In [1]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import tensorflow as tf  # Version 1.0.0
from sklearn import metrics
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import os
import time


In [3]:
from google.colab import drive
drive.mount('/gdrive')
!ls /gdrive

Drive already mounted at /gdrive; to attempt to forcibly remount, call drive.mount("/gdrive", force_remount=True).
MyDrive  Shareddrives


# Shared Code


## Get the data and split into train + test

In [4]:
import os
BASE_PATH = '/gdrive/My Drive/colab_files/final_project/'
if not os.path.exists(BASE_PATH):
    os.makedirs(BASE_PATH)
    print('Creating base path')
else: 
  print('Base path ' + BASE_PATH + ' already exists')

os.chdir(BASE_PATH)

if not os.path.exists("UCI HAR Dataset.zip"):
    print("Downloading...")
    !wget "https://archive.ics.uci.edu/ml/machine-learning-databases/00240/UCI HAR Dataset.zip"
    print("Downloading done.\n")
    !unzip -nq "UCI HAR Dataset.zip"
else:
    print("Dataset already downloaded. Did not download twice.\n")

DATASET_PATH = "UCI HAR Dataset/"

Base path /gdrive/My Drive/colab_files/final_project/ already exists
Dataset already downloaded. Did not download twice.



In [5]:
# Useful Constants

# Those are separate normalised input features for the neural network
INPUT_SIGNAL_TYPES = [
    "body_acc_x_",
    "body_acc_y_",
    "body_acc_z_",
    "body_gyro_x_",
    "body_gyro_y_",
    "body_gyro_z_",
    "total_acc_x_",
    "total_acc_y_",
    "total_acc_z_"
]

# Output classes to learn how to classify
LABELS = [
    "WALKING",
    "WALKING_UPSTAIRS",
    "WALKING_DOWNSTAIRS",
    "SITTING",
    "STANDING",
    "LAYING",
    "CANE"
]

In [6]:
TRAIN = "train/"
TEST = "test/"


# Load "X" (the neural network's training and testing inputs)

def load_X(X_signals_paths):
    X_signals = []

    for signal_type_path in X_signals_paths:
        file = open(signal_type_path, 'r')
        # Read dataset from disk, dealing with text files' syntax
        X_signals.append(
            [np.array(serie, dtype=np.float32) for serie in [
                row.replace('  ', ' ').strip().split(' ') for row in file
            ]]
        )
        file.close()

    return np.transpose(np.array(X_signals), (1, 2, 0))

X_train_signals_paths = [
    DATASET_PATH + TRAIN + "Inertial Signals/" + signal + "train.txt" for signal in INPUT_SIGNAL_TYPES
]
X_test_signals_paths = [
    DATASET_PATH + TEST + "Inertial Signals/" + signal + "test.txt" for signal in INPUT_SIGNAL_TYPES
]

X_train = load_X(X_train_signals_paths)
X_test = load_X(X_test_signals_paths)


# Load "y" (the neural network's training and testing outputs)

def load_y(y_path):
    file = open(y_path, 'r')
    # Read dataset from disk, dealing with text file's syntax
    y_ = np.array(
        [elem for elem in [
            row.replace('  ', ' ').strip().split(' ') for row in file
        ]],
        dtype=np.int32
    )
    file.close()

    # Substract 1 to each output class for friendly 0-based indexing
    return y_ - 1

y_train_path = DATASET_PATH + TRAIN + "y_train.txt"
y_test_path = DATASET_PATH + TEST + "y_test.txt"

y_train = load_y(y_train_path)
y_test = load_y(y_test_path)

In [7]:
print(np.shape(X_train))
print(np.shape(y_train))
print(np.shape(X_test))
print(float(8469504)/7352)
print(y_train[83])

(7352, 128, 9)
(7352, 1)
(2947, 128, 9)
1152.0
[0]


In [8]:
# Input Data

training_data_count = len(X_train)  # 7352 training series (with 50% overlap between each serie)
test_data_count = len(X_test)  # 2947 testing series
n_steps = len(X_train[0])  # 128 timesteps per series
n_input = len(X_train[0][0])  # 9 input parameters per timestep


# LSTM Neural Network's internal structure

n_hidden = 32 # Hidden layer num of features
n_classes = 7 # Total classes (should go up, or should go down)


# Training

learning_rate = 0.0025
lambda_loss_amount = 0.0015
training_iters = training_data_count * 300  # Loop 300 times on the dataset
batch_size = 1500
display_iter = 30000  # To show test set accuracy during training


# Some debugging info

print("Some useful info to get an insight on dataset's shape and normalisation:")
print("(X shape, y shape, every X's mean, every X's standard deviation)")
print(X_test.shape, y_test.shape, np.mean(X_test), np.std(X_test))
print("The dataset is therefore properly normalised, as expected, but not yet one-hot encoded.")

