Welcome to exercise one of week four of “Apache Spark for Scalable Machine Learning on BigData”. In this exercise we’ll work on classification.

Let’s create our DataFrame again:


In [1]:
from pyspark import SparkConf
from pyspark import SparkContext

conf = SparkConf()

sc = SparkContext(conf=conf)
from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession(sc)

In [2]:
# # delete files from previous runs
# !rm -f hmp.parquet*

# # download the file containing the data in PARQUET format
# !wget https://github.com/IBM/coursera/raw/master/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

Since this is supervised learning, let’s split our data into train (80%) and test (20%) set.

In [5]:
##train test split method on pyspark dataframes
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

In [7]:
df_train.show()

+---+---+---+--------------------+--------------+
|  x|  y|  z|              source|         class|
+---+---+---+--------------------+--------------+
|  0| 12| 39|Accelerometer-201...| Sitdown_chair|
|  0| 16| 31|Accelerometer-201...|     Getup_bed|
|  0| 17| 36|Accelerometer-201...|   Brush_teeth|
|  0| 24| 35|Accelerometer-201...| Sitdown_chair|
|  0| 26| 15|Accelerometer-201...|  Climb_stairs|
|  0| 26| 42|Accelerometer-201...|   Brush_teeth|
|  0| 27| 31|Accelerometer-201...| Sitdown_chair|
|  0| 27| 37|Accelerometer-201...|   Brush_teeth|
|  0| 27| 41|Accelerometer-201...|   Brush_teeth|
|  0| 28| 28|Accelerometer-201...|   Brush_teeth|
|  0| 29| 17|Accelerometer-201...|     Getup_bed|
|  0| 29| 25|Accelerometer-201...|     Getup_bed|
|  0| 29| 25|Accelerometer-201...|  Climb_stairs|
|  0| 29| 32|Accelerometer-201...|Descend_stairs|
|  0| 29| 34|Accelerometer-201...|          Walk|
|  0| 29| 37|Accelerometer-201...|   Brush_teeth|
|  0| 29| 38|Accelerometer-201...|   Brush_teeth|


Again, we can re-use our feature engineering pipeline

In [32]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression


indexer = StringIndexer(inputCol="class", outputCol="label")

In [24]:
##Indexer is like label encoder from spy
indexer.fit(df_train).transform(df_train).show()

+---+---+---+--------------------+--------------+-----+
|  x|  y|  z|              source|         class|label|
+---+---+---+--------------------+--------------+-----+
|  0| 12| 39|Accelerometer-201...| Sitdown_chair|  8.0|
|  0| 16| 31|Accelerometer-201...|     Getup_bed|  1.0|
|  0| 17| 36|Accelerometer-201...|   Brush_teeth|  6.0|
|  0| 24| 35|Accelerometer-201...| Sitdown_chair|  8.0|
|  0| 26| 15|Accelerometer-201...|  Climb_stairs|  4.0|
|  0| 26| 42|Accelerometer-201...|   Brush_teeth|  6.0|
|  0| 27| 31|Accelerometer-201...| Sitdown_chair|  8.0|
|  0| 27| 37|Accelerometer-201...|   Brush_teeth|  6.0|
|  0| 27| 41|Accelerometer-201...|   Brush_teeth|  6.0|
|  0| 28| 28|Accelerometer-201...|   Brush_teeth|  6.0|
|  0| 29| 17|Accelerometer-201...|     Getup_bed|  1.0|
|  0| 29| 25|Accelerometer-201...|     Getup_bed|  1.0|
|  0| 29| 25|Accelerometer-201...|  Climb_stairs|  4.0|
|  0| 29| 32|Accelerometer-201...|Descend_stairs| 10.0|
|  0| 29| 34|Accelerometer-201...|          Walk

In [25]:
##dont fit on vector asssembler only transform. and turn xyx into one vector
vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")
vectorAssembler.transform(df_train).show()

