In [None]:
# default_exp utils.clusters

# clusters

In [None]:
# export

from scipy.signal import find_peaks
from sklearn.cluster import MiniBatchKMeans,AgglomerativeClustering,\
                            SpectralClustering,DBSCAN,OPTICS,AffinityPropagation,\
                            AgglomerativeClustering,Birch
from sklearn.metrics import silhouette_score,calinski_harabasz_score,davies_bouldin_score
import numpy as np

In [None]:
# export

def clusters_annotation(df,method,params):
    if method not in [MiniBatchKMeans,AgglomerativeClustering,
                      SpectralClustering,DBSCAN,OPTICS,AffinityPropagation,
                      AgglomerativeClustering,Birch]:
        raise ValueError('method should be in sklearn.cluster.*, e.g. DBSCAN')
    if method in [MiniBatchKMeans,AgglomerativeClustering,SpectralClustering,Birch]:
        cluster= method(n_clusters=params['n_clusters'])
    elif method in [DBSCAN,OPTICS]:
        cluster=method(eps=params['eps'])
    elif method == AffinityPropagation:
        cluster=method(damping=params['damping'], preference=params['preference'])
    clustering = cluster.fit_predict(df)
    return clustering

ass_methods={
    'silhouette_score':silhouette_score,
    'calinski_harabasz_score':calinski_harabasz_score,
    'davies_bouldin_score':davies_bouldin_score
}

cluster_methods={
    'MiniBatchKMeans':MiniBatchKMeans,
    'AgglomerativeClustering':AgglomerativeClustering,
    'SpectralClustering':SpectralClustering,
    'DBSCAN':DBSCAN,
    'OPTICS':OPTICS,
    'AffinityPropagation':AffinityPropagation,
    'AgglomerativeClustering':AgglomerativeClustering,
    'Birch':Birch
}

In [None]:
from sklearn import datasets
import pandas as pd
from simplebitk.utils.plots import scatter_plots_for_reduce_dimensional

iris = datasets.load_iris()

X = iris.data
y = iris.target
df = pd.DataFrame(X,columns=['x1','x2','x3','x4'])
df['dbscan']=clusters_annotation(df,cluster_methods['DBSCAN'],{'eps':0.3})
scatter_plots_for_reduce_dimensional(df,'x1',
                                     'x2',hue='dbscan')

In [None]:
# export

def find_peak_valley(sequence,peak=True):
    if peak:
        peaks, _ =  find_peaks(sequence)
        return peaks
    else:
        peaks, _ = find_peaks(-sequence)
        return peaks
    
    
def find_best_cluster_number(df,cluster_method,params,ass_method=silhouette_score):
    records = []
    if cluster_method in [MiniBatchKMeans,AgglomerativeClustering,SpectralClustering,Birch]:
        for i in range(2,20):
            params['n_clusters'] = i
            clustering = clusters_annotation(df,cluster_method,params)
            records.append([i,ass_method(df,clustering)])
    elif cluster_method in [DBSCAN,OPTICS]:
        for i in np.arange(0.1,4,0.2):
            params['eps']=i
            clustering = clusters_annotation(df,cluster_method,params)
            if sum(clustering) == -len(clustering):
                records.append([i,0])
            else:
                records.append([i,ass_method(df,clustering)])
        
    records = np.array(records)
#     peaks, _ =  find_peaks(records[:,1])
    if ass_method == silhouette_score:
        peaks = find_peak_valley(records[:,1])
        if len(peaks) == 0:
            return records,records,peaks
        return records[peaks[0]],records,peaks
    elif ass_method == calinski_harabasz_score:
        peaks = find_peak_valley(records[:,1])
        if len(peaks) == 0:
            return records,records,peaks
        return records[peaks[0]],records,peaks
    elif ass_method == davies_bouldin_score:
        peaks = find_peak_valley(records[:,1],False)
        if len(peaks) == 0:
            return records,records,peaks
        return records[peaks[0]],records,peaks
    else:
        raise ValueError('ass method can only be one of [silhouette_score,calinski_harabasz_score,davies_bouldin_score]')
            


To find the best cluster number.

In [None]:
import matplotlib.pyplot as plt


In [None]:
X=np.random.normal(3,4,(100,4))
i=silhouette_score
a,records,peaks = find_best_cluster_number(X,DBSCAN,{'n_clusters':3,'eps':0.3},ass_method=i)

plt.plot(records[:,0],records[:,1])
plt.plot(records[peaks,0], records[peaks,1], "x")

print(a)
print(peaks)


[ 2.9        -0.01688904]
[14 18]


In [None]:
i=calinski_harabasz_score
a,records,peaks = find_best_cluster_number(X,DBSCAN,{'n_clusters':3,'eps':0.3},ass_method=i)

plt.plot(records[:,0],records[:,1])
plt.plot(records[peaks,0], records[peaks,1], "x")

print(a)
print(peaks)

[2.9        5.05533082]
[14 18]


In [None]:
i=davies_bouldin_score
a,records,peaks = find_best_cluster_number(X,DBSCAN,{'n_clusters':3,'eps':0.3},ass_method=i)

plt.plot(records[:,0],records[:,1])
plt.plot(records[peaks,0], records[peaks,1], "x")

print(a)
print(peaks)

[3.5       3.2772862]
[17]


In [None]:
import sys