In [11]:
import numpy as np
import torch
from torch import nn
import sklearn.metrics as metrics
import torch.nn.functional as F

In [7]:
n_classes = 6

# Define function to generate batches of a particular size

def extract_batch_size(_train, step, batch_size):
    shape = list(_train.shape)
    shape[0] = batch_size
    batch = np.empty(shape)

    for i in range(batch_size):
        index = ((step - 1) * batch_size + i) % len(_train)
        batch[i] = _train[index]

    return batch


# %%

# Define to function to create one-hot encoding of output labels

def one_hot_vector(y_, n_classes=n_classes):
    # e.g.:
    # one_hot(y_=[[2], [0], [5]], n_classes=6):
    #     return [[0, 0, 1, 0, 0, 0], [1, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 1]]

    y_ = y_.reshape(len(y_))
    return np.eye(n_classes)[np.array(y_, dtype=np.int32)]  # Returns FLOATS


In [8]:
# Load "X" (the neural network's training and testing inputs)

def load_X(X_signals_paths):
    X_signals = []

    for signal_type_path in X_signals_paths:
        file = open(signal_type_path, 'r')
        # Read dataset from disk, dealing with text files' syntax
        X_signals.append(
            [np.array(serie, dtype=np.float32) for serie in [
                row.replace('  ', ' ').strip().split(' ') for row in file
            ]]
        )
        file.close()

    return np.transpose(np.array(X_signals), (1, 2, 0))

# Load "y" (the neural network's training and testing outputs)

def load_y(y_path):
    file = open(y_path, 'r')
    # Read dataset from disk, dealing with text file's syntax
    y_ = np.array(
        [elem for elem in [
            row.replace('  ', ' ').strip().split(' ') for row in file
        ]],
        dtype=np.int32
    )
    file.close()

    # Substract 1 to each output class for friendly 0-based indexing
    return y_ - 1

In [9]:
def train(net, X_train, y_train, X_test, y_test, epochs=100, lr=0.0025, weight_decay=0.0015):
    print("\n\n********** Running training! ************\n\n")
    opt = torch.optim.Adam(net.parameters(), lr=lr, weight_decay=weight_decay)
    criterion = nn.CrossEntropyLoss()

    #if (train_on_gpu):
    if (torch.cuda.is_available() ):
        net.cuda()

    train_losses = []
    # results = np.empty([0, 5], dtype=np.float32)
    net.train()

    for epoch in range(epochs):
        train_losses = []
        step = 1
        batch_size = 64

        h = net.init_hidden(batch_size)

        train_len = len(X_train)

        while step * batch_size <= train_len:
            batch_xs = extract_batch_size(X_train, step, batch_size)
            # batch_ys = one_hot_vector(extract_batch_size(y_train, step, batch_size))
            batch_ys = extract_batch_size(y_train, step, batch_size)

            inputs, targets = torch.from_numpy(batch_xs), torch.from_numpy(batch_ys.flatten('F'))
            #if (train_on_gpu):
            if (torch.cuda.is_available() ):
                inputs, targets = inputs.cuda(), targets.cuda()

            h = tuple([each.data for each in h])
            opt.zero_grad()

            output, h = net(inputs.float(), h)
            # print("lenght of inputs is {} and target value is {}".format(inputs.size(), targets.size()))
            train_loss = criterion(output, targets.long())
            train_losses.append(train_loss.item())

            train_loss.backward()
            opt.step()
            step += 1

        train_loss_avg = np.mean(train_losses)
        print("Epoch: {}/{}...".format(epoch + 1, epochs),
              "Train Loss: {:.4f}".format(train_loss_avg))
        test_loss, test_f1score, test_accuracy = test(net, X_test, y_test, criterion)
        if ((epoch+1) % 10 == 0):
            print("Epoch: {}/{}...".format(epoch + 1, epochs),
                  ' ' * 16 + "Test Loss: {:.4f}...".format(test_loss),
                  "Test   accuracy: {:.4f}...".format(test_accuracy),
                  "Test F1: {:.4f}...".format(test_f1score))


