## Importing library

In [1]:
import matplotlib.pyplot as plt
from scipy.fftpack import fft
from scipy import signal
from enum import Enum
import tensorflow as tf
import pandas as pd
import numpy as np
import random
import keras
import sys

DOMAINS = Enum('DOMAINS', 'Time_Nn Time_Ny Freq_Nn Freq_Ny Both_Nn Both_Ny', start=0)

SIGNALS = [
    "body_acc_x", "body_acc_y", "body_acc_z",
    "body_gyro_x", "body_gyro_y", "body_gyro_z",
    "total_acc_x", "total_acc_y", "total_acc_z"
]

ACTIVITIES = {
    0: 'WALKING',
    1: 'WALKING_UPSTAIRS',
    2: 'WALKING_DOWNSTAIRS',
    3: 'SITTING',
    4: 'STANDING',
    5: 'LAYING',
}

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
Using TensorFlow backend.


## Load data

In [2]:
from scipy.fftpack import fft

In [3]:
def _read_csv(subset, signal):
    if subset=='p':   return pd.read_csv(f'./Data_smartphone/{signal}.txt', delim_whitespace=True, header=None)
    elif subset=='a': return pd.read_csv(f'./Data_arduino/{signal}.txt', delim_whitespace=True, header=None)
    else:             return pd.read_csv(f'./Data_UCI/{signal}_{subset}.txt', delim_whitespace=True, header=None)

#------------------------------------------------------------------------------
def Load_data(subset):
    data = []
    for signal in SIGNALS:
        data.append( _read_csv(subset, signal).to_numpy())
    return data

def Load_target(subset):
    return np.eye(6, dtype=int)[_read_csv(subset, 'y')[0]-1]

In [4]:
# UCI
raw_train, raw_test = Load_data('train'), Load_data('test')
raw_y_train, y_test = Load_target('train'), Load_target('test')

# Smartphone
raw_phone = Load_data('p')
y_phone = Load_target('p')

# Arduino
raw_arduino = Load_data('a')
y_arduino = Load_target('a')

## Preprocess

In [5]:
expand = 'e' # n/e, n: 9 signals; e: 12 signals

shuffle_index = np.arange(raw_y_train.shape[0])
np.random.shuffle(shuffle_index)

In [6]:
def _normalization(data):
    normalized = np.empty(shape=data.shape)
    mean = np.mean(data, axis=1)
    std = np.std(data, axis=1)
    for row in range(data.shape[0]):
        if not std[row] == 0:
            for col in range(data.shape[1]):
                normalized[row][col] = (data[row][col] - mean[row]) / std[row]
    return normalized

def _FFT(data):
    return abs(fft(data))/128

def _shuffle(data, index):
    return data[index]

#------------------------------------------------------------------------------
def Expand(expand, data):
    new_data = []
    for i in range(9) : new_data.append(data[i])
    if expand=='e':
        for i in range(3): new_data.append(data[i])
    return new_data

def Normalize(data):
    new_data = []
    for i in range(len(data)):
        new_data.append( _normalization(data[i]) )
    return new_data

def Time_to_Freq(data):
    new_data = []
    for i in range(len(data)):
        new_data.append( _FFT(data[i]) )
    return new_data

def Reshape(data):
    data = np.transpose(data, (1,2,0))
    return data.reshape(data.shape[0], data.shape[1], data.shape[2], 1)

def Combine(time, freq):
    return np.transpose([time, freq], (2,3,1,0))

def Shuffle(index, data, channel, y):
    new_data = np.empty(shape=data.shape)
    for ch in range(channel):
        for i in range(len(index)):
            if y==True: new_data[i,:] = data[index[i],:]
            else: new_data[i,:,:,ch] = data[index[i],:,:,ch]
    return new_data

In [7]:
# UCI
time_Nn_train, time_Nn_test = Expand(expand, raw_train), Expand(expand, raw_test)
time_Ny_train, time_Ny_test = Normalize(time_Nn_train), Normalize(time_Nn_test)

