# TEAM: BUDGET NEURALINK

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pickle
from scipy.stats import pearsonr
from scipy import signal
from scipy.io import loadmat
from sklearn.model_selection import train_test_split
from sklearn.svm import SVR
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
from scipy.stats import entropy
from scipy.signal import welch
from scipy.integrate import simps
import lightgbm as lgb
import joblib
from scipy.io import savemat
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam

# Data Prep

In [None]:
raw_training_data = loadmat('raw_training_data.mat')
testing_data = loadmat('leaderboard_data.mat')
testing_data = testing_data['leaderboard_ecog']
train_dg = raw_training_data['train_dg']
train_ecog = raw_training_data['train_ecog']

In [None]:
def filter_data(raw_eeg, lowcut=1, highcut=150, filter_order=5, fs=1000):
    """
    Write a filter function to clean underlying data.
    Filter type and parameters are up to you. Points will be awarded for reasonable filter type, parameters and application.
    Please note there are many acceptable answers, but make sure you aren't throwing out crucial data or adversly
    distorting the underlying data!

    Input:
      raw_eeg (samples x channels): the raw signal
      fs: the sampling rate (1000 for this dataset)
    Output:
      clean_data (samples x channels): the filtered signal
    """

    # Butter function to create a bandpass filter
    b, a = signal.butter(filter_order, [lowcut, highcut], btype='band', fs=fs)

    # Filtfilt to apply the filter to the data: avoids phase distortion
    clean_data = signal.filtfilt(b, a, raw_eeg, axis=0)

    return clean_data

def calculate_number_of_windows(num_samples, fs, window_length_ms, window_overlap_ms):
    # Convert milliseconds to seconds
    window_length = window_length_ms / 1000.0
    window_overlap = window_overlap_ms / 1000.0

    # Calculate the step size in seconds
    step_size = window_length - window_overlap

    # Calculate the total duration of the data in seconds
    total_duration = num_samples / fs

    # Calculate the number of windows
    M = 1 + np.ceil((total_duration - window_length) / step_size)

    return M

In [None]:
#line length
line_length = lambda x : np.sum(np.absolute(np.ediff1d(x)))

#area
area = lambda x : np.sum(np.abs(x))

#energy
energy = lambda x : np.sum(np.square(x))

#dero crossings
zero_crossings = lambda x: np.sum((x[:-1] - x.mean() > 0) & (x[1:] - x.mean() < 0)) + np.sum((x[:-1] - x.mean() < 0) & (x[1:] - x.mean() > 0))

#mean voltage
def mean_voltage(channel):
    return np.mean(channel)

#signal entropy
def signal_entropy(channel):
    """ Calculate the entropy of the signal """
    hist, bin_edges = np.histogram(channel, bins=64, density=True)
    return entropy(hist)

#hjorth activity
def hjorth_activity(channel):
    """ Calculate Hjorth Activity """
    return np.var(channel)

#hjorth mobility
def hjorth_mobility(channel):
    """ Calculate Hjorth Mobility """
    return np.sqrt(np.var(np.diff(channel)) / np.var(channel))

#hjorth complexity
def hjorth_complexity(channel):
    """ Calculate Hjorth Complexity """
    return hjorth_mobility(np.diff(channel)) / hjorth_mobility(channel)

def spectral_power(channel, fs, band):
    """
    Calculate the spectral power within a specific frequency band using Welch's method.

    Args:
        channel (array): EEG signal.
        fs (int): Sampling frequency.
        band (tuple): Frequency band (low_freq, high_freq).

    Returns:
        float: Spectral power in the given frequency band.
    """
    f, Pxx = welch(channel, fs=fs, nperseg=len(channel))
    band_mask = (f >= band[0]) & (f <= band[1])
    return np.sum(Pxx[band_mask])