In [10]:
def test(net, X_test, y_test, criterion):


    
    net.eval()
    test_losses = []
    test_len = len(X_test)
    test_batch = 64
    test_h = net.init_hidden(test_batch)
    test_accuracy = 0
    test_f1score = 0
    step = 1

    while step*test_batch <= test_len:
        batch_xs = extract_batch_size(X_test, step, test_batch)
        batch_ys = extract_batch_size(y_test, step, test_batch)

        inputs, targets = torch.from_numpy(batch_xs), torch.from_numpy(batch_ys.flatten('F'))
        #if (train_on_gpu):
        if (torch.cuda.is_available() ):
            inputs, targets = inputs.cuda(), targets.cuda()

        test_h = tuple([each.data for each in test_h])
        #print("Size of inputs is: {}".format(X_test.shape))
        output, test_h = net(inputs.float(), test_h)
        test_loss = criterion(output, targets.long())
        test_losses.append(test_loss.item())

        top_p, top_class = output.topk(1, dim=1)
        equals = top_class == targets.view(*top_class.shape).long()
        #print("\nDebugging here")
        #print(top_class.shape)
        #print(output.shape)
        #print(test_batch)
        test_accuracy += torch.mean(equals.type(torch.FloatTensor))
        #print(test_accuracy)
        test_f1score += metrics.f1_score(top_class.cpu(), targets.view(*top_class.shape).long().cpu(), average='macro')
        step += 1

    test_loss_avg = np.mean(test_losses)
    test_f1_avg = test_f1score/(step-1)
    test_accuracy_avg = test_accuracy/(step-1)

    net.train()

    return test_loss_avg, test_f1_avg, test_accuracy_avg


In [17]:
# Useful Constants

# Those are separate normalised input features for the neural network
INPUT_SIGNAL_TYPES = [
    "body_acc_x_",
    "body_acc_y_",
    "body_acc_z_",
    "body_gyro_x_",
    "body_gyro_y_",
    "body_gyro_z_",
    "total_acc_x_",
    "total_acc_y_",
    "total_acc_z_"
]

# Output classes to learn how to classify
LABELS = [
    "WALKING",
    "WALKING_UPSTAIRS",
    "WALKING_DOWNSTAIRS",
    "SITTING",
    "STANDING",
    "LAYING"
]

TRAIN = "train/"
TEST = "test/"
DATASET_PATH = ""

X_train_signals_paths = [
    DATASET_PATH + TRAIN + "Inertial Signals/" + signal + "train.txt" for signal in INPUT_SIGNAL_TYPES
]
X_test_signals_paths = [
    DATASET_PATH + TEST + "Inertial Signals/" + signal + "test.txt" for signal in INPUT_SIGNAL_TYPES
]

y_train_path = DATASET_PATH + TRAIN + "y_train.txt"
y_test_path = DATASET_PATH + TEST + "y_test.txt"

# LSTM Neural Network's internal structure

n_hidden = 32  # Hidden layer num of features
n_classes = 6  # Total classes (should go up, or should go down)

# Training

learning_rate = 0.0025
lambda_loss_amount = 0.0015
#training_iters = training_data_count * 300  # Loop 300 times on the dataset
BATCH_SIZE = 1500
display_iter = 30000  # To show test set accuracy during training

# check if GPU is available

#train_on_gpu = torch.cuda.is_available()
if (torch.cuda.is_available() ):
    print('Training on GPU')
else:
    print('GPU not available! Training on CPU. Try to keep n_epochs very small')


GPU not available! Training on CPU. Try to keep n_epochs very small


In [18]:
X_train = load_X(X_train_signals_paths)
X_test = load_X(X_test_signals_paths)

y_train = load_y(y_train_path)
y_test = load_y(y_test_path)

# Input Data

