In [1]:
#variables for data paths
data_path = "dbfs:/FileStore/tmp/wisdm"

In [2]:
import pyspark.sql.functions as fn #contains wide array of data manipulation functions
from pyspark.sql import Row #optimized storage format for spark dataframes
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType, FloatType #data types
from glob import glob

In [3]:
#While not required it is a best practice to specify a schema when reading csv files
schema = StructType() \
.add(StructField("Subject-id",IntegerType())) \
.add(StructField("ActivityLabel",StringType())) \
.add(StructField("Timestamp",LongType())) \
.add(StructField("x",FloatType())) \
.add(StructField("y",FloatType())) \
.add(StructField("z",FloatType()))

In [4]:
#spark automatically reads data from an entire directory
df_phone_accel = spark.read.schema(schema).csv('dbfs:/FileStore/tmp/wisdm/phone/accel/')
df_phone_gyro = spark.read.schema(schema).csv('dbfs:/FileStore/tmp/wisdm/phone/gyro/')

df_watch_accel = spark.read.schema(schema).csv('dbfs:/FileStore/tmp/wisdm/watch/accel/')
df_watch_gyro = spark.read.schema(schema).csv('dbfs:/FileStore/tmp/wisdm/watch/gyro/')

In [5]:
#Let's get the activity key mapping using the lower level RDD API

#Read the activity key into an rdd
rdd_act = sc.textFile('/FileStore/tmp/wisdm/activity_key.txt')

#split values and put them into spark Row Objects
rdd_act = rdd_act \
.map(lambda x: x.split(" = ")) \
.filter(lambda x: len(x) == 2) \
.map(lambda x: Row(ActivityDescription = x[0], ActivityLabel = x[1]))

#And rdd of Rows can be easily converted into a dataframe
df_act = rdd_act.toDF()

In [6]:
#schema for watch accelerometer data
df_watch_accel.printSchema()

In [7]:
#schema for the activity key data frame
df_act.printSchema()

In [8]:
#Add device and sensor columns
df_phone_accel = df_phone_accel \
.withColumn('Device',fn.lit("Phone")) \
.withColumn('Sensor',fn.lit("Accel"))

df_phone_gyro = df_phone_gyro \
.withColumn('Device',fn.lit("Phone")) \
.withColumn('Sensor',fn.lit("Gyro"))

df_watch_accel = df_watch_accel \
.withColumn('Device',fn.lit("Watch")) \
.withColumn('Sensor',fn.lit("Accel"))

df_watch_gyro = df_watch_gyro \
.withColumn('Device',fn.lit("Watch")) \
.withColumn('Sensor',fn.lit("Gyro"))


In [9]:
#union the 4 dataframes together
df_combined = df_phone_accel \
.union(df_phone_gyro) \
.union(df_watch_accel) \
.union(df_watch_gyro)

In [10]:
#join with activity key to get the activity descriptions
df_combined = df_combined.join(df_act,"ActivityLabel","inner")

In [11]:
display(df_combined)

ActivityLabel,Subject-id,Timestamp,x,y,z,Device,Sensor,ActivityDescription
K,1619,170441359106820,-4.0761285,-5.349245,6.987478,Phone,Accel,drinking
K,1619,170441361139788,-4.073136,-5.353435,6.9647326,Phone,Accel,drinking
K,1619,170441399800568,-4.086902,-5.3540335,7.029975,Phone,Accel,drinking
K,1619,170441439899892,-4.086902,-5.3282957,7.0144124,Phone,Accel,drinking
K,1619,170441481587652,-4.0701427,-5.33488,7.032369,Phone,Accel,drinking
K,1619,170441524838485,-4.0653543,-5.3366756,7.0341644,Phone,Accel,drinking
K,1619,170441561311038,-4.1000705,-5.3342814,7.007828,Phone,Accel,drinking
K,1619,170441601223588,-4.0683475,-5.3049526,7.029975,Phone,Accel,drinking
K,1619,170441641278850,-4.0384197,-5.345654,7.0305734,Phone,Accel,drinking
K,1619,170441679208330,-4.045004,-5.341464,7.0132155,Phone,Accel,drinking


In [12]:
#create a temporary view to run some SQL queries on it
df_combined.createOrReplaceTempView("wisdmData")

In [13]:
%sql
select * from wisdmData
limit 5

