In [1]:
# cnn model
import numpy as np
from numpy import mean
from numpy import std
from numpy import dstack
from pandas import read_csv
from matplotlib import pyplot
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Flatten
from keras.layers import Dropout
from keras.layers.convolutional import Conv1D
from keras.layers.convolutional import MaxPooling1D
from keras.utils import to_categorical
import os
from sliding_window import sliding_window

In [2]:
# Hardcoded number of sensor channels employed in the gyro X challenge - X,Y,Z
SENSOR_CHANNELS = 6
# Hardcoded number of classes in the gesture recognition problem
NUM_CLASSES = 12

# sampling rate: 50Hz, 200 * 1/50 = 4 seconds
# Hardcoded length of the sliding window mechanism employed to segment the data
SLIDING_WINDOW_LENGTH = 200
# Hardcoded step of the sliding window mechanism employed to segment the data
SLIDING_WINDOW_STEP = 100


# current path
cwd = os.getcwd()

In [3]:
# load a single file as a numpy array
def load_file(filepath):
	dataframe = read_csv(filepath, header=None, delim_whitespace=True)
	return dataframe.values

In [4]:
# load a list of files and return as a 3d numpy array
def load_group(filenames, prefix=''):
    loaded = list()
    for name in filenames:
        data = load_file(prefix + name)
        loaded.append(data)
    # stack group so that features are the 3rd dimension
    loaded = dstack(loaded)
    return loaded

In [5]:
def load_dataset_group(dataset_type):
    path = cwd+'/{}data_combined.txt'.format(dataset_type)
    data = load_file(path)
    return data[:,0:SENSOR_CHANNELS], data[:,-1]

In [6]:
# load the dataset, returns train and test X and y elements
def load_dataset(prefix=''):
    # load all train
    trainX, trainy = load_dataset_group('train')
    print("Train set:" + str(trainX.shape) + str(trainy.shape))
    # load all test
    testX, testy = load_dataset_group('test')
    print("Test set:" + str(testX.shape) + str(testy.shape))

    return trainX, trainy, testX, testy

In [7]:
def har_sliding_window(data_x, data_y, ws, ss):
    data_x = sliding_window(data_x,(ws,data_x.shape[1]),(ss,1))
    data_y = np.asarray([[i[-1]] for i in sliding_window(data_y,ws,ss)])
    return data_x.astype(np.float32), data_y.reshape(len(data_y)).astype(np.uint8)

In [8]:
# fit and evaluate a model
def evaluate_model(trainX, trainy, testX, testy):
    _verbose, epochs, batch_size = 0, 100, 100

    n_timesteps, n_features, n_outputs = SLIDING_WINDOW_LENGTH, SENSOR_CHANNELS, NUM_CLASSES
    #print(n_timesteps, n_features, n_outputs)

    model = Sequential()
    model.add(Conv1D(filters=128, kernel_size=5, activation='relu', input_shape=(n_timesteps,n_features)))
    model.add(Conv1D(filters=128, kernel_size=5, activation='relu'))
    model.add(Conv1D(filters=128, kernel_size=5, activation='relu'))
    model.add(Conv1D(filters=128, kernel_size=5, activation='relu'))
    model.add(Dropout(0.5))
    model.add(MaxPooling1D(pool_size=3))
    model.add(Flatten())
    model.add(Dense(100, activation='relu'))
    model.add(Dense(n_outputs, activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    # fit network
    model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=_verbose)
    # evaluate model
    _, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=_verbose)
    return accuracy

# summarize scores
def summarize_results(scores):
    print(scores)
    m, s = mean(scores), std(scores)
    print('Accuracy: %.3f%% (+/-%.3f)' % (m, s))

In [9]:
# run an experiment
def run_experiment(repeats=10):
    # load data
    trainX, trainy, testX, testy = load_dataset()

    # Sensor data is segmented using a sliding window mechanism
    trainX, trainy = har_sliding_window(trainX, trainy, SLIDING_WINDOW_LENGTH, SLIDING_WINDOW_STEP)
    testX, testy = har_sliding_window(testX, testy, SLIDING_WINDOW_LENGTH, SLIDING_WINDOW_STEP)

    # Data is reshaped
    trainX = trainX.reshape((-1, SLIDING_WINDOW_LENGTH, SENSOR_CHANNELS)) # for input to Conv1D
    testX = testX.reshape((-1, SLIDING_WINDOW_LENGTH, SENSOR_CHANNELS)) # for input to Conv1D

    #convert data to categorical form [0 0 0 0 0 0 0 0 1 0 0 0 0]. Represent class in vector
    # zero-offset class values
    trainy = trainy - 1
    testy = testy - 1
    # one hot encode y
    trainy = to_categorical(trainy)
    testy = to_categorical(testy)

    #with open('TEST-x-data.txt', "a+") as file:
    #    for row in trainX:
    #        file.write(" ".join(str(item) for item in row) + "\n")
    print(" ..after sliding and reshaping, train data: inputs {0}, targets {1}".format(trainX.shape, trainy.shape))
    print(" ..after sliding and reshaping, test data : inputs {0}, targets {1}".format(testX.shape, testy.shape))

    # repeat experiment
    scores = list()
    for r in range(repeats):
        score = evaluate_model(trainX, trainy, testX, testy)
        score = score * 100.0
        print('>#%d: %.3f' % (r+1, score))
        scores.append(score)
    # summarize results
    summarize_results(scores)

# run the experiment
run_experiment()

Train set:(273942, 6)(273942,)
Test set:(125368, 6)(125368,)
 ..after sliding and reshaping, train data: inputs (2738, 200, 6), targets (2738, 12)
 ..after sliding and reshaping, test data : inputs (1252, 200, 6), targets (1252, 12)
200 6 12
>#1: 26.917
200 6 12
>#2: 27.077
200 6 12
>#3: 28.195
200 6 12
>#4: 29.553
200 6 12
>#5: 27.316
200 6 12
>#6: 28.834
200 6 12
>#7: 28.115
200 6 12
>#8: 27.396
200 6 12
>#9: 29.233
200 6 12
>#10: 26.997
[26.916933059692383, 27.07667648792267, 28.194889426231384, 29.55271601676941, 27.316293120384216, 28.833866119384766, 28.115016222000122, 27.39616632461548, 29.2332261800766, 26.996806263923645]
Accuracy: 27.963% (+/-0.924)
