In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import pickle as pk
from collections import defaultdict
import re

from sentence_transformers import SentenceTransformer
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.cluster.hierarchy import fcluster
from sklearn.metrics import confusion_matrix
from sklearn.cluster import KMeans

from sklearn.metrics import adjusted_rand_score
from sklearn.metrics import normalized_mutual_info_score
from sklearn.metrics import homogeneity_completeness_v_measure

  from tqdm.autonotebook import tqdm, trange


In [2]:
def get_clusters(embeddings, texts, cluster_assignment, printclusters):
    """Helper function for perform_hierarchicalClustering()
    Args:
        embeddings (list): List of embeddings
        texts (list): List of texts
        cluster_assignment (list): cluster assignment for each text/embedding
        printcluster (bool): True will print all texts in each cluster
    Returns:
        response_to_cluster dict: text to cluster number
        cluster_to_response dict: cluster number to list of texts in that cluster
        num_clusters int: number of clusters
        min_similarities dict: cluster to min text similiarity
    """

    cluster_to_response = defaultdict(list)                                 # make cluster_to_response
    cluster_to_embeddings = defaultdict(list)                               # make cluster_to_embeddings (used for min sim)
    for ind, cluster_num in enumerate(cluster_assignment):
        cluster_to_response[cluster_num].append(texts[ind])
        cluster_to_embeddings[cluster_num].append(embeddings[ind])
    
    response_to_cluster = dict(zip(texts, cluster_assignment))              # make response_to_cluster 
    
    num_clusters = len(np.unique(cluster_assignment))                       # find num clusters

    min_similarities = {}   
    mean_similarities = {}                                                   # find min paiwise text similarity in each cluster
                                                                            # find mean paiwise text similarity in each cluster
    for cl in cluster_to_embeddings:
        stacked = np.array(cluster_to_embeddings[cl])
        sim = stacked @ stacked.T
        np.fill_diagonal(sim, np.inf)
        min_sim = np.min(sim)
        min_similarities[cl] = min_sim
        mean_sim = np.mean(sim)
        mean_similarities[cl] = mean_sim

    # code to get interpretation of each cluster 
    
    if printclusters:                                                       # print clusters if True
        for cluster_num, responses in cluster_to_response.items():
            print(f"Cluster {cluster_num}")
            print(responses, end="\n\n")

    return response_to_cluster, cluster_to_response, num_clusters, min_similarities, mean_similarities

def perform_hierarchicalClustering(embeddings, texts, method, printclusters, ax=None):
    """Performs hierarchical clustering
    Args:
        embeddings (list): List of embeddings
        texts (list): List of texts
        ax: axes for plotting
        type (tuple): a: distance/maxclust based clustering, b: distance threshold/max number of clusters
        printcluster (bool): True will print all texts in each cluster
    Calls:
        get_clusters()
    """
    linked = linkage(embeddings, 'ward')                                                                                                    # 'ward' distance for measuring distance between clusters
    
    if ax is not None:
        # Elbow plot -- Plot mean minsim/number of clusters as a fn of cut-off distance
        mean_minsemsim = []
        num_clusters = []
        for cod in np.linspace(1, 8, 30):
            cluster_assignment = fcluster(linked, t=cod, criterion='distance')
            _, _, _, minsims, _ = get_clusters(embeddings, texts, cluster_assignment, False)
            mean_minsemsim.append(np.mean(list(minsims.values())))
            num_clusters.append(len(np.unique(cluster_assignment)))
    
        dendrogram(linked, orientation='top', labels=texts, distance_sort='descending', show_leaf_counts=False, no_labels=True, ax=ax[0])       # Make dendogram
        ax[1].plot(np.linspace(1, 8, 30), mean_minsemsim)
        ax[1].set_xlabel("Cut-off distance"); ax[1].set_ylabel("Mean cluster semantic similarity")
        ax[2].plot(np.linspace(1, 8, 30), num_clusters)
        ax[2].set_xlabel("Cut-off distance"); ax[2].set_ylabel("Number of clusters")

    cluster_assignment = fcluster(linked, t=method[1], criterion=method[0])         # assign clusters
    print(cluster_assignment)
    print(type(cluster_assignment))
    print(type(cluster_assignment[0]))
    return get_clusters(embeddings, texts, cluster_assignment, printclusters)