ActivityLabel,Subject-id,Timestamp,x,y,z,Device,Sensor,ActivityDescription
K,1650,358188655208000,0.7759864,3.781736,8.653206,Phone,Accel,drinking
K,1650,358188675272000,1.1352394,4.3325906,7.199429,Phone,Accel,drinking
K,1650,358188695368000,0.91729254,4.9073954,7.8604546,Phone,Accel,drinking
K,1650,358188715464000,0.3065625,5.352869,8.382568,Phone,Accel,drinking
K,1650,358188735708000,-0.3879932,6.090535,7.814949,Phone,Accel,drinking


In [14]:
%sql
select ActivityDescription, count(1) 
from wisdmData
group by ActivityDescription

ActivityDescription,count(1)
drinking,901381
sandwich,857571
kicking,882417
sitting,875030
soup,869704
standing,882587
jogging,862281
dribbling,882716
catch,868766
clapping,869905


In [15]:
%sql
select Timestamp,x,y,z
from wisdmData
where `Subject-id` = 1601
and Device = "Watch"
and Sensor = "Accel"
and ActivityDescription = "walking"

Timestamp,x,y,z
1896411611733301,-2.969708,-1.9493291,10.726623
1896411661695801,-3.4868555,-2.420987,11.660361
1896411711658874,-2.8260558,-2.8543375,9.792884
1896411761623926,-3.3072903,-3.076998,9.926959
1896411811593717,-3.9968204,-2.8471546,9.280524
1896411861613316,-3.4102411,-1.8727146,8.22947
1896411911642791,-2.0766706,-0.67321956,6.4817023
1896411961677199,-1.4733319,0.07377134,5.3061495
1896412011712879,-1.8683752,-0.24465749,5.2606597
1896412061637572,-2.3591864,0.38501757,3.8911762


In [17]:
#we can leverage the pandas UDF functionality to run a pandas profiling report for each device and sensor in a distributed fashion

import pandas_profiling

dbutils.fs.mkdirs(data_path+'/profiles')

@fn.pandas_udf(df_combined.schema, fn.PandasUDFType.GROUPED_MAP)
def profile_data(pdf):
  device = pdf.loc[0,'Device']
  sensor = pdf.loc[0,'Sensor']
  profile = pandas_profiling.ProfileReport(pdf.sample(1000))
  profile.to_file(output_file="/dbfs/FileStore/tmp/wisdm/profiles/{}_{}_profile.html".format(device,sensor))
  return pdf

In [18]:
#using a pandas UDF is similar to how you'd do it in pandas
#Since spark uses lazy evaluation we have to call an action to execute the profiling
df_combined.groupBy("Device","Sensor").apply(profile_data).count()

In [19]:
#let's log the reports into MLFLow
import mlflow
with mlflow.start_run():
  mlflow.log_param("pandas_profiles",4)
  for p in glob('/dbfs/FileStore/tmp/wisdm/profiles/*.html'):
    mlflow.log_artifact(p)

#Handling Missing Values

In [21]:
#there are no missing values in the dataset so we'll create some for x
#the code below will make 20% of the x values null
df_nulls = df_watch_accel.withColumn('x',fn.when(fn.rand(5) < 0.2, None).otherwise(fn.col('x')))

In [22]:
display(df_nulls.where((fn.col("Subject-id") == 1646) & (fn.col("ActivityLabel") == 'A' )).limit(10))

Subject-id,ActivityLabel,Timestamp,x,y,z,Device,Sensor
1646,A,1470028569293284,,1.469142,3.2790089,Watch,Accel
1646,A,1470028618793284,,-1.6505028,-0.9084499,Watch,Accel
1646,A,1470028668293284,5.168782,-4.3128552,-6.0057054,Watch,Accel
1646,A,1470028717793284,2.8128877,-4.0806174,-7.6984057,Watch,Accel
1646,A,1470028767293284,1.5367782,-5.2920837,-9.225906,Watch,Accel
1646,A,1470028816793284,-1.7720085,-6.189909,-8.414272,Watch,Accel
1646,A,1470028866293284,0.86400753,-9.568128,-5.3760304,Watch,Accel
1646,A,1470028915793284,3.2390556,-9.955989,-1.0856209,Watch,Accel
1646,A,1470028965293284,,-4.391864,3.5136406,Watch,Accel
1646,A,1470029014793284,3.5981858,-8.46919,10.133608,Watch,Accel


In [23]:
#Forward fill
from pyspark.sql import Window
window_spec = Window.orderBy("Timestamp").partitionBy(["Subject-id","ActivityLabel"]).rowsBetween(-10000, 0)

