In [1]:
import sys
import os

# Get the absolute path of the parent directory
parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))

# Add the parent directory to sys.path
sys.path.append(parent_dir)

In [2]:
from utilities import read_files
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from openai import OpenAI
from umap import UMAP
import csv
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans
from sklearn.cluster import AgglomerativeClustering
from sklearn.cluster import HDBSCAN
from sklearn.cluster import ward_tree
from sklearn.metrics import silhouette_score
from sklearn.metrics import davies_bouldin_score
from sklearn.metrics import calinski_harabasz_score
from sklearn.decomposition import PCA
import torch
from transformers import AutoTokenizer, AutoModel    

In [3]:
def generate_embeddings(tokenizer, model, code_snippet):
    inputs = tokenizer(code_snippet, return_tensors="pt", padding=True, truncation=True, max_length=512)
    
    with torch.no_grad():
        outputs = model(**inputs)
    
    token_embeddings = outputs.last_hidden_state
    
    code_embedding = torch.mean(token_embeddings, dim=1)
    
    return code_embedding

In [4]:
def embed_files_openai(df, embedding_model = "text-embedding-3-large"):
    return df['text'].apply(lambda x : OpenAI().embeddings.create(input=x, model= embedding_model).data[0].embedding)

def embed_files_graphcodebert(df):
    tokenizer_graphCodeBert = AutoTokenizer.from_pretrained("microsoft/graphcodebert-base")
    model_graphCodeBert = AutoModel.from_pretrained("microsoft/graphcodebert-base")
    return df['text'].apply(lambda x : generate_embeddings(tokenizer_graphCodeBert, model_graphCodeBert, x)[0])


def prepare_and_process(in_path, out_path, embdding_function = embed_files_openai):
    files = read_files(in_path)
    df = pd.DataFrame([[file, open(file, "r").read(), len(open(file, "r").read().split(" "))] for file in files], columns=["file", "text", "token_count"])    
    df = df[df["token_count"] < 8000] 
    df["embedding"] = embdding_function(df)
    df.to_pickle(out_path)
    return df

def load_df(path):
    return pd.read_pickle(path)

In [5]:
df_oa = prepare_and_process("../data/examples", "./pickles/df_text_embedding_large.pkl", embed_files_openai)
df_gb = prepare_and_process("../data/examples", "./pickles/df_graphcodebert.pkl", embed_files_graphcodebert)

Some weights of RobertaModel were not initialized from the model checkpoint at microsoft/graphcodebert-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [6]:
def run_clusterings(df):
    # HDBSCAN
    clusterer = HDBSCAN(min_cluster_size=5, metric='cosine')
    clusterer.fit(df["embedding"].to_list())
    df["hdbscan_labels"] = clusterer.labels_

    # KMeans
    kmeans = KMeans(n_clusters=10, random_state=0).fit(df["embedding"].to_list())
    df["kmeans_labels"] = kmeans.labels_

    # Agglomerative Clustering
    agg = AgglomerativeClustering(n_clusters=10).fit(df["embedding"].to_list())
    df["agg_labels"] = agg.labels_

    return df

In [7]:
df_oa = run_clusterings(df_oa)
df_gb = run_clusterings(df_gb)

In [8]:
def evaluate_clusters(df):
    slilhouette_score_hdbscan = silhouette_score(df["embedding"].to_list(), df["hdbscan_labels"])
    slilhouette_score_kmeans = silhouette_score(df["embedding"].to_list(), df["kmeans_labels"])
    slilhouette_score_agg = silhouette_score(df["embedding"].to_list(), df["agg_labels"])
    
    davies_bouldin_score_hdbscan = davies_bouldin_score(df["embedding"].to_list(), df["hdbscan_labels"])
    davies_bouldin_score_kmeans = davies_bouldin_score(df["embedding"].to_list(), df["kmeans_labels"])
    davies_bouldin_score_agg = davies_bouldin_score(df["embedding"].to_list(), df["agg_labels"])
    
    calinski_harabasz_score_hdbscan = calinski_harabasz_score(df["embedding"].to_list(), df["hdbscan_labels"])
    calinski_harabasz_score_kmeans = calinski_harabasz_score(df["embedding"].to_list(), df["kmeans_labels"])
    calinski_harabasz_score_agg = calinski_harabasz_score(df["embedding"].to_list(), df["agg_labels"])
    
    return {"hdbscan": [slilhouette_score_hdbscan, davies_bouldin_score_hdbscan, calinski_harabasz_score_hdbscan],
            "kmeans": [slilhouette_score_kmeans, davies_bouldin_score_kmeans, calinski_harabasz_score_kmeans],
            "agg": [slilhouette_score_agg, davies_bouldin_score_agg, calinski_harabasz_score_agg]}


In [9]:
def visualize_with_UMAP(df):
    
    umap = UMAP(n_components=2, random_state=0)
    umap_results = umap.fit_transform(df["embedding"].to_list())
    
    # component one is x, component two is y
    df["viz_x"] = umap_results[:,0]
    df["viz_y"] = umap_results[:,1]
    
    return df

