In [1]:
%matplotlib inline

from dask.distributed import Client
import dask.dataframe as dd
import matplotlib.pyplot as plt
import joblib
import numpy as np
import pandas as pd

In [2]:
# Start and connect to local client

client = Client(n_workers=4)
# client = Client("scheduler-address:8786")  # connecting to remote cluster

In [3]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:34985  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 5.18 GB


# 1. Data Preparation

## a) Load Data

In [4]:
# Read in files

In [5]:
def read_file(filepath):
    df = dd.read_csv(filepath, sep = ',', header = None)
    df.columns = ['subject_id', 'activity_code', 'timestamp', 'x_coord', 'y_coord', 'z_coord']
    df['timestamp_dt'] = dd.to_datetime(df['timestamp'])
    df['z_coord'] = df['z_coord'].str.replace(";","").astype('float64') # remove ; and ensure float (having issues with lineterminator)
    return df

In [6]:
# phone data
phone_accel_df = read_file('wisdm-dataset/raw/phone/accel/*.txt')
phone_gyro_df = read_file('wisdm-dataset/raw/phone/gyro/*.txt')

# watch data
watch_accel_df = read_file('wisdm-dataset/raw/watch/accel/*.txt')
watch_gyro_df = read_file('wisdm-dataset/raw/watch/gyro/*.txt')

In [7]:
watch_gyro_df.dtypes

subject_id                int64
activity_code            object
timestamp                 int64
x_coord                 float64
y_coord                 float64
z_coord                 float64
timestamp_dt     datetime64[ns]
dtype: object

## b) Exploratory Data Analysis

In [None]:
def plot_subject_activity(df, title_append=None):
    subject_id = df.head(1)['subject_id'][0]
    activity_code = df.head(1)['activity_code'][0]
    title = f'Subject {subject_id} Performing Activity {activity_code}' 
    if title_append != None:
        title+=str(f' ({title_append})')
        
    fig, ax = plt.subplots(3, figsize=(20,10))
    fig.suptitle(title)
    ax[0].plot(range(len(df)), df['x_coord'].compute().to_numpy())
    ax[0].set_title('x_coord')
    ax[1].plot(range(len(df)), df['y_coord'].compute().to_numpy())
    ax[1].set_title('y_coord')
    ax[2].plot(range(len(df)), df['z_coord'].compute().to_numpy())
    ax[2].set_title('z_coord')
    ax[2].set_xlabel('Interval')

In [None]:
temp = phone_gyro_df[(phone_gyro_df['subject_id'] == 1600) & (phone_gyro_df['activity_code'] == 'A')]
plot_subject_activity(temp, 'Phone Gyro')

In [None]:
print(f'len(phone_accel_df):\t{len(phone_accel_df)}')
print(f'len(phone_gyro_df):\t{len(phone_gyro_df)}')
print(f'len(watch_accel_df):\t{len(watch_accel_df)}')
print(f'len(watch_gyro_df):\t{len(watch_gyro_df)}')

Row counts match description within the provided documentation. That being said, intuitively, one might think there would be equal size accelerometer and gyroscope data for each respective device, but that is not the case. If we want to use the different device sensor data in conjunction, we will have to ensure an equal amount of rows (len(phone_accel_df) == len(phone_gyro_df)). We will attempt to join on the timestamps to accomplish this.

In [None]:
phone_accel_sub_act_interval_counts = phone_accel_df.groupby(['subject_id', 'activity_code']).size().rename('count').reset_index()
len(phone_accel_sub_act_interval_counts)
# Expected result if all subjects performed all activities: 51 x 18 = 918

In [None]:
phone_gyro_sub_act_interval_counts = phone_gyro_df.groupby(['subject_id', 'activity_code']).size().rename('count').reset_index()
len(phone_gyro_sub_act_interval_counts)
# Expected result if all subjects performed all activities: 51 x 18 = 918

In [None]:
phone_accel_sub_act_counts = phone_accel_sub_act_interval_counts.groupby('subject_id').size().rename('activity_count').reset_index()
phone_accel_sub_act_counts[phone_accel_sub_act_counts['activity_count'] < 18].compute()
# Our slackers that didn't perform all 18 activities with phone accel

