# Apache SparkML

In [None]:
!git clone https://github.com/wchill/HMP_Dataset.git

In [1]:
!ls HMP_Dataset

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20190625055659-0002
KERNEL_ID = f1a70b62-72d7-4cdc-a3c6-634ee21fd5c9
Brush_teeth	Drink_glass  Liedown_bed  Sitdown_chair  final.py
Climb_stairs	Eat_meat     MANUAL.txt   Standup_chair  impdata.py
Comb_hair	Eat_soup     Pour_water   Use_telephone
Descend_stairs	Getup_bed    README.txt   Walk


In [2]:
!ls HMP_Dataset/Brush_teeth

Accelerometer-2011-04-11-13-28-18-brush_teeth-f1.txt
Accelerometer-2011-04-11-13-29-54-brush_teeth-f1.txt
Accelerometer-2011-05-30-08-35-11-brush_teeth-f1.txt
Accelerometer-2011-05-30-09-36-50-brush_teeth-f1.txt
Accelerometer-2011-05-30-10-34-16-brush_teeth-m1.txt
Accelerometer-2011-05-30-21-10-57-brush_teeth-f1.txt
Accelerometer-2011-05-30-21-55-04-brush_teeth-m2.txt
Accelerometer-2011-05-31-15-16-47-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-42-22-brush_teeth-f1.txt
Accelerometer-2011-06-02-10-45-50-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-45-27-brush_teeth-f1.txt
Accelerometer-2011-06-06-10-48-05-brush_teeth-f1.txt


In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType

In [4]:
schema = StructType([ 
    StructField('x', IntegerType(), True),
    StructField('y', IntegerType(), True),
    StructField('z', IntegerType(), True)
])

In [5]:
import os

file_list = os.listdir('HMP_Dataset')
file_list_filtered = [s for s in file_list if '_' in s]

df = None

In [6]:
file_list_filtered

['Brush_teeth',
 'Climb_stairs',
 'Comb_hair',
 'Descend_stairs',
 'Drink_glass',
 'Eat_meat',
 'Eat_soup',
 'Getup_bed',
 'Liedown_bed',
 'Pour_water',
 'Sitdown_chair',
 'Standup_chair',
 'Use_telephone']

In [7]:
from pyspark.sql.functions import lit

for category in file_list_filtered:
    data_files = os.listdir('HMP_Dataset/'+category)
    for data_file in data_files:
        temp_df = spark.read.option('header','false').option('delimiter',' ').csv('HMP_Dataset/'+category+'/'+data_file,schema=schema)
        temp_df = temp_df.withColumn('class', lit(category))
        temp_df = temp_df.withColumn('source', lit(data_file))
        
        if df is None:
            df = temp_df
        else:
            df = df.union(temp_df)

In [8]:
df.show()

+---+---+---+-----------+--------------------+
|  x|  y|  z|      class|              source|
+---+---+---+-----------+--------------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|
| 20| 50| 34|Brush_teeth|Accelerometer-201...|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|
| 20| 51| 35|Brush_teeth|Accelerometer-201...|
| 18| 49| 34|Brush_teeth|Accelerometer-201...|
| 19| 48| 34|Brush_teeth|Accelerometer-201...|
| 16| 53| 34|Brush_teeth|Accelerometer-201...|
| 18| 52| 35|

In [9]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = 'class', outputCol = 'ClassIndex')
indexed = indexer.fit(df).transform(df)

indexed.show()

+---+---+---+-----------+--------------------+----------+
|  x|  y|  z|      class|              source|ClassIndex|
+---+---+---+-----------+--------------------+----------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|       5.0|
| 20| 50| 34|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|       5.0|
| 21| 49| 33|Brush_teeth|Accelerometer-201...|       5.0|
| 20| 51| 35|B

### One-Hot Encoding

In [10]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol = 'ClassIndex', outputCol = 'categoryVec')
encoded = encoder.transform(indexed)
encoded.show()

+---+---+---+-----------+--------------------+----------+--------------+
|  x|  y|  z|      class|              source|ClassIndex|   categoryVec|
+---+---+---+-----------+--------------------+----------+--------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 20| 50| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 50| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 22| 51| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|
| 21| 51| 33|Brush_teeth|Accelerometer-201...|     

In [12]:
#Since SparkML only work with vector objects
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['x','y','x'], outputCol = 'features')

features_vectorized = vectorAssembler.transform(encoded)


In [13]:
#Normalize data for simplicity

from pyspark.ml.feature import Normalizer

normalizer = Normalizer(inputCol ='features', outputCol = 'feature_norm', p=1.0)

normalized_data = normalizer.transform(features_vectorized)

## Pipeline

In [14]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [indexer, encoder, vectorAssembler, normalizer])

model = pipeline.fit(df)

prediction = model.transform(df)

prediction.show()

+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
|  x|  y|  z|      class|              source|ClassIndex|   categoryVec|        features|        feature_norm|
+---+---+---+-----------+--------------------+----------+--------------+----------------+--------------------+
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,22.0]|[0.23655913978494...|
| 22| 49| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,49.0,22.0]|[0.23655913978494...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,22.0]|[0.22916666666666...|
| 22| 52| 35|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,52.0,22.0]|[0.22916666666666...|
| 21| 52| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[21.0,52.0,21.0]|[0.22340425531914...|
| 22| 51| 34|Brush_teeth|Accelerometer-201...|       5.0|(12,[5],[1.0])|[22.0,51.0,22.0]|[0.23157894736842...|
|