<a href="https://colab.research.google.com/github/rotom303/Final_Project/blob/main/PreProcessingFinal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Finger Flexion Final Project
Developed by

Alexander Byrd, Aakash Jajoo, Chaoyi Cheng



# Project Setup

In [58]:
#Set up the notebook environment
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pickle
from scipy.stats import pearsonr
from scipy import signal as sig
from scipy.io import loadmat, savemat
from scipy.fft import fft, fftfreq
import sklearn
from numpy.linalg import inv

import random
from google.colab import drive

from google.colab import auth
import gspread
from google.auth import default

!pip install deepdiff
from deepdiff import DeepDiff


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


**File Directory:**


In [59]:
drive.mount('/content/drive')
%cd /content/drive/MyDrive/Brain_Computer_Interfaces/Final_Project/

# auth.authenticate_user()
# creds, _ = default()
# gc = gspread.authorize(creds)

proj_data = loadmat('raw_training_data.mat')
leaderboard_data = loadmat('leaderboard_data.mat')
fs = 1000 # Sampling frequency of the signals
numPatients = 3 # This is just to increase reusability.

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/Brain_Computer_Interfaces/Final_Project


# Filter Design
This is where the filter applied to the raw ECoG data is defined.

In [226]:
"""
Filter parameters:

fc_passband: a list in form [f1,f2]. f1 and f2 are the corner frequencies of
  a bandpass filter (-3dB attenuation at f1 & f2). Frequencies in between f1 and 
  f2 are kept (called the pass band) while others outside that range are 
  attenuated (called the rejection band). Units = Hz 

order: The order of the bandpass filter used. Increasing the order of a filter
  makes the transition between the pass band and the rejection band sharper. In
  the case of a Butterworth filter, increasing order means the pass band stays
  flat closer to f1 and f2. Increasing the order too much will reduce filter 
  stability and can result in ripples in the pass band or other unpredictable
  behaviour. 4th to 6th Order filters tend to be the best compromise.  

applyNotch: a Boolean for whether or not to apply a notch filter. A Notch filter
  Removes noise at precise frequencies. It is useful for removing artifacts
  from power supplies, which are usually at harmonics of 60Hz.   

f_notch: Selects the frequencies which are removed with a Notch Filter. It must
  be a list, even if it just has 1 element. Units = Hz.

Q: is the quality factor of the notch filter. A low Q will also attenuate
  frequencies near f_notch. A high Q will make the notch filter more precise, 
  but it will not attenuate f_notch as much. 
"""
fc_passband = [75,200]
order = 4
applyNotch = True 
f_notch = [60,120]; 
Q = 50

def apply_filter(raw_signal):
  """
  Input: 
    raw_signal (samples x channels): the raw signal
  Output: 
    clean_data (samples x channels): the filtered signal
  """
  number_of_channels = np.shape(raw_signal)[1] #number of channels
  filteredData = np.zeros(np.shape(raw_signal)); #filtered data output

  # Bandpass Butterworth filter 
  sos = sig.butter(order, fc_passband, 'bandpass', analog=False, fs=fs, output='sos'); # returns filter coefficients
  b_notch = []; a_notch = []
  for f_remove in f_notch:
    b, a = sig.iirnotch(f_remove,Q,fs=fs)
    b_notch.append(b); a_notch.append(a)
  #for each channel
  for chanInd in np.arange(number_of_channels):
    # subtract mean from each datapoint
    currFilt = raw_signal[:, chanInd] - np.mean(raw_signal[:, chanInd]);
    if(applyNotch): 
      for i in range(len(b_notch)):
        currFilt = sig.filtfilt(b_notch[i],a_notch[i],currFilt)
    currFilt = sig.sosfiltfilt(sos, currFilt) # forward-backward digital filter using cascaded second-order sections                                        
    filteredData[:, chanInd] = currFilt
  return filteredData

# Feature Extraction
Here are the functions that generate the window that features are extracted from. The Features and their normalization (standardization in this case) are defined. The downsampling of the glove data to match the windows is also defined here. 

## Feature Class
Every feature is a child of this class. This class streamlines the process of adding features and how we take extract them from the ECoG data. We can easily change whether a feature is normalized (using standardization method) and whether it uses the raw or filtered ECoG data. 
This class will save the mean and standard deviation of the training data so it can easily standardize both training and testing data.

We can define every feature we consider here, but the only features that are extracted and used in the models will be the ones we instantiate as objects. Every object we instantiate is automatically added to the global list *featFns*, which keeps track of the features we are analyzing. 

