In [None]:
# default_exp cluster

# fe_utils.cluster
> module for clustering optimization

In [None]:
# hide
from nbdev.showdoc import*
from functools import partial
import hdbscan
from hyperopt import hp
from hyperopt import fmin, tpe, space_eval
from hyperopt import Trials
from hyperopt import STATUS_OK
import numpy as np
import os
import pandas as pd
from sklearn.preprocessing import normalize
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import make_scorer
from sklearn.metrics.cluster import adjusted_rand_score
from sklearn.metrics.cluster import homogeneity_completeness_v_measure
from sklearn.metrics.cluster import homogeneity_score, v_measure_score, silhouette_score 
from sklearn.metrics.cluster import completeness_score, adjusted_mutual_info_score
import time
import umap


In [None]:
# export

def load_embedding(path):
    """
    dataloader for saved embeddings. Load from numpy file
    """
    emb = np.load(path)
    
    return emb
    

In [None]:
# export

from hyperopt import fmin, tpe, space_eval

class BayesClusterTrainer():
    """
    A trainer for cluster optimization runs
    Inputs:
        `space`: `dict` containing relevant parameter spaces for `hdbscan` and `umap`
        
    """
    
    def __init__(self, space, cost_fn_params, embeddings, labels, *args, **kwargs):
        self.space = space
        self.cost_fn_params = cost_fn_params

        self.embeddings = embeddings
        self.labels = labels

        self.logs = []

        self.run = dict()

    def generate_clusters(self, embeddings,
                        min_cluster_size,
                        cluster_selection_epsilon,
                        cluster_selection_method,
                        metric,
                        n_neighbors,
                        n_components, 
                        random_state = 42):
        """
        Generate HDBSCAN cluster object after reducing embedding dimensionality with UMAP
        """
    
        umap_embeddings = (umap.UMAP(n_neighbors=n_neighbors, 
                                    n_components=n_components, 
                                    metric='cosine', 
                                    random_state=random_state)
                                    .fit_transform(embeddings))

        clusters = hdbscan.HDBSCAN(min_cluster_size = min_cluster_size,
                                   metric=metric, cluster_selection_epsilon = cluster_selection_epsilon,
                                   cluster_selection_method=cluster_selection_method,
                                   gen_min_span_tree=True).fit(umap_embeddings)

        return clusters

    def objective(self, params, embeddings, labels):
        """
        Objective function for hyperopt to minimize, which incorporates constraints
        on the number of clusters we want to identify
        """
    
        clusters = generate_clusters(embeddings, 
                                     n_neighbors = params['n_neighbors'], 
                                     n_components = params['n_components'], 
                                     min_cluster_size = params['min_cluster_size'],
                                     cluster_selection_epsilon = params['cluster_selection_epsilon'],
                                     metric = params['metric'],
                                     cluster_selection_method = params['cluster_selection_method'],
                                     random_state = 42)

        cost = score_clusters(clusters, y)


        pprint(params)

        loss = cost

        return {'loss': loss, 'status': STATUS_OK}


    def score_clusters(self, clusters, y):
        """
        Returns the label count and cost of a given cluster supplied from running hdbscan
        """
        penalty = (clusters.labels_ == -1).sum() / len(clusters.labels_)
        pers = clusters.cluster_persistence_.mean(0)
        val = clusters.relative_validity_
        outlier = clusters.outlier_scores_.mean(0)
        prob = clusters.probabilities_.mean(0)

        #cluster_size = len(np.unique(clusters.labels_))

        score = -1*(val + prob + pers) + (penalty + outlier)
        score = score/5                                             

        fns = [adjusted_rand_score, homogeneity_completeness_v_measure, homogeneity_score, v_measure_score, completeness_score, adjusted_mutual_info_score]

        print(f"SCORE: {score}")
        for fn in fns:
            print(f"{fn.__name__} : {fn(clusters.labels_, y)}")                                            
        print("-"*20)

        return score


    def train(self, max_evals=100, algo=tpe.suggest):
        """
        Perform bayesian search on hyperopt hyperparameter space to minimize objective function
        """
    
        trials = Trials()
        fmin_objective = partial(self.objective, embeddings=self.embeddings, labels=self.labels)
        best = fmin(fmin_objective, 
                    space = self.space, 
                    algo=algo,
                    max_evals=max_evals, 
                    trials=trials)

        best_params = space_eval(self.space, best)
        print ('best:')
        print (best_params)
        print("-"*20)
        print("-"*20)

        best_clusters = generate_clusters(self.embeddings, 
                                         n_neighbors = best_params['n_neighbors'], 
                                         n_components = best_params['n_components'], 
                                         min_cluster_size = best_params['min_cluster_size'],
                                         cluster_selection_epsilon = best_params['cluster_selection_epsilon'],
                                         metric = best_params['metric'],
                                         cluster_selection_method = best_params['cluster_selection_method']
                                         )

        return best_params, best_clusters, trials

    def fit(self):
        print('*' * 10)
        print('TRAINING NOW!')
        print('*' * 10)
        

In [None]:
show_doc(BayesClusterTrainer.generate_clusters)

<h4 id="BayesClusterTrainer.generate_clusters" class="doc_header"><code>BayesClusterTrainer.generate_clusters</code><a href="__main__.py#L22" class="source_link" style="float:right">[source]</a></h4>

> <code>BayesClusterTrainer.generate_clusters</code>(**`embeddings`**, **`min_cluster_size`**, **`cluster_selection_epsilon`**, **`cluster_selection_method`**, **`metric`**, **`n_neighbors`**, **`n_components`**, **`random_state`**=*`42`*)

Generate HDBSCAN cluster object after reducing embedding dimensionality with UMAP

In [None]:
show_doc(BayesClusterTrainer.objective)

<h4 id="BayesClusterTrainer.objective" class="doc_header"><code>BayesClusterTrainer.objective</code><a href="__main__.py#L47" class="source_link" style="float:right">[source]</a></h4>

> <code>BayesClusterTrainer.objective</code>(**`params`**, **`embeddings`**, **`labels`**)

Objective function for hyperopt to minimize, which incorporates constraints
on the number of clusters we want to identify

In [None]:
show_doc(BayesClusterTrainer.score_clusters)

<h4 id="BayesClusterTrainer.score_clusters" class="doc_header"><code>BayesClusterTrainer.score_clusters</code><a href="__main__.py#L72" class="source_link" style="float:right">[source]</a></h4>

> <code>BayesClusterTrainer.score_clusters</code>(**`clusters`**, **`y`**)

Returns the label count and cost of a given cluster supplied from running hdbscan

In [None]:
show_doc(BayesClusterTrainer.train)

<h4 id="BayesClusterTrainer.train" class="doc_header"><code>BayesClusterTrainer.train</code><a href="__main__.py#L97" class="source_link" style="float:right">[source]</a></h4>

> <code>BayesClusterTrainer.train</code>(**`max_evals`**=*`100`*, **`algo`**=*`suggest`*)

Perform bayesian search on hyperopt hyperparameter space to minimize objective function

Example:

In [None]:
space = {'some_param': range(10)}
cost_fn_param = {'a':0.3, 'b': 0.7}
emb = np.random.randn(32, 768)
labels = np.random.randint(0,1,32)

trainer = BayesClusterTrainer(space, cost_fn_param, emb, labels)
trainer.fit()

**********
TRAINING NOW!
**********


tests:

In [None]:
assert type(cost_fn_param) == dict
assert type(space) == dict