In [3]:
#Setup

# Install Packages
import sys
!{sys.executable} -m pip install numpy --quiet
!{sys.executable} -m pip install matplotlib --quiet
!{sys.executable} -m pip install seaborn --quiet
!{sys.executable} -m pip install sklearn --quiet
!{sys.executable} -m pip install torch --quiet

from sklearn import preprocessing
import os, requests
from matplotlib import rcParams 
from matplotlib import pyplot as plt
import numpy as np
import seaborn as sns
import pandas as pd
from sklearn.decomposition import PCA
from sklearn import datasets, linear_model
from sklearn.metrics import mean_squared_error, r2_score
import math
from sklearn.linear_model import LinearRegression
import torch
from torch.autograd import Variable
import torch.nn as nn

# Data Retrieval

fname = 'joystick_track.npz'
url = "https://osf.io/6jncm/download"

print('Is cuda?', torch.cuda.is_available())

if not os.path.isfile(fname):
  try:
    r = requests.get(url)
  except requests.ConnectionError:
    print("!!! Failed to download data !!!")
  else:
    if r.status_code != requests.codes.ok:
      print("!!! Failed to download data !!!")
    else:
      with open(fname, "wb") as fid:
        fid.write(r.content)
# Import matplotlib and set styling
rcParams['figure.figsize'] = [20, 4]
rcParams['font.size'] =15
rcParams['axes.spines.top'] = False
rcParams['axes.spines.right'] = False
rcParams['figure.autolayout'] = True

colourmap_diverge = sns.diverging_palette(321, 172, s=100, n=100, center = "light", as_cmap=True)
colourmap = sns.color_palette("rocket", as_cmap=True)
colourmap = sns.light_palette("#30887c", as_cmap=True)
colourmap_diverge.set_bad("black", alpha=0)
colourmap.set_bad("black", alpha=0)

# Data Loading
alldat = np.load(fname, allow_pickle=True)['dat']

# Select just one of the recordings here. This is subject 1, block 1.
dat = alldat[0, 3]

Is cuda? False


In [4]:
# Define normalisation of voltage channels

Vmax =  max((max(x) for x in dat['V']))
Vmin = min((min(x) for x in dat['V']))
xmax = max(dat['cursorX'])
xmin = min(dat['cursorX'])
ymax = max(dat['cursorY'])
ymin = min(dat['cursorY'])

def Vnormalise(_V):
    _V_norm = (_V - Vmin)/(Vmax - Vmin)
    return _V_norm

def normalise(_V):
    _V_norm = (_V - min(_V))/(max(_V) - min(_V))
    return _V_norm

def Xnormalise(_x):
    _x_norm = (_x - xmin)/(xmax - xmin)
    return _x_norm
    
def Xdenormalise(_x):
    _x_denorm = _x*(xmax - xmin) + xmin
    return _x_denorm

def Ynormalise(_y):
    _y_norm = (_y - ymin)/(ymax - ymin)
    return _y_norm
    
def Ydenormalise(_y):
    _y_denorm = _y*(ymax - ymin) + ymin
    return _y_denorm

  
def downsample(signal, factor):
  nbins = math.floor(nt/factor)
  signal_norm = np.zeros((nbins, 1))
  for ibin in range(nbins):
    binstart = ibin * 40
    binend = binstart + 40
    signal_norm[ibin, 0] = np.mean(signal[binstart:binend])

  return signal_norm


In [5]:
ds = True #downsample data?
select_channels = False #Use correlated channels only?


# Load patient 2
dat = alldat[0, 2]
corr_chan = sorted([5, 23, 17, 1, 25])

V = dat['V']
V = Vnormalise(V)
nt, nchan = V.shape

if select_channels:
  V = V[:, corr_chan]

cx = Xnormalise(dat['cursorX'].flatten()).reshape(-1,1)
cy = Ynormalise(dat['cursorY'].flatten()).reshape(-1,1)

print('number of samples', nt, 'number of channels', nchan)
print('V shape', V.shape)
print('cx, cy shape', cx.shape, cy.shape)

factor = 40 #cursor trajectory updates every 40 samples, hardware limitation
if downsample:
  nbins = math.floor(nt/factor)

  V_norm = np.zeros((nbins, nchan))
  for c in range(nchan):
    V_norm[:,c] = downsample(V[:,c], 40).flatten()
  V = V_norm
  cx = downsample(cx, 40)
  cy = downsample(cy, 40)

  print("post normalisation:")
  print('number of samples', nt, 'number of channels', nchan)
  print('V shape', V.shape)
  print('cx, cy shape', cx.shape, cy.shape)



