In [1]:
import copy
import importlib
import numpy as np
from numpy import convolve
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
import pickle
from scipy.cluster.hierarchy import linkage
from scipy.stats import sem
from scipy import signal
from scipy.integrate import trapz
from sklearn.decomposition import PCA
from sklearn.model_selection import StratifiedKFold
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.svm import SVC
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.model_selection import cross_validate
from sklearn.model_selection import train_test_split
import random
#from RT import savedData, taskData
#from gimutil.signal_analysis import erps
#from gimutil.configuration import config
#from gimutil.visualization import plotting_tools
import os
import importlib
import torch
import torch.nn as nn
import torch.optim as optim
from preprocessing import extract_data, preprocess, shuffle_data
from default_model import CnnRnn
from training import *
from evaluation import *
from plotting import *

In [2]:
from sklearn.metrics import accuracy_score

# Load data

In [3]:
folder_name = 'data'
file_name = 'alphabet_with_fingerflex_data.pkl'

# Define the full path
full_path = os.path.join(folder_name, file_name)

with open(full_path, 'rb') as file:
    data = pickle.load(file)
print("Data successfully loaded.")

Data successfully loaded.


# Preprocess data

In [4]:
X, y, y_num = extract_data(data)

X shape: (4224, 800, 253)
y shape: (4224,)


In [5]:
X, y, y_num = preprocess(X, y, y_num, downsampling_factor = 6)

Original data shape (trials x time x channels):  (4224, 800, 253)

Decimating the time series dimension...
Downsampled data shape:  (4224, 134, 253)

Normalizing data across channels at each time point...

Shuffling trial order...

Finished preprocessing!


In [6]:
class_labels = np.unique(y)

# Note:
- Run this notebook to execute all four analyses included in my final project. 
  - **Note:** `main.ipynb` sets `test_splits = 3` for computational speed. 
  - The final plots in the paper were averaged across at least 10 test splits.

In [7]:
device = "cuda:2"
test_splits = 3

# Analysis 1: Reward Model Outperforms Default Model

In [None]:
# Initialize parameters

n_targ = len(np.unique(y)) # Number of unique targets
class_names = np.unique(y) # Names of unique targets

# Initialize dictionaries and lists to store metrics
all_test_splits_metrics = {} # Dictionary to store fold metrics for each test split

# Storing metrics across test splits
default_confmatrix_all = np.zeros((n_targ, n_targ, test_splits)) # all confusion matrices generated from the default model on test set, before averaging across test folds
reward_confmatrix_all = np.zeros((n_targ, n_targ, test_splits)) # all confusion matrices generated from the default model on test set, before averaging across test folds

default_test_accuracy_history = []
default_val_accuracy_history = []

reward_test_accuracy_history = []
reward_val_accuracy_history = []

for i in range(test_splits):
    print(f"\nTest Split {i + 1}/{test_splits}:")
    
    # Set seeds for reproducibility
    np.random.seed(i)
    torch.manual_seed(i) # for CPU
    torch.cuda.manual_seed_all(i) # for GPU
    
    # Shuffle indices
    indices = np.arange(len(X))
    np.random.shuffle(indices)
    
    # Split indices into training and test sets
    train_indices, test_indices = train_test_split(indices, test_size=224)
    
    # Further split train_indices into two training sets
    train_default_indices, train_reward_indices = train_test_split(train_indices, test_size=2000)

    # Use these indices to create data splits
    X_default, y_default = X[train_default_indices], y_num[train_default_indices]
    X_reward, y_reward = X[train_reward_indices], y_num[train_reward_indices]
    X_test, y_test = X[test_indices], y_num[test_indices]
    
    # Train default model. Select highest performing model across validation sets
    print('Training default model...')
    default_model, default_validation_acc, fold_metrics_default = train_default(X_default, y_default, device = torch.device(device))
    default_val_accuracy_history.append(default_validation_acc)
    
    # Evaluate performance of default on held out test set
    default_test_accuracy, default_confmatrix = evaluate_default(X_test, y_test, default_model, device = torch.device(device))
    default_test_accuracy_history.append(default_test_accuracy)
    default_confmatrix_all[:,:,i] = default_confmatrix
    
    # Initialize reward model with default model weights. Train reward model. Select highest performing model across validation sets
    print('\nTraining reward model...')
    reward_model, reward_validation_acc, fold_metrics_reward = train_reward(X_reward, y_reward, default_model, device = torch.device(device)) 
    reward_val_accuracy_history.append(reward_validation_acc)
    
    _, _, reward_test_accuracy, reward_confmatrix = evaluate_reward_model(X_test, y_test, reward_model, eval_mode = True, device_eval=torch.device(device))
    print(f'Reward test accuracy: {reward_test_accuracy*100:.2f}%')
    reward_test_accuracy_history.append(reward_test_accuracy)
    reward_confmatrix_all[:,:,i] = reward_confmatrix
    
    # Store fold metrics for this test split
    all_test_splits_metrics[f'Test Split {i+1}'] = {
        'Default': fold_metrics_default,
        'Reward': fold_metrics_reward
    }
        