In [None]:
phone_gyro_sub_act_counts = phone_gyro_sub_act_interval_counts.groupby('subject_id').size().rename('activity_count').reset_index()
phone_gyro_sub_act_counts[phone_gyro_sub_act_counts['activity_count'] < 18].compute()
# Our slackers that didn't perform all 18 activities with phone gyro

Additional Takeaways:
- Not all subjects performed all activities with phone sensors
- Some subjects may have performed certain activities with only 1 of the phone sensors

In [None]:
# Join gyroscope and accelerometer data by device

In [8]:
# Timestamps between sensors not guaranteed to be "perfectly" aligned
# Including activity_code in inner_join will only grab data when available for both sensors
combined_phone_df = phone_accel_df.merge(phone_gyro_df, on=["subject_id", "activity_code", "timestamp_dt"], how="inner", suffixes=['_accel', '_gyro'])
combined_phone_df.head()

Unnamed: 0,subject_id,activity_code,timestamp_accel,x_coord_accel,y_coord_accel,z_coord_accel,timestamp_dt,timestamp_gyro,x_coord_gyro,y_coord_gyro,z_coord_gyro
0,1600,A,252208623536856,0.889343,4.531982,-0.228958,1970-01-03 22:03:28.623536856,252208623536856,0.910889,-0.612137,0.197403
1,1600,A,252208875306876,1.651108,13.003159,-2.630463,1970-01-03 22:03:28.875306876,252208875306876,1.082046,0.425598,0.237305
2,1600,A,252212249025138,-0.697052,11.57724,3.471695,1970-01-03 22:03:32.249025138,252212249025138,-0.653595,0.831863,0.515823
3,1600,A,252217435498358,-0.366928,8.423248,3.045731,1970-01-03 22:03:37.435498358,252217435498358,-0.254837,-0.243484,-0.491852
4,1600,A,252217939038397,-7.973221,15.072464,-0.96936,1970-01-03 22:03:37.939038397,252217939038397,-1.388626,-0.085068,0.710587


In [9]:
# Cleanup columns
combined_phone_df = combined_phone_df.drop(['timestamp_gyro'], axis=1)
combined_phone_df = combined_phone_df.rename(columns={'timestamp_accel': 'timestamp'})
combined_phone_df.head()

Unnamed: 0,subject_id,activity_code,timestamp,x_coord_accel,y_coord_accel,z_coord_accel,timestamp_dt,x_coord_gyro,y_coord_gyro,z_coord_gyro
0,1600,A,252208623536856,0.889343,4.531982,-0.228958,1970-01-03 22:03:28.623536856,0.910889,-0.612137,0.197403
1,1600,A,252208875306876,1.651108,13.003159,-2.630463,1970-01-03 22:03:28.875306876,1.082046,0.425598,0.237305
2,1600,A,252212249025138,-0.697052,11.57724,3.471695,1970-01-03 22:03:32.249025138,-0.653595,0.831863,0.515823
3,1600,A,252217435498358,-0.366928,8.423248,3.045731,1970-01-03 22:03:37.435498358,-0.254837,-0.243484,-0.491852
4,1600,A,252217939038397,-7.973221,15.072464,-0.96936,1970-01-03 22:03:37.939038397,-1.388626,-0.085068,0.710587


In [None]:
len(combined_phone_df)

In [None]:
combined_phone_df.isna().sum().compute()

In [None]:
corr_matrix = combined_phone_df.corr()
corr_matrix.compute().style.background_gradient(cmap='coolwarm')

In [None]:
combined_phone_df.set_index('timestamp_dt').groupby(['subject_id', 'activity_code', pd.Grouper(freq='3S')]).size().head(50) # 3S = 3 seconds

In [10]:
phone_grouped_df = combined_phone_df.set_index('timestamp_dt').groupby(['subject_id', 'activity_code', pd.Grouper(freq='3S')])