Some useful info to get an insight on dataset's shape and normalisation:
(X shape, y shape, every X's mean, every X's standard deviation)
(2947, 128, 9) (2947, 1) 0.09913992 0.39567086
The dataset is therefore properly normalised, as expected, but not yet one-hot encoded.


# LSTM Section

## Create the LSTM Model

In [None]:
def LSTM_RNN(_X, _weights, _biases):
    # Function returns a tensorflow LSTM (RNN) artificial neural network from given parameters.
    # Moreover, two LSTM cells are stacked which adds deepness to the neural network.
    # Note, some code of this notebook is inspired from an slightly different
    # RNN architecture used on another dataset, some of the credits goes to
    # "aymericdamien" under the MIT license.

    # (NOTE: This step could be greatly optimised by shaping the dataset once
    # input shape: (batch_size, n_steps, n_input)
    _X = tf.transpose(_X, [1, 0, 2])  # permute n_steps and batch_size
    # Reshape to prepare input to hidden activation
    _X = tf.reshape(_X, [-1, n_input])
    # new shape: (n_steps*batch_size, n_input)

    # ReLU activation, thanks to Yu Zhao for adding this improvement here:
    _X = tf.nn.relu(tf.matmul(_X, _weights['hidden']) + _biases['hidden'])
    # Split data because rnn cell needs a list of inputs for the RNN inner loop
    _X = tf.split(_X, n_steps, 0)
    # new shape: n_steps * (batch_size, n_hidden)

    # Define two stacked LSTM cells (two recurrent layers deep) with tensorflow
    lstm_cell_1 = tf.compat.v1.nn.rnn_cell.BasicLSTMCell(n_hidden, forget_bias=1.0, state_is_tuple=True)
    lstm_cell_2 = tf.compat.v1.nn.rnn_cell.BasicLSTMCell(n_hidden, forget_bias=1.0, state_is_tuple=True)
    lstm_cells = tf.compat.v1.nn.rnn_cell.MultiRNNCell([lstm_cell_1, lstm_cell_2], state_is_tuple=True)
    # Get LSTM cell output
    outputs, states = tf.compat.v1.nn.static_rnn(lstm_cells, _X, dtype=tf.float32)

    # Get last time step's output feature for a "many-to-one" style classifier,
    # as in the image describing RNNs at the top of this page
    lstm_last_output = outputs[-1]

    # Linear activation
    return tf.matmul(lstm_last_output, _weights['out']) + _biases['out']


def extract_batch_size(_train, step, batch_size):
    # Function to fetch a "batch_size" amount of data from "(X|y)_train" data.

    shape = list(_train.shape)
    shape[0] = batch_size
    batch_s = np.empty(shape)

    for i in range(batch_size):
        # Loop index
        index = ((step-1)*batch_size + i) % len(_train)
        batch_s[i] = _train[index]

    return batch_s


def one_hot(y_, n_classes=n_classes):
    # Function to encode neural one-hot output labels from number indexes
    # e.g.:
    # one_hot(y_=[[5], [0], [3]], n_classes=6):
    #     return [[0, 0, 0, 0, 0, 1], [1, 0, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0]]

    y_ = y_.reshape(len(y_))
    return np.eye(n_classes)[np.array(y_, dtype=np.int32)]  # Returns FLOATS

## Train LSTM Model

In [None]:
tf.compat.v1.disable_eager_execution()
tf.compat.v1.reset_default_graph()

# Graph input/output
x = tf.compat.v1.placeholder(tf.float32, [None, n_steps, n_input])
y = tf.compat.v1.placeholder(tf.float32, [None, n_classes])

# Graph weights
weights = {
    'hidden': tf.Variable(tf.random.normal([n_input, n_hidden])), # Hidden layer weights
    'out': tf.Variable(tf.random.normal([n_hidden, n_classes], mean=1.0))
}
biases = {
    'hidden': tf.Variable(tf.random.normal([n_hidden])),
    'out': tf.Variable(tf.random.normal([n_classes]))
}

pred = LSTM_RNN(x, weights, biases)

# Loss, optimizer and evaluation
l2 = lambda_loss_amount * sum(
    tf.nn.l2_loss(tf_var) for tf_var in tf.compat.v1.trainable_variables()
) # L2 loss prevents this overkill neural network to overfit the data
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=y, logits=pred)) + l2 # Softmax loss
optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost) # Adam Optimizer

correct_pred = tf.equal(tf.argmax(pred,1), tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))

In [None]:
# To keep track of training's performance
test_losses = []
test_accuracies = []
train_losses = []
train_accuracies = []

start = time.time()

