In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Algorithm Overview:
*   Features: LMP of lowpassed signal at 3.5 Hz and bandpower of 6 bandpassed signal at 6 different freq bands
*   R matrix then catboost
*   A
*   List item








In [None]:
#Set up the notebook environment
!pip install catboost xgboost lightgbm scikit-learn
import catboost
from catboost import CatBoostRegressor
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pickle
from scipy.stats import pearsonr
from scipy import signal as sig
from scipy.io import loadmat
from scipy.signal import welch, butter, filtfilt
from tqdm import tqdm
from scipy.signal import decimate
from sklearn.model_selection import train_test_split
from scipy.signal import stft
from scipy.interpolate import CubicSpline
from sklearn.preprocessing import StandardScaler
from scipy.io import savemat

In [None]:
# Load Training data
train_data = loadmat('/content/drive/Shareddrives/BE 5210 Shared Drive/Xuanbei/V4/raw_training_data.mat')
test_data = loadmat('/content/drive/Shareddrives/BE 5210 Shared Drive/Xuanbei/V4/leaderboard_data.mat')

In [None]:
# Assign Training & Testing Dataset
train_ecog = train_data['train_ecog']
train_dg = train_data['train_dg']
test_ecog = test_data['leaderboard_ecog']

# ECoG Training
train_ecog1 = np.vstack(train_ecog[0])
train_ecog2 = np.vstack(train_ecog[1])
train_ecog3 = np.vstack(train_ecog[2])

# Glove Training Data
train_dg1 = np.vstack(train_dg[0])
train_dg2 = np.vstack(train_dg[1])
train_dg3 = np.vstack(train_dg[2])

# EcoG Testing
test_ecog1 = np.vstack(test_ecog[0])
test_ecog2 = np.vstack(test_ecog[1])
test_ecog3 = np.vstack(test_ecog[2])

In [None]:
def filter_signal(raw_ecog,fs):
    """
    Filter the ECoG signal using Butterworth filters.

    Parameters:
    - raw_ecog: The raw ECoG signal.
    - fs: The sampling frequency of the ECoG signal.

    Returns:
    - down_low_ecog: The filtered ECoG signal.
    """
    lowcut = 3.5  # Low cut-off frequency in Hz
    order = 4
    notch_freq = 60
    notch_Q = 30
    down_freq = 20
    num_samp_decimate = int(raw_ecog.shape[0]/(fs/down_freq))
    chan_num = raw_ecog.shape[1]
    relevant_freq_bands = [ (5,8), (8,12), (12,24), (24,34), (34,60), (100,200)]

    # Calculate normalized cut-off frequencies
    low = lowcut / (fs*0.5)

    # Design Butterworth filters
    b, a = butter(order, low, btype='low', analog=False)
    b_notch, a_notch = sig.iirnotch(notch_freq, notch_Q, fs)
    downsample_factor = int(fs / down_freq)
    down_low_ecog = np.zeros([raw_ecog.shape[0],chan_num])

    for num in range(chan_num):
    # Apply low-pass filtering and notch filter to the ECoG data
        filtered_ecog = filtfilt(b, a, raw_ecog[:,num])
        down_low_ecog[:,num] = filtfilt(b_notch, a_notch, filtered_ecog)

    bandpass_signal = []
    for band in relevant_freq_bands:
        start, end = band
        start_freq = start/(fs*.5)
        end_freq = end/(fs*.5)
        b_band, a_band = butter(order,[start_freq,end_freq],btype= 'band',analog = False)

        filtered_channel = np.zeros([raw_ecog.shape[0],chan_num])
        for num in range(chan_num):
          filtered_band = filtfilt(b_band,a_band, raw_ecog[:,num])
          filtered_channel[:,num] = filtered_band
        bandpass_signal.append(filtered_channel)

    # Assign BandPass
    bandpass_signal = np.hstack(bandpass_signal)
    bandpass1 = bandpass_signal[:,0:chan_num]
    bandpass2 = bandpass_signal[:,chan_num:2*chan_num]
    bandpass3 = bandpass_signal[:,2*chan_num:3*chan_num]
    bandpass4 = bandpass_signal[:,3*chan_num:4*chan_num]
    bandpass5 = bandpass_signal[:,4*chan_num:5*chan_num]
    bandpass6 = bandpass_signal[:,5*chan_num:]

    return down_low_ecog, bandpass1, bandpass2, bandpass3, bandpass4, bandpass5, bandpass6