In [227]:
global featFns;
featFns = [] # A list that stores all the Feature Objects used
featDict = dict() # a dict that stores each Feature Object. 
  # The dict keys are from its get_name method.

class Feature():
  def __init__(self,isFiltered=True, doNormalize=True):
    self.isFiltered = isFiltered # Whether or not the feature is extracted from the raw or filtered data.
    self.doNormalize = doNormalize # Whether or not to normalize this feature
    self.mean = 0 # mean
    self.std = 1 # standard deviation
    featFns.append(self)

  def __call__(self, signal_data):
    return signal_data

  def get_name(self,nameAddon=None):
    name = type(self).__name__
    if nameAddon is not None:
      name += nameAddon
    name_filter = 'Filtered'
    name_norm = 'Normalized'
    if not self.isFiltered: name_filter = 'not' + name_filter
    if not self.doNormalize: name_norm = 'not' + name_norm
    fullname = name + '_' + name_filter + '_' + name_norm
    featDict[fullname] = self
    return fullname

  def standardize_training(self,training_feat):
    if(self.doNormalize):
      self.mean = np.mean(training_feat)
      self.std = np.std(training_feat)
    return (training_feat - self.mean)/self.std

  def standardize_testing(self,testing_feat):
    return (testing_feat-self.mean) / self.std
  


## Feature Definitions
This is where the functions for the different features are defined. The functions themselves should be written in the __ call__(self, window) method.

Note that the functions are applied to one window in one channel at a time. 

In [228]:
class LineLength(Feature):
  def __call__(self,x):
    return np.sum(np.absolute(np.ediff1d(x)))

class Area(Feature):
  def __call__(self,x):
    return np.sum(np.absolute(x))

class Energy(Feature):
  def __call__(self,x):
    return np.sum(np.square(x))

class ZeroCrossings(Feature):
  def __call__(self,x):
    return np.size(np.nonzero(np.ediff1d(np.sign(x-np.mean(x)))))

class Mean(Feature):
  def __call__(self,x):
    return np.mean(x)

class FreqBand(Feature):
  def __init__(self,f_low,f_high,isFiltered=False,doNormalize=True):
    Feature.__init__(self,isFiltered,doNormalize)
    self.f_low = f_low
    self.f_high = f_high
  
  def __call__(self,signal):
    freq_response = fft(signal)
    N = len(freq_response)
    n = np.arange(N)
    T = N/fs #sampling rate=1000
    freq = n/T 
    power_spectrum = np.abs(freq_response)
    # Find values in frequency vector corresponding to input band
    index_band = np.logical_and(freq >= self.f_low, freq <= self.f_high)
    #average frequency domain magnitude
    avg_mag = np.mean(power_spectrum[index_band])
    return avg_mag

  def get_name(self):
    nameAddon = str(self.f_low) + 'to' + str(self.f_high)
    return Feature.get_name(self,nameAddon)

## Getting Windowed Feats

In [229]:
def NumWins(x,winLen,winDisp,fs=1000):
  """
    Calculates the number of possible full windows that can fit in x
    Inputs:
      x is the signal in the time domain. 
      fs is the sampling frequency of x. Hz.
      winLen is the length of windows. sec
      winDisp is the displacement between the start of each window. sec
  """
  x_duration = len(x)/fs # seconds.
  windows_fit = (x_duration - winLen + winDisp) / (winDisp)
  # default behaviour of int() is to floor float, so using round()
  return round(windows_fit)

def get_features(raw_window,filtered_window):
  """
    Input: 
      raw_window (window_samples x channels): the window of the unfiltered ecog signal 
      filtered_window (window_samples x channels): the window of the filtered ecog signal 
      

    Global Inputs: must be defined outside of the function
      featFns: a list containing the methods to apply as feats. 
    
    Output:s
      features (channels x num_features): the features calculated on each channel for the window
  """
  [window_samples,num_channels]=np.shape(raw_window)
  features = np.empty(num_channels*len(featFns))
  i = 0
  for feat in featFns:
    if feat.isFiltered: window = filtered_window
    else: window = raw_window
    for chn in range(num_channels):
      current_window = window[:,chn]
      features[num_channels*i+chn] = feat(current_window)
    i+=1
  return features