# Launch the graph
sess = tf.compat.v1.InteractiveSession(config=tf.compat.v1.ConfigProto(log_device_placement=True))
init = tf.compat.v1.global_variables_initializer()
sess.run(init)

# Perform Training steps with "batch_size" amount of example data at each loop
step = 1
while step * batch_size <= training_iters:
    batch_xs =         extract_batch_size(X_train, step, batch_size)
    batch_ys = one_hot(extract_batch_size(y_train, step, batch_size))

    # Fit training using batch data
    _, loss, acc = sess.run(
        [optimizer, cost, accuracy],
        feed_dict={
            x: batch_xs,
            y: batch_ys
        }
    )
    train_losses.append(loss)
    train_accuracies.append(acc)

    # Evaluate network only at some steps for faster training:
    if (step*batch_size % display_iter == 0) or (step == 1) or (step * batch_size > training_iters):

        # To not spam console, show training accuracy/loss in this "if"
        print("Training iter #" + str(step*batch_size) + \
              ":   Batch Loss = " + "{:.6f}".format(loss) + \
              ", Accuracy = {}".format(acc))

        # Evaluation on the test set (no learning made here - just evaluation for diagnosis)
        loss, acc = sess.run(
            [cost, accuracy],
            feed_dict={
                x: X_test,
                y: one_hot(y_test)
            }
        )
        test_losses.append(loss)
        test_accuracies.append(acc)
        print("PERFORMANCE ON TEST SET: " + \
              "Batch Loss = {}".format(loss) + \
              ", Accuracy = {}".format(acc))

    step += 1

end = time.time()
print("Time Taken")
print(end - start)

print("Optimization Finished!")

# Accuracy for test data

one_hot_predictions, accuracy, final_loss = sess.run(
    [pred, accuracy, cost],
    feed_dict={
        x: X_test,
        y: one_hot(y_test)
    }
)

test_losses.append(final_loss)
test_accuracies.append(accuracy)

print("FINAL RESULT: " + \
      "Batch Loss = {}".format(final_loss) + \
      ", Accuracy = {}".format(accuracy))

## Visualize the Results

In [None]:
x = np.linspace(1, len(train_losses), len(train_losses))


plt.figure()
plt.plot(x, train_losses, label="Train loss")
#plt.plot(x, test_losses, label="Test loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Train and Test Loss")
plt.savefig(DATASET_PATH + "images/lstm_train_loss.png")
plt.show()

x = np.linspace(1, len(test_losses), len(test_losses))

plt.figure()
# plt.plot(x, train_losses, label="Train loss")
plt.plot(x, test_losses, label="Test loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Train and Test Loss")
plt.savefig(DATASET_PATH + "images/lstm_test_loss.png")
plt.show()


plt.figure()
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Test Accuracy")
plt.plot(x, test_accuracies)
plt.savefig(DATASET_PATH + "images/lstm_test_acc.png")
plt.show()

In [None]:
predictions = one_hot_predictions.argmax(1)

print("Testing Accuracy: {}%".format(100*accuracy))

print("")
print("Precision: {}%".format(100*metrics.precision_score(y_test, predictions, average="weighted")))
print("Recall: {}%".format(100*metrics.recall_score(y_test, predictions, average="weighted")))
print("f1_score: {}%".format(100*metrics.f1_score(y_test, predictions, average="weighted")))

print("")
print("Confusion Matrix:")
print("I'm here")
print(y_test.shape)
print(y_test[0])
print(predictions.shape)
print(predictions[0])
confusion_matrix = metrics.confusion_matrix(y_test, predictions)
print(confusion_matrix)
normalised_confusion_matrix = np.array(confusion_matrix, dtype=np.float32)/np.sum(confusion_matrix)*100

print("")
print("Confusion matrix (normalised to % of total test data):")
print(normalised_confusion_matrix)
print("Note: training and testing data is not equally distributed amongst classes, ")
print("so it is normal that more than a 6th of the data is correctly classifier in the last category.")

# Plot Results:
width = 12
height = 12
plt.figure(figsize=(width, height))
plt.imshow(
    normalised_confusion_matrix,
    interpolation='nearest',
    cmap=plt.cm.rainbow
)
plt.title("Confusion matrix \n(normalised to % of total test data)")
plt.colorbar()
tick_marks = np.arange(n_classes)
plt.xticks(tick_marks, LABELS, rotation=90)
plt.yticks(tick_marks, LABELS)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.savefig(DATASET_PATH + "images/LSTM_cm.png")
plt.show()

# CNN Section

## Format the data

Use X_train, X_test, y_train, y_test from above. The data is of the shape:

[num series, num sensor samples per one series, num sensor streams we are pulling from]

## Create Dataset