number of samples 134360 number of channels 64
V shape (134360, 64)
cx, cy shape (134360, 1) (134360, 1)
post normalisation:
number of samples 134360 number of channels 64
V shape (3359, 64)
cx, cy shape (3359, 1) (3359, 1)


In [6]:
def split_sequences(input_sequences, output_sequence, n_steps_in, n_steps_out):
    X, y = list(), list() # instantiate X and y
    for i in range(len(input_sequences)):
        # find the end of the input, output sequence
        end_ix = i + n_steps_in
        out_end_ix = end_ix + n_steps_out - 1
        # check if we are beyond the dataset
        if out_end_ix > len(input_sequences): break
        # gather input and output of the pattern
        seq_x, seq_y = input_sequences[i:end_ix], output_sequence[end_ix-1:out_end_ix, -1]
        X.append(seq_x), y.append(seq_y)
    return np.array(X), np.array(y)

In [9]:
lookback = 200
lookahead = 25
X_ss, y_ss = split_sequences(V, cx, lookback, lookahead)
print(X_ss.shape, y_ss.shape)

total_samples = len(V)
train_test_cutoff = round(0.80 * total_samples)

X_train = X_ss[:train_test_cutoff]
X_test = X_ss[train_test_cutoff:]

y_train = y_ss[:train_test_cutoff]
y_test = y_ss[train_test_cutoff:] 

print("Training Shape:", X_train.shape, y_train.shape)
print("Testing Shape:", X_test.shape, y_test.shape) 
print("Test/Train cutoff", train_test_cutoff)
print("Total data points", total_samples)


(3136, 200, 64) (3136, 25)
Training Shape: (2687, 200, 64) (2687, 25)
Testing Shape: (449, 200, 64) (449, 25)
Test/Train cutoff 2687
Total data points 3359


In [10]:
# define model architecture
class LSTM(nn.Module):
    
    def __init__(self, num_classes, input_size, hidden_size, num_layers):
        super().__init__()
        self.num_classes = num_classes # output size
        self.num_layers = num_layers # number of recurrent layers in the lstm
        self.input_size = input_size # input size
        self.hidden_size = hidden_size # neurons in each lstm layer
        # LSTM model
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
                            num_layers=num_layers, batch_first=True, dropout=0.2) # lstm
        self.fc_1 =  nn.Linear(hidden_size, 100) # fully connected 
        self.fc_2 = nn.Linear(100, num_classes) # fully connected last layer
        self.relu = nn.ReLU()
        
    def forward(self,x):
        # hidden state
        h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
        # cell state
        c_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
        # propagate input through LSTM
        output, (hn, cn) = self.lstm(x, (h_0, c_0)) # (input, hidden, and internal state)
        hn = hn.view(-1, self.hidden_size) # reshaping the data for Dense layer next
        out = self.relu(hn)
        out = self.fc_1(out) # first dense
        out = self.relu(out) # relu
        out = self.fc_2(out) # final output
        return out

In [11]:
# define training
def training_loop(n_epochs, lstm, optimiser, loss_fn, X_train, y_train,
                  X_test, y_test):
    test_losses = []
    train_losses = []
    for epoch in range(n_epochs):
        lstm.train()
        outputs = lstm.forward(X_train) # forward pass
        optimiser.zero_grad() # calculate the gradient, manually setting to 0
        # obtain the loss function
        loss = loss_fn(outputs, y_train)
        loss.backward() # calculates the loss of the loss function
        optimiser.step() # improve from loss, i.e backprop
        # test loss
        lstm.eval()
        test_preds = lstm(X_test)
        test_loss = loss_fn(test_preds, y_test)
        train_losses.append(loss.item())
        test_losses.append(test_loss.item())
        if epoch % 10 == 0:
            print("Epoch: %d, train loss: %1.5f, test loss: %1.5f" % (epoch, 
                                                                      loss.item(), 
                                                                      test_loss.item())) 
    return train_losses, test_losses

In [12]:
# Instantiate model, set architecture
import warnings
warnings.filterwarnings('ignore')

n_epochs = 1000 # 2000 epochs
learning_rate = 0.001 # 0.001 lr

input_size = nchan # number of features
hidden_size = 10 # number of features in hidden state
num_layers = 1 # number of stacked lstm layers

num_classes = lookahead # number of output classes 

lstm_x = LSTM(num_classes, 
              input_size, 
              hidden_size, 
              num_layers)

In [None]:
# Train model
loss_fn = torch.nn.MSELoss()    # mean-squared error for regression
optimiser = torch.optim.Adam(lstm_x.parameters(), lr=learning_rate) 