In [None]:
def hjorth_params(signal):
    """
    Calculate Hjorth parameters: activity, mobility, and complexity.

    Parameters:
    - signal: The input signal.

    Returns:
    - activity: The activity parameter.
    - mobility: The mobility parameter.
    - complexity: The complexity parameter.
    """
    activity = np.var(signal)
    grad = np.diff(signal)
    grad_activity = np.var(grad)
    mobility = np.sqrt(grad_activity / activity)

    grad2 = np.diff(grad)
    grad2_activity = np.var(grad2)
    complexity = np.sqrt(grad2_activity / grad_activity) / mobility

    return activity, mobility, complexity

In [None]:
def get_features(lowpass, bandpass1, bandpass2, bandpass3, bandpass4, bandpass5, bandpass6):
    """
    Calculate the features for a window.

    Parameters:
    - lowpass: The low-passed signal.
    - bandpass 1 - 6: The band-passed signal.

    Returns:
    - window_feat_normalized: The normalized features.
    """

    num_samples, num_channels = lowpass.shape
    num_features = 10  # Including 3 new Hjorth parameters
    features = np.zeros((num_channels, num_features))

    for chan in range(num_channels):
        lmp = np.mean(lowpass[:,chan],axis=0)
        features[chan,0] = lmp

        power1 = np.sum(np.square(bandpass1[:,chan]),axis=0)
        features[chan,1] = power1
        power2 = np.sum(np.square(bandpass2[:,chan]),axis=0)
        features[chan,2] = power2
        power3 = np.sum(np.square(bandpass3[:,chan]),axis=0)
        features[chan,3] = power3
        power4 = np.sum(np.square(bandpass4[:,chan]),axis=0)
        features[chan,4] = power4
        power5 = np.sum(np.square(bandpass5[:,chan]),axis=0)
        features[chan,5] = power5
        power6 = np.sum(np.square(bandpass6[:,chan]),axis=0)
        features[chan,6] = power6

        # Hjorth parameters for the raw ECoG channel
        activity, mobility, complexity = hjorth_params(lowpass[:,chan])
        features[chan,7] = activity
        features[chan,8] = mobility
        features[chan,9] = complexity

    scaler = StandardScaler()
    window_feat_normalized = scaler.fit_transform(features)
    return window_feat_normalized

In [None]:
def NumWins(x,fs,winLen,winDisp):
  """
  Calculate the number of windows in a signal.

  Parameters:
  - x: The input signal.
  - fs: The sampling frequency.
  - winLen: The length of each window in seconds.
  - winDisp: The overlap between windows in seconds.

  Returns:
  - num: The number of windows.
  """
  xLen = int(winLen * fs)
  samDisp = int(winDisp * fs)
  num = 0
  result = [];
  for i in np.arange(0, len(x), samDisp):
     window = x[i:i+xLen]
     num = num + 1
     if len(window) < xLen:
      num = num - 1

  return num

In [None]:
def get_windowed_feats(raw_ecog, fs, window_length,window_overlap):
    """
    Get the windowed features for training.

    Parameters:
    - raw_ecog: The raw ECoG data array
    - fs: The sampling frequency
    - window_length: The length of each window in seconds
    - window_overlap: The overlap between windows in seconds

    Returns:
    - all_feats: The windowed features array
    """
    # Apply Filters
    lowpass, bandpass1, bandpass2, bandpass3, bandpass4, bandpass5, bandpass6 = filter_signal(raw_ecog, fs)

    # Assuming NumWins is defined elsewhere
    window_samples = int(window_length * fs)
    overlap_samples = int(window_overlap * fs)
    num_windows = NumWins(raw_ecog,fs,window_length,window_overlap)

    # Initialize features array
    features = []
    for i in tqdm(range(num_windows)):
        window_start = i * overlap_samples
        window_end = window_start+window_samples
        lowpass_win = lowpass[window_start:window_end]
        bandpass1_win = bandpass1[window_start:window_end]
        bandpass2_win = bandpass2[window_start:window_end]
        bandpass3_win = bandpass3[window_start:window_end]
        bandpass4_win = bandpass4[window_start:window_end]
        bandpass5_win = bandpass5[window_start:window_end]
        bandpass6_win = bandpass6[window_start:window_end]
        window_feat = get_features(lowpass_win,bandpass1_win,bandpass2_win,bandpass3_win,bandpass4_win,bandpass5_win,bandpass6_win)
        features.append(window_feat.flatten())

    all_feats = np.array(features)

    return all_feats