In [9]:
BATCH_SIZE = 8
n_classes = 7
class HARDataset(torch.utils.data.Dataset):
    def __init__(self, data_array, labels_array, sequence_length, batch_size, num_channels):
        super(HARDataset, self).__init__()

        self.sequence_length = sequence_length
        self.batch_size = batch_size
        self.num_channels = num_channels

        final_data = []
        for series in data_array:
            transposed = np.transpose(series)
            final_data.append(transposed)
        final_data = np.array(final_data)
        dim1 = final_data.shape[0]
        dim2 = final_data.shape[1]
        dim3 = final_data.shape[2]
        self.final_data = np.resize(final_data, (dim1, dim2, 1, dim3))
        self.final_labels = labels_array

    def __len__(self):
        # TODO return the number of unique sequences you have, not the number of characters.
        return self.final_data.shape[0]

    def __getitem__(self, idx):
        data = torch.from_numpy(self.final_data[idx])
        labels = torch.from_numpy(self.final_labels[idx])
        return data, labels

train_dataset = HARDataset(X_train, y_train, 128, BATCH_SIZE, 9)
test_dataset = HARDataset(X_test, y_test, 128, BATCH_SIZE, 9)

## Create the CNN

In [42]:
class MyNet1(nn.Module):
    # input is 128x9
    def __init__(self):
        super(MyNet1, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=9, out_channels=3*9, kernel_size=(1,8), groups=9, padding=(0,1))
        self.maxpool = nn.MaxPool2d(kernel_size=(1,3), stride=2);
        self.conv2 = nn.Conv2d(in_channels=27, out_channels=3*27, kernel_size=(1, 4), groups=27, padding=(0,1))
        self.fc2 = nn.Linear(4860, n_classes)
        self.accuracy = None


    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.maxpool(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = torch.flatten(x, 1)
        x = self.fc2(x)
        return x

    def loss(self, prediction, label, reduction='mean'):
        label = label.long()
        loss_val = F.cross_entropy(prediction, label.squeeze(), reduction=reduction)
        return loss_val

    # def save_model(self, file_path, num_to_keep=1):
    #     pt_util.save(self, file_path, num_to_keep)
        
    # def save_best_model(self, accuracy, file_path, num_to_keep=1):
    #     if self.accuracy == None or accuracy > self.accuracy:
    #         self.accuracy = accuracy
    #         self.save_model(file_path, num_to_keep)

    # def load_model(self, file_path):
    #     pt_util.restore(self, file_path)

    # def load_last_model(self, dir_path):
    #     return pt_util.restore_latest(self, dir_path)

## Train the CNN

In [11]:
import time
def train(model, device, train_loader, optimizer, epoch, log_interval):
    model.train()
    model=model.double()
    losses = []
    for batch_idx, (data, label) in enumerate(train_loader):

        data, label = data.to(device), label.to(device)
        optimizer.zero_grad()
        output = model(data.double())
        loss = model.loss(output, label)
        losses.append(loss.item())
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print('{} Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                time.ctime(time.time()),
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
    return np.mean(losses)

def test(model, device, test_loader, log_interval=None):
    model.eval()
    test_loss = 0
    correct = 0
    test_predictions, test_labels = [], []

    with torch.no_grad():
        for batch_idx, (data, label) in enumerate(test_loader):
            data, label = data.to(device), label.to(device)
            output = model(data.double())
            test_loss_on = model.loss(output, label, reduction='sum').item()
            test_loss += test_loss_on
            pred = output.max(1)[1]
            for i in range(0, len(label)):
                test_predictions.append(pred[i])
                test_labels.append(label[i])
            correct_mask = pred.eq(label.view_as(pred))
            num_correct = correct_mask.sum().item()
            correct += num_correct
            if log_interval is not None and batch_idx % log_interval == 0:
                print('{} Test: [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    time.ctime(time.time()),
                    batch_idx * len(data), len(test_loader.dataset),
                    100. * batch_idx / len(test_loader), test_loss_on))

    test_loss /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset), test_accuracy))
    return test_loss, test_accuracy, test_predictions, test_labels

In [12]:
# Now the actual training code
use_cuda = torch.cuda.is_available()
#torch.manual_seed(SEED)
device = torch.device("cuda" if use_cuda else "cpu")