def get_windowed_feats(ecog_data, window_length, window_overlap):
  """
    Inputs:
      raw_eeg (samples x channels): the raw signal
      window_length: the window's length
      window_overlap: the window's overlap
    Output: 
      all_feats (num_windows x (channels x features)): the features for each channel for each time window
        note that this is a 2D array. 
  """
  [num_samples,num_channels]=np.shape(ecog_data)
  num_windows = NumWins(ecog_data, window_length,window_overlap, fs) 
  filtered_ecog = apply_filter(ecog_data)
  #convert everything to units of samples
  wLen=round(window_length*fs) #window length in samples
  wDisp=round(window_overlap*fs) #window displacement in samples
  data_feats = np.zeros((num_windows,num_channels*len(featFns))); # stores the features of the window
   
  rightmost = num_samples
  for i in range(num_windows):
    raw_window = ecog_data[rightmost-wLen:rightmost,:]
    filtered_window = filtered_ecog[rightmost-wLen:rightmost,:]
    data_feats[-1-i,:] = (get_features(raw_window,filtered_window).flatten())
    rightmost = rightmost - wDisp
  return data_feats


## Feature Normalization
We are using standardization to normalize every feature.
:

In [230]:
def standardize_training(feature_matrix):
  [windows_trn, feats_trn] = np.shape(feature_matrix)
  numChns = int(feats_trn/len(featFns))
  normFeats = np.empty((windows_trn,feats_trn))
  for i in range(len(featFns)):
    for j in range(numChns):
      column = i*numChns + j
      normFeats[:,column] = featFns[i].standardize_training(feature_matrix[:,column])
  return normFeats

def standardize_testing(feature_matrix):
  [windows_trn,feats_trn] = np.shape(feature_matrix)
  numChns = int(feats_trn/len(featFns))
  normFeats = np.empty((windows_trn,feats_trn))
  for i in range(len(featFns)):
    for j in range(numChns):
      column = i*numChns + j
      normFeats[:,column] = featFns[i].standardize_testing(feature_matrix[:,column])
  return normFeats

def standardize_both(train_feats, test_feats):
  normed_trn = standardize_training(train_feats)
  normed_tst = standardize_testing(test_feats)
  return normed_trn, normed_tst

## Response Matrix
We will be using a response matrix as the input to each of our learning algorithms. This is because it allows us to associate one window with the *N_wind* windows before it instead of treating each window as an independent case.   

In [231]:
def create_R_matrix(features, N_wind):
  """ 
  Input:
    features (samples (number of windows in the signal) x channels x features): 
      the features you calculated using get_windowed_feats
    N_wind: number of windows to use in the R matrix

  Output:
    R (samples x (N_wind*channels*features))
  """
  features_appended = np.copy(features)
  for i in list(range(N_wind-2, -1, -1)):
      a = features[i]
      features_appended = np.vstack([a, features_appended])
  samples = len(features)   # number of rows = number of windows

  R = np.zeros((samples, 1+(N_wind*len(features[0,:]))))  # len(features[0,:]) = (num of features)*(num of channels)
  lst = np.array(list(range(1, 1+N_wind)))
  R[:, 0] = 1
  
  
  for i in range(len(features[0,:])):   # goes thru each column of the features matrix
    for j in range(len(lst)):
        x = lst[j]
        R[:, x] = features_appended[j : (len(features_appended)-(N_wind-1-j)), i]
    lst = lst + N_wind
  return R


## Downsampling The Glove Data
We find the features for each window in the ECoG data, and this is how we decide the value for the glove data in the same windows. The downsample methods defined below are analagous to feats, but applied to the windows in glove data instead of ECoG. 

Below are several functions used to downsample the data. Simple downsampling would just be taking the glove data at the startpoint or endpoint of the window. However, changing our downsampling method may increase our correlation coefficient. Several methods are defined below and the one that we use is defined later in the Define Paramters section.

In [232]:
def startpoint_downsample(finger_window):
  return finger_window[0]

def endpoint_downsample(finger_window):
  return finger_window[-1]

def max_downsample(finger_window):
  return np.max(finger_window)

def area_downsample(finger_window):
  return np.sum(np.absolute(finger_window))

In [233]:
def downsample_window(glove_window, downsample_method):
  """
    This function applies the chosen downsampling method to a window of the 
    glove. Returns a downsampled value for each channel (each finger). 
  """
  [num_samples,num_channels] = np.shape(glove_window)
  downsampled_glove = np.empty(num_channels)
  for chn in range(num_channels):
    current_window = glove_window[:,chn]
    downsampled_glove[chn] = downsample_method(current_window)
  return downsampled_glove