Test Split 1/3:
Training default model...
Using device: cuda:2
     
Validation Fold 1/10:
     Early stopping! No improvement in validation loss for 5 epochs.
     Fold 1 - Train Loss: 0.5839, Train Accuracy: 0.8006
     Fold 1 - Val Loss: 1.3696, Val Accuracy: 0.6050
     
Validation Fold 2/10:
     Early stopping! No improvement in validation loss for 5 epochs.
     Fold 2 - Train Loss: 0.1376, Train Accuracy: 0.9617
     Fold 2 - Val Loss: 1.0763, Val Accuracy: 0.6950
     
Validation Fold 3/10:
     Early stopping! No improvement in validation loss for 5 epochs.
     Fold 3 - Train Loss: 0.3026, Train Accuracy: 0.9072
     Fold 3 - Val Loss: 1.0686, Val Accuracy: 0.6750
     
Validation Fold 4/10:
     Early stopping! No improvement in validation loss for 5 epochs.
     Fold 4 - Train Loss: 0.3339, Train Accuracy: 0.8989
     Fold 4 - Val Loss: 1.2655, Val Accuracy: 0.6500
     
Validation Fold 5/10:


In [None]:
plot_avg_performance_bar_chart(default_test_accuracy_history, reward_test_accuracy_history)

# Class-wise bar chart
plot_class_wise_bar_chart(default_confmatrix_all, reward_confmatrix_all, class_labels)

In [None]:
# Plot default clustered confusion matrix
test_acc_mean = np.mean(default_test_accuracy_history)
val_acc_mean = np.mean(default_val_accuracy_history)
plot_average_clustered_conf_matrix(default_confmatrix_all, test_acc_mean, val_acc_mean, class_labels)

# Plot reward clustered confusion matrix
test_acc_mean = np.mean(reward_test_accuracy_history)
val_acc_mean = np.mean(reward_val_accuracy_history)
plot_average_clustered_conf_matrix(reward_confmatrix_all, test_acc_mean, val_acc_mean, class_labels)

# Analysis 2: Informative Prior Policy Accelerates Training of Reward Model

In [None]:
# Initialize parameters
n_targ = len(np.unique(y)) # Number of unique targets
class_names = np.unique(y) # Names of unique targets

# Initialize dictionaries and lists to store metrics
all_test_splits_metrics_no_prior = {} # Dictionary to store fold metrics for each test split

# Storing metrics across test splits
reward_no_prior_confmatrix_all = np.zeros((n_targ, n_targ, test_splits)) # all confusion matrices generated from the default model on test set, before averaging across test folds

reward_test_accuracy_history_no_prior = []
reward_val_accuracy_history_no_prior = []