training_data_count = len(X_train)  # 7352 training series (with 50% overlap between each serie)
test_data_count = len(X_test)  # 2947 testing series
n_steps = len(X_train[0])  # 128 timesteps per series
n_input = len(X_train[0][0])  # 9 input parameters per timestep

# Some debugging info

print("Some useful info to get an insight on dataset's shape and normalisation:")
print("(X shape, y shape, every X's mean, every X's standard deviation)")
print(X_test.shape, y_test.shape, np.mean(X_test), np.std(X_test))
print("The dataset is therefore properly normalised, as expected, but not yet one-hot encoded.")

Some useful info to get an insight on dataset's shape and normalisation:
(X shape, y shape, every X's mean, every X's standard deviation)
(2947, 128, 9) (2947, 1) 0.09913992 0.39567086
The dataset is therefore properly normalised, as expected, but not yet one-hot encoded.


In [89]:
n_classes = 6
n_input = 9
n_hidden = 32

import warnings
warnings.filterwarnings('ignore')

import torch
from torch import nn
import torch.nn.functional as F

n_classes = 6
n_input = 9
n_hidden = 32

n_classes = 6
n_input = 9
n_hidden = 32


class BiDirResidual_LSTMModel(nn.Module):

    def __init__(self, n_input=n_input, n_hidden=n_hidden, n_layers=2,
                 n_classes=n_classes, drop_prob=0.5):
        super(BiDirResidual_LSTMModel, self).__init__()

        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.n_classes = n_classes
        self.drop_prob = drop_prob
        self.n_input = n_input

        self.relu1 = nn.Sequential(
            nn.Linear(n_input, n_input),
            nn.ReLU()
        )
        self.relu2 = nn.Sequential(
            nn.Linear(n_hidden, n_hidden),
            nn.ReLU()
        )
        self.lstm = nn.LSTM(n_input, n_hidden, n_layers, dropout=self.drop_prob)
        self.bidir_lstm1 = nn.LSTM(n_input, int(n_hidden / 2), n_layers, bidirectional=True, dropout=self.drop_prob)
        self.bidir_lstm2 = nn.LSTM(n_hidden, int(n_hidden / 2), n_layers, bidirectional=True, dropout=self.drop_prob)
        self.fc = nn.Linear(n_hidden, n_classes)
        self.BatchNorm = nn.BatchNorm1d(64)
        self.dropout = nn.Dropout(drop_prob)

    def add_residual_component(self, layer1, layer2):
        return torch.add(layer1, layer2)

    def make_residual_layer(self, input_layer, hidden, first=False):
        if first:
            mid_layer, hidden_layer1 = self.bidir_lstm1(input_layer, hidden)
        else:
            mid_layer, hidden_layer1 = self.bidir_lstm2(input_layer, hidden)

        mid_layer = self.relu2(mid_layer)
        output_layer, hidden_layer2 = self.bidir_lstm2(mid_layer, hidden)
        output_layer = self.relu2(output_layer)

        output = self.add_residual_component(mid_layer, output_layer)
        #output = self.BatchNorm(output)
        return output

    def forward(self, x, hidden):
        #print("Shape of input is: {}".format(x.shape))
        x = x.permute(1, 0, 2)

        x = self.relu1(x)
        x = self.make_residual_layer(x, hidden, True)
        x = self.make_residual_layer(x, hidden, False)
        x = self.dropout(x)
        #print("Shape of x is: {}".format(x.shape))
        out = x[-1]
        out = out.contiguous().view(-1, self.n_hidden)
        out = self.fc(out)
        out = F.softmax(out)
        #print("Shape of out is: {}".format(out.shape))

        return out, hidden

    def init_hidden(self, batch_size):
        ''' Initialize hidden state'''
        # Create two new tensors with sizes n_layers x batch_size x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data

        # if (train_on_gpu):
        if (torch.cuda.is_available()):
            hidden = (weight.new(2 * self.n_layers, batch_size, int(self.n_hidden / 2)).zero_().cuda(),
                      weight.new(2 * self.n_layers, batch_size, int(self.n_hidden / 2)).zero_().cuda())
        else:
            hidden = (weight.new(2 * self.n_layers, batch_size, int(self.n_hidden / 2)).zero_(),
                      weight.new(2 * self.n_layers, batch_size, int(self.n_hidden / 2)).zero_())

        return hidden


