In [None]:
import csv
import random
import numpy as np
import scipy.linalg as la
import matplotlib.pyplot as plt

from sklearn import metrics
from metric_learn import LMNN
from metric_learn import NCA
from sklearn.datasets import load_iris
from sklearn.cluster import KMeans
from sklearn.cluster import DBSCAN

# 3D scatter plot
import pylab
from mpl_toolkits.mplot3d import Axes3D

%matplotlib inline  

In [None]:
def confusionMatrixFromRandomClusters(truth, prediction):
    if len(truth) != len(prediction):
        raise ValueError("Truth labels and prediction labels must have same length")
    
    truthLength = len(set(truth))
    predictionLength = len(set(prediction))
    
    M = np.zeros(shape=(truthLength,predictionLength))
    for i in range(len(truth)):
        M[truth[i]][prediction[i]] += 1
    
    # rename (reorder) clusters so that it matches predictions (heuristic: sort by largest )
    perm = np.argmax(M, axis=1)
    permAll = list(range(predictionLength))
    result = [i for i in permAll if not i in set(perm) ]
    permOrdered = np.concatenate((perm, result)).astype(int)
    
    # return matrix with clusters ordered/renamed
    return (M[:, permOrdered], renameLabels(prediction, permOrdered))

def renameLabels(labels, permutation):
    ordering = np.argsort(permutation)
    # negative prediction can be returned by DBSCAN
    return [x if x < 0 else ordering[x] for x in labels]

def sizesFunction(x, truth, numberOfClusters):
    if x == truth:
        return 50 # correct cluster/label
    elif 0 <= x <= numberOfClusters:
        return 200 # incorrect cluster/label
    else:
        return 100 # new cluster

def sizesForMislabeledData(data, truth, numberOfClusters = -1):
    if numberOfClusters == -1:
        numberOfClusters = max(truth)
    return [sizesFunction(x,t,numberOfClusters) for x,t in zip(data, truth)]

def edgesForMislabeledData(data, truth):
    return ['black' if x==t else 'red' for x,t in zip(data, truth)]


In [None]:
def MahalanobisDist(x, y):
    covariance_xy = np.cov(x,y, rowvar=0)
    inv_covariance_xy = np.linalg.inv(covariance_xy)
    xy_mean = np.mean(x),np.mean(y)
    x_diff = np.array([x_i - xy_mean[0] for x_i in x])
    y_diff = np.array([y_i - xy_mean[1] for y_i in y])
    diff_xy = np.transpose([x_diff, y_diff])
    print(diff_xy)
    
    md = []
    for i in range(len(diff_xy)):
        md.append(np.sqrt(np.dot(np.dot(np.transpose(diff_xy[i]),inv_covariance_xy),diff_xy[i])))
    return md

In [None]:
def randnClusters(M):
    ret = []
    for cluster in M:
        N = cluster[0]
        dim = len(cluster)-1
        data = np.random.randn(N, dim)
        
        for d in range(dim):
            data[:,d] = cluster[1+d][0] * data[:,d] + cluster[1+d][1]
        
        if len(ret) == 0:
            ret = data
        else:
            ret = np.vstack([ret, data])
            
    return ret 

In [None]:
pars = [
    [50, (10,50), (20,10)],
    [50, (10,-5), (2,-10)],
    [50, (10,-5), (2,10)],
]

genClusters = randnClusters(pars)
x,y = genClusters[:,0],genClusters[:,1]
plt.scatter(x,y, color=((['r']*50)+(['b']*50)+(['g']*50)))

covariance_xy = np.cov(x,y, rowvar=0)
A = np.linalg.inv(covariance_xy)
A12 = la.sqrtm(A)

#scaled = ([ np.dot(A12, i) for i in genClusters ])
scaled = np.transpose(np.dot(A12, np.transpose(genClusters)))

#    3 5 7
#    4 6 8
#1 2
#3 4

plt.scatter(scaled[:,0], scaled[:,1], color='brown')
plt.show()

In [None]:
target = ([0]*50)+([1]*50)+([2]*50)

lmnn = LMNN(k=10, verbose=False)
lmnn.fit(genClusters, target)

In [None]:
tra = lmnn.transform(genClusters)

#L12 = la.sqrtm(lmnn.L)
#scaled = ([ np.dot(A12, i) for i in genClusters ])
#tra = np.transpose(np.dot(L12, np.transpose(genClusters)))
print(lmnn.L)

plt.scatter(tra[:,0], tra[:,1], color=((['r']*50)+(['b']*50)+(['g']*50)))

In [None]:
iris_data = load_iris()
X = iris_data['data']
Y = iris_data['target']

#print(X,Y)

Ycolors = [ "r" if x==0 else "g" if x==1 else "b" for x in Y]

plt.scatter(X[:,3],X[:,2],color=Ycolors)

lmnn = LMNN(k=20, learn_rate=1e-6, verbose=True)
nca = NCA()
lmnn.fit(X, Y)
nca.fit(X, Y)
print(lmnn.L)
print(lmnn.metric())
print(nca.metric())

In [None]:
Xt = lmnn.transform(X)

plt.scatter(Xt[:,3],Xt[:,2],color=Ycolors)
plt.show()

In [None]:
Xtnca = nca.transform(X)

plt.scatter(Xtnca[:,3],Xtnca[:,2],color=Ycolors)
plt.show()

In [None]:
#%matplotlib qt 

neigh = KMeans(n_clusters=3)
neigh.fit(X) 

contTable, predictedLabels = confusionMatrixFromRandomClusters(Y, neigh.labels_)