for i in range(test_splits):
    print(f"\nTest Split {i + 1}/{test_splits}:")
    
    # Set seeds for reproducibility
    np.random.seed(i)
    torch.manual_seed(i) # for CPU
    torch.cuda.manual_seed_all(i) # for GPU
    
    # Shuffle indices
    indices = np.arange(len(X))
    np.random.shuffle(indices)
    
    # Split indices into training and test sets
    train_indices, test_indices = train_test_split(indices, test_size=224)
    
    # Further split train_indices into two training sets
    train_default_indices, train_reward_indices = train_test_split(train_indices, test_size=2000)

    # Use these indices to create data splits
    X_default, y_default = X[train_default_indices], y_num[train_default_indices]
    X_reward, y_reward = X[train_reward_indices], y_num[train_reward_indices]
    X_test, y_test = X[test_indices], y_num[test_indices]
        
     # Initialize reward model without prior policy. Train reward model. Select highest performing model across validation sets
    print('\nTraining reward model without prior policy...')
    reward_model_no_prior, reward_validation_acc_no_prior, fold_metrics_reward_no_prior = train_reward_no_prior(X_reward, y_reward, device = torch.device(device)) 
    reward_val_accuracy_history_no_prior.append(reward_validation_acc_no_prior)
    
    print('Evaluating reward model without prior policy...')
    _, _, reward_test_accuracy_no_prior, reward_no_prior_confmatrix = evaluate_reward_model(X_test, y_test, reward_model_no_prior, eval_mode = True, device_eval=torch.device(device))
    print(f'Reward without prior policy test accuracy: {reward_test_accuracy_no_prior*100:.2f}%')
    reward_test_accuracy_history_no_prior.append(reward_test_accuracy_no_prior)
    reward_no_prior_confmatrix_all[:,:,i] = reward_no_prior_confmatrix
    
    # Store fold metrics for this test split
    all_test_splits_metrics_no_prior[f'Test Split {i+1}'] = {
        'Reward no prior': fold_metrics_reward_no_prior
    }


In [None]:
# bar chart
plot_triple_comparison_bar_chart(default_test_accuracy_history, reward_test_accuracy_history, reward_test_accuracy_history_no_prior, third_label = 'Reward without\nPrior Policy', third_color = 'orange', chance = 3.33)


In [None]:
# average training loss versus epochs
average_values_reward, sem_values_reward, epochs_reward = calculate_average_metric(all_test_splits_metrics, 'Reward', 'Training Loss')
average_values_default, sem_values_default, epochs_default = calculate_average_metric(all_test_splits_metrics, 'Default', 'Training Loss')
average_values_noprior, sem_values_noprior, epochs_noprior = calculate_average_metric(all_test_splits_metrics_no_prior, 'Reward no prior', 'Training Loss')

plt.figure(figsize=(8,5))

plt.plot(epochs_default, average_values_default, color='gray', marker='o', label='Default')
plt.fill_between(epochs_default, average_values_default - sem_values_default, average_values_default + sem_values_default, color='gray', alpha=0.2)

plt.plot(epochs_reward, average_values_reward, color='steelblue', marker='o', label='Reward')
plt.fill_between(epochs_reward, average_values_reward - sem_values_reward, average_values_reward + sem_values_reward, color='steelblue', alpha=0.2)

plt.plot(epochs_noprior, average_values_noprior, color='orange', marker='o', label='Reward without prior policy')
plt.fill_between(epochs_noprior, average_values_noprior - sem_values_noprior, average_values_noprior + sem_values_noprior, color='orange', alpha=0.2)

plt.xlabel('Normalized Epochs', fontsize=18)
plt.ylabel(f'Average Training Loss', fontsize=18)
plt.xticks(fontsize=16)
plt.yticks(fontsize=16)
plt.title(f'Average Training Loss', fontsize=22)
plt.legend(frameon = False, fontsize=12)
plt.show()

In [None]:
# average validation accuracy versus epochs
average_values_reward, sem_values_reward, epochs_reward = calculate_average_metric(all_test_splits_metrics, 'Reward', 'Validation Accuracy')
average_values_default, sem_values_default, epochs_default = calculate_average_metric(all_test_splits_metrics, 'Default', 'Validation Accuracy')
average_values_noprior, sem_values_noprior, epochs_noprior = calculate_average_metric(all_test_splits_metrics_no_prior, 'Reward no prior', 'Validation Accuracy')