def init_weights(m):
    if type(m) == nn.LSTM:
        for name, param in m.named_parameters():
            if 'weight_ih' in name:
                torch.nn.init.orthogonal_(param.data)
            elif 'weight_hh' in name:
                torch.nn.init.orthogonal_(param.data)
            elif 'bias' in name:
                param.data.fill_(0)
    elif type(m) == nn.Linear:
        torch.nn.init.orthogonal_(m.weight)
        m.bias.data.fill_(0)


In [90]:
net = BiDirResidual_LSTMModel()
net.apply(init_weights)

BiDirResidual_LSTMModel(
  (relu1): Sequential(
    (0): Linear(in_features=9, out_features=9, bias=True)
    (1): ReLU()
  )
  (relu2): Sequential(
    (0): Linear(in_features=32, out_features=32, bias=True)
    (1): ReLU()
  )
  (relu3): Sequential(
    (0): Linear(in_features=32, out_features=6, bias=True)
    (1): ReLU()
  )
  (lstm): LSTM(9, 32, num_layers=2, dropout=0.5)
  (bidir_lstm1): LSTM(9, 16, num_layers=2, dropout=0.5, bidirectional=True)
  (bidir_lstm2): LSTM(32, 16, num_layers=2, dropout=0.5, bidirectional=True)
  (fc): Linear(in_features=32, out_features=6, bias=True)
  (BatchNorm): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (dropout): Dropout(p=0.5, inplace=False)
)

In [91]:
train(net.float(), X_train, y_train, X_test, y_test)



********** Running training! ************


Shape of input is: torch.Size([64, 128, 9])
Shape of x is: torch.Size([128, 64, 32])
Shape of out is: torch.Size([64, 6])




Shape of input is: torch.Size([64, 128, 9])
Shape of x is: torch.Size([128, 64, 32])
Shape of out is: torch.Size([64, 6])
Shape of input is: torch.Size([64, 128, 9])
Shape of x is: torch.Size([128, 64, 32])
Shape of out is: torch.Size([64, 6])
Shape of input is: torch.Size([64, 128, 9])
Shape of x is: torch.Size([128, 64, 32])
Shape of out is: torch.Size([64, 6])
Shape of input is: torch.Size([64, 128, 9])
Shape of x is: torch.Size([128, 64, 32])
Shape of out is: torch.Size([64, 6])


KeyboardInterrupt: 

In [15]:
def main():

    X_train = load_X(X_train_signals_paths)
    X_test = load_X(X_test_signals_paths)

    y_train = load_y(y_train_path)
    y_test = load_y(y_test_path)

    # Input Data

    training_data_count = len(X_train)  # 7352 training series (with 50% overlap between each serie)
    test_data_count = len(X_test)  # 2947 testing series
    n_steps = len(X_train[0])  # 128 timesteps per series
    n_input = len(X_train[0][0])  # 9 input parameters per timestep


    # Some debugging info

    print("Some useful info to get an insight on dataset's shape and normalisation:")
    print("(X shape, y shape, every X's mean, every X's standard deviation)")
    print(X_test.shape, y_test.shape, np.mean(X_test), np.std(X_test))
    print("The dataset is therefore properly normalised, as expected, but not yet one-hot encoded.")

    net = LSTMModel()
    net.apply(init_weights)
    train(net.float(), X_train, y_train, X_test, y_test)



********** Running training! ************


Shape of x is: torch.Size([128, 64, 64])
Shape of out is: torch.Size([128, 6])




ValueError: Expected input batch_size (128) to match target batch_size (64).