In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql.types import IntegerType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.evaluation import ClusteringEvaluator

conf = SparkConf().setMaster('local[1]').setAppName('my app')
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession.builder.appName('unsupervised-learning').getOrCreate()

In [30]:
data = spark.read.format("libsvm").load("/home/tutorial/PySparkWork/data/pendigits.txt")

In [31]:
test_data = spark.read.format("libsvm").load("/home/tutorial/PySparkWork/data/pendigitsT.txt")

### (a) Normalization

In [32]:
def normalize(data):
    scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
    model = scaler.fit(data)
    data = model.transform(data)
    return data

In [33]:
data = data.sample(0.003)
data = normalize(data)

### (b) A distance calculation function

In [105]:
from pyspark.sql.functions import udf
@udf
def cal_dist(x1, x2):
    dist = x1.squared_distance(x2)
    print(dist)
    return dist


### (c) Centroids initialization function

In [22]:
def centroid_init(data):
    centroids = data[["scaledFeatures"]].sample(False, 1.0, seed=0).limit(2)
    return centroids

In [45]:
c1 = centroid_init(data)

In [48]:
c1.count()

2

### (d) Closest centroid searching

In [109]:
dataC = get_closest(data,c1)

[0.19230769230769232,0.8888888888888888,0.15254237288135594,0.3846153846153846,0.1923076923076923,0.0,0.0,0.0,0.09722222222222221,0.0,0.9491525423728814,0.5,1.0,0.9620253164556962,0.0,1.0]
[1]
[1.0,1.0,0.0,0.9743589743589743,0.0,0.4133333333333334,1.0,0.7125,0.7222222222222222,1.0,0.8135593220338984,0.9375,0.0,0.20253164556962025,0.047619047619047616,0.0]
[1, 1]
[0.6538461538461539,0.48148148148148145,0.13559322033898305,0.0,0.4615384615384615,0.10666666666666667,0.24,0.0,0.0,0.0,0.0,1.0,0.125,1.0,1.0,1.0]
[1, 1, 1]
[0.0,0.0,1.0,1.0,1.0,1.0,0.84,1.0,1.0,0.4666666666666667,1.0,0.0,0.10714285714285714,0.0,0.25396825396825395,0.0]
[1, 1, 1, 1]
+-----+-----+--------------------+--------------------+-----+
|index|label|            features|      scaledFeatures|value|
+-----+-----+--------------------+--------------------+-----+
|    0|  8.0|(16,[0,1,2,3,4,5,...|[0.19230769230769...|    1|
|    1|  4.0|(16,[0,1,3,4,5,6,...|[1.0,1.0,0.0,0.97...|    1|
|    2|  5.0|(16,[0,1,2,3,4,5,...|[0.6538

In [21]:
def get_closest(data, centroids):
    l = []
    data = data.withColumn("index", monotonically_increasing_id())
    for i in range(data.count()):
        print(data.collect()[i][2])
        nearest_center = 0
        nearest_center_dist = None
        for j in range(centroids.count()):
            dist = centroids.collect()[j][0].squared_distance(data.collect()[i][2])

            if nearest_center_dist == None:
                nearest_center_dist = dist
                nearest_center = j
            elif nearest_center_dist > dist:
                nearest_center_dist = dist
                nearest_center = j

        l.append(j)
        print(l)
    

    
    dfl = spark.createDataFrame(l, IntegerType())
    dfl = dfl.withColumn("index", monotonically_increasing_id())
    
    data = data.drop("value").join(dfl, on=["index"]).alias("x")
    data.show()
    data.dtypes
    return data
        

def get_closest2(data, centroids):
    l = []
    data = data.drop("index").withColumn("index", monotonically_increasing_id())
    for i in range(data.count()):
        print(data.collect()[i]["scaledFeatures"])

        nearest_center = 0
        nearest_center_dist = None
        for j in range(centroids.count()):
            dist = centroids.collect()[j]["averages_vector"].squared_distance(data.collect()[i]["scaledFeatures"])

            if nearest_center_dist == None:
                nearest_center_dist = dist
                nearest_center = j
            elif nearest_center_dist > dist:
                nearest_center_dist = dist
                nearest_center = j
        l.append(j)
        print(l)
        
    
    dfl = spark.createDataFrame(l, IntegerType())
    dfl = dfl.withColumn("index", monotonically_increasing_id())
    data = data.drop("value").join(dfl, on=["index"])
    data.show()
    data.dtypes
    return data

### (e) Centroid updating function

In [60]:
c2 = mean_centroid(dataC, c1)

+-----+-----+--------------------+--------------------+-----+--------------------+
|index|label|            features|      scaledFeatures|value|      features_array|
+-----+-----+--------------------+--------------------+-----+--------------------+
|    0|  8.0|(16,[0,1,2,3,4,5,...|[0.19230769230769...|    1|[0.1923077, 0.888...|
|    1|  4.0|(16,[0,1,3,4,5,6,...|[1.0,1.0,0.0,0.97...|    1|[1.0, 1.0, 0.0, 0...|
|    2|  5.0|(16,[0,1,2,3,4,5,...|[0.65384615384615...|    1|[0.65384614, 0.48...|
|    3|  1.0|(16,[1,2,3,4,5,6,...|[0.0,0.0,1.0,1.0,...|    1|[0.0, 0.0, 1.0, 1...|
+-----+-----+--------------------+--------------------+-----+--------------------+

+--------------------+
|            averages|
+--------------------+
|[0.46153846010565...|
+--------------------+

+-----------------+
|         averages|
+-----------------+
|[,,,,,,,,,,,,,,,]|
+-----------------+



In [19]:
def mean_centroid(data, centroid):
    dense_to_array_udf = F.udf(dense_to_array, T.ArrayType(T.FloatType()))
    
    data = data.withColumn('features_array', dense_to_array_udf('scaledFeatures'))
    data.show()
    dfct1 = data.where(data["value"]==1)
    dfct2 = data.where(data["value"]==2)
    dfct1 = dfct1.agg(F.array(*[F.avg(F.col('features_array')[i]) for i in range(16)]).alias("averages"))
    dfct2 = dfct2.agg(F.array(*[F.avg(F.col('features_array')[i]) for i in range(16)]).alias("averages"))
    dfct1.show()
    dfct2.show()
    centroid = dfct1.union(dfct2)
    
    C1 = centroid.collect()[0][0]
    C2 = centroid.collect()[1][0]
    Cx = [float(0) if v is None else v for v in C1]
    Cx2 = [float(0) if v is None else v for v in C2]
    
    C =[]
    C.append(Cx)
    C.append(Cx2)
    
    dfC = spark.createDataFrame(C)
    
    centroid = dfC.select(array('_1','_2','_3','_4','_5','_6','_7','_8','_9','_10','_11','_12','_13','_14','_15','_16').alias('avg'))

    
    to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())
    new_centroid = centroid.withColumn("averages_vector", to_vector("avg"))
    
    return new_centroid

In [20]:
def dense_to_array(v):
  new_array = list([float(x) for x in v])
  return new_array

### (f) The 𝑘-means main function

In [35]:
c_final = kmeans(data,2,2)

[0.0,0.7391304347826086,0.41000000000000003,1.0,0.73,0.8,0.6808510638297872,0.4,0.49,0.0,0.4,0.16883116883116883,0.48484848484848486,0.42,1.0,0.46]
[1]
[0.44,0.6304347826086957,0.7000000000000001,1.0,0.93,0.77,0.6276595744680851,0.5,0.93,0.4835164835164836,1.0,0.2077922077922078,0.5252525252525253,0.0,0.0,0.03]
[1, 1]
[1.0,1.0,0.55,0.8064516129032258,0.2,0.61,0.0,0.28,0.19,0.0,0.66,0.18181818181818182,0.6161616161616162,0.43,0.17,0.43]
[1, 1, 1]
[0.0,0.4782608695652174,0.42,1.0,0.51,0.91,0.4574468085106383,0.52,0.46,0.14285714285714288,1.0,0.09090909090909091,0.7070707070707072,0.02,0.31,0.0]
[1, 1, 1, 1]
[0.65,0.6086956521739131,0.93,1.0,0.93,0.71,0.7340425531914894,0.44,0.36,0.2197802197802198,0.0,0.0,0.4747474747474748,0.12,1.0,0.16]
[1, 1, 1, 1, 1]
[0.0,0.7826086956521738,0.41000000000000003,1.0,0.8300000000000001,0.88,0.6914893617021276,0.65,0.6900000000000001,0.47252747252747257,1.0,0.28571428571428575,0.6565656565656566,0.05,0.14,0.0]
[1, 1, 1, 1, 1, 1]
[0.71,0.21739130434782608

[1, 1, 1]
[0.0,0.4782608695652174,0.42,1.0,0.51,0.91,0.4574468085106383,0.52,0.46,0.14285714285714288,1.0,0.09090909090909091,0.7070707070707072,0.02,0.31,0.0]
[1, 1, 1, 1]
[0.65,0.6086956521739131,0.93,1.0,0.93,0.71,0.7340425531914894,0.44,0.36,0.2197802197802198,0.0,0.0,0.4747474747474748,0.12,1.0,0.16]
[1, 1, 1, 1, 1]
[0.0,0.7826086956521738,0.41000000000000003,1.0,0.8300000000000001,0.88,0.6914893617021276,0.65,0.6900000000000001,0.47252747252747257,1.0,0.28571428571428575,0.6565656565656566,0.05,0.14,0.0]
[1, 1, 1, 1, 1, 1]
[0.71,0.21739130434782608,0.55,1.0,0.0,0.98,0.20212765957446807,0.6900000000000001,0.74,0.8791208791208792,1.0,0.6363636363636364,0.6666666666666667,0.1,0.13,0.0]
[1, 1, 1, 1, 1, 1, 1]
[0.73,0.30434782608695654,1.0,1.0,0.48,1.0,0.2127659574468085,0.71,0.79,0.8461538461538463,0.84,0.5974025974025975,0.4747474747474748,0.15,0.0,0.0]
[1, 1, 1, 1, 1, 1, 1, 1]
[0.0,0.8260869565217391,0.42,0.9516129032258064,0.86,1.0,0.7659574468085106,0.54,0.6,0.0,0.4,0.0,0.55555555

[1, 1, 1, 1, 1, 1]
[0.0,0.7826086956521738,0.41000000000000003,1.0,0.8300000000000001,0.88,0.6914893617021276,0.65,0.6900000000000001,0.47252747252747257,1.0,0.28571428571428575,0.6565656565656566,0.05,0.14,0.0]
[1, 1, 1, 1, 1, 1, 1]
[0.44,0.6304347826086957,0.7000000000000001,1.0,0.93,0.77,0.6276595744680851,0.5,0.93,0.4835164835164836,1.0,0.2077922077922078,0.5252525252525253,0.0,0.0,0.03]
[1, 1, 1, 1, 1, 1, 1, 1]
[0.0,0.0,0.62,0.6290322580645161,1.0,1.0,0.7978723404255319,0.77,0.75,0.5824175824175825,0.85,0.38961038961038963,1.0,0.06,0.98,0.0]
[1, 1, 1, 1, 1, 1, 1, 1, 1]
[0.0,0.4782608695652174,0.42,1.0,0.51,0.91,0.4574468085106383,0.52,0.46,0.14285714285714288,1.0,0.09090909090909091,0.7070707070707072,0.02,0.31,0.0]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[0.17,0.6086956521739131,0.01,0.5483870967741935,0.4,0.58,0.8617021276595744,0.28,0.05,0.0,0.0,0.5974025974025975,0.0,0.93,1.0,1.0]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[0.0,0.8260869565217391,0.42,0.9516129032258064,0.86,1.0,0.7659574468085

In [36]:
c_final.show()

+--------------------+--------------------+
|                 avg|     averages_vector|
+--------------------+--------------------+
|[0.34590908817269...|[0.34590908817269...|
|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|
+--------------------+--------------------+



In [18]:
def kmeans(data, k, max_iter):
#     data = data.withColumn("index", monotonically_increasing_id())
#     data.show()
    
    centroids = centroid_init(data)

#     centroids = data[["scaledFeatures"]].sample(False, 0.1, seed=0).limit(k)
    
    clustered_data = get_closest(data, centroids)
    
    for i in range(max_iter):
        centroids = mean_centroid(clustered_data, centroids)
        clustered_data = get_closest2(clustered_data, centroids)
        
    return centroids

### sample data 

In [None]:
# sD2 = scaledDataX.sample(0.0025)
# sD2 = sD2.withColumn("index", monotonically_increasing_id())
# mcX = scaledDataX[["scaledFeatures"]].sample(False, 0.1, seed=0).limit(3)
# sD2 = get_closest(sD2,mcX)
# mcX_n = mean_centroid(sD2,mcX)

### (g) The prediction function

In [37]:
def predict(data, centroids):
    l = []
    for i in range(data.count()):
        print(data.collect()[i][2])
        
        nearest_center = 0
        nearest_center_dist = None
        for j in range(centroids.count()):
            dist = centroids.collect()[j]["averages_vector"].squared_distance(data.collect()[i]["scaledFeatures"])
            if nearest_center_dist == None:
                nearest_center_dist = dist
                nearest_center = j
            elif nearest_center_dist > dist:
                nearest_center_dist = dist
                nearest_center = j
        l.append(j)
        print(l)
    
    dfl = spark.createDataFrame(l, IntegerType())
    dfl = dfl.withColumn("index", monotonically_increasing_id())
    
    data = data.drop("value").join(dfl, on=["index"]).alias("x")
    data = data.drop("features").withColumnRenamed("value", "prediction")
    data = data.withColumnRenamed("scaledFeatures", "features")
    data.show()
    return data

In [38]:
data_T = test_data.sample(0.003) 
data_T = normalize(data_T)
data_T = data_T.withColumn("index", monotonically_increasing_id())
prediction = predict(data_T,c_final) 

[0.62,0.7272727272727273,0.8061224489795917,0.0,0.89247311827957,0.9999999999999999,0.56,0.6857142857142857,0.97,0.8181818181818182,1.0,0.3962264150943396,0.2962962962962963,0.0,0.0,1.0]
[1]
[0.0,0.48484848484848486,0.3469387755102041,1.0,0.4516129032258065,0.3469387755102041,0.28,0.0,0.0,0.0,0.25,0.0,0.5,0.20588235294117646,1.0,0.4375]
[1, 1]
[0.15,0.4545454545454546,0.42857142857142855,1.0,0.6774193548387097,0.6938775510204082,0.63,0.4714285714285714,1.0,0.6103896103896105,0.8200000000000001,0.32075471698113206,0.07407407407407407,0.0,0.0,0.8125]
[1, 1, 1]
[0.25,0.0,0.5510204081632653,1.0,0.5268817204301076,0.5510204081632653,0.51,0.05714285714285714,0.2,0.03896103896103896,0.0,0.16981132075471697,0.2962962962962963,0.11764705882352941,1.0,0.0]
[1, 1, 1, 1]
[0.53,0.8787878787878788,0.9999999999999999,1.0,1.0,0.6938775510204082,0.67,0.5428571428571428,0.71,0.5714285714285715,0.78,0.4150943396226415,0.18518518518518517,0.1764705882352941,0.0,0.0]
[1, 1, 1, 1, 1]
[0.91,0.939393939393939

### (h) The evaluation function evaluate

In [43]:
score = evaluate(added_data)

Silhouette with squared euclidean distance = -0.14098710418265964


In [42]:
def evaluate(data):
    evaluator = ClusteringEvaluator()
    data = data.withColumnRenamed("value", "prediction")
    SC_score = evaluator.evaluate(data)
    print("Silhouette with squared euclidean distance = " + str(SC_score))
    return SC_score

### just in case 

In [41]:
added_data = add_cluster(prediction)

In [40]:
def add_cluster(data):    
    data = data.select("features", "prediction")
    featureAndPredictions = map(lambda x: (Vectors.dense(x[0]), x[1]),
        [([0.0, 0.5, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8, 0.8], 0)])
    dummy = spark.createDataFrame(featureAndPredictions, ["features", "prediction"])
    UN = data.union(dummy)
    UN.collect()
    return UN

### compare

In [113]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
kmeans = KMeans(k=2)
model = kmeans.fit(data)

# Make predictions
predictions = model.transform(data)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.34516803051992223


In [114]:
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[25.38275998 83.27599842 40.3471728  89.41202056 65.69039146 69.77580071
 55.64926849 35.45274812 29.9430605  14.75879794 16.06524318 23.01383946
 44.94859628 38.56030051 90.84776592 43.28272044]
[43.49748238 85.39415911 39.83162135 79.56717019 43.33353474 62.63444109
 49.34400806 49.25881168 70.97804632 43.90513595 84.51822759 42.2858006
 59.66666667 34.39516616 24.38429003 21.06888218]


In [44]:
c_final.select("averages_vector").collect()

[Row(averages_vector=DenseVector([0.3459, 0.5672, 0.5214, 0.7757, 0.59, 0.7355, 0.4947, 0.4741, 0.5509, 0.3462, 0.6395, 0.3861, 0.5459, 0.3232, 0.5014, 0.2591])),
 Row(averages_vector=DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]))]

#### The silhouette score for the manual implementation of kmeans clustering is -0.14098710418265964
#### The silhouette score for the implementation of kmeans clustering from the ml library was 0.34516803051992223

#### A higher score means that the point in the dataset are well match to their own cluster and poorly match to the neighbor cluster. It is clear that the kmeans algorithm from ml library is far superior and more well written.

#### The cluster center for manual implementation of kmeans clustering and from ml library are shown about in the output. The manual one is not as good as it is unable to even discover a second cluster, but some of the reasons due to a smaller dataset as the time needed to run through the entire dataset is way too long. 
#### The value of the densevector of each cluster are also very far apart, indicating that the clusters found from both implementation also varies hugely.