In [11]:
phone_grouped_means_df = phone_grouped_df.mean()
phone_grouped_means_df.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,timestamp,x_coord_accel,y_coord_accel,z_coord_accel,x_coord_gyro,y_coord_gyro,z_coord_gyro
subject_id,activity_code,timestamp_dt,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
1611,F,1970-01-01 00:19:21,1163260000000.0,8.729862,3.539461,-2.11156,-0.038073,0.060864,0.023982
1611,F,1970-01-01 00:19:24,1165501000000.0,8.820844,3.439254,-1.733797,-0.003111,0.006763,0.003343
1611,F,1970-01-01 00:19:27,1168497000000.0,8.867655,3.398671,-1.600897,-0.000189,0.00211,0.001944
1611,F,1970-01-01 00:19:30,1171493000000.0,8.887162,3.378646,-1.541527,-0.00044,0.000399,0.003515
1611,F,1970-01-01 00:19:33,1174489000000.0,8.889435,3.361615,-1.537518,0.001218,0.000198,0.000884


In [None]:
len(phone_grouped_means_df)

In [None]:
combined_watch_df = watch_accel_df.merge(watch_gyro_df, on=["subject_id","timestamp"], how="inner")
combined_watch_df.compute()

In [None]:
# Group by subject id, activity, time interval window (3s) - mean, std x, y, z with group by

# https://stackoverflow.com/questions/35898667/group-by-time-and-other-column-in-pandas

In [None]:
# Join back together for single dataframe 

# 2. Model Selection & Training

In [12]:
# train test split
import dask_ml.model_selection
#train_test_split with 80-20 split
X_train, X_test = dask_ml.model_selection.train_test_split(phone_grouped_means_df, shuffle = True, random_state=0, test_size = 0.2, train_size=0.8)

In [13]:
X_train.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,timestamp,x_coord_accel,y_coord_accel,z_coord_accel,x_coord_gyro,y_coord_gyro,z_coord_gyro
subject_id,activity_code,timestamp_dt,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
1633,L,1970-01-09 00:25:36,692737500000000.0,-0.037645,-8.722009,-4.797704,-0.000569,-0.008274,0.000912
1624,R,1970-01-06 23:18:57,515938500000000.0,7.010596,-4.339497,-5.392845,-0.001075,-0.00046,0.00131
1634,B,1970-01-01 21:42:30,78151510000000.0,-3.533785,7.675023,-1.576748,0.074562,0.06289,-0.047029
1650,Q,1970-01-05 03:11:54,357116200000000.0,-4.233556,-4.638135,-7.096383,0.001411,0.004661,0.000613
1613,I,1970-01-03 01:02:06,176527500000000.0,2.339335,5.202153,7.674111,-0.001065,8.2e-05,0.023716


In [14]:
len(X_train)

37447

In [15]:
len(X_test)

9217

In [16]:
#reset the indices of the training data so that we can separate out the activity_code as labels
X_train_reset = X_train.reset_index()
#create a separate df just for the training labels
X_train_labels = X_train_reset['activity_code']
#drop the timestamp columns and the activity_code column from the index reset training data
X_train_reset = X_train_reset.drop(['timestamp_dt', 'timestamp', 'activity_code'], axis=1)
X_train_reset.head()

Unnamed: 0,subject_id,x_coord_accel,y_coord_accel,z_coord_accel,x_coord_gyro,y_coord_gyro,z_coord_gyro
0,1633,-0.037645,-8.722009,-4.797704,-0.000569,-0.008274,0.000912
1,1624,7.010596,-4.339497,-5.392845,-0.001075,-0.00046,0.00131
2,1634,-3.533785,7.675023,-1.576748,0.074562,0.06289,-0.047029
3,1650,-4.233556,-4.638135,-7.096383,0.001411,0.004661,0.000613
4,1613,2.339335,5.202153,7.674111,-0.001065,8.2e-05,0.023716


In [17]:
X_train_labels.head()

0    L
1    R
2    B
3    Q
4    I
Name: activity_code, dtype: object

In [18]:
# fit model
from sklearn.ensemble import RandomForestClassifier
randomForestClassifier = RandomForestClassifier(random_state=0)
with joblib.parallel_backend('dask'):
    randomForestClassifier.fit(X_train_reset, X_train_labels)
# hyperparamter tuning depending on model selected (if time permits)

# 3. Model Test