In [13]:
def training_loop(model, rate=0.001, epochs=20, momentum=0.9, weight=0.0005, train_dataset=None, test_dataset=None):
    print("Parameters are rate: %f, epochs %d, momentum %f, weight %f\n", rate, epochs, momentum, weight)
    # Play around with these constants, you may find a better setting.
    EPOCHS = epochs
    LEARNING_RATE = rate
    MOMENTUM = momentum
    USE_CUDA = True
    SEED = 0
    PRINT_INTERVAL = 100
    WEIGHT_DECAY = weight


    print('Using device', device)
    import multiprocessing
    print('num cpus:', multiprocessing.cpu_count())

    kwargs = {'num_workers': multiprocessing.cpu_count(),
            'pin_memory': True} if use_cuda else {}


    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE,
                                               shuffle=True, **kwargs)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=BATCH_SIZE,
                                               shuffle=True, **kwargs)

    optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE, momentum=MOMENTUM, weight_decay=WEIGHT_DECAY)

    train_losses, test_losses, test_accuracies = [], [], []
    test_predictions, test_labels = [], []

    try:
        for epoch in range(0, EPOCHS + 1):
            train_loss = train(model, device, train_loader, optimizer, epoch, PRINT_INTERVAL)
            print("Train loss is " + str(train_loss))
            test_loss, test_accuracy, test_predictions, test_labels = test(model, device, test_loader)
            print("Test loss is " + str(test_loss))
            train_losses.append((epoch, train_loss))
            print("Test accuracy is " + str(test_accuracy))
            test_losses.append((epoch, test_loss))
            test_accuracies.append((epoch, test_accuracy))
            # pt_util.write_log(LOG_PATH + 'log.pkl', (train_losses, test_losses, test_accuracies))
            # model.save_best_model(test_accuracy, LOG_PATH + '%03d.pt' % epoch)


    except KeyboardInterrupt as ke:
        print('Interrupted')
    except:
        import traceback
        traceback.print_exc()

    return train_losses, test_losses, test_accuracies, test_predictions, test_labels

In [None]:
start = time.time()
model = MyNet1().to(device)
train_losses1, test_losses1, test_accuracies1, test_predictions1, test_labels1 = training_loop(model, rate=.001, weight=.0005, momentum=.95, epochs=20, train_dataset=train_dataset, test_dataset=test_dataset)
end = time.time()
print("Time Taken2")
print(end - start)
# for wd in [.01, .005, .001, .0005, .0001]:
# for lr in [.005, .001, .0005, .0001, .0001]:
#     model = MyNet1().to(device)
#     print("Lr is " + str(lr))
#     train_losses, test_losses, test_accuracies, test_predictions, test_labels = training_loop(model, rate=lr, epochs=num_epochs, train_dataset=train_dataset, test_dataset=test_dataset)
# for wd in [.01, .005, .001, .0005, .0001]:
#     model = MyNet1().to(device)
#     print("wd is " + str(wd))
#     train_losses, test_losses, test_accuracies, test_predictions, test_labels = training_loop(model, weight=wd, epochs=num_epochs, train_dataset=train_dataset, test_dataset=test_dataset)
# for m in [.8, .85, .9, .95]:
#     model = MyNet1().to(device)
#     print("mom is " + str(m))
#     train_losses, test_losses, test_accuracies, test_predictions, test_labels = training_loop(model, momentum=m, epochs=num_epochs, train_dataset=train_dataset, test_dataset=test_dataset)
# for e in [15, 20, 25]:
#     model = MyNet1().to(device)
#     print("e is " + str(e))
#     train_losses, test_losses, test_accuracies, test_predictions, test_labels = training_loop(model, epochs=e, train_dataset=train_dataset, test_dataset=test_dataset)

## Visualize the Results

In [None]:
x = np.linspace(1, len(train_loss1), len(train_losses1))
train_y = [second for (first,second) in train_losses1]
test_loss_y = [second for (first,second) in test_losses1]
test_acc_y = [second for (first,second) in test_accuracies1]

plt.figure()
plt.plot(x, test_loss_y, label="Test loss")
plt.plot(x, train_y, label="Train loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Train and Test Loss")
plt.legend()
plt.savefig(DATASET_PATH + "images/my_orig_loss_better.png")
plt.show()

plt.figure()
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Test Accuracy")
plt.plot(x, test_acc_y)
plt.savefig(DATASET_PATH + "images/my_orig_acc_better.png")
plt.show()


In [None]:
print("")
print("Confusion Matrix:")
print("I'm here")

confusion_matrix = metrics.confusion_matrix(test_labels1, test_predictions1)
print(confusion_matrix)
normalised_confusion_matrix = np.array(confusion_matrix, dtype=np.float32)/np.sum(confusion_matrix)*100

print("")
print("Confusion matrix (normalised to % of total test data):")
print(normalised_confusion_matrix)

# Plot Results:
width = 12
height = 12
plt.figure(figsize=(width, height))
plt.imshow(
    normalised_confusion_matrix,
    interpolation='nearest',
    cmap=plt.cm.rainbow
)
plt.title("Confusion matrix \n(normalised to % of total test data)")
plt.colorbar()
tick_marks = np.arange(n_classes)
plt.xticks(tick_marks, LABELS, rotation=90)
plt.yticks(tick_marks, LABELS)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.savefig(DATASET_PATH + "images/my_orig_better_cm.png")
plt.show()