plt.figure(figsize=(8,5))

plt.plot(epochs_reward, average_values_reward, color='steelblue', marker='o', label='Reward')
plt.fill_between(epochs_reward, average_values_reward - sem_values_reward, average_values_reward + sem_values_reward, color='steelblue', alpha=0.2)

plt.plot(epochs_default, average_values_default, color='gray', marker='o', label='Default')
plt.fill_between(epochs_default, average_values_default - sem_values_default, average_values_default + sem_values_default, color='gray', alpha=0.2)

plt.plot(epochs_noprior, average_values_noprior, color='orange', marker='o', label='Reward without\nprior policy')
plt.fill_between(epochs_noprior, average_values_noprior - sem_values_noprior, average_values_noprior + sem_values_noprior, color='orange', alpha=0.2)

plt.xlabel('Normalized Epochs', fontsize=18)
plt.ylabel(f'Average Val Accuracy (%)', fontsize=18)
plt.xticks(fontsize=16)
plt.yticks(fontsize=16)
plt.title(f'Average Validation Accuracy', fontsize=22)
plt.legend(frameon = False, fontsize=12)
plt.show()

In [None]:
# train val acc for default, reward, and reward without prior policy

average_values_reward, sem_values_reward, normalized_epochs = calculate_average_metric(all_test_splits_metrics, 'Reward', 'Training Accuracy')
average_values_default, sem_values_default_train, _ = calculate_average_metric(all_test_splits_metrics, 'Default', 'Training Accuracy')
average_values_noprior, sem_values_noprior, epochs_noprior = calculate_average_metric(all_test_splits_metrics_no_prior, 'Reward no prior', 'Training Accuracy')

average_values_reward_val, sem_values_reward_val, normalized_epochs = calculate_average_metric(all_test_splits_metrics, 'Reward', 'Validation Accuracy')
average_values_default_val, sem_values_default_val, _ = calculate_average_metric(all_test_splits_metrics, 'Default', 'Validation Accuracy')
average_values_noprior_val, sem_values_noprior_val, epochs_noprior = calculate_average_metric(all_test_splits_metrics_no_prior, 'Reward no prior', 'Validation Accuracy')

metrics_data = [
    (average_values_default, sem_values_default, average_values_default_val, sem_values_default_val),
    (average_values_reward, sem_values_reward, average_values_reward_val, sem_values_reward_val),
    (average_values_noprior, sem_values_noprior, average_values_noprior_val, sem_values_noprior_val)
]

titles = ['Default Model', 'Reward Model', 'Reward Model without Prior Policy']
colors = ['gray', 'steelblue', 'orange']

plot_ablation_training_validation_accuracy(normalized_epochs, metrics_data, titles, colors)


# Analysis 3: Ensembling Reward Models Enhances Predictive Performance

### Plot confusion matrices

In [None]:
# Initialize parameters
n_targ = len(np.unique(y)) # Number of unique targets
class_names = np.unique(y) # Names of unique targets

# Initialize dictionaries and lists to store metrics
all_test_splits_metrics = {} # Dictionary to store fold metrics for each test split

# Storing metrics across test splits
default_confmatrix_all = np.zeros((n_targ, n_targ, test_splits))
ensemble_reward_confmatrix_all = np.zeros((n_targ, n_targ, test_splits))

default_test_accuracy_history = []
default_val_accuracy_history = []

ensemble_reward_test_accuracy_history = []
ensemble_reward_val_accuracy_history = []

