In [1]:
from IPython.display import Markdown, display
def printmd(string):
    display(Markdown('# <span style="color:red">'+string+'</span>'))


if ('sc' in locals() or 'sc' in globals()):
    printmd('<<<<<!!!!! It seems that you are running in a IBM Watson Studio Apache Spark Notebook. Please run it in an IBM Watson Studio Default Runtime (without Apache Spark) !!!!!>>>>>')


In [2]:
!pip install pyspark==2.4.5

Collecting pyspark==2.4.5
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 165kB/s  eta 0:00:01
[?25hCollecting py4j==0.10.7 (from pyspark==2.4.5)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 34.2MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Stored in directory: /home/dsxuser/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


In [3]:
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.sql import SparkSession
except ImportError as e:
    printmd('<<<<<!!!!! Please restart your kernel after installing Apache Spark !!!!!>>>>>')

In [4]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

In [26]:
# delete files from previous runs
!rm -f hmp.parquet*

# download the file containing the data in PARQUET format
!wget https://github.com/IBM/coursera/raw/master/hmp.parquet
    
# create a dataframe out of it
df = spark.read.parquet('hmp.parquet')

# register a corresponding query table
df.createOrReplaceTempView('df')

--2020-05-02 10:24:30--  https://github.com/IBM/coursera/raw/master/hmp.parquet
Resolving github.com (github.com)... 140.82.118.3
Connecting to github.com (github.com)|140.82.118.3|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://github.com/IBM/skillsnetwork/raw/master/hmp.parquet [following]
--2020-05-02 10:24:30--  https://github.com/IBM/skillsnetwork/raw/master/hmp.parquet
Reusing existing connection to github.com:443.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/IBM/skillsnetwork/master/hmp.parquet [following]
--2020-05-02 10:24:30--  https://raw.githubusercontent.com/IBM/skillsnetwork/master/hmp.parquet
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.16.133
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.16.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 932997 (911K) [application/octet-stream]
Savin

## This is just an example demonstrating how to use spark API for Classification
## This is subquential data, so the prediction cannot be made on one row and have to consider the entire scenario

Since this is supervised learning, let’s split our data into train (80%) and test (20%) set.

In [31]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

df_train.createOrReplaceTempView('df_train')
df_test.createOrReplaceTempView('df_test')

Again, we can re-use our feature engineering pipeline

In [37]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler


indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=["x","y","z"],
                                  outputCol="features")


minmaxscaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

Now we use LogisticRegression, a simple and basic linear classifier to obtain a classification performance baseline.

In [40]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

lr = LogisticRegression(labelCol="label", featuresCol="scaledFeatures", maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[indexer, vectorAssembler, minmaxscaler,lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)

If we look at the schema of the prediction dataframe we see that there is an additional column called prediction which contains the best guess for the class our model predicts.

In [9]:
prediction.printSchema()

root
 |-- x: integer (nullable = true)
 |-- y: integer (nullable = true)
 |-- z: integer (nullable = true)
 |-- source: string (nullable = true)
 |-- class: string (nullable = true)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- features_norm: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



Let’s evaluate performance by using a build-in functionality of Apache SparkML.

In [41]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
MulticlassClassificationEvaluator().setMetricName("accuracy").evaluate(prediction) 

0.20599499107152885

So we get 20% right. This is not bad for a baseline. Note that random guessing would give us only 7%. Of course we need to improve. You might have notices that we’re dealing with a time series here. And we’re not making use of that fact right now as we look at each training example only individually. But this is ok for now. More advanced courses like “Advanced Machine Learning and Signal Processing” (https://www.coursera.org/learn/advanced-machine-learning-signal-processing/) will teach you how to improve accuracy to the nearly 100% by using algorithms like Fourier transformation or wavelet transformation. But let’s skip this for now. In the following cell, please use the RandomForest classifier (you might need to play with the “numTrees” parameter) in the code cell below. You should get an accuracy of around 44%. More on RandomForest can be found here:

https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier


In [32]:
spark.sql('select class,count(*) from df_test group by class').show()

+--------------+--------+
|         class|count(1)|
+--------------+--------+
| Use_telephone|    3001|
| Standup_chair|    5149|
|      Eat_meat|    6290|
|     Getup_bed|    9124|
|   Drink_glass|    8477|
|    Pour_water|    8392|
|     Comb_hair|    4664|
|          Walk|   18342|
|  Climb_stairs|    8011|
| Sitdown_chair|    4910|
|   Liedown_bed|    2293|
|Descend_stairs|    3036|
|   Brush_teeth|    6038|
|      Eat_soup|    1314|
+--------------+--------+



In [36]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler

minmaxscaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
rf = RandomForestClassifier(labelCol="label", featuresCol="scaledFeatures", numTrees=10)

pipeline_rf = Pipeline(stages=[indexer, vectorAssembler, minmaxscaler,rf])
model_rf = pipeline_rf.fit(df_train)
prediction_rf = model_rf.transform(df_test)

prediction_rf.show()
print(MulticlassClassificationEvaluator().setMetricName("accuracy").evaluate(prediction_rf))



+---+---+---+--------------------+--------------+-----+---------------+--------------------+--------------------+--------------------+----------+
|  x|  y|  z|              source|         class|label|       features|      scaledFeatures|       rawPrediction|         probability|prediction|
+---+---+---+--------------------+--------------+-----+---------------+--------------------+--------------------+--------------------+----------+
|  0| 12| 39|Accelerometer-201...| Sitdown_chair|  8.0|[0.0,12.0,39.0]|[0.0,0.1904761904...|[2.80737689904376...|[0.28073768990437...|       1.0|
|  0| 16| 31|Accelerometer-201...|     Getup_bed|  1.0|[0.0,16.0,31.0]|[0.0,0.2539682539...|[2.35006707679124...|[0.23500670767912...|       1.0|
|  0| 25| 30|Accelerometer-201...|   Brush_teeth|  6.0|[0.0,25.0,30.0]|[0.0,0.3968253968...|[2.35006707679124...|[0.23500670767912...|       1.0|
|  0| 25| 40|Accelerometer-201...|   Brush_teeth|  6.0|[0.0,25.0,40.0]|[0.0,0.3968253968...|[2.78545303175419...|[0.27854530