def downsample(glove_data, winLen, winDisp, downsample_method):
  """
    Creates an array of the downsampled glove data for each channel (finger).
    Inputs:
      glove_data = the glove data for finger flexion. Should have 5 channels.

    Returns: an array of size (windows x glove channels) that represents the 
      downsampled glove data for each window. 
  """
  [num_samples,num_channels] = np.shape(glove_data)
  num_windows = NumWins(glove_data, winLen,winDisp, fs)
  wLen = round(winLen*fs) #window length in samples
  wDisp = round(winDisp*fs) #window displacement in samples
  windowed_fingers = np.zeros((num_windows,num_channels)); # stores the features of the window
  rightmost = num_samples
  for i in range(num_windows):
    window = glove_data[rightmost-wLen:rightmost,:]
    windowed_fingers[-1-i,:] = (downsample_window(window, downsample_method).flatten())
    rightmost = rightmost - wDisp
  return windowed_fingers


# Define Parameters
*This 

## User-Defined Parameters
Below are the user defined parameters. All the parameters are defined here, except for the ones that have already been defined for the filter in the Filter Design section.

In [234]:
# Name of the file to write or read the data from for parameters and R matrices
filename = 'previousRun.pkl'
use_file_params = False 
# If use_file_params is True, then it will overwrite the parameters defined here

# Window Parameters
window_length = 100e-3  # seconds
window_displacement = 50e-3 #seconds
N_winds = 3 # Number of previous windows considered in Response matrix. 

# These Booleans are to make code clearer
uses_raw = False; uses_filtered = True 
not_normalized = False; normalized = True; 
# Different Features analyzed
featFns=[] # Empty list of featFns before defining feats for this runtime.
# line_length = LineLength(uses_filtered,normalized)
# area = Area(uses_filtered,normalized)
# zero_crossings = ZeroCrossings(uses_filtered,normalized)
energy = Energy(uses_filtered,normalized)
# freq_band_5_to_15 = FreqBand(5,15,uses_raw,normalized)
# freq_band_20_to_25 = FreqBand(20,25,uses_raw,normalized)
# freq_band_75_to_115 = FreqBand(75,115,uses_raw,normalized)
# freq_band_125_to_160 = FreqBand(125,160,uses_raw,normalized)
# freq_band_160_to_175 = FreqBand(160,175,uses_raw,normalized)
freq_band_8_to_12 = FreqBand(8,12,uses_filtered,normalized)
freq_band_75_to_95 = FreqBand(75,95,uses_filtered,normalized)
freq_band_96_to_115 = FreqBand(96,115,uses_filtered,normalized)
mean = Mean(uses_filtered,normalized)

# Training / Testing Split
training_fraction = 0.7 # What fraction of the samples

# Downsampling method for finger flexion in glove data.
downsample_method = startpoint_downsample


## Parameter Dictionary
Here we put all the parameters into a dictionary. This makes it easy to export them and compare them between trials. This dictionary will be compared to parameter dictionaries from previous trials, and if they use the same parameters, we will just load the previous feature and R matrices instead of calculating them all again. The saving and loading code is towards the end of the Preparing Data section.

In [235]:
global param_dict;
param_dict = dict()
# Filter Params
param_dict['fs'] = fs
param_dict['fc_passband'] = fc_passband 
param_dict['order'] = order
param_dict['applyNotch'] = applyNotch
param_dict['f_notch'] = f_notch
param_dict['Q'] = Q
# Window Parameters
param_dict['winLen'] = window_length
param_dict['winDisp'] = window_displacement
param_dict['N_winds'] = N_winds
# Feature Parameters
param_dict['featFns'] = []
for feat in featFns:
  param_dict['featFns'].append(feat.get_name())
# Training / Testing Split Parameter
param_dict['training_fraction'] = training_fraction

# Downsampling method for finger flexion in glove data.
param_dict['downsampling'] = downsample_method.__name__


# Data Preprocessing
Putting the data in a form that can be easily used for different learning algorithms. 

## Preprocessing Functions
These are functions for splitting the data into training and testing for validation purposes. Additionally, functions to save, load, and compare data files are defined here.

In [236]:
def split_data(data, train_fraction = training_fraction):
  """
    Inputs:
    data = a samples x channels array of data for one patient
    training_fraction = a number between 0 and 1 that represents the fraction of
      data that will be put into the training split. The remaining will be put
      into the testing split. 
    Returns:
      training data, testing data
  """
  m = len(data[:,0]) # Number of samples per channel
  m_training = round(train_fraction*m)
  training_data = data[0:m_training,:]
  testing_data = data[m_training:m,:]
  return training_data, testing_data