# Transfer Learning

## Prep new data

In [None]:
cane_x_train = []
cane_x_test = []
waist_x_train = []
waist_x_test = []
cane_y_train = []
cane_y_test = []
waist_y_train = []
waist_y_test = []


files = ['acc_x.txt', 'acc_y.txt', 'acc_z.txt', 'gyro_x.txt', 'gyro_y.txt', 'gyro_z.txt', 'total_x.txt', 'total_y.txt', 'total_z.txt']
first_round = True
for filename in files:
    print("Processing file " + filename)
    cane_data = []
    waist_data = []

    # read in new data
    file = open(DATASET_PATH + 'on_cane/' + filename, 'r')
    # Read dataset from disk, dealing with text files' syntax
    for row in file:
        cane_data.append(float(row))
    file.close()
    file = open(DATASET_PATH + 'in_pocket/' + filename, 'r')
    # Read dataset from disk, dealing with text files' syntax
    for row in file:
        waist_data.append(float(row))
    file.close()
    
    # normalize
    cane_normalized = cane_data / np.linalg.norm(cane_data)
    waist_normalized = waist_data / np.linalg.norm(waist_data)

    # chunk it up into 128 samples
    # make shape for CNN: [num_series][channels][128 samples]
    # note you need to transpose that for the RNN ^^

    # chop off % 128
    norm_cane_data = cane_normalized[:-(len(cane_normalized)%128)]
    norm_waist_data = waist_normalized[:-(len(waist_normalized)%128)]

    # add the overlap between series
    cane_data = norm_cane_data[0:64]
    waist_data = norm_waist_data[0:64]

    stop = False
    idx = 128
    while(idx < len(norm_cane_data)):
        cane_data = np.concatenate((cane_data, norm_cane_data[(idx - 64):idx], norm_cane_data[(idx - 64):idx]))
        idx += 64

    idx = 128
    while(idx < len(norm_waist_data)):
        waist_data = np.concatenate((waist_data, norm_waist_data[(idx - 64):idx], norm_waist_data[(idx - 64):idx]))
        idx += 64

    cane_data = cane_data[0:-64]
    waist_data = waist_data[0:-64]

    cane_data = np.resize(cane_data, [int(len(cane_data)/128), 128])
    waist_data = np.resize(waist_data, [int(len(waist_data)/128), 128])

    split_idx_cane = int(len(cane_data)*0.7) # split idx for train and test
    split_idx_waist = int(len(waist_data)*0.7)

    if first_round:
        first_round = False
        for i in range(0, cane_data.shape[0]):
            if i < split_idx_cane:
                cane_x_train.append([])
            else:
                cane_x_test.append([])
        for i in range(0, waist_data.shape[0]):
            if i < split_idx_waist:
                waist_x_train.append([])
            else:
                waist_x_test.append([])


    for i in range(0, waist_data.shape[0]):
        if i < split_idx_waist:
            waist_x_train[i].append([waist_data[i]])
            waist_y_train.append([6])
        else:
            waist_x_test[i-split_idx_waist].append([waist_data[i]])
            waist_y_test.append([6])

    for i in range(0, cane_data.shape[0]):
        if i < split_idx_cane:
            cane_x_train[i].append([cane_data[i]])
            cane_y_train.append([6])
        else:
            cane_x_test[i-split_idx_cane].append([cane_data[i]])
            cane_y_test.append([6])

final_data = []
for series in X_test:
    transposed = np.transpose(series)
    final_data.append(transposed)
final_data = np.array(final_data)
dim1 = final_data.shape[0]
dim2 = final_data.shape[1]
dim3 = final_data.shape[2]

X_test_reshaped = np.resize(final_data, (dim1, dim2, 1, dim3))


final_data = []
for series in X_train:
    transposed = np.transpose(series)
    final_data.append(transposed)
final_data = np.array(final_data)
dim1 = final_data.shape[0]
dim2 = final_data.shape[1]
dim3 = final_data.shape[2]

X_train_reshaped = np.resize(final_data, (dim1, dim2, 1, dim3))


cane_x_train = np.array(cane_x_train)
cane_x_test = np.concatenate((np.array(cane_x_test), X_test_reshaped))
waist_x_train = np.array(waist_x_train)
waist_x_test = np.concatenate((np.array(waist_x_test), X_test_reshaped))
cane_y_train = np.array(cane_y_train)
cane_y_test = np.concatenate((np.array(cane_y_test), y_test))
waist_y_train = np.array(waist_y_train)
waist_y_test = np.concatenate((np.array(waist_y_test), y_test))