In [None]:
def get_features(filtered_window, fs=1000):
    """
      Write a function that calculates features for a given filtered window.
      Feel free to use features you have seen before in this class, features that
      have been used in the literature, or design your own!

      Input:
        filtered_window (window_samples x channels): the window of the filtered ecog signal
        fs: sampling rate
      Output:
        features (channels x num_features): the features calculated on each channel for the window
    """
    features = []
    bands = [(8, 12), (18, 24), (75, 115), (125, 159), (160, 175)]

    for channel in filtered_window.T:
      channel_features = [
          line_length(channel),
          area(channel),
          energy(channel),
          zero_crossings(channel),
          mean_voltage(channel),
          signal_entropy(channel),
          hjorth_activity(channel),
          hjorth_mobility(channel),
          hjorth_complexity(channel)
      ]

      spectral_features = [spectral_power(channel, fs, band) for band in bands]
      channel_features.extend(spectral_features)

      features.append(channel_features)

    return np.array(features)

def get_windowed_feats(raw_ecog, fs, window_length, window_overlap):
    """
      Write a function which processes data through the steps of filtering and
      feature calculation and returns features. Points will be awarded for completing
      each step appropriately (note that if one of the functions you call within this script
      returns a bad output, you won't be double penalized). Note that you will need
      to run the filter_data and get_features functions within this function.

      Inputs:
        raw_eeg (samples x channels): the raw signal
        fs: the sampling rate (1000 for this dataset)
        window_length: the window's length
        window_overlap: the window's overlap
      Output:
        all_feats (num_windows x (channels x features)): the features for each channel for each time window
          note that this is a 2D array.
    """
    filtered_ecog_data = filter_data(raw_ecog)

    #number of samples in each window
    window_length_samples = int(window_length * fs)

    #number of samples that should overlap
    window_overlap_samples = int(window_overlap * fs)

    #step size:  number of samples by which we move the window forward to create the next window.
    step_size = window_length_samples - window_overlap_samples

    total_samples = raw_ecog.shape[0]
    #feature windows

    all_features = []

    for start in range(0, total_samples - window_length_samples + 1, step_size):
      window_start = filtered_ecog_data[start : (start + window_length_samples), :]

      #features for each channel within a window, which results in a two-dimensional array (or matrix)
      #each row corresponds to a different channel and each column corresponds to a different feature extracted from that channel
      window_features = get_features(window_start)

      all_features.append(window_features)

    return all_features

def create_R_matrix(features, N_wind):
    #pad the beginning of the features matrix with copies of the first N_wind-1 rows
    padding = features[:N_wind-1]
    padded_features = np.vstack((padding, features))

    #initialize an empty list to hold the rows of the R matrix
    R_list = []

    #iterate over the padded features matrix to construct the R matrix rows
    for i in range(N_wind-1, len(padded_features)):
        #extract the current window and N_wind-1 preceding windows
        window_features = padded_features[i-N_wind+1:i+1].flatten()
        #prepend a 1 to the feature vector to account for the bias term
        window_features_with_bias = np.insert(window_features, 0, 1)
        #append to the R_list
        R_list.append(window_features_with_bias)

    #convert the R_list to a numpy array to form the R matrix
    R = np.array(R_list)

    return R

def downsample_data(data, num_windows):
    #number of finger features
    num_features = data.shape[1]
    #store downsampled data
    downsampled_data = np.zeros((num_windows, num_features))

    #iterate through each feature
    for i in range(num_features):
        #data for each finger
        feature_data = data[:, i]

        #create a new time axis for interpolation, matching the target number of windows
        new_time_axis = np.linspace(0, len(feature_data) - 1, num=num_windows)

        downsampled_data[:, i] = np.interp(new_time_axis, np.arange(len(feature_data)), feature_data)

    return downsampled_data

In [None]:
# setting up data
N_wind = 10
fs = 1000
window_length = 0.1
window_overlap = 0.05
test_sz = 0.2

ecog_train1, ecog_test1, glove_train1, glove_test1 = train_test_split(train_ecog[0][0], train_dg[0][0], test_size=test_sz, shuffle=False)
ecog_train2, ecog_test2, glove_train2, glove_test2 = train_test_split(train_ecog[1][0], train_dg[1][0], test_size=test_sz, shuffle=False)
ecog_train3, ecog_test3, glove_train3, glove_test3 = train_test_split(train_ecog[2][0], train_dg[2][0], test_size=test_sz, shuffle=False)

ecog_trains = [ecog_train1, ecog_train2, ecog_train3]
ecog_tests = [ecog_test1, ecog_test2, ecog_test3]