def permform_KMeans(embeddings, texts, method, printclusters):
    kmeans = KMeans(n_clusters=method[1], random_state=42)
    kmeans.fit(embeddings)
    cluster_assignment = kmeans.labels_
    return get_clusters(embeddings, texts, cluster_assignment, printclusters)

def calculate_cosine_similarity(embedding1, embedding2):
    """Calculates cosine similarity between two embeddings
    Args:
        embedding1: first embedding
        embedding2: second embedding
    Returns:
        cosine similarity
    """
    if np.any(embedding1) and np.any(embedding2):
        return np.dot(embedding1, embedding2)       # embeddings are already normalised so no need to divide by norm
    else:
        return 0

def normalize(embedding):
    norm = np.linalg.norm(embedding)
    if norm == 0:
        return embedding  # Return as-is if it's a zero vector
    return embedding / norm

def get_similarity_matrix(embeddings):
    embedding_matrix = np.array(embeddings)
    # Since embeddings are already normalized, the cosine similarity is just the dot product
    similarity_matrix = np.dot(embedding_matrix, embedding_matrix.T)
    return similarity_matrix

def plot_heatmap(matrix, texts, title):
    plt.figure(figsize=(8, 6))
    sns.heatmap(matrix, cmap="coolwarm", annot=True)
    plt.title(title)
    plt.show()

def write_category_SS_jump(taskid, model, context, embeddings, response_to_cluster):
    data.loc[data["task"] == taskid, f"category_{model}_{context}"] = data[data["task"] == taskid]["response"].apply(lambda x: response_to_cluster.get(x))
    data.loc[data["task"] == taskid, f"SS_{model}_{context}"] = data[data["task"] == taskid].apply(lambda row: calculate_cosine_similarity(embeddings.get(row['response']), embeddings.get(row['previous_response'])), axis=1)
    

    data[f"jump_cat_{model}_{context}"] = (~(data[f"category_{model}_{context}"].diff() == 0)).astype(int)
    data[f"jump_SS_{model}_{context}"] = (data[f"SS_{model}_{context}"] < 0.8).astype(int)
    
    # print(sum(pd.isna(data[data["task"] == taskid][f"jump_cat_{model}_{context}"])))
    # print(sum(pd.isna(data[data["task"] == taskid][f"jump_SS_{model}_{context}"])))
    # data[f"jump_cat_{model}_{context}"].fillna(0, inplace=True)
    # data[f"jump_SS_{model}_{context}"].fillna(0, inplace=True)

    data.loc[data["task"] == taskid, f'jump_{model}_{context}'] = data[f"jump_cat_{model}_{context}"] & data[f"jump_SS_{model}_{context}"]

def get_cluster_agreement(cluster_assignment1, cluster_assignment2):
    ari = adjusted_rand_score(cluster_assignment1, cluster_assignment2)
    nmi = normalized_mutual_info_score(cluster_assignment1, cluster_assignment2)
    homogeneity, completeness, v_measure = homogeneity_completeness_v_measure(cluster_assignment1, cluster_assignment2)
    print(ari, nmi, homogeneity, completeness, v_measure)

def get_jump_agreement(data, task, jumpcol1, jumpcol2):
    cm = confusion_matrix(data[data["task"] == task][jumpcol1], data[data["task"] == task][jumpcol2])
    print(cm)
    TN, FP, FN, TP = cm.ravel()

    # Calculate TPR, FPR, TNR, FNR
    TPR = TP / (TP + FN)  # Sensitivity, Recall
    FPR = FP / (FP + TN)
    TNR = TN / (TN + FP)  # Specificity
    FNR = FN / (TP + FN)

    print(f"True Positive Rate (TPR): {TPR:.2f}")
    print(f"True Negative Rate (TNR): {TNR:.2f}")
    print(f"False Positive Rate (FPR): {FPR:.2f}")
    print(f"False Negative Rate (FNR): {FNR:.2f}")

In [3]:
data = pd.read_csv("../csvs/data_humans_allresponses.csv")
texts_autbrick = data[data["task"] == 2]["response"].unique().tolist()
texts_autpaperclip = data[data["task"] == 3]["response"].unique().tolist()
texts_vf = data[data["task"] == 1]["response"].unique().tolist()
texts = [texts_autbrick, texts_autpaperclip, texts_vf]
tasks = ["autbrick", "autpaperclip", "vf"]
taskid = [2, 3, 1]