def save_feature_parameters(filename):  
  with open(filename, 'wb') as f:
    # Note that this will overwrite any data already in filename
    pickle.dump(param_dict,f)
  print(f"The parameters and R matrices have been saved in {filename}")

def load_feature_parameters(filename):
  with open(filename, 'rb') as f:
    ref_dict = pickle.load(f)
  return ref_dict

def compare_dicts(dict1, dict2):
  """
    Input: dict1 and dict2 are borth dictionaries. The expectation is that they
      have the same keys. 
    Returns: the number of differences between the keys in dict1 and the keys in
      dict2. It will also print them out.  
  """
  differences = DeepDiff(dict1,dict2)
  num_differences = len(differences.to_dict())
  if num_differences > 0:
    print(f'{filename} uses different parameters from the ones currently set.')
    print(differences)
  return num_differences

## Load Data
This is where all the R Matrices are either loaded or created depending on whether the filename provided has them already. If it does not, then the newly created R matrices will be saved into it. 

In [237]:
raw_ecog = proj_data['train_ecog'][:,0]
raw_glove = proj_data['train_dg'][:,0]
raw_leaderboard = leaderboard_data['leaderboard_ecog'][:,0]

In [238]:
newDataNeeded = True 
# new Data Needed is True by default, but set to False if data matching the
# parameters described in param_dict are found in filename. 

try: # This Try-Except block is in case filename does not exist
  ref_dict = load_feature_parameters(filename)
  ref_dict_backup = ref_dict.copy() # backup because popping removes from dict.
  # This will extract the preprocessed data from the file
  # The first time running the code, param_dict will not contain these keys, so
  # popping them makes the ref_dict more comparable to param_dict.  
  R_total = ref_dict.pop('R_total',None)
  R_train = ref_dict.pop('R_train',None)
  R_test = ref_dict.pop('R_test',None)
  R_leaderboard = ref_dict.pop('R_leaderboard',None) 
  flexion_total = ref_dict.pop('flexion_total',None)
  flexion_train = ref_dict.pop('flexion_train',None)
  flexion_test = ref_dict.pop('flexion_test',None)
  
  # After running the code for the first time, param_dict might contain keys for
  # R matrices, so this allows code to work without needing to restart runtime. 
  if 'R_leaderboard' in param_dict:
    ref_dict = ref_dict_backup

  # This compares the user defined parameters to the ones from the file. 
  if compare_dicts(ref_dict, param_dict) == 0:
    newDataNeeded = False
    print(f"{filename} uses the same parameters as currently set, so it will be loaded")
  
  # uses parameters from file instead of the user defined ones.
  elif use_file_params:
    newDataNeeded = False
    print(f"{filename} uses different parameters, but use_file_params is True")
except: 
  print(f'{filename} was not found. Creating new a dataset instead')
finally:
  if newDataNeeded:
    flexion_total = []; flexion_train = []; flexion_test = []
    R_total = []; R_leaderboard = []; R_train = []; R_test = []
    for p in range(numPatients):
      # Creates the R matrix for all of the raw ecog data
      feat_matrix_p = get_windowed_feats(raw_ecog[p], window_length, window_displacement)
      normed_feat_matrix_p = standardize_training(feat_matrix_p)
      R_total.append(create_R_matrix(normed_feat_matrix_p,N_winds))
      
      # Calculates the R matrix of the leaderboard ecog data. 
      leaderboard_p = get_windowed_feats(raw_leaderboard[p], window_length, window_displacement)
      normed_leaderboard_p = standardize_testing(leaderboard_p)
      R_leaderboard.append(create_R_matrix(normed_leaderboard_p,N_winds))

      # Creates the R matrix for the training and testing split of the ecog data
      trn_ecog_p, tst_ecog_p = split_data(raw_ecog[p])
      trn_matrix_p = get_windowed_feats(trn_ecog_p, window_length, window_displacement)
      tst_matrix_p = get_windowed_feats(tst_ecog_p, window_length, window_displacement)
      normed_trn_matrix_p, normed_tst_matrix_p = standardize_both(trn_matrix_p, tst_matrix_p)
      R_train.append(create_R_matrix(normed_trn_matrix_p,N_winds))
      R_test.append(create_R_matrix(normed_tst_matrix_p,N_winds))

      # Creates the downsampled glove data for the entire raw ecog dataset, and
      # the training and testing split. 
      flexion_p = downsample(raw_glove[p], window_length, window_displacement, downsample_method)
      flexion_total.append(flexion_p)
      trn_flexion_p, tst_flexion_p = split_data(flexion_p)
      flexion_train.append(trn_flexion_p); flexion_test.append(tst_flexion_p)
      
      # Sometimes flexion has more samples after splitting than the R matrix
      if (len(flexion_total[p][:,0]) != len(R_total[p][:,0])):
        flexion_total[p] = np.delete(flexion_total[p], 0, axis = 0)
      if (len(flexion_train[p][:,0]) != len(R_train[p][:,0])):
        flexion_train[p] = np.delete(flexion_train[p], 0, axis = 0)
      if (len(flexion_test[p][:,0]) != len(R_test[p][:,0])):
        flexion_test[p] = np.delete(flexion_test[p], 0, axis = 0)

    # Store the resulting matrices into param_dict and save it to filename
    param_dict['flexion_total'] = flexion_total
    param_dict['flexion_train'] = flexion_train
    param_dict['flexion_test'] = flexion_test
    param_dict['R_total'] = R_total
    param_dict['R_train'] = R_train
    param_dict['R_test'] = R_test
    param_dict['R_leaderboard'] = R_leaderboard
    save_feature_parameters(filename)