# Smartphone
time_Nn_phone = Expand(expand, raw_phone)
time_Ny_phone = Normalize(time_Nn_phone)

# Arduino
time_Nn_arduino = Expand(expand, raw_arduino)
time_Ny_arduino = Normalize(time_Nn_arduino)

In [8]:
# UCI
freq_Nn_train, freq_Nn_test = Time_to_Freq(time_Nn_train), Time_to_Freq(time_Nn_test)
freq_Ny_train, freq_Ny_test = Time_to_Freq(time_Ny_train), Time_to_Freq(time_Ny_test)

# Smartphone
freq_Nn_phone = Time_to_Freq(time_Nn_phone)
freq_Ny_phone = Time_to_Freq(time_Ny_phone)

# Arduino
freq_Nn_arduino = Time_to_Freq(time_Nn_arduino)
freq_Ny_arduino = Time_to_Freq(time_Ny_arduino)

In [9]:
X_train = [Reshape(time_Nn_train),
           Reshape(time_Ny_train),
           Reshape(freq_Nn_train),
           Reshape(freq_Ny_train),
           Combine(time_Nn_train, freq_Nn_train),
           Combine(time_Ny_train, freq_Ny_train)]
X_test  = [Reshape(time_Nn_test),
           Reshape(time_Ny_test),
           Reshape(freq_Nn_test),
           Reshape(freq_Ny_test),
           Combine(time_Nn_test, freq_Nn_test),
           Combine(time_Ny_test, freq_Ny_test)]
X_phone = [Reshape(time_Nn_phone),
           Reshape(time_Ny_phone),
           Reshape(freq_Nn_phone),
           Reshape(freq_Ny_phone),
           Combine(time_Nn_phone, freq_Nn_phone),
           Combine(time_Ny_phone, freq_Ny_phone)]
X_arduino = [Reshape(time_Nn_arduino),
             Reshape(time_Ny_arduino),
             Reshape(freq_Nn_arduino),
             Reshape(freq_Ny_arduino),
             Combine(time_Nn_arduino, freq_Nn_arduino),
             Combine(time_Ny_arduino, freq_Ny_arduino)]


for index in range(len(X_train)):
    X_train[index] = Shuffle(shuffle_index, X_train[index], X_train[index].shape[3], False) 
y_train = Shuffle(shuffle_index, raw_y_train, 1, True)

In [10]:
print(X_train[0].shape)
print(y_train.shape)
print(X_test[0].shape)
print(y_test.shape)
print(X_phone[0].shape)
print(y_phone.shape)
print(X_arduino[0].shape)
print(y_arduino.shape)

(7352, 128, 12, 1)
(7352, 6)
(2947, 128, 12, 1)
(2947, 6)
(39, 128, 12, 1)
(39, 6)
(153, 128, 12, 1)
(153, 6)


## Function