display(df_nulls.withColumn('x',fn.last('x',ignorenulls=True)
                            .over(window_spec))
        .where((fn.col("Subject-id") == 1646) & (fn.col("ActivityLabel") == 'A' ))
        .limit(10))

Subject-id,ActivityLabel,Timestamp,x,y,z,Device,Sensor
1646,A,1470028569293284,,1.469142,3.2790089,Watch,Accel
1646,A,1470028618793284,,-1.6505028,-0.9084499,Watch,Accel
1646,A,1470028668293284,5.168782,-4.3128552,-6.0057054,Watch,Accel
1646,A,1470028717793284,2.8128877,-4.0806174,-7.6984057,Watch,Accel
1646,A,1470028767293284,1.5367782,-5.2920837,-9.225906,Watch,Accel
1646,A,1470028816793284,-1.7720085,-6.189909,-8.414272,Watch,Accel
1646,A,1470028866293284,0.86400753,-9.568128,-5.3760304,Watch,Accel
1646,A,1470028915793284,3.2390556,-9.955989,-1.0856209,Watch,Accel
1646,A,1470028965293284,3.2390556,-4.391864,3.5136406,Watch,Accel
1646,A,1470029014793284,3.5981858,-8.46919,10.133608,Watch,Accel


In [24]:
#Backfill
window_spec = Window.orderBy("Timestamp").partitionBy(["Subject-id","ActivityLabel"]).rowsBetween(1,10000)

display(df_nulls.withColumn('x',fn.when(fn.isnull('x'),fn.first('x',ignorenulls=True)
                            .over(window_spec)).otherwise(fn.col('x')))
        .where((fn.col("Subject-id") == 1646) & (fn.col("ActivityLabel") == 'A' ))
        .limit(10))

Subject-id,ActivityLabel,Timestamp,x,y,z,Device,Sensor
1646,A,1470028569293284,5.168782,1.469142,3.2790089,Watch,Accel
1646,A,1470028618793284,5.168782,-1.6505028,-0.9084499,Watch,Accel
1646,A,1470028668293284,5.168782,-4.3128552,-6.0057054,Watch,Accel
1646,A,1470028717793284,2.8128877,-4.0806174,-7.6984057,Watch,Accel
1646,A,1470028767293284,1.5367782,-5.2920837,-9.225906,Watch,Accel
1646,A,1470028816793284,-1.7720085,-6.189909,-8.414272,Watch,Accel
1646,A,1470028866293284,0.86400753,-9.568128,-5.3760304,Watch,Accel
1646,A,1470028915793284,3.2390556,-9.955989,-1.0856209,Watch,Accel
1646,A,1470028965293284,3.5981858,-4.391864,3.5136406,Watch,Accel
1646,A,1470029014793284,3.5981858,-8.46919,10.133608,Watch,Accel


In [25]:
#mean fill
window_spec = Window.partitionBy(["Subject-id","ActivityLabel"])

display(df_nulls.withColumn('x',fn.when(fn.isnull('x'),fn.mean('x')
                            .over(window_spec)).otherwise(fn.col('x')))
        .where((fn.col("Subject-id") == 1646) & (fn.col("ActivityLabel") == 'A' ))
        .limit(10))

Subject-id,ActivityLabel,Timestamp,x,y,z,Device,Sensor
1646,A,1470028569293284,7.003262799626448,1.469142,3.2790089,Watch,Accel
1646,A,1470028618793284,7.003262799626448,-1.6505028,-0.9084499,Watch,Accel
1646,A,1470028668293284,5.168782234191895,-4.3128552,-6.0057054,Watch,Accel
1646,A,1470028717793284,2.812887668609619,-4.0806174,-7.6984057,Watch,Accel
1646,A,1470028767293284,1.536778211593628,-5.2920837,-9.225906,Watch,Accel
1646,A,1470028816793284,-1.7720085382461548,-6.189909,-8.414272,Watch,Accel
1646,A,1470028866293284,0.8640075325965881,-9.568128,-5.3760304,Watch,Accel
1646,A,1470028915793284,3.239055633544922,-9.955989,-1.0856209,Watch,Accel
1646,A,1470028965293284,7.003262799626448,-4.391864,3.5136406,Watch,Accel
1646,A,1470029014793284,3.598185777664185,-8.46919,10.133608,Watch,Accel


