In [None]:
import itertools
import numpy as np
from tqdm import tqdm

from sklearn.cluster import KMeans, AgglomerativeClustering, DBSCAN, HDBSCAN
from sklearn.mixture import GaussianMixture
from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score

In [None]:
from DL_Datasets import *
from DL_Models import *
from DL_ModelTrainer import ModelTrainer

dataset = ImageDataset("heredenene")
model = PowerOf2s256andAbove()
mt = ModelTrainer(num_of_epochs=1, lr=0.001,
                  batch_size=16, loss_type="mse",
                  dataset=dataset, model=model,
                  ckpt_path="heredenene_PowerOf2s256andAbove_mse_05:16:19:01:53/min_loss:0.06544473022222519_epoch:19.pth")

In [None]:
mt.train()

In [None]:
mt.get_features(0, 10)

# .

In [None]:
class DL_Clustering():
    def __init__(self, model_trainer, method, number_of_clusters=[None], max_iter=[200], DBSCAN_eps=[0.5],
                DBSCAN_min_samples=[5], HDBSCAN_min_cluster_size=[5], HDBSCAN_max_cluster_size=[None]):
        self.model_trainer = model_trainer
        self.method = method
        self.number_of_clusters = number_of_clusters
        self.max_iter = max_iter
        self.DBSCAN_eps = DBSCAN_eps
        self.DBSCAN_min_samples = DBSCAN_min_samples
        self.HDBSCAN_min_cluster_size = HDBSCAN_min_cluster_size
        self.HDBSCAN_max_cluster_size = HDBSCAN_max_cluster_size        

        pass
    
    def __str__(self, verbose=0):
        """casting to string method for printing/debugging object attributes

        Returns:
            str: object attribute information
        """
        attributes = vars(self)
        attr_strings = [f"{key}: {value}" for key, value in attributes.items()]
        return "-"*70 + "\n" + "\n".join(attr_strings) + "\n" + "-"*70

    def arguman_check(self, verbose=0):
        valid_methods = ["kmeans", "hierarchy", "DBSCAN", "gaussian", "HDBSCAN"]
        if self.method not in valid_methods:
            6/0

        
    def get_models(self, verbose=0):
        def calculate_grid_search():
            param_grid = []
            if self.method == "kmeans":
                param_grid = list(itertools.product(number_of_clusters, max_iter))
            elif self.method == "hierarchy":
                param_grid = number_of_clusters
            elif self.method == "DBSCAN":
                param_grid = list(itertools.product(DBSCAN_eps, DBSCAN_min_samples))
            elif self.method == "gaussian":
                param_grid = list(itertools.product(number_of_clusters, max_iter))
            elif self.method == "HDBSCAN":
                param_grid = list(itertools.product(HDBSCAN_min_cluster_size, HDBSCAN_max_cluster_size))
            
            return param_grid
        param_grid = calculate_grid_search()

        models = []
        for params in param_grid:
            if self.method == "kmeans":
                n_clusters, max_iter = params
                models.append(KMeans(n_clusters=n_clusters, max_iter=max_iter, verbose=verbose))
            elif self.method == "hierarchy":
                n_clusters = params
                models.append(AgglomerativeClustering(n_clusters=n_clusters))
            elif self.method == "DBSCAN":
                eps, min_samples = params
                models.append(DBSCAN(eps=eps, min_samples=min_samples))
            elif self.method == "gaussian":
                n_clusters, max_iter = params
                models.append(GaussianMixture(n_components=n_clusters, max_iter=max_iter, verbose=verbose))
            elif self.method == "HDBSCAN":
                min_cluster_size, max_cluster_size = params
                models.append(HDBSCAN(min_cluster_size=min_cluster_size, max_cluster_size=max_cluster_size))

        return models

    def find_best_model(self, verbose=0):
        silhouette_scores, db_scores, ch_scores = [], [], []
        for model in tqdm(self.models):
            labels = model.fit_predict(self.reps)
            
            silhouette_scores.append(silhouette_score(self.reps, labels))
            db_scores.append(davies_bouldin_score(self.reps, labels))
            ch_scores.append(calinski_harabasz_score(self.reps, labels))

        silhouette_scores, db_scores, ch_scores = np.array(silhouette_scores), np.array(db_scores), np.array(ch_scores)
        silhouette_scores = (silhouette_scores - silhouette_scores.min())/(silhouette_scores.max()-silhouette_scores.min())
        db_scores = (db_scores - db_scores.min())/(db_scores.max()-db_scores.min())
        ch_scores = (ch_scores - ch_scores.min())/(ch_scores.max()-ch_scores.min())
        
        combined_scores = silhouette_scores - db_scores + ch_scores
        best_model = self.models[np.argmax(combined_scores)]
        
        return best_model

    def clusters_from_labels(self, verbose=0):
        clusters = {}
        for file, cluster_id in zip(self.paths, self.labels):
            if cluster_id not in clusters:
                clusters[cluster_id] = []
            clusters[cluster_id].append(file)
        return clusters

    def calculate_batch_similarity(self, verbose=0):
        features = self.model_trainer()
        self.paths = list(features.keys())
        self.reps = np.array(list(features.values()))

        self.models = self.get_models()
        self.best_model = self.find_best_model()
        self.labels = self.best_model.fit_predict(self.reps)        


    def calculate_template_similarity(self, verbose=0):
        pass

    def merge_clusters_my_templates(self, verbose=0):
        pass

    def create_clusters(self, verbose=0):
        clusters = self.clusters_from_labels()
        write_clusters(clusters, batch_idx, self.result_container_folder, outliers, self.transfer, verbose=verbose-1)
        if verbose > 0:
            print("-"*70)

    def process(self, verbose=0):
        pass

    def __call__(self, verbose=0):
        pass