previousRun.pkl uses different parameters from the ones currently set.
{'values_changed': {"root['applyNotch']": {'new_value': True, 'old_value': False}}}
The parameters and R matrices have been saved in previousRun.pkl


# Learning Algorithms

## Learning Helper Classes
Here two classes are defined to streamline the process of adding new learning algorithms. The LearningModel class is the parent class of every type of model we will use. The LearningAlgo class is a helper class that allows us to apply a LearningModel on each finger seperately while keeping track of the data.

The only methods that might need changed to implement subclasses are the train, predict, clone, and maybe __ init __, methods in LearningModel. 

Note that LearningModel is designed so that it can work directly with most sklearn models without needing to create a subclass. 

In [239]:
class LearningModel():
  def __init__(self, model_type = None, smoothing = None):
    """
      model_type is the unfit model being used.
      smoothing is a function that applies smoothing to the prediction that
        right now it does nothing because I don't know how to implement it.
    """
    self.model_type = model_type
    self.smoothing = smoothing

  def train(self, R_trn, y_trn):
    """
      This method trains a single model on all of y_trn. This method will need
      redefined in subcasses if self.model_type.fit(data,labels) does not exist 
      or if self.model_type is not from the sklearn library. 
      Inputs:
        R_trn = Response matrix for the training data set used
        y_trn = finger flexion matrix for the training data set used.
      Returns:
        A model trained on R_trn and y_trn.
    """
    self.model = sklearn.base.clone(self.model_type)
    self.model.fit(R_trn, y_trn)
    return self.model
  
  def predict(self, R_tst):
    """
      This method predicts a y matrix based on the training it has recieved.
      This method will fail if train has not been called yet. This, method will
      need redefined in subcasses if self.model_type.predict(data) does not 
      exist or if self.model_type is not from the sklearn library. 
      Inputs:
        R_tst = Response matrix for the training data set used
      Returns:
        a y matrix predicted to be the labels associated with R_tst
    """
    self.prediction = self.model.predict(R_tst)
    return self.prediction
  
  def score(self, R_tst, Y_tst):
    """
      Predicts the Y for R_tst and then compares it to the actual Y_tst.
      Inputs:
        R_tst = response matrix for testing or validation data set.
        Y_tst = finger flexion matrix for the testing data set used.
    """
    Y_predict = self.predict(R_tst)
    self.correlation = pearsonr(Y_predict,Y_tst).statistic
    return self.correlation

  def clone(self):
    return LearningModel(self.model_type, self.smoothing)