In [4]:
models = ["qwen", "stella", "gtelarge", "jxm"]
contexts = ["noshortcontext", "nearshortcontext", "farshortcontext"]
# nocontext - only text embedding
# nearcontext - task keyword (brick/paperclip/animal) + text embedding
# farcontext - different task keyword (animal/brick/paperclip) + text embedding

In [5]:
printclusters = True

In [6]:
for i, textset in enumerate(texts):
    for model in models:
        for context in contexts:
            print(tasks[i], model, context)
            embeddings = pk.load(open(f"../embeddings/embeddings_{model}_{tasks[i]}_{context}.pk", "rb"))
            response_to_cluster, cluster_to_response, num_clusters, minsim, meansim = perform_hierarchicalClustering(list(embeddings.values()), textset, ("maxclust", 25), printclusters)    # or ("distance", 1.89)
            print(response_to_cluster)
            print("Num clusters =", num_clusters)
            write_category_SS_jump(taskid[i], model, context, embeddings, response_to_cluster)

autbrick qwen noshortcontext
[ 1  1  1 ... 14  3 15]
<class 'numpy.ndarray'>
<class 'numpy.int32'>
Cluster 1
['house', 'bridge', 'school', 'hotel', 'riot', 'room', 'houses', 'tables', 'art', 'chair', 'destroy', 'weights', 'weapon', 'stamp', 'gift', 'draw', 'break', 'sink', 'traffic', 'weight', 'heat', 'stairs', 'path', 'street', 'castle', 'church', 'wall', 'stone', 'floor', 'villa', 'pillar', 'pole', 'gate', 'sell', 'save', 'paint', 'roof', 'throw', 'build', 'beat', 'drop', 'hang', 'packing', 'cook', 'balance', 'sign', 'cut', 'transport', 'freeze', 'football', 'fork', 'knife', 'comb', 'brush', 'telephone', 'play', 'canvas', 'paper', 'hat', 'cap', 'close', 'sit', 'sand', 'exchange', 'builder', 'oven', 'red', 'brown', 'industry', 'factory', 'hot', 'trade', 'design', 'bread', 'pizza', 'drawing', 'stack', 'hammer', 'chalk', 'collect', 'pack', 'playing', 'lay', 'dig', 'stand', 'table', 'trash', 'plate', 'rock', 'warehouse', 'support', 'seat', 'hit', 'push', 'stop', 'write', 'container', 'ha

KeyboardInterrupt: 

In [None]:
model1 = "gtelarge"
context1 = "farshortcontext"
model2 = "stella"
context2 = "farshortcontext"

get_cluster_agreement(data[f"category_{model1}_{context1}"], data[f"category_{model2}_{context2}"])

In [None]:
model1 = "gtelarge"
context1 = "noshortcontext"
model2 = "gtelarge"
context2 = "nearshortcontext"

ari = adjusted_rand_score(data[f"category_{model1}_{context1}"].tolist(), data[f"category_{model2}_{context2}"].tolist())
print(ari)

In [None]:
model1 = "gtelarge"
context1 = "noshortcontext"
model2 = "gtelarge"
context2 = "farshortcontext"

ari = adjusted_rand_score(data[f"category_{model1}_{context1}"].tolist(), data[f"category_{model2}_{context2}"].tolist())
print(ari)

In [None]:
model1 = "gtelarge"
context1 = "nearshortcontext"
model2 = "gtelarge"
context2 = "farshortcontext"

ari = adjusted_rand_score(data[f"category_{model1}_{context1}"].tolist(), data[f"category_{model2}_{context2}"].tolist())
print(ari)

In [None]:
model1 = "gtelarge"
context1 = "noshortcontext"
model2 = "stella"
context2 = "noshortcontext"

ari = adjusted_rand_score(data[f"category_{model1}_{context1}"].tolist(), data[f"category_{model2}_{context2}"].tolist())
print(ari)

In [None]:
model1 = "stella"
context1 = "noshortcontext"
model2 = "qwen"
context2 = "noshortcontext"