In [26]:
#Fill using pandas
@fn.pandas_udf(df_nulls.schema, fn.PandasUDFType.GROUPED_MAP)
def ffill(pdf):
  pdf.x = pdf.x.fillna(method='ffill')
  return pdf

display(df_nulls.groupBy(["Subject-id","ActivityLabel"])
        .apply(ffill)
        .where((fn.col("Subject-id") == 1646) & (fn.col("ActivityLabel") == 'A' ))
        .limit(10))

Subject-id,ActivityLabel,Timestamp,x,y,z,Device,Sensor
1646,A,1470028569293284,,1.469142,3.2790089,Watch,Accel
1646,A,1470028618793284,,-1.6505028,-0.9084499,Watch,Accel
1646,A,1470028668293284,5.168782,-4.3128552,-6.0057054,Watch,Accel
1646,A,1470028717793284,2.8128877,-4.0806174,-7.6984057,Watch,Accel
1646,A,1470028767293284,1.5367782,-5.2920837,-9.225906,Watch,Accel
1646,A,1470028816793284,-1.7720085,-6.189909,-8.414272,Watch,Accel
1646,A,1470028866293284,0.86400753,-9.568128,-5.3760304,Watch,Accel
1646,A,1470028915793284,3.2390556,-9.955989,-1.0856209,Watch,Accel
1646,A,1470028965293284,3.2390556,-4.391864,3.5136406,Watch,Accel
1646,A,1470029014793284,3.5981858,-8.46919,10.133608,Watch,Accel


#Machine Learning
Let's recreate what we previously did with pandas and sklearn

In [28]:
#Let's fix the timestamps
df_watch_accel = df_watch_accel.withColumn("Timestamp",fn.to_timestamp(fn.round(fn.col("Timestamp") / 1000000000)))
df_watch_gyro = df_watch_gyro.withColumn("Timestamp",fn.to_timestamp(fn.round(fn.col("Timestamp") / 1000000000)))

In [29]:
#Generate aggregate functions for every combination of specified fields and functions
from itertools import product
def generate_agg_funcs(fields,functions,sensor):
  funcs = []
  for fld, fun in product(fields,functions):
    funcs.append(fun(fld).alias("{}_{}_{}".format(fld,sensor,fun.__name__)))
  return funcs

In [30]:
df_watch_gr_agg = df_watch_gyro \
.groupBy("Subject-id","ActivityLabel","Sensor",fn.window("Timestamp","10 seconds")) \
.agg(*generate_agg_funcs(["x","y","z"],[fn.min,fn.avg,fn.max],"gr"))

df_watch_ac_agg = df_watch_accel \
.groupBy("Subject-id","ActivityLabel","Sensor",fn.window("Timestamp","10 seconds")) \
.agg(*generate_agg_funcs(["x","y","z"],[fn.min,fn.avg,fn.max],"ac"))

In [31]:
display(df_watch_ac_agg.limit(10))

Subject-id,ActivityLabel,Sensor,window,x_ac_min,x_ac_avg,x_ac_max,y_ac_min,y_ac_avg,y_ac_max,z_ac_min,z_ac_avg,z_ac_max
1629,A,Accel,"List(1970-01-11T10:21:30.000+0000, 1970-01-11T10:21:40.000+0000)",1.6018705,9.137697841397566,15.272762,-9.903914,-3.347363476432971,-0.5832874,-5.1904798,1.7458085529135532,6.6608186
1629,F,Accel,"List(1970-01-05T09:07:00.000+0000, 1970-01-05T09:07:10.000+0000)",-2.5276785,-1.284932913916621,0.587178,-12.617743,-9.16157512759688,-7.096713,-0.17657238,3.093418779120368,5.1170077
1629,K,Accel,"List(1970-01-05T09:22:10.000+0000, 1970-01-05T09:22:20.000+0000)",-8.149862,-2.582617012088868,6.505046,-14.40247,-6.122367544553766,-2.9294555,-0.61156887,3.905074778906607,12.623578
1621,G,Accel,"List(1970-01-03T02:04:50.000+0000, 1970-01-03T02:05:00.000+0000)",-11.9831295,-7.59980076789856,-3.6297593,-1.7004818,1.7931134255230428,5.3265004,-3.89791,5.042924982085824,11.5734215
1621,I,Accel,"List(1970-01-03T02:02:40.000+0000, 1970-01-03T02:02:50.000+0000)",-14.29114,-7.626484150998294,3.0763996,-10.563816,-2.4000674937386064,2.8485014,-5.061492,0.9482654821593316,10.306889
1621,M,Accel,"List(1970-01-03T01:31:10.000+0000, 1970-01-03T01:31:20.000+0000)",2.30816,10.269557123184205,19.421316,-10.834211,-2.7481784277944827,13.481301,-7.2687964,-0.4010168134071864,9.277532
1621,P,Accel,"List(1970-01-03T01:22:10.000+0000, 1970-01-03T01:22:20.000+0000)",-19.804688,6.196394369353666,19.421316,-19.649662,-0.6229145247396545,19.57634,-19.583971,0.7631786155203978,19.64203
1624,C,Accel,"List(1970-01-04T09:11:40.000+0000, 1970-01-04T09:11:50.000+0000)",-11.475109,-1.9421963656126564,16.936882,-16.28506,-6.577478063454972,1.5134348,-7.5107603,-2.227269415308105,2.382081
1624,G,Accel,"List(1970-01-04T08:48:10.000+0000, 1970-01-04T08:48:20.000+0000)",-9.929652,-7.424412020403354,-4.3391914,-12.650662,-5.365499818113758,0.5102642,-5.8741736,-1.3838068894099036,5.744887
1624,I,Accel,"List(1970-01-04T09:07:20.000+0000, 1970-01-04T09:07:30.000+0000)",-8.379407,-4.125017997026443,-1.4027029,-12.897265,-6.926234000623226,0.24690205,1.0796354,4.589487131237984,7.5032783