In [None]:
# Sampling Tuning
window_length = 0.15
window_overlap = 0.05
fs = 1000
N_win = 20

# CatBoost Hyperparameters
cat_iter = 2500
cat_dept = 5
cat_leaf = 4
cat_learnr = 0.05

# Smoothing & Noise Reduction
window_size = 210

In [None]:
# Feature Extraction from Training set
train_feat1 = get_windowed_feats(train_ecog1, fs, window_length, window_overlap)
train_feat2 = get_windowed_feats(train_ecog2, fs, window_length, window_overlap)
train_feat3 = get_windowed_feats(train_ecog3, fs, window_length, window_overlap)

In [None]:
 def create_R_matrix(features, N):
    """
    Create the response matrix for training.

    Parameters:
    - features: The windowed features array
    - N: The window size

    Returns:
    - R: The response matrix
    """
    # First N-1 rows to beginning of features matrix
    adjusted_features = np.vstack([features[:N-1], features])

    # Number of total instances after adjustment
    M_prime = adjusted_features.shape[0] - (N - 1)

    # Initialize response matrix R
    num_features = features.shape[1]
    R = np.zeros((M_prime, N * num_features + 1))  # +1 for the intercept term

    # Fill in R
    for i in range(M_prime):
        #extract N consecutive windows of features
        consecutive_features = adjusted_features[i:i+N].flatten()
        #fill corresponding row in R, adding 1 as the last column
        R[i, 1:] = consecutive_features
        R[i, 0] = 1  # Intercept term

    return R

In [None]:
# R Matrices from Training Data
R1 = create_R_matrix(train_feat1, N = N_win)
R2 = create_R_matrix(train_feat2, N = N_win)
R3 = create_R_matrix(train_feat3, N = N_win)

In [None]:
# Feature Extraction from Testing Data
test_feat1 = get_windowed_feats(test_ecog1, fs, window_length, window_overlap)
test_feat2 = get_windowed_feats(test_ecog2, fs, window_length, window_overlap)
test_feat3 = get_windowed_feats(test_ecog3, fs, window_length, window_overlap)

In [None]:
# R Matrices from Testing Data
test_R1 = create_R_matrix(test_feat1, N=N_win)
test_R2 = create_R_matrix(test_feat2, N=N_win)
test_R3 = create_R_matrix(test_feat3, N=N_win)

In [None]:
def get_windowed_target(glove_data, window_length, window_overlap, fs=1000):
    """
    Get the windowed target glove data for training.

    Parameters:
    - glove_data: The glove data array
    - window_length: The length of each window in seconds
    - window_overlap: The overlap between windows in seconds
    - fs: The sampling frequency in Hz

    Returns:
    - targets: The windowed target glove data array
    """
    samp_length = int(fs * (window_length))
    samp_overlap = int(fs * (window_overlap))

    num_windows = NumWins(glove_data,fs,window_length,window_overlap)
    targets = []

    for i in range(num_windows):
        start_idx = i * samp_overlap
        end_idx = start_idx + samp_length
        window = glove_data[start_idx:end_idx, :]
        targets.append(np.mean(window, axis=0))  # Example: mean over the window

    return np.array(targets)

In [None]:
# Windowing target glove data
target_1 = get_windowed_target(train_dg1, window_length,window_overlap)
target_2 = get_windowed_target(train_dg2, window_length,window_overlap)
target_3 = get_windowed_target(train_dg3, window_length,window_overlap)

In [None]:
# CatBoost Models Training
cat_model1 = CatBoostRegressor(iterations = cat_iter, depth = cat_dept, l2_leaf_reg = cat_leaf, learning_rate = cat_learnr, loss_function = 'MultiRMSE', verbose = False, per_float_feature_quantization = '0:border_count=1024', task_type = 'GPU', boosting_type = 'Plain')
cat_model1.fit(R1[:, 1:], target_1)