for i in range(test_splits):
    print(f"\nTest Split {i + 1}/{test_splits}:")
    
    # Set seeds for reproducibility
    np.random.seed(i)
    torch.manual_seed(i) # for CPU
    torch.cuda.manual_seed_all(i) # for GPU
    
    # Shuffle indices
    indices = np.arange(len(X))
    np.random.shuffle(indices)
    
    # Split indices into training and test sets
    train_indices, test_indices = train_test_split(indices, test_size=224)
    
    # Further split train_indices into two training sets
    train_default_indices, train_reward_indices = train_test_split(train_indices, test_size=2000)

    # Use these indices to create data splits
    X_default, y_default = X[train_default_indices], y_num[train_default_indices]
    X_reward, y_reward = X[train_reward_indices], y_num[train_reward_indices]
    X_test, y_test = X[test_indices], y_num[test_indices]
    
    # Train default model. Select highest performing model across validation sets
    print('Training default model...')
    default_model, default_validation_acc, fold_metrics_default = train_default(X_default, y_default, device = torch.device(device))
    default_val_accuracy_history.append(default_validation_acc)
    
    # Evaluate performance of default on held out test set
    print('Evaluating default model...')
    default_test_accuracy, default_confmatrix = evaluate_default(X_test, y_test, default_model, device = torch.device(device))
    default_test_accuracy_history.append(default_test_accuracy)
    default_confmatrix_all[:,:,i] = default_confmatrix
    
    # Initialize ensemble reward model with default model weights. Train reward model. Collect an ensemble of 10 models trained on each validation set.
    print('\nTraining reward models...')
    reward_models, fold_metrics_ensemble_reward = train_reward_ensemble(X_reward, y_reward, default_model, device = torch.device(device)) # this needs to use cpu
    print('reward_models keys', reward_models.keys())
    
    print('Evaluating ensemble of reward models')
    _, ensemble_reward_test_accuracy, ensemble_reward_confmatrix = evaluate_reward_model_ensemble(X_test, y_test, reward_models, device_eval=torch.device(device))
    print(f'Reward test accuracy: {ensemble_reward_test_accuracy*100:.2f}%')
    ensemble_reward_test_accuracy_history.append(ensemble_reward_test_accuracy)
    ensemble_reward_confmatrix_all[:,:,i] = ensemble_reward_confmatrix
    
    # Store fold metrics for this test split
    all_test_splits_metrics[f'Test Split {i+1}'] = {
        'Default': fold_metrics_default,
        'Ensemble': fold_metrics_ensemble_reward
    }
        

In [None]:
plot_triple_comparison_bar_chart(default_test_accuracy_history, reward_test_accuracy_history, ensemble_reward_test_accuracy_history, third_label = 'Ensemble', third_color = 'darkseagreen', chance = 3.33)


In [None]:
plot_sorted_confusion_matrix(ensemble_reward_confmatrix_all, np.mean(ensemble_reward_test_accuracy_history), np.mean(ensemble_reward_val_accuracy_history), class_labels)
plot_sorted_confusion_matrix(reward_confmatrix_all, np.mean(reward_test_accuracy_history), np.mean(reward_val_accuracy_history), class_labels)
plot_sorted_confusion_matrix(default_confmatrix_all, np.mean(default_test_accuracy_history), np.mean(default_val_accuracy_history), class_labels)

### Accuracy versus reward mislabeling rate

In [None]:
# Initialize parameters
test_splits = 1 # This experiment takes a particularly long time
n_targ = len(np.unique(y)) # Number of unique targets
class_names = np.unique(y) # Names of unique targets

default_test_accuracy_history = []
default_val_accuracy_history = []

reward_error = [0, 0.05, 0.1, 0.15, 0.2, 0.25]
reward_test_accuracy_dict = {val: [] for val in reward_error}
reward_val_accuracy_dict = {val: [] for val in reward_error}

reward_ensemble_test_accuracy_dict = {val: [] for val in reward_error}
reward_ensemble_val_accuracy_dict = {val: [] for val in reward_error}