class LearningAlgo():
  """
    This helper class will train multiple of the same type of learning channel
    on different parts of the data. In particular, it will train a model on 
    each channel within the label matrix Y_trn for each patient.

    Unlike LearningModel, subclasses of LearningAlgo should not need to be made
    for implementing new learning models. It should work for any model that is
    a subclass of the LearningModel class.

    It stores its models in a nested list self.models. To get the model of 
    finger i for patient p, use self.models[p][i]
  """
  def __init__(self, learning_model):
    """
      Its only input should be a LearningModel (or subclass) object 
    """
    self.learning_model = learning_model
    self.model_type = learning_model.model_type
    self.smoothing = learning_model.smoothing
    self.models = [[]]

  def train(self, R_trn, Y_trn):
      """
        Will train a model on each finger in Y_trn. The total number of models 
        will be equal to channels in Y_trn 
        Inputs:
          R_trn = A list of Response matrices for the training data set used
          Y_trn = a list of finger flexion matrices for the training data set 
            used. Each element should have finger flexion for each window for
            all five fingers. 
          The list of R_trn and Y_trn matrices should be the same length
        Returns:
          The list of models trained. The models at each index, i, corresponds 
          to the model trained on finger i+1
      """
      self.models = []
      for p in range(len(R_trn)):
        models_p = []
        for finger in range(len(Y_trn[p][0,:])):
          model = self.learning_model.clone()
          model.train(R_trn[p], Y_trn[p][:,finger])
          models_p.append(model)
        self.models.append(models_p)
      return self.models

  def predict(self,R_tst):
    """
      This method predicts a Y matrix based on the training it has recieved.
      This method will fail if train has not been called yet. 
      Inputs:
        R_tst = Response matrix for the training data set used
      Returns:
        a Y matrix predicted to be the labels associated with R_tst
    """
    predictions = np.zeros((len(self.models),len(self.models[0])))
    for p in range(len(self.models)):
      for i in range(len(self.models[p])):
        predictions[p,i] = self.models[p][i].predict(R_tst[p])
    return predictions
  
  def score(self, R_tst, Y_tst):
    """
      Predicts the Y for R_tst and then compares it to the actual Y_tst.
      Inputs:
        R_tst = response matrix for testing or validation data set.
        Y_tst = finger flexion matrix for the testing data set used.
    """
    correlations = np.zeros((len(self.models),len(self.models[0])))
    for p in range(len(self.models)):
      for i in range(len(self.models[p])):
        correlations[p,i] = self.models[p][i].score(R_tst[p],Y_tst[p][:,i])
    return correlations
  
  def get_models(self):
    return self.models

  def print_scores(self, exclude_finger_4 = True):
    """
      Prints out the correlation score for each finger and the averages for each
      patient and the overall average. Has an optional Boolean input for whether
      to exclude the ring finger's correlation in the output since it does not 
      have its own correlated ECoG signals. It is more so dependent on the pinky
      and the middle finger's signals.
    """
    cumulative_score = 0
    patient_count = 0
    for p in range(len(self.models)):
      patient_score = 0
      finger_count = 0
      for i in range(len(self.models[p])):
        print(f"Patient {p+1}, Finger {i+1} Score = {self.models[p][i].correlation}")
        if i != 3:
          patient_score += self.models[p][i].correlation
          finger_count += 1
        elif not exclude_finger_4:
          patient_score += self.models[p][i].correlation
          finger_count += 1 
      patient_score = patient_score / finger_count
      cumulative_score += patient_score
      patient_count += 1
      print(f"Average Score of Patient {p+1} = {patient_score}")
    avg_core = cumulative_score/patient_count
    print(f"The Overall Average Score = {avg_core}")
    return avg_core


## Optimized Linear Filter

In [240]:
class OptimizedLinearFilter(LearningModel):
  def train(self, R, Y):
    """
      Builds an optimized linear filter using a training R and Y
      Inputs: 
        R = Response matrix for the training data set used
        Y = label matrix (finger flexion) for the training data set used
      Returns: an optimized linear filter. 
    """
    Rt = np.transpose(R)
    inv_term = inv(np.matmul(Rt,R))
    self.model_type = 'Optimized Linear Filter' 
    self.model = np.matmul(inv_term,np.matmul(Rt,Y))
    return self.model

  def predict(self, R_tst):
    self.prediction = np.matmul(R_tst,self.model)
    return self.prediction

  def clone(self):
    return OptimizedLinearFilter(self.model_type, self.smoothing)

opt_linear_filters_val_algo = LearningAlgo(OptimizedLinearFilter())
opt_linear_filters_val_algo.train(R_train, flexion_train)
optlinfilt_val_score = opt_linear_filters_val_algo.score(R_test,flexion_test)
# opt_linear_filters_val = [] # for validation 
# opt_linear_filters_lead = [] # for the leaderboard
# for p in range(numPatients):
#  opt_linear_filters_val.append(LearningAlgo(OptimizedLinearFilter())) 
#  opt_linear_filters_val[p].train(R_train,flexion_train)
#  opt_linear_filters_lead.append(LearningAlgo(OptimizedLinearFilter()))
print(optlinfilt_val_score[0])
print(optlinfilt_val_score[1])
print(optlinfilt_val_score[2])