In [19]:
X_test.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,timestamp,x_coord_accel,y_coord_accel,z_coord_accel,x_coord_gyro,y_coord_gyro,z_coord_gyro
subject_id,activity_code,timestamp_dt,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
1609,D,1970-01-02 12:56:27,132988500000000.0,2.684975,-3.660209,-9.150366,0.003215,-0.001748,0.001136
1610,D,1970-01-01 04:21:27,15688510000000.0,8.921557,3.392493,-0.587196,0.000492,-5e-05,-0.000329
1648,D,1970-01-06 08:30:36,462637200000000.0,-9.315328,-0.51501,-1.785061,0.001336,0.001535,-0.00074
1644,H,1970-01-22 00:43:15,1816996000000000.0,-0.158696,-9.247032,-0.988553,0.021166,-0.006608,-0.001528
1606,L,1970-01-02 11:11:12,126673500000000.0,-9.905204,-0.660528,-0.536415,0.006033,-0.01479,-0.003439


In [20]:
#reset the indices of the testing data so that we can separate out the activity_code as labels
X_test_reset = X_test.reset_index()
#create a separate df just for the testing labels
X_test_labels = X_test_reset['activity_code']
#drop the timestamp columns and the activity_code column from the index reset testing data
X_test_reset = X_test_reset.drop(['timestamp_dt', 'timestamp', 'activity_code'], axis=1)
X_test_reset.head()

Unnamed: 0,subject_id,x_coord_accel,y_coord_accel,z_coord_accel,x_coord_gyro,y_coord_gyro,z_coord_gyro
0,1609,2.684975,-3.660209,-9.150366,0.003215,-0.001748,0.001136
1,1610,8.921557,3.392493,-0.587196,0.000492,-5e-05,-0.000329
2,1648,-9.315328,-0.51501,-1.785061,0.001336,0.001535,-0.00074
3,1644,-0.158696,-9.247032,-0.988553,0.021166,-0.006608,-0.001528
4,1606,-9.905204,-0.660528,-0.536415,0.006033,-0.01479,-0.003439


In [21]:
X_test_labels.head()

0    D
1    D
2    D
3    H
4    L
Name: activity_code, dtype: object

In [None]:
# test /validate and provide final accurracy
# with joblib.parallel_backend('dask'):
#     score = randomForestClassifier.score(X_test_reset, X_test_labels)
# score

In [None]:
# from sklearn.metrics import confusion_matrix
# with joblib.parallel_backend('dask'):
#     confusion_matrix(X_test_labels, randomForestClassifier.predict(X_test_reset) )

In [22]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
with joblib.parallel_backend('dask'):
    predicted_labels = randomForestClassifier.predict(X_test_reset)

In [23]:
confusion_matrix(X_test_labels, predicted_labels)

array([[407,   9,  24,   1,   1,   0,   0,   0,   0,   0,   2,   4,  14,
          7,   5,   0,   1,  14],
       [ 20, 439,  20,   1,   1,   1,   0,   3,   4,   1,   0,   1,  13,
         12,   4,   0,   0,   3],
       [ 43,  18, 331,   2,   8,   1,   0,   1,   1,   1,   1,   0,  47,
         16,  15,   0,   0,   9],
       [  1,   0,   2, 537,   0,   1,   6,   2,   0,   2,   1,   5,   0,
          1,   1,   1,   1,   1],
       [  7,   1,   6,   1, 480,   0,   1,   0,   1,   0,   0,   1,   5,
          7,   6,   0,   2,  15],
       [  2,   3,   0,   3,   8, 499,   3,   2,   2,   1,   2,   2,   0,
          1,   2,   4,   2,   4],
       [  3,   0,   1,   5,   3,   2, 499,   6,   4,   4,   5,   2,   3,
          1,   1,   0,   5,   2],
       [  0,   2,   2,   5,   1,   5,  12, 414,   7,   9,   7,   5,   0,
          1,   1,   1,   4,   2],
       [  2,   0,   0,   3,   0,   4,  10,  15, 400,   5,  11,  10,   1,
          1,   1,   4,   5,   3],
       [  1,   1,   0,   2,   2,   3,

In [24]:
accuracy_score(X_test_labels, predicted_labels)

0.8342193772377129

# 4. Results & Conclusion

In [None]:
# Summarize final results and conclusion - reinclude any helpful charts/graphs

In [None]:
client.shutdown()