In [32]:
#join accelerometer and gyroscope data
df_watch_agg = (df_watch_ac_agg
                .join(df_watch_gr_agg,on=["Subject-id","ActivityLabel","window"])
                .join(df_act,on="ActivityLabel")
               )


In [33]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, IndexToString 
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

In [34]:
#for every spark ML Model, you need to create a dataframe that contains 2 columns: 
# - features column which contains a vector of all of your features
# - label column which contains the target you're trying to predict

#Assemble all features into a signle features column
acc = VectorAssembler(inputCols=['x_ac_min',
                                 'x_ac_avg',
                                 'x_ac_max',
                                 'y_ac_min',
                                 'y_ac_avg',
                                 'y_ac_max',
                                 'z_ac_min',
                                 'z_ac_avg',
                                 'z_ac_max',
                                 'x_gr_min',
                                 'x_gr_avg',
                                 'x_gr_max',
                                 'y_gr_min',
                                 'y_gr_avg',
                                 'y_gr_max',
                                 'z_gr_min',
                                 'z_gr_avg',
                                 'z_gr_max'
                                        ],
    outputCol="features")

#Create numeric label for every activity
si = StringIndexer(inputCol="ActivityDescription",outputCol="label").fit(df_watch_agg)
#Random forest with all deafult params
rf = RandomForestClassifier(numTrees=100)
#convert index back to string
ist = IndexToString(inputCol="prediction", outputCol="PredictedActivity",
                               labels=si.labels)
#merge the 3 steps into a pipeline - similar to an sk learn pipeline
pipe = Pipeline(stages=[acc,si,rf,ist])

In [35]:
train_df = df_watch_agg.where(fn.col("Subject-id") <= 1635)
test_df = df_watch_agg.where(fn.col("Subject-id") > 1635)

In [36]:
#fit model on first 36 subjects
model = pipe.fit(train_df)

In [37]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
predictions = model.transform(test_df)

evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(accuracy)

In [38]:
confusion_matrix = (predictions.select("ActivityDescription",
                   "PredictedActivity")
 .groupBy("ActivityDescription")
 .pivot("PredictedActivity").count())

In [39]:
pd_cf = confusion_matrix.toPandas()

In [40]:
activities = ['walking', 'jogging', 'stairs', 'sitting', 'standing', 'typing',
       'teeth', 'soup', 'chips', 'pasta', 'drinking', 'sandwich',
       'kicking', 'catch', 'dribbling', 'writing', 'clapping', 'folding']

In [41]:
pd_cf = pd_cf.set_index('ActivityDescription').loc[activities,activities].fillna(0)

In [42]:
import seaborn as sns
import matplotlib.pyplot as plt

plt.figure(figsize = (12,7))
sns.heatmap(pd_cf, cmap="Reds",annot=True, annot_kws={"size": 10}, fmt=".0f", linewidths=.5)
sns.despine()
plt.tight_layout()
display()