[0.43069528 0.57199565 0.09316127 0.45087363 0.08784496]
[0.03402185 0.15760543 0.02750669 0.13485574 0.00272575]
[0.5437972  0.31131737 0.39979914 0.43607092 0.42798062]


## SVR Model

In [241]:
from sklearn.svm import SVR

SVR_model = LearningModel(SVR(kernel = 'rbf'))
SVR_algo = LearningAlgo(SVR_model)
trained_models = SVR_algo.train(R_train,flexion_train)
correlation_scores = SVR_algo.score(R_test,flexion_test)



In [242]:
SVR_algo.print_scores()

Patient 1, Finger 1 Score = 0.4190671629258502
Patient 1, Finger 2 Score = 0.5711689994868724
Patient 1, Finger 3 Score = 0.1797293055235954
Patient 1, Finger 4 Score = 0.5032005765254516
Patient 1, Finger 5 Score = 0.0563524976725487
Average Score of Patient 1 = 0.3065794914022167
Patient 2, Finger 1 Score = -0.05262814090757546
Patient 2, Finger 2 Score = -0.0399604141670056
Patient 2, Finger 3 Score = 0.043862984207269265
Patient 2, Finger 4 Score = 0.02037143247898543
Patient 2, Finger 5 Score = 0.002515349543995831
Average Score of Patient 2 = -0.011552555330828992
Patient 3, Finger 1 Score = 0.552619371909619
Patient 3, Finger 2 Score = 0.4239501821849011
Patient 3, Finger 3 Score = 0.45617696460496876
Patient 3, Finger 4 Score = 0.48060424844919397
Patient 3, Finger 5 Score = 0.45509738890073176
Average Score of Patient 3 = 0.4719609769000551
The Overall Average Score = 0.2556626376571476


0.2556626376571476

# Troubleshooting Stuff

In [243]:
print(inv(np.matmul(np.transpose(R_test[1]),R_test[1])))

[[ 1.64624303e-01  6.31009874e-04  4.36502062e-04 ... -1.86420287e-04
  -6.56533482e-04 -1.97642946e-04]
 [ 6.31009874e-04  1.19837292e-03 -4.87973679e-04 ...  3.36024341e-05
   1.32109743e-05  8.86648498e-07]
 [ 4.36502062e-04 -4.87973679e-04  1.53808110e-03 ... -1.52320530e-05
   5.83738849e-05 -2.18051608e-05]
 ...
 [-1.86420287e-04  3.36024341e-05 -1.52320530e-05 ...  2.21324350e-03
   4.14633359e-05  1.09643875e-03]
 [-6.56533482e-04  1.32109743e-05  5.83738849e-05 ...  4.14633359e-05
   1.65280290e-03  2.74025223e-05]
 [-1.97642946e-04  8.86648498e-07 -2.18051608e-05 ...  1.09643875e-03
   2.74025223e-05  2.16356980e-03]]


In [244]:
R_test[1].shape

(1799, 721)

In [245]:
SVR_model = LearningModel(SVR(kernel = 'rbf'))
SVR_models2 = []
for i in range(5):
  SVR_models2.append(SVR_model.clone())
  SVR_models2[i].train(R_train[1],flexion_train[1][:,i])
  SVR_models2[i].score(R_test[1],flexion_test[1][:,i])
  print(SVR_models2[i].correlation)


-0.05262814090757546
-0.0399604141670056
0.043862984207269265
0.02037143247898543
0.002515349543995831


In [246]:
prediction_list = []
for model in SVR_algo.get_models()[1]:
  prediction_list.append(model.prediction)
  print(model.prediction)

[-0.20736696 -0.20736696 -0.20736696 ... -0.20736473 -0.20736696
 -0.20736696]
[-0.34147124 -0.34147124 -0.34147124 ... -0.34147212 -0.34147124
 -0.34147124]
[-0.17129182 -0.17129182 -0.17129182 ... -0.17129232 -0.17129182
 -0.17129182]
[-0.23804408 -0.23804408 -0.23804408 ... -0.23804439 -0.23804408
 -0.23804408]
[-0.12351883 -0.12351883 -0.12351883 ... -0.12351902 -0.12351883
 -0.12351883]


In [249]:
flexion_test[1].shape

(1799, 5)

In [257]:
np.argmax(np.max(normed_tst_matrix_p, axis=0))

24