In [1]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))


if ('sc' in locals() or 'sc' in globals()):
    printmd('<<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>>')


Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200515132124-0000
KERNEL_ID = 3e415489-7edd-4571-b15f-250bf4846c3f


# <span style="color:red"><<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>></span>

In [None]:
!pip install pyspark==2.4.5

In [None]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [None]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

Welcome to exercise two of week three of “Apache Spark for Scalable Machine Learning on BigData”. In this exercise we’ll work on clustering.

Let’s create our DataFrame again:


In [2]:
# delete files from previous runs
!rm -f hmp.parquet*

# download the file containing the data in PARQUET format
!wget https://github.com/IBM/coursera/raw/master/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

--2020-05-15 13:21:43--  https://github.com/IBM/coursera/raw/master/hmp.parquet
Resolving github.com (github.com)... 140.82.112.3
Connecting to github.com (github.com)|140.82.112.3|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://github.com/IBM/skillsnetwork/raw/master/hmp.parquet [following]
--2020-05-15 13:21:43--  https://github.com/IBM/skillsnetwork/raw/master/hmp.parquet
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/IBM/skillsnetwork/master/hmp.parquet [following]
--2020-05-15 13:21:44--  https://raw.githubusercontent.com/IBM/skillsnetwork/master/hmp.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 199.232.8.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|199.232.8.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 932997 (911K) [application/octet-stream]
Saving 

Let’s reuse our feature engineering pipeline.

In [18]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline

indexer = StringIndexer(inputCol="class", outputCol="classIndex")
encoder = OneHotEncoder(inputCol="classIndex", outputCol="categoryVec")
vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

pipeline = Pipeline(stages=[indexer, encoder, vectorAssembler, normalizer])
model = pipeline.fit(df)
prediction = model.transform(df)
prediction.show()