In [11]:
def show_train_history(H):
    for length in range(len(H)):
        plt.plot(H[length].history['val_loss'])
    plt.title('Validation Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
#     plt.legend(['BS:64', 'BS:64', 'BS:128', 'BS:128', 'BS:256', 'BS:256'], loc='upper right')
    plt.show()
    plt.clf()
    
    for length in range(len(H)):
        plt.plot(H[length].history['val_accuracy'])
    plt.title('Validation Accuracy')
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
#     plt.legend(['BS:64', 'BS:64', 'BS:128', 'BS:128', 'BS:256', 'BS:256'], loc='upper right')
    plt.show()
    plt.clf()

## Neural network

In [12]:
from keras.models import Sequential
from keras.layers import Activation,Dense,Dropout,Flatten,Conv2D,MaxPooling2D,Flatten,LSTM
from keras.layers.normalization import BatchNormalization
from keras.optimizers import Adam
from keras import regularizers

In [13]:
def CNN_model(channel):
    model = Sequential()

    model.add(Conv2D(filters=32, kernel_size=(13,6), strides=(1,3),  input_shape=(128, 12, channel)))
    model.add(Activation('relu'))
    model.add(BatchNormalization(momentum=0.9))
    model.add(MaxPooling2D(pool_size=(2,1), strides=(2,1)))

    model.add(Conv2D(filters=64, kernel_size=(13,3), strides=(1,1)))
    model.add(Activation('relu'))
    model.add(BatchNormalization(momentum=0.9))
    model.add(MaxPooling2D(pool_size=(2,1), strides=(2,1)))

    model.add(Conv2D(filters=128, kernel_size=(12,1), strides=(1,1)))
    model.add(Activation('relu'))
    model.add(BatchNormalization(momentum=0.9))
    model.add(MaxPooling2D(pool_size=(2,1), strides=(2,1)))

    model.add(Dropout(0.5))
    model.add(Flatten())
    model.add(Dense(6, activation='softmax', kernel_regularizer=regularizers.l2(0.01)))
    model.compile(optimizer=Adam(lr=0.01), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def confusion_matrix(Y_true, Y_pred):
    Y_true = pd.Series([ACTIVITIES[y] for y in np.argmax(Y_true, axis=1)])
    Y_pred = pd.Series([ACTIVITIES[y] for y in np.argmax(Y_pred, axis=1)])

    return pd.crosstab(Y_true, Y_pred, rownames=['True'], colnames=['Pred'])
    # https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.crosstab.html

In [14]:
def training(X_train, y_train, validation_split, BS, Epochs):
    model = CNN_model(X_train.shape[3])
    history = model.fit(X_train, y_train, validation_split=validation_split, shuffle=True, batch_size=BS, epochs=Epochs)
    return model, history

In [26]:
# for domain in DOMAINS:
model, history = training( X_train[DOMAINS.Time_Nn.value], y_train, 0.15, 300, 15 )

print("\nUCI data")
score = model.evaluate(X_test[DOMAINS.Time_Nn.value], y_test)
print("Test accuracy: ", score[1])
print('Test predict:')
print(confusion_matrix(y_test, model.predict(X_test[DOMAINS.Time_Nn.value])))

print("\nSmartphone data")
score = model.evaluate(X_phone[DOMAINS.Time_Nn.value], y_phone)
print("Phone accuracy: ", score[1])
print('Phone predict:')
print(confusion_matrix(y_phone, model.predict(X_phone[DOMAINS.Time_Nn.value])))

print("\nArduino data")
score = model.evaluate(X_arduino[DOMAINS.Time_Nn.value], y_arduino)
print("Arduino accuracy: ", score[1])
print('Arduino predict:')
print(confusion_matrix(y_arduino, model.predict(X_arduino[DOMAINS.Time_Nn.value])))

Train on 6249 samples, validate on 1103 samples
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
UCI data

Test accuracy:  0.9175432920455933
Test predict:
Pred                LAYING  SITTING  STANDING  WALKING  WALKING_DOWNSTAIRS  \
True                                                                         
LAYING                 537        0         0        0                   0   
SITTING                  0      399        87        0                   0   
STANDING                 0       88       437        5                   0   
WALKING                  0        0         0      477                  18   
WALKING_DOWNSTAIRS       0        0         0        3                 416   
WALKING_UPSTAIRS         0        0         0        2                  31   

Pred                WALKING_UPSTAIRS  
True                                  
LAYING             

In [27]:
# for domain in DOMAINS:
model, history = training( X_train[DOMAINS.Time_Ny.value], y_train, 0.15, 300, 15 )

print("\nUCI data:")
score = model.evaluate(X_test[DOMAINS.Time_Ny.value], y_test)
print("Test accuracy: ", score[1])
print('Test predict:')
print(confusion_matrix(y_test, model.predict(X_test[DOMAINS.Time_Ny.value])))

print("\nSmartphone data:")
score = model.evaluate(X_phone[DOMAINS.Time_Ny.value], y_phone)
print("Phone accuracy: ", score[1])
print('Phone predict:')
print(confusion_matrix(y_phone, model.predict(X_phone[DOMAINS.Time_Ny.value])))

print("\nArduino data:")
score = model.evaluate(X_arduino[DOMAINS.Time_Ny.value], y_arduino)
print("Arduino accuracy: ", score[1])
print('Arduino predict:')
print(confusion_matrix(y_arduino, model.predict(X_arduino[DOMAINS.Time_Ny.value])))

Train on 6249 samples, validate on 1103 samples
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
UCI data

Test accuracy:  0.894808292388916
Test predict:
Pred                LAYING  SITTING  STANDING  WALKING  WALKING_DOWNSTAIRS  \
True                                                                         
LAYING                 423       49        64        1                   0   
SITTING                 20      419        52        0                   0   
STANDING                 8       31       493        0                   0   
WALKING                  0        4        27      465                   0   
WALKING_DOWNSTAIRS       3        1         8        0                 403   
WALKING_UPSTAIRS         0       29         1        0                   7   

Pred                WALKING_UPSTAIRS  
True                                  
LAYING              

In [30]:
# for domain in DOMAINS:
model, history = training( X_train[DOMAINS.Freq_Nn.value], y_train, 0.15, 300, 15 )

print("\nUCI data:")
score = model.evaluate(X_test[DOMAINS.Freq_Nn.value], y_test)
print("Test accuracy: ", score[1])
print('Test predict:')
print(confusion_matrix(y_test, model.predict(X_test[DOMAINS.Freq_Nn.value])))

print("\nSmartphone data:")
score = model.evaluate(X_phone[DOMAINS.Freq_Nn.value], y_phone)
print("Phone accuracy: ", score[1])
print('Phone predict:')
print(confusion_matrix(y_phone, model.predict(X_phone[DOMAINS.Freq_Nn.value])))

print("\nArduino data:")
score = model.evaluate(X_arduino[DOMAINS.Freq_Nn.value], y_arduino)
print("Arduino accuracy: ", score[1])
print('Arduino predict:')
print(confusion_matrix(y_arduino, model.predict(X_arduino[DOMAINS.Freq_Nn.value])))

Train on 6249 samples, validate on 1103 samples
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15
UCI data

Test accuracy:  0.8571428656578064
Test predict:
Pred                LAYING  SITTING  STANDING  WALKING  WALKING_DOWNSTAIRS  \
True                                                                         
LAYING                 537        0         0        0                   0   
SITTING                  0      146       344        0                   0   
STANDING                 0        3       528        0                   0   
WALKING                  0        0         0      489                   6   
WALKING_DOWNSTAIRS       0        0         0       17                 356   
WALKING_UPSTAIRS         1        0         0        0                   0   

Pred                WALKING_UPSTAIRS  
True                                  
LAYING             

In [31]:
# for domain in DOMAINS:
model, history = training( X_train[DOMAINS.Freq_Ny.value], y_train, 0.15, 300, 15 )

print("\nUCI data:")
score = model.evaluate(X_test[DOMAINS.Freq_Ny.value], y_test)
print("Test accuracy: ", score[1])
print('Test predict:')
print(confusion_matrix(y_test, model.predict(X_test[DOMAINS.Freq_Ny.value])))

print("\nSmartphone data:")
score = model.evaluate(X_phone[DOMAINS.Freq_Ny.value], y_phone)
print("Phone accuracy: ", score[1])
print('Phone predict:')
print(confusion_matrix(y_phone, model.predict(X_phone[DOMAINS.Freq_Ny.value])))

print("\nArduino data:")
score = model.evaluate(X_arduino[DOMAINS.Freq_Ny.value], y_arduino)
print("Arduino accuracy: ", score[1])
print('Arduino predict:')
print(confusion_matrix(y_arduino, model.predict(X_arduino[DOMAINS.Freq_Ny.value])))

Train on 6249 samples, validate on 1103 samples
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15

UCI data
Test accuracy:  0.7465218901634216
Test predict:
Pred                LAYING  SITTING  STANDING  WALKING  WALKING_DOWNSTAIRS  \
True                                                                         
LAYING                 379       80        52       19                   7   
SITTING                 11      393        74        9                   2   
STANDING                 7       95       422        7                   0   
WALKING                  0        0         0      495                   0   
WALKING_DOWNSTAIRS       0        0         0      214                 203   
WALKING_UPSTAIRS         0        1         0      152                  10   

Pred                WALKING_UPSTAIRS  
True                                  
LAYING             

In [32]:
# for domain in DOMAINS:
model, history = training( X_train[DOMAINS.Both_Nn.value], y_train, 0.15, 300, 15 )

print("\nUCI data:")
score = model.evaluate(X_test[DOMAINS.Both_Nn.value], y_test)
print("Test accuracy: ", score[1])
print('Test predict:')
print(confusion_matrix(y_test, model.predict(X_test[DOMAINS.Both_Nn.value])))

print("\nSmartphone data:")
score = model.evaluate(X_phone[DOMAINS.Both_Nn.value], y_phone)
print("Phone accuracy: ", score[1])
print('Phone predict:')
print(confusion_matrix(y_phone, model.predict(X_phone[DOMAINS.Both_Nn.value])))

print("\nArduino data:")
score = model.evaluate(X_arduino[DOMAINS.Both_Nn.value], y_arduino)
print("Arduino accuracy: ", score[1])
print('Arduino predict:')
print(confusion_matrix(y_arduino, model.predict(X_arduino[DOMAINS.Both_Nn.value])))

Train on 6249 samples, validate on 1103 samples
Epoch 1/15
Epoch 2/15
Epoch 3/15
Epoch 4/15
Epoch 5/15
Epoch 6/15
Epoch 7/15
Epoch 8/15
Epoch 9/15
Epoch 10/15
Epoch 11/15
Epoch 12/15
Epoch 13/15
Epoch 14/15
Epoch 15/15

UCI data
Test accuracy:  0.9117746949195862
Test predict:
Pred                LAYING  SITTING  STANDING  WALKING  WALKING_DOWNSTAIRS  \
True                                                                         
LAYING                 536        0         0        0                   0   
SITTING                  0      452        36        0                   0   
STANDING                 0      134       395        2                   0   
WALKING                  0        0         0      486                   8   
WALKING_DOWNSTAIRS       0        0         0       23                 397   
WALKING_UPSTAIRS         0        0         0       31                  19   

Pred                WALKING_UPSTAIRS  
True                                  
LAYING             

In [34]:
# for domain in DOMAINS:
model, history = training( X_train[DOMAINS.Both_Ny.value], y_train, 0.15, 300, 15 )

print("\nUCI data:")
score = model.evaluate(X_test[DOMAINS.Both_Ny.value], y_test)
print("Test accuracy: ", score[1])
print('Test predict:')
print(confusion_matrix(y_test, model.predict(X_test[DOMAINS.Both_Ny.value])))

print("\nSmartphone data:")
score = model.evaluate(X_phone[DOMAINS.Both_Ny.value], y_phone)
print("Phone accuracy: ", score[1])
print('Phone predict:')
print(confusion_matrix(y_phone, model.predict(X_phone[DOMAINS.Both_Ny.value])))

print("\nArduino data:")
score = model.evaluate(X_arduino[DOMAINS.Both_Ny.value], y_arduino)
print("Arduino accuracy: ", score[1])
print('Arduino predict:')
print(confusion_matrix(y_arduino, model.predict(X_arduino[DOMAINS.Both_Ny.value])))

Train on 6249 samples, validate on 1103 samples
Epoch 1/15

KeyboardInterrupt: 