In [16]:
class CaneDataset(torch.utils.data.Dataset):
    def __init__(self, data_array, labels_array, sequence_length, batch_size, num_channels):
        super(CaneDataset, self).__init__()

        self.sequence_length = sequence_length
        self.batch_size = batch_size
        self.num_channels = num_channels

        self.final_data = data_array
        self.final_labels = labels_array

    def __len__(self):
        # TODO return the number of unique sequences you have, not the number of characters.
        return self.final_data.shape[0]

    def __getitem__(self, idx):
        data = torch.from_numpy(self.final_data[idx])
        labels = torch.from_numpy(self.final_labels[idx])
        return data, labels


In [17]:
class WaistDataset(torch.utils.data.Dataset):
    def __init__(self, data_array, labels_array, sequence_length, batch_size, num_channels):
        super(WaistDataset, self).__init__()

        self.sequence_length = sequence_length
        self.batch_size = batch_size
        self.num_channels = num_channels

        self.final_data = data_array
        self.final_labels = labels_array

    def __len__(self):
        # TODO return the number of unique sequences you have, not the number of characters.
        return self.final_data.shape[0]

    def __getitem__(self, idx):
        data = torch.from_numpy(self.final_data[idx])
        labels = torch.torch.IntTensor(self.final_labels[idx])
        return data, labels


## Cane Dataset

In [None]:
train_cane_dataset = CaneDataset(cane_x_train, cane_y_train, 128, BATCH_SIZE, 9)
test_cane_dataset = CaneDataset(cane_x_test, cane_y_test, 128, BATCH_SIZE, 9)
print(cane_y_train)
model = MyNet1().to(device)
num_epochs = 20
train_loss, test_loss, test_accuracy, test_predictions, test_labels = training_loop(model, epochs=20, train_dataset=train_dataset, test_dataset=test_dataset)

# take same model and freeze the n-1 first layers' weights
print(model.parameters)
model.conv1.requires_grad = False
model.maxpool.requires_grad = False
model.conv2.requires_grad = False

train_loss1, test_loss1, test_accuracy1, test_predictions1, test_labels1 = training_loop(model, rate=.0005, epochs=num_epochs, train_dataset=train_cane_dataset, test_dataset=test_cane_dataset)

In [None]:
x = np.linspace(1, len(train_loss1), len(train_loss1))
train_y = [second for (first,second) in train_loss1]
test_loss_y = [second for (first,second) in test_loss1]
test_acc_y = [second for (first,second) in test_accuracy1]

plt.figure()
plt.plot(x, test_loss_y, label="Test loss")
plt.plot(x, train_y, label="Train loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Train and Test Loss")
plt.legend()
plt.savefig(DATASET_PATH + "images/cane_loss_lr0005.png")
plt.show()

plt.figure()
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Test Accuracy")
plt.plot(x, test_acc_y)
plt.savefig(DATASET_PATH + "images/cane_acc_lr0005.png")
plt.show()


In [None]:
print("")
print("Confusion Matrix:")
print("I'm here")

confusion_matrix = metrics.confusion_matrix(test_labels1, test_predictions1)
print(confusion_matrix)
normalised_confusion_matrix = np.array(confusion_matrix, dtype=np.float32)/np.sum(confusion_matrix)*100

print("")
print("Confusion matrix (normalised to % of total test data):")
print(normalised_confusion_matrix)

# Plot Results:
width = 12
height = 12
plt.figure(figsize=(width, height))
plt.imshow(
    normalised_confusion_matrix,
    interpolation='nearest',
    cmap=plt.cm.rainbow
)
plt.title("Confusion matrix \n(normalised to % of total test data)")
plt.colorbar()
tick_marks = np.arange(n_classes)
plt.xticks(tick_marks, LABELS, rotation=90)
plt.yticks(tick_marks, LABELS)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.savefig(DATASET_PATH + 'images/cane_rate_0005.png')
plt.show()

## Waist Dataset

In [None]:
train_dataset = HARDataset(X_train, y_train, 128, BATCH_SIZE, 9)
test_dataset = HARDataset(X_test, y_test, 128, BATCH_SIZE, 9)
train_waist_datset = WaistDataset(waist_x_train, waist_y_train, 128, BATCH_SIZE, 9)
test_waist_dataset = WaistDataset(waist_x_test, waist_y_test, 128, BATCH_SIZE, 9)
model2 = MyNet1().to(device)
num_epochs = 20
train_loss2, test_loss2, test_accuracy2, test_predictions2, test_labels2  = training_loop(model2, epochs=num_epochs, train_dataset=train_dataset, test_dataset=test_dataset)