+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
|  x|  y|  z|              source|      class|classIndex|   categoryVec|        features|       features_norm|
+---+---+---+--------------------+-----------+----------+--------------+----------------+--------------------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,49.0,35.0]|[0.20754716981132...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,52.0,35.0]|[0.20183486238532...|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[21.0,52.0,34.0]|[0.19626168224299...|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|       6.0|(13,[6],[1.0])|[22.0,51.0,34.0]|[0.20560747663551...|
|

Now let’s create a new pipeline for kmeans.

In [15]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

kmeans = KMeans(featuresCol="features").setK(14).setSeed(1)#as here we set k's value as 13 becaus there is only 13 possible classes
pipeline = Pipeline(stages=[vectorAssembler, kmeans])#vectorAssemble already configured above
model = pipeline.fit(df)
predictions = model.transform(df)

evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.41244594513295846


In [6]:
predictions.show()

+---+---+---+--------------------+-----------+----------------+----------+
|  x|  y|  z|              source|      class|        features|prediction|
+---+---+---+--------------------+-----------+----------------+----------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|[22.0,49.0,35.0]|         7|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|[22.0,49.0,35.0]|         7|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|[22.0,52.0,35.0]|         7|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|[22.0,52.0,35.0]|         7|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|[21.0,52.0,34.0]|         7|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|[22.0,51.0,34.0]|         7|
| 20| 50| 35|Accelerometer-201...|Brush_teeth|[20.0,50.0,35.0]|         7|
| 22| 52| 34|Accelerometer-201...|Brush_teeth|[22.0,52.0,34.0]|         7|
| 22| 50| 34|Accelerometer-201...|Brush_teeth|[22.0,50.0,34.0]|         7|
| 22| 51| 35|Accelerometer-201...|Brush_teeth|[22.0,51.0,35.0]|         7|
| 21| 51| 33|Acceleromete

We have 14 different movement patterns in the dataset, so setting K of KMeans to 14 is a good idea. But please experiment with different values for K, do you find a sweet spot? The closer Silhouette gets to 1, the better.

https://en.wikipedia.org/wiki/Silhouette_(clustering)


In [16]:
# please change the pipeline the check performance for different K, feel free to use a loop
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()
for i in range(11,14):
    
    kmeans = KMeans(featuresCol="features").setK(i).setSeed(1)#putting iterator 'i'
    pipeline = Pipeline(stages=[vectorAssembler, kmeans])#vectorAssemble already configured above
    model = pipeline.fit(df)
    predictions = model.transform(df)

    #evaluator = ClusteringEvaluator()

    silhouette = evaluator.evaluate(predictions)
    print("for k=",i)
    print("Silhouette with squared euclidean distance=" + str(silhouette))


for k= 11
Silhouette with squared euclidean distance=0.4819049717562352
for k= 12
Silhouette with squared euclidean distance=0.40964155503229643
for k= 13
Silhouette with squared euclidean distance=0.4153293521373778


Now please extend the pipeline to work on the normalized features. You need to tell KMeans to use the normalized feature column and change the pipeline in order to contain the normalizer stage as well.

In [20]:
kmeans = KMeans(featuresCol="features_norm").setK(14).setSeed(1)
#as we know that normalizer is obtaining ouputCol= features_norm by the use of inputCol=features(which is actually obtained by vectorAssembler )
#so pipline is ordered as =1.vectorAssembler  2.normalizer  3.kmeans 
                           
pipeline = Pipeline(stages=[vectorAssembler,normalizer,kmeans])#as normalizer already configured above
model = pipeline.fit(df)

predictions = model.transform(df)

evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))#very optamized one 


Silhouette with squared euclidean distance = 0.2668998965895519


Sometimes, inflating the dataset helps, here we multiply x by 10, let’s see if the performance inceases.

In [23]:
from pyspark.sql.functions import col
df_denormalized = df.select([col('*'),(col('x')*10)]).drop('x').withColumnRenamed('(x * 10)','x')
df_denormalized.show()

+---+---+--------------------+-----------+---+
|  y|  z|              source|      class|  x|
+---+---+--------------------+-----------+---+
| 49| 35|Accelerometer-201...|Brush_teeth|220|
| 49| 35|Accelerometer-201...|Brush_teeth|220|
| 52| 35|Accelerometer-201...|Brush_teeth|220|
| 52| 35|Accelerometer-201...|Brush_teeth|220|
| 52| 34|Accelerometer-201...|Brush_teeth|210|
| 51| 34|Accelerometer-201...|Brush_teeth|220|
| 50| 35|Accelerometer-201...|Brush_teeth|200|
| 52| 34|Accelerometer-201...|Brush_teeth|220|
| 50| 34|Accelerometer-201...|Brush_teeth|220|
| 51| 35|Accelerometer-201...|Brush_teeth|220|
| 51| 33|Accelerometer-201...|Brush_teeth|210|
| 50| 34|Accelerometer-201...|Brush_teeth|200|
| 49| 33|Accelerometer-201...|Brush_teeth|210|
| 49| 33|Accelerometer-201...|Brush_teeth|210|
| 51| 35|Accelerometer-201...|Brush_teeth|200|
| 49| 34|Accelerometer-201...|Brush_teeth|180|
| 48| 34|Accelerometer-201...|Brush_teeth|190|
| 53| 34|Accelerometer-201...|Brush_teeth|160|
| 52| 35|Acce

In [22]:
kmeans = KMeans(featuresCol="features").setK(14).setSeed(1)
pipeline = Pipeline(stages=[vectorAssembler, kmeans])
model = pipeline.fit(df_denormalized)
predictions = model.transform(df_denormalized)

evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.5709023393004293


Apache SparkML can be used to try many different algorithms and parametrizations using the same pipeline. Please change the code below to use GaussianMixture over KMeans. Please use the following link for your reference.

https://spark.apache.org/docs/latest/ml-clustering.html#gaussian-mixture-model-gmm


In [24]:
from pyspark.ml.clustering import GaussianMixture

gmm = GaussianMixture(featuresCol="features").setK(14).setSeed(1)
pipeline = Pipeline(stages=[vectorAssembler,gmm])

model = pipeline.fit(df)

predictions = model.transform(df)

evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.15906267433367427
