In [1]:
import numpy as np
import matplotlib.pyplot as plt
import ipywidgets as widgets
import seaborn as sn
import pandas as pd

from scipy import signal
import pywt

import os
import time
import datetime
import pickle
import ipywidgets as widgets

from tensorflow.keras import models
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import classification_report, accuracy_score

from platform import python_version
print(python_version())
import warnings
warnings.simplefilter("ignore")

3.8.8


# Read and Prepare Data

In [2]:
def readData(accDir, annotFile):
    files = os.listdir(accDir)
    files_csv = [f for f in files if f[-3:] == 'csv']
    empatica_dict = dict()
    for f in files_csv:
        data = np.genfromtxt(accDir+f, delimiter=',') # creates numpy array for each Empatica acc csv file
        key = int(float(f.strip("ACC.csv")))
        empatica_dict[key] = data
    tmp = pd.read_excel(annotFile, sheet_name=None)
    annot_dict = dict(zip(tmp.keys(), [i.dropna() for i in tmp.values()])) # Remove the rows with NaN values (some with ladder 2 missing)
    return empatica_dict, annot_dict

def getLabeledDict(empatica_dict, annot_dict, subject_ids, SR):
    labeled_dict = {}; taskInd_dict = {}
    for id in subject_ids:
        start_time = int(empatica_dict[id][0,0])
        acc = empatica_dict[id][2:,:]
        label = list(map(lambda i: i.replace("_end", "").replace("_start", ""), annot_dict['P'+ str(id)].taskName.tolist()))
        task_time= list(map(lambda i: time.mktime(datetime.datetime.strptime(i[:6] + '20' + i[6:], "%m/%d/%Y %H:%M:%S").timetuple()),
                            annot_dict['P'+ str(id)].startTime_global.tolist()))
        task_ind = [int(x - start_time)*SR for x in task_time]
        taskInd_dict[id] = task_ind
        label_tmp = np.empty(acc.shape[0], dtype=object)
        for i, (j, k) in enumerate(zip(task_ind[0::2], task_ind[1::2])):
            tmpInd = 2*i
            label_tmp[j:k] = label[tmpInd]
        acc_mag = np.sqrt(np.sum(acc**2, axis=1))[:,None]
        accel = np.hstack((acc, acc_mag))
        labeled_dict[id] = pd.DataFrame(np.hstack((accel, label_tmp.reshape(label_tmp.shape[0],1))), columns=['X', 'Y', 'Z', 'Mag', 'label'])
    return labeled_dict, taskInd_dict

In [3]:
sepAccDict, sepAnnotDict = readData(accDir='./Data/Acc Data/', annotFile='./Data/Annotation Data/separate.xlsx')
SR=int(sepAccDict[8][1,0])

sepSubIDs = list(range(8,45))
sepLabeledDict_, sepTaskIndDict = getLabeledDict(sepAccDict, sepAnnotDict, sepSubIDs, SR)

## Apply Low Pass Filter

In [4]:
# Apply Filter on All Subjects
n=4; fc=2; w=fc/(SR/2)
b, a = signal.butter(n, w, 'low')
sepLabeledDict_filtered = dict(map(lambda key: (key, signal.filtfilt(b, a, x=sepLabeledDict_[key].drop(columns='label'), axis=0)), sepLabeledDict_.keys()))
# back to DF and add label
sepLabeledDict_filtered_dfs = dict(map(lambda key: (
                                                        key, pd.DataFrame(sepLabeledDict_filtered[key],columns=['X', 'Y', 'Z', 'Mag']).assign(label=sepLabeledDict_[key].label)
                                                    ), sepLabeledDict_filtered.keys()))
# Remove data without label
filt_noNA_dict = dict(map(lambda key: (key, sepLabeledDict_filtered_dfs[key].dropna()), sepLabeledDict_filtered_dfs.keys()))

# Load Trained Model

In [5]:
model = models.load_model('trained_model/tf_model/')
test_subs = pd.read_pickle('trained_model/test_subs.pickle')
history = pd.read_pickle('trained_model/keras_history.pickle')
scaler = pd.read_pickle('trained_model/scaler.pickle')
labels_categorical = ['sit', 'stand', 'walk', 'hoist', 'lift', 'push', 'type', 'ladder', 'electricPanel', 'overhead']
labels_encoded = LabelEncoder().fit_transform(labels_categorical).tolist()
labels_dict = dict([(numeric_, categorical_) for numeric_, categorical_ in zip(labels_encoded, labels_categorical)])

## Activity Classification on Test (Unseen) Subjects

