In [21]:
import os

os.environ["OMP_NUM_THREADS"] = "1"

import numpy as np
from sklearn.cluster import KMeans
from sklearn.metrics import adjusted_rand_score, silhouette_score
from sklearn.model_selection import KFold
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import make_blobs
from scipy.spatial.distance import cdist
import joblib


In [22]:
#make syntetic data

# Parameters for data generation
n_samples = 200  # Number of data points
n_features = 768  # Dimensionality of each data point
centers = 4       # Number of clusters
cluster_std = 5.0 # Standard deviation to add noise to the clusters
noise_level = 30 # Adjust this to control the amount of noise

# Generate synthetic data
X, y = make_blobs(n_samples=n_samples, 
                  n_features=n_features, 
                  centers=centers, 
                  cluster_std=cluster_std, 
                  random_state=42)

# Adding noise to the data
noise = np.random.normal(0, noise_level, X.shape)
X_noisy = X + noise

In [23]:


class TreeClusteringClassifier:
    def __init__(self, max_clusters=10, min_clusters=2):
        self.max_clusters = max_clusters
        self.min_clusters = min_clusters
        self.kmeans = None
        self.scaler = StandardScaler()
        self.class_names = ['1', '2', '3', '4']
        self.class_mapping = None

    def find_optimal_clusters(self, embeddings):
        silhouette_scores = []
        for k in range(self.min_clusters, self.max_clusters + 1):
            kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
            cluster_labels = kmeans.fit_predict(embeddings)
            silhouette_avg = silhouette_score(embeddings, cluster_labels)
            silhouette_scores.append(silhouette_avg)
        
        optimal_clusters = silhouette_scores.index(max(silhouette_scores)) + self.min_clusters
        return optimal_clusters

    def train_and_save_model(self, embeddings, labels, model_path, n_splits=5):
        kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
        
        embeddings_scaled = self.scaler.fit_transform(embeddings)
        
        best_ari = -1
        best_model = None
        best_n_clusters = 0
        
        for fold, (train_idx, val_idx) in enumerate(kf.split(embeddings_scaled)):
            print(f"Fold {fold + 1}/{n_splits}")
            
            train_embeddings = embeddings_scaled[train_idx]
            val_embeddings = embeddings_scaled[val_idx]
            val_labels = labels[val_idx]
            
            n_clusters = self.find_optimal_clusters(train_embeddings)
            print(f"Optimal number of clusters: {n_clusters}")
            
            kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
            kmeans.fit(train_embeddings)
            
            val_predictions = kmeans.predict(val_embeddings)
            ari = adjusted_rand_score(val_labels, val_predictions)
            print(f"Fold {fold + 1} Adjusted Rand Index: {ari:.4f}")
            
            if ari > best_ari or (ari == best_ari and n_clusters < best_n_clusters):
                best_ari = ari
                best_model = kmeans
                best_n_clusters = n_clusters
        
        print(f"Best Adjusted Rand Index: {best_ari:.4f}")
        print(f"Best number of clusters: {best_n_clusters}")
        self.kmeans = best_model
        self.class_mapping = self.map_clusters_to_classes(embeddings, labels)
        self.save_model(model_path)

    def save_model(self, path):
        joblib.dump({'kmeans': self.kmeans, 'scaler': self.scaler, 'class_mapping': self.class_mapping}, path)
        print(f"Model saved to {path}")

    def load_model(self, path):
        if os.path.exists(path):
            model_data = joblib.load(path)
            self.kmeans = model_data['kmeans']
            self.scaler = model_data['scaler']
            self.class_mapping = model_data['class_mapping']
            print(f"Model loaded from {path}")
        else:
            print(f"No saved model found at {path}")

    def predict(self, embeddings):
        if self.kmeans is None:
            raise ValueError("Model not trained or loaded. Please train or load a model first.")
        embeddings_scaled = self.scaler.transform(embeddings)
        cluster_labels = self.kmeans.predict(embeddings_scaled)
        distances = cdist(embeddings_scaled, self.kmeans.cluster_centers_, 'euclidean')
        probabilities = np.exp(-distances)
        probabilities /= probabilities.sum(axis=1, keepdims=True)
        return cluster_labels, np.max(probabilities, axis=1)

    def predict_with_class_names(self, embeddings):
        cluster_labels, probabilities = self.predict(embeddings)
        if self.class_mapping is None:
            raise ValueError("Class mapping not available. Please train or load a model first.")
        predictions = [self.class_mapping.get(label, 'unknown') for label in cluster_labels]
        return list(zip(predictions, probabilities))

    def map_clusters_to_classes(self, embeddings, labels):
        if self.kmeans is None:
            raise ValueError("Model not trained or loaded. Please train or load a model first.")
        
        n_clusters = self.kmeans.n_clusters
        class_labels = np.unique(labels)
        
        cluster_to_classes = {}
        for cluster in range(n_clusters):
            cluster_indices = np.where(self.kmeans.labels_ == cluster)[0]
            cluster_class_labels = labels[cluster_indices]
            most_common_class = np.bincount(cluster_class_labels).argmax()
            cluster_to_classes[cluster] = self.class_names[most_common_class]
        
        return cluster_to_classes


In [None]:

def main():
    
    embeddings = X_noisy
    labels = y

    classifier = TreeClusteringClassifier(max_clusters=10, min_clusters=2)

    classifier.train_and_save_model(embeddings, labels, 'best_tree_clustering_model.joblib')

    classifier.load_model('best_tree_clustering_model.joblib')

    indices = np.random.choice(len(X), size=100, replace=False)
    new_embeddings = X[indices]
    
    predictions = classifier.predict_with_class_names(new_embeddings)

    print("\nPredictions for new data:")
    for i, (class_name, probability) in enumerate(predictions):
        print(f"Sample {i + 1}: Predicted class: {class_name}, Probability: {probability:.4f}")

if __name__ == "__main__":
    main()