def visualize_with_pca(df):
    
    pca = PCA(n_components=2)
    pca_results = pca.fit_transform(df["embedding"].to_list())
    
    # component one is x, component two is y
    df["viz_x"] = pca_results[:,0]
    df["viz_y"] = pca_results[:,1]
    
    return df


In [10]:
def append_explanations_to_html(out_path):
    explanations = """
    <div style="margin-top: 20px; font-family: Arial, sans-serif;">
        <h3>Score Descriptions:</h3>
        <ul>
            <li><b>Silhouette Score:</b> Measures how similar a point is to its own cluster compared to other clusters. Higher values indicate better-defined clusters.</li>
            <li><b>Davies-Bouldin Score:</b> A measure of cluster separation and compactness. Lower values are better.</li>
            <li><b>Calinski-Harabasz Score:</b> Measures the ratio of within-cluster dispersion to between-cluster dispersion. Higher values indicate better clustering.</li>
        </ul>
    </div>
    """
    
    with open(out_path, "a") as html_file:
        html_file.write(explanations)

def plot_clusters(df, out_path, visualization_function):
    
    df = visualization_function(df)
    
    # Define specs for mixed types: 'xy' for scatter plots and 'domain' for tables
    specs = [
        [{"type": "xy"}, {"type": "xy"}],  # Scatter plots in the first row
        [{"type": "domain"}, {"type": "domain"}]  # Tables in the second row
    ]
    
    # Create subplots
    fig = make_subplots(
        rows=2, cols=2,
        specs=specs,
        subplot_titles=("HDBSCAN", "KMeans", "Agglomerative Clustering")
    )
    
    # Add scatter plots with hover text showing the cluster label

    fig.add_trace(
        go.Scatter(
            x=df["viz_x"], 
            y=df["viz_y"], 
            mode='markers',
            marker=dict(color=df["kmeans_labels"], showscale=True),
            text=[f"Cluster: {line[1]["kmeans_labels"]}, File :{line[1]["file"]}" for line in df.iterrows()],
            hoverinfo="text",
            name="KMeans"
        ),
        row=1, col=1
    )
    fig.add_trace(
        go.Scatter(
            x=df["viz_x"], 
            y=df["viz_y"], 
            mode='markers',
            marker=dict(color=df["agg_labels"], showscale=True),
            text=[f"Cluster : {line[1]["agg_labels"]}, File :{line[1]["file"]}" for line in df.iterrows()],
            hoverinfo="text",
            name="Agglomerative"
        ),
        row=1, col=2
    )
    
    fig.update_layout(
        plot_bgcolor="#f4f4f4",
        paper_bgcolor="#f4f4f4"
    )
    
    # Evaluate clusters
    cluster_eval = evaluate_clusters(df)
    
    formatted_kmeans = [f"{score:.4f}" for score in cluster_eval["kmeans"]]
    formatted_agg = [f"{score:.4f}" for score in cluster_eval["agg"]]

    fig.add_trace(
        go.Table(header=dict(values=["Score", "Value"]),
                 cells=dict(values=[
                     ["Silhouette Score", "Davies Bouldin Score", "Calinski Harabasz Score"],
                     formatted_kmeans
                 ])),
        row=2, col=1
    )
    fig.add_trace(
        go.Table(header=dict(values=["Score", "Value"]),
                 cells=dict(values=[
                     ["Silhouette Score", "Davies Bouldin Score", "Calinski Harabasz Score"],
                     formatted_agg
                 ])),
        row=2, col=2
    )
    
    # Save to HTML
    fig.write_html(out_path)
    
    # Append explanations
    append_explanations_to_html(out_path)


In [11]:
plot_clusters(df_oa, "./html_figures/clusterings_openai_UMAP.html", visualize_with_UMAP)
plot_clusters(df_oa, "./html_figures/clusterings_openai_PCA.html", visualize_with_pca)

plot_clusters(df_gb, "./html_figures/clusterings_graphcodebert_UMAP.html", visualize_with_UMAP)
plot_clusters(df_gb, "./html_figures/clusterings_graphcodebert_PCA.html", visualize_with_pca)

  warn(

n_jobs value 1 overridden to 1 by setting random_state. Use no seed for parallelism.



In [12]:
df_gb.to_pickle("./pickles/df_graphcodebert.pkl")
df_oa.to_pickle("./pickles/df_text_embedding_large.pkl")

In [14]:
# make a pie chart of files per cluster

fig = px.pie(df_gb, names='hdbscan_labels', title='HDBSCAN Clusters')
fig.update_layout(
    title='HDBSCAN Clusters',
    plot_bgcolor="#f4f4f4",
    paper_bgcolor="#f4f4f4"
)
fig.write_html("./html_figures/hdbscan_clusters.html")

fig = px.pie(df_gb, names='kmeans_labels', title='KMeans Clusters')
fig.update_layout(
    title='KMeans Clusters',
    plot_bgcolor="#f4f4f4",
    paper_bgcolor="#f4f4f4"
)
fig.write_html("./html_figures/kmeans_clusters.html")

fig = px.pie(df_gb, names='agg_labels', title='Agglomerative Clusters')
fig.update_layout(
    title='Agglomerative Clusters',
    plot_bgcolor="#f4f4f4",
    paper_bgcolor="#f4f4f4"
)
fig.write_html("./html_figures/agg_clusters.html")