for i in range(test_splits):
    print(f"\nTest Split {i + 1}/{test_splits}:")
    
    # Set seeds for reproducibility
    np.random.seed(i)
    torch.manual_seed(i) # for CPU
    torch.cuda.manual_seed_all(i) # for GPU
    
    # Shuffle indices
    indices = np.arange(len(X))
    np.random.shuffle(indices)

    # Split indices into training and test sets
    train_indices, test_indices = train_test_split(indices, test_size=224)

    # Further split train_indices into two training sets
    train_default_indices, train_reward_indices = train_test_split(train_indices, test_size=2000)

    # Use these indices to create data splits
    X_default, y_default = X[train_default_indices], y_num[train_default_indices]
    X_reward, y_reward = X[train_reward_indices], y_num[train_reward_indices]
    X_test, y_test = X[test_indices], y_num[test_indices]

    # Train default model. Select highest performing model across validation sets
    print('Training default model...')
    default_model, default_validation_acc = train_default(X_default, y_default, device = torch.device(device))
    default_val_accuracy_history.append(default_validation_acc)
    
    # Evaluate performance of default and reward models on held out test set 
    default_test_accuracy, default_confmatrix = evaluate_default(X_test, y_test, default_model, device = torch.device(device))
    default_test_accuracy_history.append(default_test_accuracy)
    
    for index, p in enumerate(reward_error):    
        for repeats in range(3):
            
            print('Reward error probability: ', p)
            print(f'     Reward prob {repeats+1}/3:')
            # Reward model
            print('          Training reward model...')
            reward_model, reward_validation_acc = train_reward_error(X_reward, y_reward, default_model, p, repeats, device = torch.device(device))
            reward_val_accuracy_dict[p].append(reward_validation_acc)

            _, _, reward_test_accuracy, reward_confmatrix = evaluate_reward_model(X_test, y_test, reward_model, eval_mode = True, device_eval=torch.device(device))
            print(f'          Reward test acc: {reward_test_accuracy*100:.2f}%')
            reward_test_accuracy_dict[p].append(reward_test_accuracy)
            
            # Reward ensemble model
            print('          Training reward ensemble model...')
            reward_ensemle_models, reward_ensemble_validation_acc = train_reward_error_ensemble(X_reward, y_reward, default_model, p, repeats, device = torch.device(device)) 
            reward_ensemble_val_accuracy_dict[p].append(reward_ensemble_validation_acc)

            _, reward_ensemble_test_accuracy, reward_ensemble_confmatrix = evaluate_reward_model_ensemble(X_test, y_test, reward_ensemle_models, device_eval=torch.device(device))
            print(f'          Reward ensemble test acc: {reward_ensemble_test_accuracy*100:.2f}%')
            reward_ensemble_test_accuracy_dict[p].append(reward_ensemble_test_accuracy)


In [None]:
plot_acc_vs_reward_error(default_test_accuracy_history, reward_test_accuracy_dict, reward_ensemble_test_accuracy_dict)

# Analysis 4: Predicting reward signal from neural activity

In [None]:
folder_name = 'data'
file_name = 'feedback_data.pkl'

# Define the full path
full_path = os.path.join(folder_name, file_name)

with open(full_path, 'rb') as file:
    incorrect_feedback, correct_feedback = pickle.load(file)
print("Data successfully loaded.")

### Plot Event related Potentials (ERPs) 
##### (note: this takes a few minutes to run)

In [None]:
plot_erp_b3_gaussiansmooth(incorrect_feedback, 'Incorrect', correct_feedback, 'Correct', 10, 0, 3)

### Plot Relative Electrode Activity
####  This plot will load from a picture since you don't have access to the libraries my lab uses to make plots like this

In [None]:
plot_relative_electrode_strength(incorrect_feedback, correct_feedback)


### Plot hyperparameter tuning curve to select best value of n_components for PCA in Logistic Regression + PCA model
#### Note: Plots in final project paper were created using 50 iterations for each n_components value

In [None]:
average_accuracies, sem_accuracies = n_components_tuning(incorrect_feedback, correct_feedback, n_components_range = np.arange(0.85, 0.99, 0.01), iterations = 1)
plot_n_components_tuning(average_accuracies, sem_accuracies)


### Assess performance of logistic regression + PCA model
#### Note: Plots in final project paper were created using 100 iterations.

In [None]:
cm_pre_avg, accuracy_history = train_logR_reward(incorrect_feedback, correct_feedback, n_targ = 2, iterations = 3, n_components = 0.92)
plot_sorted_confusion_matrix(cm_pre_avg, np.mean(accuracy_history), val_acc_mean = None, class_labels = ['Incorrect', 'Correct'])