**Inspired on** https://machinelearningmastery.com/how-to-develop-rnn-models-for-human-activity-recognition-time-series-classification/

> Using the raw dataset instead (https://archive.ics.uci.edu/ml/datasets/Smartphone-Based+Recognition+of+Human+Activities+and+Postural+Transitions).

We'll be focusing on the WALKING activity.

In [None]:
import numpy as np
import pandas as pd
# from functools import reduce

data_location = '../datasets/uci-har-raw/'
data_location_raw = data_location + 'RawData/'
labels_of_interest = [1]

def extract(file, begin, end):
    partial_df = pd.read_csv(file, skipinitialspace=True, delim_whitespace=True, header=None)
    partial_df.columns = ['x', 'y', 'z']
    partial_df = partial_df.iloc[begin:end]

    return partial_df

def get_sensors_data(row):
    
    sensors = ['acc', 'gyro']
    frames = list()

    for sensor in sensors:
        file = data_location_raw + sensor + '_exp' + "{:02}".format(row['experiment']) + '_user' + "{:02}".format(row['user']) + '.txt'
        target = extract(file, row['begin'], row['end'] + 1)
        target.columns = [f'{sensor}_x', f'{sensor}_y', f'{sensor}_z']
        # Validation & segmentation purposes
        target[f'{sensor}_user'] = row['user']
        target[f'{sensor}_exp'] = row['experiment']
        frames.append(target)

    return frames[0].join(frames[1])

def run_experiment():
    
    labels_of_interest = [1]

    df_guide = pd.read_csv(data_location_raw + 'labels.txt', delim_whitespace=True, header=None)
    df_guide.columns = ['experiment', 'user', 'label', 'begin', 'end']

    df_target = df_guide[df_guide['label'].isin(labels_of_interest)]
    
    axis_df = pd.DataFrame()
    
    for index, row in df_target.iterrows():
        axis_df = axis_df.append(get_sensors_data(row), ignore_index=True, sort=False)
    
    axis_df['label'] = labels_of_interest[0]
    
    return axis_df

result = run_experiment()

### Dataset building

In [None]:
df_raw = result.copy()

df_raw.columns = ["acx", "acy", "acz", "acu", "ace", "gyx", "gyy", "gyz", "gyu", "gye", "label"]
# Cheap tri-axial accelerometer normalization
df_raw['sac'] = np.sqrt(np.power(df_raw['acx'], 2) + np.power(df_raw['acy'], 2) + np.power(df_raw['acz'], 2))
# Cheap tri-axial gyroscope normalization
df_raw['sgy'] = np.sqrt(np.power(df_raw['gyx'], 2) + np.power(df_raw['gyy'], 2) + np.power(df_raw['gyz'], 2))

df_raw.drop(["acx", "acy", "acz", "acu", "ace", "gyx", "gyy", "gyz"], axis=1, inplace=True)
df_raw.rename(columns = { 'gyu': 'user', 'gye': 'experiment' }, inplace=True)
df_raw['seq'] = pd.Index(range(df_raw.count()[0]))
df_raw.set_index(df_raw['seq'], drop=False)

df_raw.plot.line('seq', ['sac', 'sgy'], alpha=0.5)
df_raw.plot.scatter(x='sac', y='sgy')

## Train / Test set (TODO: validation missing.)

In [None]:
from keras.utils import to_categorical

def get_sets():
    X_train = df_raw[df_raw['user'].isin(range(1,22))]
    y_train = X_train['label']
    X_train.drop(['seq', 'label', 'user', 'experiment'], axis=1, inplace=True)

    X_test = df_raw[df_raw['user'].isin(range(22,31))]
    y_test = X_test['label']
    X_test.drop(['seq', 'label', 'user', 'experiment'], axis=1, inplace=True)

    trainy = y_train.values
    testy = y_test.values
    # zero-offset class values
    trainy = trainy - 1
    testy = testy - 1
	# one hot encode y
    trainy = to_categorical(trainy)
    testy = to_categorical(testy)

    X_train.info()

    return X_train, trainy, X_test, testy

## Windowing

In [None]:
# TODO: optimize this block.

def windowed():

    _HALF_SEC = 25 # for a 50hz capture

    trainX, trainy, testX, testy = get_sets()

    loaded_train = list()

    df_train = pd.DataFrame(trainX, columns=['sac', 'sgy'])

    sac_data = df_train.iloc[:,0].values
    loaded_train.append(sac_data)
    loaded_train_stack = dstack(loaded_train)

    sgy_data = df_train.iloc[:,1].values
    loaded_train.append(sgy_data)
    loaded_train_stack = dstack(loaded_train)

    trainX = loaded_train_stack

    loaded_test = list()

    df_test = pd.DataFrame(testX, columns=['sac', 'sgy'])

    sac_data = df_test.iloc[:,0].values
    loaded_test.append(sac_data)
    loaded_test_stack = dstack(loaded_test)

    sgy_data = df_test.iloc[:,1].values
    loaded_test.append(sgy_data)
    loaded_test_stack = dstack(loaded_test)

    testX = loaded_test_stack
    # data = trainX['sac'].rolling(50)
    # print(data)
    trainX = np.moveaxis(trainX, [1],[0])
    testX = np.moveaxis(testX, [1],[0])
    return trainX, trainy, testX, testy

    # agg_array = np.array([0,0,0,0,0,0,0,0])
    # for i in range(0,len(df_raw), _HALF_SEC):
    #     window_measure = df_raw.iloc[i:i+4,7]
    #     window_measure_gy = df_raw.iloc[i:i+4,8]
    #     window_time = df_raw.iloc[i:i+4,6]
    #     to_add = np.array(
    #         [window_measure.mean(),
    #         window_measure.max(),
    #         window_measure.min(),
    #         window_measure_gy.mean(),
    #         window_measure_gy.max(),
    #         window_measure_gy.min(),
    #         window_time.min(),
    #         window_time.max()])
    #     agg_array = np.vstack((agg_array, to_add))
    # agg_array = np.delete(agg_array, 0, axis=0)

## LSTM

In [None]:
# lstm model
from numpy import mean
from numpy import std
from numpy import dstack
from pandas import read_csv
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from keras.layers import LSTM
from keras.utils import to_categorical
from matplotlib import pyplot

# [removed dataset loading code...]

# fit and evaluate a model

def evaluate_model(trainX, trainy, testX, testy):
	verbose, epochs, batch_size = 0, 15, 64
	
	# n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]
	n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]

	model = Sequential()
	model.add(LSTM(100, input_shape=(n_timesteps,n_features), return_sequences=False))
	model.add(Dropout(0.5))
	model.add(Dense(100, activation='relu'))
	model.add(Dense(n_outputs, activation='softmax'))
	model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
	# fit network
	model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)
	# evaluate model
	_, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0)
	return accuracy

# summarize scores
def summarize_results(scores):
	print(scores)
	m, s = mean(scores), std(scores)
	print('Accuracy: %.3f%% (+/-%.3f)' % (m, s))

# run an experiment
def run_experiment(repeats=10):
	# load data
	# trainX, trainy, testX, testy = load_dataset()
	trainX, trainy, testX, testy = windowed()

	# repeat experiment
	scores = list()
	for r in range(repeats):
		score = evaluate_model(trainX, trainy, testX, testy)
		score = score * 100.0
		print('>#%d: %.3f' % (r+1, score))
		scores.append(score)
	# summarize results
	summarize_results(scores)

# run the experiment
run_experiment()