+---+---+---+--------------------+--------------+---------------+
|  x|  y|  z|              source|         class|       features|
+---+---+---+--------------------+--------------+---------------+
|  0| 12| 39|Accelerometer-201...| Sitdown_chair|[0.0,12.0,39.0]|
|  0| 16| 31|Accelerometer-201...|     Getup_bed|[0.0,16.0,31.0]|
|  0| 17| 36|Accelerometer-201...|   Brush_teeth|[0.0,17.0,36.0]|
|  0| 24| 35|Accelerometer-201...| Sitdown_chair|[0.0,24.0,35.0]|
|  0| 26| 15|Accelerometer-201...|  Climb_stairs|[0.0,26.0,15.0]|
|  0| 26| 42|Accelerometer-201...|   Brush_teeth|[0.0,26.0,42.0]|
|  0| 27| 31|Accelerometer-201...| Sitdown_chair|[0.0,27.0,31.0]|
|  0| 27| 37|Accelerometer-201...|   Brush_teeth|[0.0,27.0,37.0]|
|  0| 27| 41|Accelerometer-201...|   Brush_teeth|[0.0,27.0,41.0]|
|  0| 28| 28|Accelerometer-201...|   Brush_teeth|[0.0,28.0,28.0]|
|  0| 29| 17|Accelerometer-201...|     Getup_bed|[0.0,29.0,17.0]|
|  0| 29| 25|Accelerometer-201...|     Getup_bed|[0.0,29.0,25.0]|
|  0| 29| 

In [26]:
##normalizer will take those vector encoded features and normalize them. But it has to take in the previous dataframe
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)
normalizer.transform(vectorAssembler.transform(df_train)).show()