# take same model and freeze the n-1 first layers' weights
model2.conv1.requires_grad = False
model2.maxpool.requires_grad = False
model2.conv2.requires_grad = False

train_loss3, test_loss3, test_accuracy3, test_predictions3, test_labels3  = training_loop(model2, weight=.01, epochs=num_epochs, train_dataset=train_waist_datset, test_dataset=test_waist_dataset)

In [None]:
x = np.linspace(1, len(train_loss3), len(train_loss3))
train_y = [second for (first,second) in train_loss3]
test_loss_y = [second for (first,second) in test_loss3]
test_acc_y = [second for (first,second) in test_accuracy3]

plt.figure()
plt.plot(x, test_loss_y, label="Test loss")
plt.plot(x, train_y, label="Train loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Train and Test Loss")
plt.legend()
plt.savefig(DATASET_PATH + "images/waist_loss_fc2.png")
plt.show()

plt.figure()
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Test Accuracy")
plt.plot(x, test_acc_y)
plt.savefig(DATASET_PATH + "images/waist_acc_fc2.png")
plt.show()


In [None]:
    print("")
print("Confusion Matrix:")
print("I'm here")

confusion_matrix = metrics.confusion_matrix(test_labels3, test_predictions3)
print(confusion_matrix)
normalised_confusion_matrix = np.array(confusion_matrix, dtype=np.float32)/np.sum(confusion_matrix)*100

print("")
print("Confusion matrix (normalised to % of total test data):")
print(normalised_confusion_matrix)

# Plot Results:
width = 12
height = 12
plt.figure(figsize=(width, height))
plt.imshow(
    normalised_confusion_matrix,
    interpolation='nearest',
    cmap=plt.cm.rainbow
)
plt.title("Confusion matrix \n(normalised to % of total test data)")
plt.colorbar()
tick_marks = np.arange(n_classes)
plt.xticks(tick_marks, LABELS, rotation=90)
plt.yticks(tick_marks, LABELS)
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.savefig(DATASET_PATH + 'images/waist_wd01.png')
plt.show()

# Visualize that dataset

Plot signals from the original data and our data

## Cane Dataset vs walking

In [None]:
x = np.linspace(1, 128, 128)
print(y_train[0])
final_data = []
for series in X_train:
        transposed = np.transpose(series)
        final_data.append(transposed)

to_plot = final_data[83]    # this is a walking sample

my_to_plot = np.resize(cane_x_train[50], (9, 128))
my_to_plot_waist = np.resize(waist_x_train[20], (9, 128))

titles = ["acc_x", "acc_y", "acc_z", "gyro_x", "gyro_y", "gyro_z", "total_acc_x", "total_acc_y", "total_acc_z"]
for i, title in enumerate(titles):
    y = to_plot[i]
    my_y = my_to_plot[i]
    my_y_waist = my_to_plot_waist[i]

    
    plt.figure()
    plt.plot(x, y)
    plt.xlabel("Sample number")
    plt.ylabel("Signal magnitude")
    plt.title(title)
    plt.savefig(DATASET_PATH + 'images/' + title + '.png')
    plt.show()

    plt.figure()
    plt.plot(x, my_y)
    plt.xlabel("Sample number")
    plt.ylabel("Signal magnitude")
    plt.title("Cane dataset " + title)
    plt.savefig(DATASET_PATH + 'images/cane_data_' + title + '.png')
    plt.show()


    plt.figure()
    plt.plot(x, my_y_waist)
    plt.xlabel("Sample number")
    plt.ylabel("Signal magnitude")
    plt.title("Waist dataset " + title)
    plt.savefig(DATASET_PATH + 'images/waist_data_' + title + '.png')
    plt.show()


## Waist Dataset vs walking

In [None]:
x = np.linspace(1, 128, 128)
print(y_train[0])
final_data = []
for series in X_train:
        transposed = np.transpose(series)
        final_data.append(transposed)

to_plot = final_data[83]    # this is a walking sample

my_to_plot_waist = np.resize(waist_x_train[20], (9, 128))
print(my_to_plot_waist)
titles = ["acc_x", "acc_y", "acc_z", "gyro_x", "gyro_y", "gyro_z", "total_acc_x", "total_acc_y", "total_acc_z"]
for i, title in enumerate(titles):
    y = to_plot[i]
    my_y_waist = my_to_plot_waist[i]
    
    plt.figure()
    plt.plot(x, y)
    plt.xlabel("Sample number")
    plt.ylabel("Signal magnitude")
    plt.title(title)
    plt.show()

    plt.figure()
    plt.plot(x, my_y_waist)
    plt.xlabel("Sample number")
    plt.ylabel("Signal magnitude")
    plt.title("Waist dataset " + title)
    plt.show()
