In [None]:
import permetrics
import pickle
import pandas as pd
from sklearn import preprocessing, metrics
import numpy as np
from glob import glob
import os
import warnings
warnings.filterwarnings("ignore")

In [None]:
root_folder = "Synthetic datasets"
result_folder = os.path.join(root_folder, "results")
data_folder = os.path.join(root_folder, "data")
baseline_folder = os.path.join(root_folder, "baseline")


if not os.path.exists(baseline_folder):
    os.makedirs(baseline_folder)

In [None]:
def dunn_index(X, y_pred):
    # I had to reimplement it because the permetrics library often fails to compute it
    # Compute the distances on this dataset
    distances = metrics.pairwise_distances(X, metric="euclidean")
    K = len(np.unique(y_pred))
    # Now, we must identify for each pair of clusters their closest distance
    cluster_d = np.ones((K, K))*np.inf
    for i in range(K):
        i_indices, = np.where(y_pred==i)
        for j in range(i+1, K):
            j_indices, = np.where(y_pred==j)
            cluster_d[i,j] = distances[i_indices][:,j_indices].min()
            cluster_d[j,i] = cluster_d[i,j]
    
    d_min = cluster_d.min()
    
    cluster_diameters = np.zeros(K)
    for i in range(K):
        i_indices, = np.where(y_pred==i)
        cluster_diameters[i] = distances[i_indices][:,i_indices].max()
    
    d_max = cluster_diameters.max()
    
    return d_min/d_max
    

In [None]:
def c_index(X, y_pred):
    # C index must be minimised
    # Compute the distances on this dataset
    distances = metrics.pairwise_distances(X, metric="euclidean")
    
    # Compute some constants
    n = len(y_pred)
    N_t = n*(n-1)//2
    N_w = 0
    # Compute within cluster distances
    S_w = 0
    for k in np.unique(y_pred):
        indices = y_pred==k
        S_w += distances[indices][:,indices].sum()/2 # divide by two all pairs have been seen twice
        n_cluster = indices.sum()
        N_w += n_cluster*(n_cluster-1)//2
    # Extract all non-diagonal distances
    distances = distances[np.triu_indices(len(distances),k=1)]
    distances.sort()
    S_min = distances[:N_w].sum()
    S_max = distances[-N_w:].sum()
    return (S_w-S_min)/(S_max-S_min)

In [None]:
def mclain_rao_index(X, y_pred):
    # index must be minimised
    # Compute the distances on this dataset
    distances = metrics.pairwise_distances(X, metric="euclidean")
    
    # Compute some constants
    n = len(y_pred)
    N_t = n*(n-1)//2
    N_w = 0
    # Compute within cluster distances, and between cluster distances
    S_w = 0
    S_b = 0
    cluster_ids = np.unique(y_pred)
    for i, k1 in enumerate(cluster_ids):
        indices1 = y_pred==k1
        S_w += distances[indices1][:,indices1].sum()/2 # divide by two all pairs have been seen twice
        n_cluster = indices1.sum()
        N_w += n_cluster*(n_cluster-1)//2
        for k2 in cluster_ids[i+1:]:
            indices2 = y_pred==k2
            S_b += distances[indices1][:,indices2].sum()
    N_b = N_t-N_w
    S_w /= N_w
    S_b /= N_b
    return S_w/S_b

In [None]:
def pbm_index(X, y_pred):
    data_centroid = X.mean(0, keepdims=True)
    E_t = metrics.pairwise_distances(X, data_centroid).sum()
    
    # Compute some constants
    n = len(y_pred)
    N_t = n*(n-1)//2
    N_w = 0
    # Compute within cluster distances
    E_w = 0
    all_centroids = []
    cluster_ids = np.unique(y_pred)
    for k in cluster_ids:
        indices = y_pred==k
        cluster_centroid = X[indices].mean(0, keepdims=True)
        all_centroids += [cluster_centroid]

        E_w += metrics.pairwise_distances(X[indices], cluster_centroid).sum()
    centroid_distances = metrics.pairwise_distances(np.concatenate(all_centroids, axis=0))
    D = centroid_distances.max()

    return np.square(E_t*D/E_w/len(cluster_ids))

In [None]:
def wemmert_gancarski_index(X, y_pred):
    cluster_ids = np.unique(y_pred)

    cluster_centroids = [X[y_pred==k].mean(0, keepdims=True) for k in cluster_ids]
    distances_to_centroids = metrics.pairwise_distances(X, np.concatenate(cluster_centroids, axis=0))
    
    # Compute within cluster distances
    J = 0
    for i, k in enumerate(cluster_ids):
        indices = y_pred==k
        cluster_distances = distances_to_centroids[indices]
        R = cluster_distances[:,i] / np.delete(cluster_distances, i, axis=1).min()
        J += max(0, 1-R.mean()) * len(indices)

    return J/len(y_pred)

In [None]:
score_fcts = [x for x in dir(permetrics.ClusteringMetric) if x[-1]=="I"]
smaller_is_better = ["BHI", "XBI", "DBI", "BRI", "KDI", "SSEI", "MSEI", "DHI", "BI", "HI", "CI", "McRao"]
bigger_is_better = ["DRI", "DI", "CHI", "LDRI", "LSRI", "SI", "RSI", "DBCVI","PBM", "WG"]
for dataset_file in glob(os.path.join(data_folder, "*_X.csv")):
    all_scores = []
    print(dataset_file)
    dataset_name = dataset_file.split(os.sep)[-1][:-6]
    if os.path.exists(os.path.join(baseline_folder, f"{dataset_name}_baseline.csv")):
        continue
    X = pd.read_csv(dataset_file).to_numpy()
    y_true = pd.read_csv(dataset_file.replace("_X.csv", "_y.csv")).to_numpy().reshape(-1)

    for prediction_file in glob(os.path.join(result_folder, dataset_name+"_*.pkl")):
        model_name = prediction_file.split("_")[-1][:-4]
        print(f"\tModel = {model_name}")
        with open(prediction_file, "rb") as file:
            predictions = pickle.load(file)

        for i, y_pred in enumerate(predictions):
            y_pred = preprocessing.LabelEncoder().fit_transform(y_pred) # This tackles DBSCAN -1 cluster
            evaluator = permetrics.ClusteringMetric(y_pred=y_pred.tolist(), X=X)
            for score_fct in score_fcts:
                try:
                    if score_fct not in ["DI", "CI", "McRao", "PBM", "WG"]:
                        value = evaluator.__getattribute__(score_fct)()
                    elif score_fct=="DI":
                        value = dunn_index(X, y_pred)
                    elif score_fct=="WG":
                        value = wemmert_gancarski_index(X, y_pred)
                    elif score_fct == "PBM":
                        value = pbm_index(X, y_pred)
                    elif score_fct=="CI":
                        value = c_index(X, y_pred)
                except:
                    value = np.nan
                if score_fct in smaller_is_better:
                    value = -value # To leverage a positive correlation when the metric goes on the right track
                all_scores += [{
                    "Dataset":dataset_name,
                    "Model":model_name,
                    "Run":i,
                    "Score":score_fct,
                    "Value":value
                }]
            all_scores += [{
                "Dataset":dataset_name,
                "Model":model_name,
                "Run":i,
                "Score":"ARI",
                "Value":metrics.adjusted_rand_score(y_true, y_pred)
            }]
    pd.DataFrame(all_scores).to_csv(os.path.join(baseline_folder, f"{dataset_name}_baseline.csv"), index=False)
                    