+---+---+---+--------------------+--------------+---------------+--------------------+
|  x|  y|  z|              source|         class|       features|       features_norm|
+---+---+---+--------------------+--------------+---------------+--------------------+
|  0| 12| 39|Accelerometer-201...| Sitdown_chair|[0.0,12.0,39.0]|[0.0,0.2352941176...|
|  0| 16| 31|Accelerometer-201...|     Getup_bed|[0.0,16.0,31.0]|[0.0,0.3404255319...|
|  0| 17| 36|Accelerometer-201...|   Brush_teeth|[0.0,17.0,36.0]|[0.0,0.3207547169...|
|  0| 24| 35|Accelerometer-201...| Sitdown_chair|[0.0,24.0,35.0]|[0.0,0.4067796610...|
|  0| 26| 15|Accelerometer-201...|  Climb_stairs|[0.0,26.0,15.0]|[0.0,0.6341463414...|
|  0| 26| 42|Accelerometer-201...|   Brush_teeth|[0.0,26.0,42.0]|[0.0,0.3823529411...|
|  0| 27| 31|Accelerometer-201...| Sitdown_chair|[0.0,27.0,31.0]|[0.0,0.4655172413...|
|  0| 27| 37|Accelerometer-201...|   Brush_teeth|[0.0,27.0,37.0]|[0.0,0.421875,0.5...|
|  0| 27| 41|Accelerometer-201...|   Brush_

In [50]:
##I could instead make a pipeeline of those three 
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer])

#Lets only select the X and Y
input_df = pipeline.fit(df_train).transform(df_train)[['features_norm','label']]
input_df.show()

+--------------------+-----+
|       features_norm|label|
+--------------------+-----+
|[0.0,0.2352941176...|  8.0|
|[0.0,0.3404255319...|  1.0|
|[0.0,0.3207547169...|  6.0|
|[0.0,0.4067796610...|  8.0|
|[0.0,0.6341463414...|  4.0|
|[0.0,0.3823529411...|  6.0|
|[0.0,0.4655172413...|  8.0|
|[0.0,0.421875,0.5...|  6.0|
|[0.0,0.3970588235...|  6.0|
|       [0.0,0.5,0.5]|  6.0|
|[0.0,0.6304347826...|  1.0|
|[0.0,0.5370370370...|  1.0|
|[0.0,0.5370370370...|  4.0|
|[0.0,0.4754098360...| 10.0|
|[0.0,0.4603174603...|  0.0|
|[0.0,0.4393939393...|  6.0|
|[0.0,0.4328358208...|  6.0|
|[0.0,0.4264705882...|  6.0|
|[0.0,0.4027777777...|  6.0|
|[0.0,0.3866666666...|  6.0|
+--------------------+-----+
only showing top 20 rows



In [56]:
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr.params

[Param(parent='LogisticRegression_5c517ac7f3ef', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'),
 Param(parent='LogisticRegression_5c517ac7f3ef', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'),
 Param(parent='LogisticRegression_5c517ac7f3ef', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'),
 Param(parent='LogisticRegression_5c517ac7f3ef', name='featuresCol', doc='features column name.'),
 Param(parent='LogisticRegression_5c517ac7f3ef', name='fitIntercept', doc='whether to fit an intercept term.'),
 Param(parent='LogisticRegression_5c517ac7f3ef', name='labelCol', doc='label column name.'),
 Param(parent='LogisticRegression_5c517ac7f3ef', name='lowerBoundsOnCoefficients', doc='The lower bounds on coefficients if fitting under bound cons

In [78]:
##Now I can fit the lr of the input_df
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8,featuresCol='features_norm')
model = lr.fit(input_df)
prediction_train = model.transform(input_df)
prediction_test = model.transform(pipeline.fit(df_train).transform(df_test)[['features_norm','label']])
# Print the coefficients and intercept for logistic regression
#print("Coefficients: " + str(model.coefficientMatrix))
#print("Intercept: " + str(model.interceptVector))

In [80]:
prediction_test.printSchema()

root
 |-- features_norm: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



Now we use LogisticRegression, a simple and basic linear classifier to obtain a classification performance baseline.

If we look at the schema of the prediction dataframe we see that there is an additional column called prediction which contains the best guess for the class our model predicts.

In [81]:
prediction_test.printSchema()

root
 |-- features_norm: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



Let’s evaluate performance by using a build-in functionality of Apache SparkML.

In [83]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
MulticlassClassificationEvaluator().setMetricName("accuracy").evaluate(prediction_test) 

0.20750648073656924

So we get 20% right. This is not bad for a baseline. Note that random guessing would give us only 7%. Of course we need to improve. You might have notices that we’re dealing with a time series here. And we’re not making use of that fact right now as we look at each training example only individually. But this is ok for now. More advanced courses like “Advanced Machine Learning and Signal Processing” (https://www.coursera.org/learn/advanced-machine-learning-signal-processing/) will teach you how to improve accuracy to the nearly 100% by using algorithms like Fourier transformation or wavelet transformation. But let’s skip this for now. In the following cell, please use the RandomForest classifier (you might need to play with the “numTrees” parameter) in the code cell below. You should get an accuracy of around 44%. More on RandomForest can be found here:

https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier


In [86]:
from pyspark.ml.classification import RandomForestClassifier


#To change the feature an dlabel column, change the constructure
rf = RandomForestClassifier(labelCol="label", featuresCol="features_norm", numTrees=10)

#fit the input_edf
model = rf.fit(input_df)

#What is the prediction on train
prediction_train = model.transform(input_df)
prediction_test = model.transform(pipeline.fit(df_train).transform(df_test)[['features_norm','label']])
accuracy_test = MulticlassClassificationEvaluator().setMetricName("accuracy").evaluate(prediction_test) 
accuracy_train = MulticlassClassificationEvaluator().setMetricName("accuracy").evaluate(prediction_train)
print("Accuracy train {:.4f}\nAccuracy Test {:.4f}".format(accuracy_train,accuracy_test))

Accuracy train 0.4237
Accuracy Test 0.4226