subject_train_feats = []
subject_test_feats = []
for subject in range(3):
    print(subject)
    subject_train_feats.append(get_windowed_feats(ecog_trains[subject], fs, window_length, window_overlap))
    print("Train done")
    subject_test_feats.append(get_windowed_feats(ecog_tests[subject], fs, window_length, window_overlap))
    print("Test done")
    print("---")

subjects_data = [
    (subject_train_feats[0], glove_train1, subject_test_feats[0], glove_test1),
    (subject_train_feats[1], glove_train2, subject_test_feats[1], glove_test2),
    (subject_train_feats[2], glove_train3, subject_test_feats[2], glove_test3)
]

0
Train done
Test done
---
1
Train done
Test done
---
2
Train done
Test done
---


In [None]:
import pickle

def save_data(data, filename):
    """Save data to a file using pickle."""
    with open(filename, 'wb') as f:
        pickle.dump(data, f)

# Save the subjects_data to a file
save_data(subjects_data, 'subjects_data.pkl')

def load_data(filename):
    """Load data from a file using pickle."""
    with open(filename, 'rb') as f:
        return pickle.load(f)

# Load the subjects_data from a file
subjects_data = load_data('subjects_data.pkl')

# ML

In [None]:
def mean_squared_error(actual, predicted):
    """
    Calculate the mean squared error between actual and predicted values

    Parameters:
    actual (numpy array): The actual values
    predicted (numpy array): The predicted values by the model

    Returns:
    float: The mean squared error
    """
    #calculate the difference between actual and predicted values
    difference = actual - predicted

    #square the differences
    squared_difference = difference ** 2

    #calculate the mean/average of the squared differences
    mse = squared_difference.mean()

    return mse

def calculate_correlations(Y_test, Y_pred):
    """
    Calculate the Pearson correlation coefficient between actual and predicted
    finger angles for each finger.

    Parameters:
    Y_test (numpy array): Actual test finger angles, shape (num_samples, num_fingers)
    Y_pred (numpy array): Predicted finger angles, shape (num_samples, num_fingers)

    Returns:
    List of Pearson correlation coefficients for each finger.
    """
    correlations = []
    #iterate through each finger
    for i in range(Y_test.shape[1]):
        corr, _ = pearsonr(Y_test[:, i], Y_pred[:, i])
        correlations.append(corr)
    return correlations

## Model: LGBM

In [None]:
def train_and_evaluate_lgbm(features_train, targets_train, features_test):
    # Initialize a list to hold models for each dimension
    models = []
    # Initialize an array to hold predictions, corrected to match the test features shape
    predictions = np.zeros((features_test.shape[0], targets_train.shape[1]))

    # Train a separate model for each dimension of the target
    for dim in range(targets_train.shape[1]):
        model = lgb.LGBMRegressor(objective='regression', n_estimators=80, learning_rate=0.05, num_leaves=10, random_state=42)
        model.fit(features_train, targets_train[:, dim])
        predictions[:, dim] = model.predict(features_test)
        models.append(model)

    return models, predictions



```
# This code was used for hyperparam tuning of the number of leaves

num_leaves_values = [10, 20, 30, 40, 50]
average_correlations = []

for num_leaves in num_leaves_values:
    correlations_across_subjects = []
    for i, (windowed_feats, glove_train, windowed_feats_test, glove_test) in enumerate(full_subjects_data, start=1):
        R_train = create_R_matrix(windowed_feats, N_wind)
        downsampled_finger_flexion = downsample_data(glove_train, R_train.shape[0])
        R_test = create_R_matrix(windowed_feats_test, N_wind)

        lgbm_model, Y_pred = train_and_evaluate_lgbm(R_train, downsampled_finger_flexion, R_test, num_leaves)
        Y_test = downsample_data(glove_test, len(windowed_feats_test))

        # Evaluation
        correlations = calculate_correlations(Y_test, Y_pred)
        print("avg corr for sub {i} for this val of num leveas = ", np.mean(correlations))
        correlations_across_subjects.append(np.mean(correlations))

    # Calculate the average correlation for this num_leaves setting across all subjects
    avg_correlation = np.mean(correlations_across_subjects)
    average_correlations.append(avg_correlation)
    print(f"Average Correlation for num_leaves={num_leaves}: {avg_correlation}")

# Plotting the number of leaves against the average correlation
plt.figure(figsize=(10, 6))
plt.plot(num_leaves_values, average_correlations, marker='o')
plt.title('Number of Leaves vs. Average Correlation')
plt.xlabel('Number of Leaves')
plt.ylabel('Average Correlation')
plt.grid(True)
plt.show()```