train_loss, test_loss = training_loop(n_epochs=n_epochs,
              lstm=lstm_x,
              optimiser=optimiser,
              loss_fn=loss_fn,
              X_train=X_train_tensors_final,
              y_train=y_train_tensors,
              X_test=X_test_tensors_final,
              y_test=y_test_tensors)



In [None]:
# Define plotting functions

def plot_prediction(model, inputs, targets):

  df_X_ss = inputs
  df_y_mm = targets
  # split the sequence
  X_ss, y_ss = split_sequences(inputs, targets, lookback, lookahead)

  # converting to tensors
  df_X_ss = Variable(torch.Tensor(X_ss))
  df_y_mm = Variable(torch.Tensor(y_ss))
  # reshaping the dataset
  df_X_ss = torch.reshape(df_X_ss, (df_X_ss.shape[0], lookback, df_X_ss.shape[2]))

  train_predict = model(df_X_ss) # forward pass
  data_predict = train_predict.data.numpy() # numpy conversion
  dataY_plot = df_y_mm.data.numpy()


  true, preds = [], []
  for i in range(len(dataY_plot)):
      true.append(dataY_plot[i][0])
  for i in range(len(data_predict)):
      preds.append(data_predict[i][0])
  plt.figure(figsize=(10,6)) #plotting
  plt.axvline(x=train_test_cutoff, c='r', linestyle='--') # size of the training set

  plt.plot(true, label='Actual Data') # actual plot
  plt.plot(preds, label='Predicted Data') # predicted plot
  plt.title('Time-Series Prediction')
  plt.legend()
  plt.savefig("whole_plot.png", dpi=300)
  plt.show() 

  plt.figure()
  plt.plot(true, label='Actual Data') # actual plot
  plt.plot(preds, label='Predicted Data') # predicted plot
  plt.title('Time-Series Prediction')
  plt.legend()
  plt.savefig("whole_plot.png", dpi=300)
  plt.xlim(left = train_test_cutoff - train_test_cutoff*0/1)
  plt.axvline(x=train_test_cutoff, c='r', linestyle='--') # size of the training set
  plt.show() 

  print("test/train boundary:", train_test_cutoff)
  print("test/train proportion:", train_test_cutoff/nt)
  rms_test = rms(np.array(true), np.array(preds))
  print(rms_test)


def plot_losses(train_loss, test_loss):
  plt.plot(train_loss, label="Train Losses")
  plt.plot(test_loss, label="Test Losses")
  plt.legend

In [13]:
# Global metric to compare models

#Define common metric between models
def rms(truth, prediction):
    _diff = truth - prediction
    _diff_flat = _diff.flatten()
    _rms = np.sqrt(np.mean(_diff_flat**2))
    return _rms

In [14]:
# Plot model predictions

plot_prediction(lstm_x, V, cx)
plot_losses(train_loss, test_loss)

In [None]:
# Repeat above for cursorY

X_ss, y_ss = split_sequences(V, cy, lookback, lookahead)
print(X_ss.shape, y_ss.shape)

X_train = X_ss[:train_test_cutoff]
X_test = X_ss[train_test_cutoff:]

y_train = y_ss[:train_test_cutoff]
y_test = y_ss[train_test_cutoff:] 

X_train_tensors = Variable(torch.Tensor(X_train))
X_test_tensors = Variable(torch.Tensor(X_test))

y_train_tensors = Variable(torch.Tensor(y_train))
y_test_tensors = Variable(torch.Tensor(y_test))

X_train_tensors_final = torch.reshape(X_train_tensors,   
                                      (X_train_tensors.shape[0], lookback, 
                                       X_train_tensors.shape[2]))
X_test_tensors_final = torch.reshape(X_test_tensors,  
                                     (X_test_tensors.shape[0], lookback, 
                                      X_test_tensors.shape[2])) 

#Defined as x model
lstm_y = LSTM(num_classes, 
              input_size, 
              hidden_size, 
              num_layers)

loss_fn = torch.nn.MSELoss()    # mean-squared error for regression
optimiser = torch.optim.Adam(lstm_y.parameters(), lr=learning_rate) 

train_loss, test_loss = training_loop(n_epochs=n_epochs,
              lstm=lstm_y,
              optimiser=optimiser,
              loss_fn=loss_fn,
              X_train=X_train_tensors_final,
              y_train=y_train_tensors,
              X_test=X_test_tensors_final,
              y_test=y_test_tensors)



In [None]:
# Plot y coord 