Welcome to exercise twp of week three of “Apache Spark for Scalable Machine Learning on BigData”. In this exercise we’ll work on clustering.

Let’s create our DataFrame again:


In [15]:
# delete files from previous runs
!rm -f hmp.parquet*

# download the file containing the data in PARQUET format
!wget https://github.com/IBM/coursera/raw/master/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

--2019-10-21 21:20:55--  https://github.com/IBM/coursera/raw/master/hmp.parquet
Resolving github.com (github.com)... 140.82.114.3
Connecting to github.com (github.com)|140.82.114.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/IBM/coursera/master/hmp.parquet [following]
--2019-10-21 21:20:55--  https://raw.githubusercontent.com/IBM/coursera/master/hmp.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.48.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.48.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 932997 (911K) [application/octet-stream]
Saving to: 'hmp.parquet'


2019-10-21 21:20:56 (23.0 MB/s) - 'hmp.parquet' saved [932997/932997]



Let’s reuse our feature engineering pipeline.

In [17]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline

indexer = StringIndexer(inputCol="class", outputCol="classIndex")
encoder = OneHotEncoder(inputCol="classIndex", outputCol="categoryVec")
vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer])
model = pipeline.fit(df)
prediction = model.transform(df)
prediction.show()

+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
|  x|  y|  z|              source|      class|classIndex|   categoryVec|        features|       features_norm|
+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[21.0,52.0,34.0]|[0.19626168224299...|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,51.0,34.0]|[0.20560747663551...|
|

Now let’s create a new pipeline for kmeans.

In [18]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

kmeans = KMeans(featuresCol="features").setK(14).setSeed(1)
pipeline = Pipeline(stages=[vectorAssembler, kmeans])
model = pipeline.fit(df)
predictions = model.transform(df)

evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.41244594513295846


We have 14 different movement patterns in the dataset, so setting K of KMeans to 14 is a good idea. But please experiment with different values for K, do you find a sweet spot? The closer Silhouette gets to 1, the better.

https://en.wikipedia.org/wiki/Silhouette_(clustering)


In [9]:
# please change the pipeline the check performance for different K, feel free to use a loop
KList = [5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20, 21, 22, 23, 24, 25]

for i in KList:
    kmeans = KMeans(featuresCol="features").setK(i).setSeed(1)
    pipeline = Pipeline(stages=[vectorAssembler, kmeans])
    model = pipeline.fit(df)
    predictions = model.transform(df)

    evaluator = ClusteringEvaluator()

    silhouette = evaluator.evaluate(predictions)
    print("Silhouette with squared euclidean distance = " + str(i) +" ---- " + str(silhouette))

Silhouette with squared euclidean distance = 5 ---- 0.5937447997439024
Silhouette with squared euclidean distance = 6 ---- 0.592463658820136
Silhouette with squared euclidean distance = 7 ---- 0.5484627422401509
Silhouette with squared euclidean distance = 8 ---- 0.46686489256383346
Silhouette with squared euclidean distance = 9 ---- 0.48034893889849645
Silhouette with squared euclidean distance = 10 ---- 0.47370428136987536
Silhouette with squared euclidean distance = 11 ---- 0.4819049717562352
Silhouette with squared euclidean distance = 12 ---- 0.40964155503229643
Silhouette with squared euclidean distance = 13 ---- 0.4153293521373778
Silhouette with squared euclidean distance = 14 ---- 0.41244594513295846
Silhouette with squared euclidean distance = 15 ---- 0.41771495579360896
Silhouette with squared euclidean distance = 16 ---- 0.39594610810727193
Silhouette with squared euclidean distance = 17 ---- 0.40512075095291467
Silhouette with squared euclidean distance = 18 ---- 0.4058090

Now please extend the pipeline to work on the normalized features. You need to tell KMeans to use the normalized feature column and change the pipeline in order to contain the normalizer stage as well.

In [10]:
KList = [5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20, 21, 22, 23, 24, 25]
for i in KList:
    kmeans = KMeans(featuresCol="features_norm").setK(i).setSeed(1)
    pipeline = Pipeline(stages=[vectorAssembler, normalizer, kmeans])
    model = pipeline.fit(df)

    predictions = model.transform(df)

    evaluator = ClusteringEvaluator()

    silhouette = evaluator.evaluate(predictions)
    print("Silhouette with squared euclidean distance = " + str(i) +" ---- " + str(silhouette))

Silhouette with squared euclidean distance = 5 ---- 0.5378473434364454
Silhouette with squared euclidean distance = 6 ---- 0.3616039650566081
Silhouette with squared euclidean distance = 7 ---- 0.3388334984297795
Silhouette with squared euclidean distance = 8 ---- 0.35346131260617686
Silhouette with squared euclidean distance = 9 ---- 0.3320686157150071
Silhouette with squared euclidean distance = 10 ---- 0.31921981409325373
Silhouette with squared euclidean distance = 11 ---- 0.3166261086889984
Silhouette with squared euclidean distance = 12 ---- 0.2524553751769574
Silhouette with squared euclidean distance = 13 ---- 0.2811747980314105
Silhouette with squared euclidean distance = 14 ---- 0.2668998965895519
Silhouette with squared euclidean distance = 15 ---- 0.20781470594423448
Silhouette with squared euclidean distance = 16 ---- 0.19826724173804225
Silhouette with squared euclidean distance = 17 ---- 0.21748769321069447
Silhouette with squared euclidean distance = 18 ---- 0.192197127

Sometimes, inflating the dataset helps, here we multiply x by 10, let’s see if the performance inceases.

In [11]:
from pyspark.sql.functions import col
df_denormalized = df.select([col('*'),(col('x')*10)]).drop('x').withColumnRenamed('(x * 10)','x')

In [12]:
kmeans = KMeans(featuresCol="features").setK(14).setSeed(1)
pipeline = Pipeline(stages=[vectorAssembler, kmeans])
model = pipeline.fit(df_denormalized)
predictions = model.transform(df_denormalized)

evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.5709023393004293


Apache SparkML can be used to try many different algorithms and parametrizations using the same pipeline. Please change the code below to use GaussianMixture over KMeans. Please use the following link for your reference.

https://spark.apache.org/docs/latest/ml-clustering.html#gaussian-mixture-model-gmm


In [19]:
from pyspark.ml.clustering import GaussianMixture

gmm = GaussianMixture(featuresCol = "features").setK(14).setSeed(1)
pipeline = Pipeline(stages=[vectorAssembler, gmm])

model = pipeline.fit(df)

predictions = model.transform(df)

evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.15906267433367427