cat_model2 = CatBoostRegressor(iterations = cat_iter, depth = cat_dept, l2_leaf_reg = cat_leaf, learning_rate = cat_learnr, loss_function = 'MultiRMSE',verbose = False, per_float_feature_quantization = '0:border_count=1024', task_type = 'GPU', boosting_type = 'Plain')
cat_model2.fit(R2[:, 1:], target_2)

cat_model3 = CatBoostRegressor(iterations = cat_iter, depth = cat_dept, l2_leaf_reg = cat_leaf, learning_rate = cat_learnr, loss_function = 'MultiRMSE', verbose = False, per_float_feature_quantization = '0:border_count=1024', task_type = 'GPU', boosting_type = 'Plain')
cat_model3.fit(R3[:, 1:], target_3)

<catboost.core.CatBoostRegressor at 0x785850a8b550>

In [None]:
# Save Trained Models
filename1 = 'cat_model1.pk1'
pickle.dump(cat_model1, open(filename1, 'wb'))

filename2 = 'cat_model2.pk1'
pickle.dump(cat_model2, open(filename2, 'wb'))

filename3 = 'cat_model3.pk1'
pickle.dump(cat_model3, open(filename3, 'wb'))

In [None]:
# Make Predictions
predicted_1 = cat_model1.predict(test_R1[:, 1:])
predicted_2 = cat_model2.predict(test_R2[:, 1:])
predicted_3 = cat_model3.predict(test_R3[:, 1:])

In [None]:
def interpolate(prediction, original_length, desired_length):
    """
    Interpolate the prediction to the desired length.

    Parameters:
    - prediction: The prediction array
    - original_length: The length of the original prediction
    - desired_length: The desired length of the prediction

    Returns:
    - interpolated_prediction: The interpolated prediction array
    """

    # Step 1: Create a time array for the original prediction
    original_time = np.linspace(0, 1, original_length)

    # Step 2: Create a time array for the desired length
    desired_time = np.linspace(0, 1, desired_length)

    # Step 3: Interpolate each column separately using cubic spline
    interpolated_prediction = np.zeros((desired_length, prediction.shape[1]))
    for i in range(prediction.shape[1]):
        cs = CubicSpline(original_time, prediction[:, i])
        interpolated_prediction[:, i] = cs(desired_time)

    return interpolated_prediction

In [None]:
# Get Lengths of Predictions and Testing Sets
original_length = predicted_1.shape[0]
desired_length = test_ecog1.shape[0]

# Interpolate Predictions to Match Desired Length (Testing Length)
inter_prediction1 = interpolate(predicted_1, original_length, desired_length)
inter_prediction2 = interpolate(predicted_2, original_length, desired_length)
inter_prediction3 = interpolate(predicted_3, original_length, desired_length)


In [None]:
def moving_average(data, window_size):
    """
    Apply a moving average filter to the data.

    Parameters:
    - data: Input data array
    - window_size: Size of the moving average window

    Returns:
    - smoothed_data: Smoothed data array
    """

    all_data = []
    for finger in range(5):
      smoothed_data = np.convolve(data[:,finger], np.ones(window_size)/window_size, mode = 'same')
      all_data.append(smoothed_data)
    all_data = np.vstack(all_data)

    return all_data.T

In [None]:
# Smoothing the Prediction for Noise Reduction
moving_pred1 = moving_average(inter_prediction1, window_size)
moving_pred2 = moving_average(inter_prediction2, window_size)
moving_pred3 = moving_average(inter_prediction3, window_size)

In [None]:
# Visualize the 3rd Prediction
plt.plot(np.linspace(0,moving_pred1.shape[0]/1000,num=147500, endpoint=False), moving_pred3[:,0])

In [None]:
# Visualize the Predictions
moving_pred1, moving_pred2, moving_pred3

In [None]:
# Create Submission Array & Save
predictions = np.zeros((3,1), dtype=object)

predictions[0,0] = moving_pred1
predictions[1,0] = moving_pred2
predictions[2,0] = moving_pred3

savemat('predictions.mat', {'predicted_dg':predictions})
