In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import max, count, col, avg


In [7]:
spark = SparkSession\
        .builder\
        .appName("KMeansApp")\
        .getOrCreate()

In [8]:
data_path = "/Users/sabarnikundu/Downloads/drive_stats_2019_Q1/*.csv" 

In [9]:
dataset = spark.read \
    .format("csv") \
    .option("header", "true") \
    .load(data_path)


In [10]:
# dataset.show()

In [11]:
f_data = dataset.select("smart_1_normalized")
f_data = f_data.withColumn("smart_1_normalized", f_data["smart_1_normalized"].cast("float"))
f_data.show()

+------------------+
|smart_1_normalized|
+------------------+
|             117.0|
|              80.0|
|              84.0|
|              79.0|
|             100.0|
|              83.0|
|              83.0|
|              76.0|
|              81.0|
|              84.0|
|             115.0|
|              81.0|
|              65.0|
|              84.0|
|              84.0|
|             100.0|
|             100.0|
|             114.0|
|             100.0|
|             100.0|
+------------------+
only showing top 20 rows



In [12]:
Assembler = VectorAssembler(inputCols=["smart_1_normalized"], outputCol="features")
f_data = Assembler.setHandleInvalid("skip").transform(f_data).na.drop()
f_data.show()

+------------------+--------+
|smart_1_normalized|features|
+------------------+--------+
|             117.0| [117.0]|
|              80.0|  [80.0]|
|              84.0|  [84.0]|
|              79.0|  [79.0]|
|             100.0| [100.0]|
|              83.0|  [83.0]|
|              83.0|  [83.0]|
|              76.0|  [76.0]|
|              81.0|  [81.0]|
|              84.0|  [84.0]|
|             115.0| [115.0]|
|              81.0|  [81.0]|
|              65.0|  [65.0]|
|              84.0|  [84.0]|
|              84.0|  [84.0]|
|             100.0| [100.0]|
|             100.0| [100.0]|
|             114.0| [114.0]|
|             100.0| [100.0]|
|             100.0| [100.0]|
+------------------+--------+
only showing top 20 rows



In [13]:
mod = []
werror = []
kval = [3, 4, 5]
for i in kval:
    
    print('Value of K = ', i)
    
    kmeans = KMeans().setK(i).setSeed(1)
    model = kmeans.fit(f_data)
    mod.append(model)

    # Evaluate clustering by computing Within Set Sum of Squared Errors.
    wssse = model.computeCost(f_data)
    werror.append(wssse)
    print("Within Set Sum of Squared Errors = " + str(wssse))

    # Printing centers of clusters
    centers = model.clusterCenters()
    print("Cluster Centers: ")
    for c in centers:
        print(c)

Value of K =  3
Within Set Sum of Squared Errors = 55604477.41484555
Cluster Centers: 
[108.68525421]
[81.77618785]
[74.64574811]
Value of K =  4
Within Set Sum of Squared Errors = 30803672.288310237
Cluster Centers: 
[74.64574811]
[118.76206306]
[81.77417717]
[100.50193946]
Value of K =  5
Within Set Sum of Squared Errors = 2955709.6775352894
Cluster Centers: 
[81.48975836]
[199.99681451]
[116.0926346]
[100.37475395]
[73.74329291]


In [14]:
model = mod[1] #choosing k=4 as it gives the reliable distribution

In [15]:
f_data.show(10)

predictions = model.transform(f_data)
colnames = predictions.schema.names

centers = model.clusterCenters()
predictions = predictions.rdd.map(lambda x: (x[0], x[2], int(centers[x[2]][0]))).toDF([colnames[0], 'cluster', 'center'])


+------------------+--------+
|smart_1_normalized|features|
+------------------+--------+
|             117.0| [117.0]|
|              80.0|  [80.0]|
|              84.0|  [84.0]|
|              79.0|  [79.0]|
|             100.0| [100.0]|
|              83.0|  [83.0]|
|              83.0|  [83.0]|
|              76.0|  [76.0]|
|              81.0|  [81.0]|
|              84.0|  [84.0]|
+------------------+--------+
only showing top 10 rows



In [16]:
predictions.show()

+------------------+-------+------+
|smart_1_normalized|cluster|center|
+------------------+-------+------+
|             117.0|      1|   118|
|              80.0|      2|    81|
|              84.0|      2|    81|
|              79.0|      2|    81|
|             100.0|      3|   100|
|              83.0|      2|    81|
|              83.0|      2|    81|
|              76.0|      0|    74|
|              81.0|      2|    81|
|              84.0|      2|    81|
|             115.0|      1|   118|
|              81.0|      2|    81|
|              65.0|      0|    74|
|              84.0|      2|    81|
|              84.0|      2|    81|
|             100.0|      3|   100|
|             100.0|      3|   100|
|             114.0|      1|   118|
|             100.0|      3|   100|
|             100.0|      3|   100|
+------------------+-------+------+
only showing top 20 rows



In [17]:
colnames = predictions.schema.names
dist = predictions.rdd.map(lambda x: (x[0], x[1], x[2], abs(x[2]-x[0]))).toDF(colnames + ['distance'])
dist.show(5)

+------------------+-------+------+--------+
|smart_1_normalized|cluster|center|distance|
+------------------+-------+------+--------+
|             117.0|      1|   118|     1.0|
|              80.0|      2|    81|     1.0|
|              84.0|      2|    81|     3.0|
|              79.0|      2|    81|     2.0|
|             100.0|      3|   100|     0.0|
+------------------+-------+------+--------+
only showing top 5 rows



In [18]:
Max_dist = dist.groupBy(dist.cluster).agg(avg(dist.distance).alias('average_distance'))
Max_dist.show(5)

+-------+------------------+
|cluster|  average_distance|
+-------+------------------+
|      0|2.8143438507478464|
|      1| 4.696678068723983|
|      3|0.5074446171081638|
|      2| 1.450772255669031|
+-------+------------------+



In [20]:
dist.registerTempTable('distances')
Max_dist.registerTempTable('clusters')
anomalies = spark.sql("select smart_1_normalized as anomalies from distances inner join clusters on distances.cluster = clusters.cluster where distances.smart_1_normalized > (distances.center + clusters.average_distance + 1)")
anomalies.show()

+---------+
|anomalies|
+---------+
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
|     78.0|
+---------+
only showing top 20 rows