In [6]:
cwt_dict = {}
label_dict = {}
for sub in sorted(test_subs):
    tasks = ['sit', 'stand', 'walk', 'hoist', 'lift', 'push', 'type', 'ladder1', 'ladder2', 'electricPanel', 'overhead']
    sig_ = filt_noNA_dict[sub]
    sig = sig_[sig_.label.isin(tasks)]
    winLen = 320
    scales = range(1,200)
    waveletname = 'morl'
    cwt_list = []
    label_dict[sub] = []
    for label, label_df in sig.groupby(by='label'):
        for window, window_df in label_df.groupby(by=np.arange(label_df.shape[0])//winLen):
            if window_df.shape[0] == winLen:
                label_dict[sub].append(label)
                ########### cwt transform
                tmp = np.zeros((max(scales), winLen, 3))
                for i in range(3):
                    tmp_sig = window_df.values[:,i]
                    coeff, freq = pywt.cwt(tmp_sig, scales, waveletname, 1/SR)
                    tmp[:,:,i] = coeff
                cwt_list.append(tmp)
    cwt_dict[sub] = np.array(cwt_list)
    label_dict[sub] = [elem.replace("1", "").replace("2", "") for elem in label_dict[sub]]

In [7]:
pred_labels_dict = {}
true_pred_dict = {}
for sub in sorted(test_subs):
    ############## predict label
#     scaler = np.max(cwt_dict[sub])
    pred_labels_dict[sub] = {}
    pred_label = model.predict(cwt_dict[sub]/scaler)
    numeric_pred = np.argmax(pred_label, axis=1)
    pred_labels_dict[sub] = [labels_dict[elem] for elem in numeric_pred]
    true_pred_dict[sub] = pd.DataFrame({'true': label_dict[sub], 'predicted': pred_labels_dict[sub]})
################### Store Reports
report_df_dict = {}
acc = {}
for sub in sorted(test_subs):
    report_dict = classification_report(true_pred_dict[sub].true, true_pred_dict[sub].predicted, output_dict=True)
    report_df_dict[sub] = pd.DataFrame(report_dict).drop(columns=['macro avg', 'weighted avg'], index='support').round(decimals=2)

# Results
## Plot Confusion Matrix for Each Subject

In [8]:
sub_tab = [widgets.Output() for i in range(len(test_subs))]
tab = widgets.Tab(sub_tab)
for i, sub in enumerate(sorted(test_subs)):
    tab.set_title(i, 'Subject ' + str(sub))
    with sub_tab[i]:
        cm = confusion_matrix(true_pred_dict[sub].true, true_pred_dict[sub].predicted)#, labels=true_pred_dict[sub].true.unique())
        disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=true_pred_dict[sub].true.unique())
        fig, ax = plt.subplots(1, figsize=(10, 10))
        disp.plot(ax=ax, cmap=plt.cm.Blues)
        plt.show()
        display(report_df_dict[sub])
display(tab)

Tab(children=(Output(), Output(), Output(), Output(), Output(), Output(), Output(), Output()), _titles={'0': '…

## Export Confusion Matrix

In [None]:
sub = 8
cm = confusion_matrix(true_pred_dict[sub].true, true_pred_dict[sub].predicted, labels=true_pred_dict[sub].true.unique())
df_cm = pd.DataFrame(cm, index=true_pred_dict[sub].true.unique(), columns=true_pred_dict[sub].true.unique())
df_cm.to_csv('./outputs/CNN classification/cm_19.csv')
df_cm = df_cm.rename(columns={'electricPanel':'EP', 'hoist':'H', 'ladder':'Ld', 'lift':'Lf', 'overhead':'OH', 'push':'P', 'sit':'St', 'stand':'Sd', 'type':'Tp', 'walk':'W'},
             index={'electricPanel':'EP', 'hoist':'H', 'ladder':'Ld', 'lift':'Lf', 'overhead':'OH', 'push':'P', 'sit':'St', 'stand':'Sd', 'type':'Tp', 'walk':'W'})

f, ax = plt.subplots(1, figsize=(7,7))
sn.heatmap(df_cm, cmap=plt.cm.Blues, annot=True, cbar=False, annot_kws={"fontsize":20})
ax.tick_params(axis='both', labelsize=20)
ax.tick_params(axis='y', rotation=0)
ax.set_ylabel('True Label', fontsize=20)
ax.set_xlabel('Predicted Label', fontsize=20)
ax.set_title( 'Confusion Matrix', fontsize=25)
f.tight_layout(pad=0.1)
f.savefig('./outputs/cnf_mtrx/cnf ' + str(sub) + '.png')
# plt.show()

## Export Subject-specific Classification Report

In [336]:
for sub in report_df_dict.keys():
    report_df_dict[sub].to_csv('./outputs/CNN classification/' + str(sub) + '.csv')
    
report_df_dict[8]

## Overal Test Subject Accuracy Excluding The Left-handed Subject

In [14]:
true_pred_dict_list = [true_pred_dict[key] for key in true_pred_dict.keys() if key!=19]
test_true_pred_df = pd.concat(true_pred_dict_list, ignore_index=True)
overall_test_acc = accuracy_score(test_true_pred_df.true, test_true_pred_df.predicted)
print(overall_test_acc)

0.9642160052049447
