<a href="https://colab.research.google.com/github/mkelb99/517-Project/blob/main/517Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Part 1) 

In [None]:
#mount the drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#Solutions code by Matt Mender, W-2021 used in this project

#begin with library imports
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import scipy.io as sio #allows for importing of .mat files 
import numpy
#add optimizer library to given imports
import torch.optim as optm

from torch.utils.data import DataLoader, sampler, TensorDataset 
import torch.nn.functional as F

from torch.autograd import Variable

In [None]:
# Import your data here
rootDir = '/content/drive/Shareddrives/517_Project/'
fn = 'contdata95.mat'


dtype = torch.float
conv_size = 3 # size of time history

# load the mat file
mat = sio.loadmat(rootDir+fn)

# Get each variable from the mat file
X = torch.tensor(mat['Y']) # This is switching Y in the mat file to X in our code - we programmed this with X as the neural data but contdata95.mat has Y as neural data.
y = torch.tensor(mat['X'])[:,0:4]

nsamp = X.shape[0]
ntrain = int(numpy.round(nsamp*0.8)) # using 80% of data for training

X_train = X[0:ntrain,:].to(dtype)
X_test = X[ntrain+1:,:].to(dtype)
y_train = y[0:ntrain,:].to(dtype)
y_test = y[ntrain+1:,:].to(dtype)


# Initialize tensor with conv_size*nfeatures features
X_ctrain = torch.zeros((int(X_train.shape[0]), int(X_train.shape[1]*conv_size)), dtype=dtype)
X_ctest = torch.zeros((int(X_test.shape[0]), int(X_test.shape[1]*conv_size)), dtype=dtype)
X_ctrain[:,0:X_train.shape[1]] = X_train
X_ctest[:,0:X_test.shape[1]] = X_test

# Add the previous 3 time bins features as a feature in the current time bin
for k1 in range(conv_size-1):
    k = k1+1
    X_ctrain[k:, int(X_train.shape[1]*k):int(X_train.shape[1]*(k+1))] = X_train[0:-k, :]
    X_ctest[k:, int(X_test.shape[1]*k):int(X_test.shape[1]*(k+1))] = X_test[0:-k, :]

# Create Dataset and dataloader
test_ds= TensorDataset(X_ctest, y_test)
train_ds = TensorDataset(X_ctrain, y_train)

#If a batch in BatchNorm only has 1 sample it wont work, so dropping the last in case that happens
train_dl = DataLoader(train_ds, batch_size=64, shuffle=True, drop_last=True)
test_dl = DataLoader(test_ds, batch_size=len(test_ds), shuffle=False, drop_last=True)

Part 2) Create the LSTM Architecture

In [None]:
class LSTM(nn.Module):

  def __init__(self, num_classes, input_size, hidden_size, num_layers):
    super(LSTM, self).__init__()

    self.num_classes = num_classes
    self.num_layers = num_layers
    self.input_size = input_size
    self.hidden_size = hidden_size

    self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
                        num_layers=num_layers, batch_first=True)
    
    self.fc = nn.Linear(hidden_size, num_classes)
  
  def forward(self, x):
    #h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
    
    #c_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
    h_0 = torch.zeros(num_layers, 64, hidden_size)
    c_0 = torch.zeros(num_layers, 64, hidden_size)

    # Propagate input through LSTM
    #ula, (h_out, _) = self.lstm(x, (h_0, c_0))
    
    #h_out = h_out.view(-1, self.hidden_size)
    
    #out = self.fc(h_out)

    output, (h_0, c_0) = self.lstm(x, (h_0, c_0))
    #output = self.fc(h_0)
    
    return output

In [None]:
#try creating architecture a different way here

Part 3) Train the LSTM

In [None]:
n_epochs = 2000
learning_rate = 1e-5
weight_decay = 1e-2

input_size = 285
hidden_size = 256
num_layers = 1

num_classes = 4 

lstm = LSTM(num_classes, input_size, hidden_size, num_layers)
loss_fn=nn.MSELoss()
opt=optm.Adam(lstm.parameters(), lr=learning_rate, weight_decay=weight_decay)

In [None]:
def myfit(epochs, nntofit, loss_fn, opt, train_dl, val_dl, print_every=1):
    train_loss = torch.zeros(epochs * len(train_dl) , dtype=torch.float) # train error for every iteration
    validation_loss = torch.zeros(epochs , dtype=torch.float) # validation is only once per epoch
    i = -1 # iteration number
    for epoch in range(n_epochs):
        for x,y in train_dl: # batch of training points
          i += 1
          # Set model in train mode (for batch normalization and dropout)
          nntofit.train()
          #predicted outputs from model
          yh = lstm(x)
          # obtain the loss function
          loss = loss_fn(yh,y)
          loss.backward()
          train_loss[i] = loss.item()
          opt.step()
          opt.zero_grad()
          
        for xval,yval in val_dl:
          with torch.no_grad(): # disable gradient calculation
            nntofit.eval() # set model to evaluation mode (matters for batch normalization and dropout)
            loss2 = loss_fn(nntofit(xval),yval)
            validation_loss[epoch] = loss2.item()
    return train_loss, validation_loss

Part 4) Evaluate the Network

In [None]:
train_loss, validation_loss = myfit(n_epochs, lstm, loss_fn, opt, train_dl, test_dl)
# Plot training and validation losses
plot_epochs = n_epochs

val_iters = numpy.arange(0,n_epochs)*len(train_dl)
train_iters = numpy.arange(0,len(train_dl)*n_epochs)
n_iter = len(train_dl) * n_epochs # number of batches per epoch * number of epochs 

plt.plot(train_iters[0:plot_epochs*len(train_dl)], train_loss[0:plot_epochs*len(train_dl)], 'b')
plt.plot(val_iters[0:plot_epochs], validation_loss[0:plot_epochs], 'r')
plt.xlabel('Number of iterations')
plt.ylabel('MSE')
plt.show()

RuntimeError: ignored

In [None]:
# Plot some example decodes
for x,y in test_dl:
    with torch.no_grad():
        yh = neuralnet(x)
        #looking at select channel
    th = numpy.arange(0, x.shape[0])*50e-3
    
    plt.subplot(2,1,1)
    plt.plot(th[1000:1500], y[1000:1500,0],'b')
    plt.plot(th[1000:1500],yh[1000:1500,0].detach().numpy(), 'r')
    #plt.xlabel('sec')
    plt.ylabel('X Position')
    
    plt.subplot(2,1,2)
    plt.plot(th[1000:1500], y[1000:1500,1], 'b')
    plt.plot(th[1000:1500],yh[1000:1500,1].detach().numpy(), 'r')
    plt.xlabel('Time (seconds)')
    plt.ylabel('Y Position')
    
    plt.show()
    
    r = numpy.corrcoef(yh.detach().numpy().T,y.T)
    r = numpy.diag(r[4:,0:4])
    print('Correlation for X position is %g' % r[0])
    print('Correlation for Y position is %g' % r[1])
    print('Correlation for X velocity is %g' % r[2])
    print('Correlation for Y velocity is %g' % r[3])
    print('Average Correlation: %g' % numpy.mean(r))
