Welcome to exercise one of week four of “Apache Spark for Scalable Machine Learning on BigData”. In this exercise we’ll work on classification.

Let’s create our DataFrame again:


This notebook is designed to run in a IBM Watson Studio Apache Spark runtime. In case you are running it in an IBM Watson Studio standard runtime or outside Watson Studio, we install Apache Spark in local mode for test purposes only. Please don't use it in production.

In [None]:
!pip install --upgrade pip

In [1]:
if not ('sc' in locals() or 'sc' in globals()):
    print('It seems you are note running in a IBM Watson Studio Apache Spark Notebook. You might be running in a IBM Watson Studio Default Runtime or outside IBM Waston Studio. Therefore installing local Apache Spark environment for you. Please do not use in Production')
    
    from pip import main
    main(['install', 'pyspark==2.4.5'])
    
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession

    sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
    
    spark = SparkSession \
        .builder \
        .getOrCreate()

It seems you are note running in a IBM Watson Studio Apache Spark Notebook. You might be running in a IBM Watson Studio Default Runtime or outside IBM Waston Studio. Therefore installing local Apache Spark environment for you. Please do not use in Production


Please see https://github.com/pypa/pip/issues/5599 for advice on fixing the underlying issue.
To avoid this problem you can invoke Python with '-m pip' instead of running pip directly.




You should consider upgrading via the '//anaconda3/bin/python -m pip install --upgrade pip' command.


In [2]:
# # delete files from previous runs
# !rm -f hmp.parquet*

# # download the file containing the data in PARQUET format
# !wget https://github.com/IBM/coursera/raw/master/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

Since this is supervised learning, let’s split our data into train (80%) and test (20%) set.

In [3]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

Again, we can re-use our feature engineering pipeline

In [5]:
df.show()

+---+---+---+--------------------+-----------+
|  x|  y|  z|              source|      class|
+---+---+---+--------------------+-----------+
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 49| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 35|Accelerometer-201...|Brush_teeth|
| 21| 52| 34|Accelerometer-201...|Brush_teeth|
| 22| 51| 34|Accelerometer-201...|Brush_teeth|
| 20| 50| 35|Accelerometer-201...|Brush_teeth|
| 22| 52| 34|Accelerometer-201...|Brush_teeth|
| 22| 50| 34|Accelerometer-201...|Brush_teeth|
| 22| 51| 35|Accelerometer-201...|Brush_teeth|
| 21| 51| 33|Accelerometer-201...|Brush_teeth|
| 20| 50| 34|Accelerometer-201...|Brush_teeth|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|
| 21| 49| 33|Accelerometer-201...|Brush_teeth|
| 20| 51| 35|Accelerometer-201...|Brush_teeth|
| 18| 49| 34|Accelerometer-201...|Brush_teeth|
| 19| 48| 34|Accelerometer-201...|Brush_teeth|
| 16| 53| 34|Accelerometer-201...|Brush_teeth|
| 18| 52| 35|

In [6]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer


indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

Now we use LogisticRegression, a simple and basic linear classifier to obtain a classification performance baseline.

In [7]:
df_train.show()

+---+---+---+--------------------+--------------+
|  x|  y|  z|              source|         class|
+---+---+---+--------------------+--------------+
|  0| 10| 28|Accelerometer-201...|     Getup_bed|
|  0| 12| 39|Accelerometer-201...| Sitdown_chair|
|  0| 15| 39|Accelerometer-201...|   Brush_teeth|
|  0| 16| 31|Accelerometer-201...|     Getup_bed|
|  0| 23| 36|Accelerometer-201...|   Brush_teeth|
|  0| 24| 35|Accelerometer-201...| Sitdown_chair|
|  0| 25| 40|Accelerometer-201...|   Brush_teeth|
|  0| 26| 15|Accelerometer-201...|  Climb_stairs|
|  0| 26| 42|Accelerometer-201...|   Brush_teeth|
|  0| 27| 33|Accelerometer-201...|     Getup_bed|
|  0| 27| 37|Accelerometer-201...|   Brush_teeth|
|  0| 27| 39|Accelerometer-201...|   Brush_teeth|
|  0| 27| 41|Accelerometer-201...|   Brush_teeth|
|  0| 28| 28|Accelerometer-201...|   Brush_teeth|
|  0| 29| 17|Accelerometer-201...|     Getup_bed|
|  0| 29| 25|Accelerometer-201...|     Getup_bed|
|  0| 29| 25|Accelerometer-201...|  Climb_stairs|


In [8]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer,lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)

If we look at the schema of the prediction dataframe we see that there is an additional column called prediction which contains the best guess for the class our model predicts.

In [9]:
prediction.printSchema()

root
 |-- x: integer (nullable = true)
 |-- y: integer (nullable = true)
 |-- z: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- class: string (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- features_norm: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [10]:
prediction.show()

+---+---+---+--------------------+--------------+-----+---------------+--------------------+--------------------+--------------------+----------+
|  x|  y|  z|              source|         class|label|       features|       features_norm|       rawPrediction|         probability|prediction|
+---+---+---+--------------------+--------------+-----+---------------+--------------------+--------------------+--------------------+----------+
|  0| 11| 38|Accelerometer-201...| Sitdown_chair|  8.0|[0.0,11.0,38.0]|[0.0,0.2244897959...|[1.25607900059940...|[0.20682507997930...|       0.0|
|  0| 17| 36|Accelerometer-201...|   Brush_teeth|  6.0|[0.0,17.0,36.0]|[0.0,0.3207547169...|[1.25607900059940...|[0.20682507997930...|       0.0|
|  0| 25| 30|Accelerometer-201...|   Brush_teeth|  6.0|[0.0,25.0,30.0]|[0.0,0.4545454545...|[1.25607900059940...|[0.20682507997930...|       0.0|
|  0| 27| 31|Accelerometer-201...| Sitdown_chair|  8.0|[0.0,27.0,31.0]|[0.0,0.4655172413...|[1.25607900059940...|[0.20682507

Let’s evaluate performance by using a build-in functionality of Apache SparkML.

In [11]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
MulticlassClassificationEvaluator().setMetricName("accuracy").evaluate(prediction) 

0.2057135826221183

So we get 20% right. This is not bad for a baseline. Note that random guessing would give us only 7%. Of course we need to improve. You might have notices that we’re dealing with a time series here. And we’re not making use of that fact right now as we look at each training example only individually. But this is ok for now. More advanced courses like “Advanced Machine Learning and Signal Processing” (https://www.coursera.org/learn/advanced-machine-learning-signal-processing/) will teach you how to improve accuracy to the nearly 100% by using algorithms like Fourier transformation or wavelet transformation. But let’s skip this for now. In the following cell, please use the RandomForest classifier (you might need to play with the “numTrees” parameter) in the code cell below. You should get an accuracy of around 44%. More on RandomForest can be found here:

https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier


In [None]:
$$