In [None]:
results = []
models = []

for i, (windowed_feats, glove_train, windowed_feats_test, glove_test) in enumerate(subjects_data, start=1):

    R_train = create_R_matrix(windowed_feats, N_wind)
    downsampled_finger_flexion = downsample_data(glove_train, R_train.shape[0])

    R_test = create_R_matrix(windowed_feats_test, N_wind)

    # Train using LightGBM
    lgbm_model, Y_pred = train_and_evaluate_lgbm(R_train, downsampled_finger_flexion, R_test)
    Y_test = downsample_data(glove_test, len(windowed_feats_test))

    # Evaluation
    mse = mean_squared_error(Y_test, Y_pred)
    correlations = calculate_correlations(Y_test, Y_pred)
    results.append((mse, correlations))

    # Save the trained model
    joblib.dump(lgbm_model, f'lgbm_model_subject_{i}.joblib')
    models.append(lgbm_model)

    print(f"Mean Squared Error (LightGBM) for Subject {i}: {mse}")
    print(f"Correlations for Subject {i}: {correlations}")
    print(f"Average Correlation for Subject {i}: {np.mean(correlations)}\n")


[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 1.925186 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 2064250
[LightGBM] [Info] Number of data points in the train set: 4799, number of used features: 8680
[LightGBM] [Info] Start training from score -0.091412
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 2.310442 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 2064250
[LightGBM] [Info] Number of data points in the train set: 4799, number of used features: 8680
[LightGBM] [Info] Start training from score -0.026955
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 2.231193 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 2064250
[LightGBM] [Info] Number of data points in the train set: 4799, number of used features: 8680
[LightGBM] [I

# Submit Leaderboard.mat

In [None]:
# this method works well with lgbm
def predict_with_pretrained_models(models, features_test):
    # Assuming models is a list of pre-trained LightGBM models, one for each target dimension
    num_dimensions = len(models)  # Number of target dimensions, e.g., number of fingers
    num_samples = features_test.shape[0]

    # Initialize an array to hold predictions for each dimension
    predictions = np.zeros((num_samples, num_dimensions))

    # Generate predictions for each dimension using the corresponding pre-trained model
    for dim, model in enumerate(models):
        predictions[:, dim] = model.predict(features_test)

    return predictions

def interpolate_predictions(predictions, target_length):
    # Current length of the predictions
    current_length = predictions.shape[0]
    # Target length - 147,500
    # Create an array representing the indices of the current predictions
    current_indices = np.linspace(0, current_length - 1, num=current_length)
    # Create an array representing the indices for the interpolated output
    target_indices = np.linspace(0, current_length - 1, num=target_length)
    # Interpolate predictions to target length for each dimension
    interpolated_predictions = np.zeros((target_length, predictions.shape[1]))
    for i in range(predictions.shape[1]):  # Interpolate for each feature/dimension
        interpolated_predictions[:, i] = np.interp(target_indices, current_indices, predictions[:, i])
    return interpolated_predictions

In [None]:
# currently this submits our lgbm model
predicted_dg = np.empty((3, 1), dtype=object)

for i in range(3):
    print(i)
    models = joblib.load(f'lgbm_model_subject_{i+1}.joblib')
    ecog_test = testing_data[i][0]
    windowed_feats_test = get_windowed_feats(ecog_test, fs, window_length, window_overlap)
    R_test = create_R_matrix(windowed_feats_test, N_wind)

    # Fetch predictions
    predictions = np.zeros((R_test.shape[0], 5))
    for dim, single_model in enumerate(models):
        predictions[:, dim] = single_model.predict(R_test)

    # Interpolate predictions to match the required 147,500 length
    interpolated_predictions = interpolate_predictions(predictions, 147500)
    predicted_dg[i, 0] = interpolated_predictions

savemat('submission.mat', {'predicted_dg': predicted_dg})