colors = ["green", "blue", "yellow", "purple", "orange", "purple"]
colors = plt.cm.rainbow(np.linspace(0, 1, 7))
colors = [(31, 119, 180, alpha), (255, 127, 14, alpha), (70, 171, 70, alpha), (214, 39, 40, alpha), (148, 103, 189, alpha),
          (140, 86, 75, alpha), (227, 119, 194, alpha), (127, 127, 127, alpha), (188, 189, 34, alpha), (23, 190, 207, alpha)]
colors = [(254, 232, 37, alpha), (39, 148, 140, alpha), (119, 63, 72, alpha)]
colors = [[y/255.0 for y in x] for x in colors]
YpredColors = [ colors[x] for x in predictedLabels]
YpredColors = [ x[0] if x[1]==x[2] else colors[x[1]] for x in zip(YpredColors, Y, predictedLabels)]

# ax = Axes3D(pylab.figure())
# ax.scatter(X[:,0],X[:,2],X[:,3],s=sizesForMislabeledData(predictedLabels, Y),c=YpredColors,edgecolor=edgesForMislabeledData(predictedLabels, Y))
plt.scatter(X[:,0],X[:,2],s=sizesForMislabeledData(predictedLabels, Y),c=YpredColors, edgecolor=edgesForMislabeledData(predictedLabels, Y))
# plt.savefig("foo.pdf", bbox_inches='tight')

score = metrics.adjusted_rand_score(Y, neigh.labels_)
print(score)
print(contTable)

In [None]:
neigh = KMeans(n_clusters=3)
neigh.fit(Xt) 

contTable, predictedLabels = confusionMatrixFromRandomClusters(Y, neigh.labels_)

alpha=200
colors = [(31, 119, 180, alpha), (255, 127, 14, alpha), (70, 171, 70, alpha), (214, 39, 40, alpha), (148, 103, 189, alpha),
          (140, 86, 75, alpha), (227, 119, 194, alpha), (127, 127, 127, alpha), (188, 189, 34, alpha), (23, 190, 207, alpha)]
colors = [(254, 232, 37, alpha), (39, 148, 140, alpha), (119, 63, 72, alpha)]
colors = [[y/255.0 for y in x] for x in colors]
YpredColors = [ colors[x] for x in predictedLabels]
YpredColors = [ x[0] if x[1]==x[2] else colors[x[1]] for x in zip(YpredColors, Y, predictedLabels)]

# ax = Axes3D(pylab.figure())
# ax.scatter(Xt[:,0],Xt[:,2],Xt[:,3],s=sizesForMislabeledData(predictedLabels, Y),c=YpredColors)
plt.scatter(Xt[:,0],Xt[:,2],s=sizesForMislabeledData(predictedLabels, Y),c=YpredColors, edgecolor=edgesForMislabeledData(predictedLabels, Y))

score = metrics.adjusted_rand_score(Y, neigh.labels_)
print(score)
print(contTable)

In [None]:
neigh = KMeans(n_clusters=3)
neigh.fit(Xtnca) 

contTable, predictedLabels = confusionMatrixFromRandomClusters(Y, neigh.labels_)

colors = ["green", "blue", "yellow", "purple", "pink"]
YpredColors = [ colors[x] for x in predictedLabels]
YpredColors = [ x[0] if x[1]==x[2] else "r" for x in zip(YpredColors, Y, predictedLabels)]

ax = Axes3D(pylab.figure())
ax.scatter(Xtnca[:,1],Xtnca[:,2],Xtnca[:,3],s=sizesForMislabeledData(predictedLabels, Y),c=YpredColors)

score = metrics.adjusted_rand_score(Y, neigh.labels_)
print(score)
print(contTable)

In [None]:
neigh = DBSCAN(eps=0.4, min_samples=4)
neigh.fit(X) 

contTable, predictedLabels = confusionMatrixFromRandomClusters(Y, neigh.labels_)

ax = Axes3D(pylab.figure())
ax.scatter(X[:,1],X[:,2],X[:,3],s=sizesForMislabeledData(predictedLabels, Y),c=predictedLabels)

score = metrics.adjusted_rand_score(Y, neigh.labels_)
print(score)
print(contTable)

In [None]:
neigh = DBSCAN(eps=0.55, min_samples=4)
neigh.fit(Xt) 

contTable, predictedLabels = confusionMatrixFromRandomClusters(Y, neigh.labels_)

ax = Axes3D(pylab.figure())
ax.scatter(Xt[:,1],Xt[:,2],Xt[:,3],s=sizesForMislabeledData(predictedLabels, Y),c=neigh.labels_)

score = metrics.adjusted_rand_score(Y, neigh.labels_)
print(score)
print(contTable)

In [None]:
import warnings
warnings.filterwarnings('ignore')

neigh = DBSCAN(eps=0.55, min_samples=4, metric=lambda X,Y:metrics.pairwise.pairwise_distances(X,Y,metric='mahalanobis',VI=lmnn.metric()))
neigh.fit(X)

contTable, predictedLabels = confusionMatrixFromRandomClusters(Y, neigh.labels_)

ax = Axes3D(pylab.figure())
ax.scatter(X[:,1],X[:,2],X[:,3],s=sizesForMislabeledData(predictedLabels, Y),c=neigh.labels_)

score = metrics.adjusted_rand_score(Y, neigh.labels_)
print(score)
print(contTable)

In [None]:
import warnings
warnings.filterwarnings('ignore')

neigh = DBSCAN(eps=0.55, min_samples=4)
neigh.fit(Xtnca)

contTable, predictedLabels = confusionMatrixFromRandomClusters(Y, neigh.labels_)

ax = Axes3D(pylab.figure())
ax.scatter(Xtnca[:,1],Xtnca[:,2],Xtnca[:,3],s=sizesForMislabeledData(predictedLabels, Y),c=neigh.labels_)

score = metrics.adjusted_rand_score(Y, neigh.labels_)
print(score)
print(contTable)