# FingerFlex Model with Original Preprocessing Pipeline

In [48]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import scipy.io as sio
from scipy import signal as sig
from scipy.stats import pearsonr

In [2]:
def filter_data(raw_eeg, fs=1000):
  def notch_filter(data, freq, fs, Q=30):
      """Apply a notch filter at a specific frequency."""
      b, a = sig.iirnotch(w0=freq/(fs/2), Q=Q)
      return sig.filtfilt(b, a, data, axis=0)

  def bandpass_filter(data, lowcut, highcut, fs, order=4):
      """Apply a Butterworth bandpass filter."""
      nyq = 0.5 * fs
      low = lowcut / nyq
      high = highcut / nyq
      b, a = sig.butter(order, [low, high], btype='band')
      return sig.filtfilt(b, a, data, axis=0)

    # Apply notch filters at 60 Hz harmonics (up to 300 Hz)
  filtered = raw_eeg.copy()
  for freq in [60, 120, 180, 240, 300]:
      filtered = notch_filter(filtered, freq, fs)

    # Apply bandpass filter (default 1–200 Hz)
  clean_data = bandpass_filter(filtered, lowcut=1, highcut=200, fs=fs)

  return clean_data

In [3]:
def create_windows(ecog_data, glove_data, win_len=256, step=128, delay=2):
    X, Y = [], []
    for i in range(0, len(ecog_data) - win_len - delay, step):
        window = ecog_data[i:i+win_len]
        label = glove_data[i+delay:i+delay+win_len]
        X.append(window)
        Y.append(label)
    return np.stack(X), np.stack(Y)

In [4]:
class FingerFlexDataset(Dataset):
    def __init__(self, ecog, glove):
        self.X = torch.tensor(ecog, dtype=torch.float32).permute(0, 2, 1)
        self.Y = torch.tensor(glove, dtype=torch.float32)
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.Y[idx]

In [5]:
class FingerFlexModel(nn.Module):
    def __init__(self, input_channels, output_channels=5):
        super(FingerFlexModel, self).__init__()
        
        self.enc1 = self.block(input_channels, 32)
        print(f"enc1 initialized: input={input_channels}, output=32")
        
        self.enc2 = self.block(32, 32)
        print(f"enc2 initialized: input=32, output=32")
        
        self.enc3 = self.block(32, 64)
        print(f"enc3 initialized: input=32, output=64")
        
        self.enc4 = self.block(64, 64)
        print(f"enc4 initialized: input=64, output=64")
        
        self.enc5 = self.block(64, 128)
        print(f"enc5 initialized: input=64, output=128")
        
        self.enc6 = self.block(128, 128)
        print(f"enc6 initialized: input=128, output=128")
        
        self.dec1 = self.up(128, 128)
        print(f"dec1 initialized: input=128, output=128")
        
        self.dec2 = self.up(256, 64)  # 128 + 128 = 256 input
        print(f"dec2 initialized: input=256, output=64")
        
        self.dec3 = self.up(128, 64)  # 64 + 64 = 128 input
        print(f"dec3 initialized: input=128, output=64")
        
        self.dec4 = self.up(128, 32)  # 64 + 64 = 128 i

    def block(self, in_c, out_c):
        return nn.Sequential(
            nn.Conv1d(in_c, out_c, kernel_size=3, padding=1),
            nn.LayerNorm(out_c),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.MaxPool1d(kernel_size=2, stride=2)
        )
    
    def forward(self, x):
        print(f"\nForward pass:")
        print(f"Input shape: {x.shape}")  # [1, 62, 240000]
        s = []
        
        # First encoder block
        print("\nEncoder Block 1:")
        x1 = self.enc1(x)
        print(f"After Conv1d: {x1.shape}")  # [1, 32, 240000]
        print(f"After LayerNorm: {x1.shape}")  # [1, 32, 240000]
        print(f"After GELU: {x1.shape}")  # [1, 32, 240000]
        print(f"After Dropout: {x1.shape}")  # [1, 32, 240000]
        print(f"After MaxPool: {x1.shape}")  # [1, 32, 120000]
        s.append(x1)
        
        # Second encoder block
        print("\nEncoder Block 2:")
        x2 = self.enc2(x1)
        print(f"After Conv1d: {x2.shape}")  # [1, 32, 120000]
        print(f"After LayerNorm: {x2.shape}")  # [1, 32, 120000]
        print(f"After GELU: {x2.shape}")  # [1, 32, 120000]
        print(f"After Dropout: {x2.shape}")  # [1, 32, 120000]
        print(f"After MaxPool: {x2.shape}")  # [1, 32, 60000]
        s.append(x2)
        
        # Continue with other blocks...
        x3 = self.enc3(x2); s.append(x3)
        x4 = self.enc4(x3); s.append(x4)
        x5 = self.enc5(x4); s.append(x5)
        x6 = self.enc6(x5)
        
        # Decoder blocks
        d1 = self.dec1(x6)
        d2 = self.dec2(torch.cat([d1, s[4]], dim=1))
        d3 = self.dec3(torch.cat([d2, s[3]], dim=1))
        d4 = self.dec4(torch.cat([d3, s[2]], dim=1))
        d5 = self.dec5(torch.cat([d4, s[1]], dim=1))
        
        out = self.final(torch.cat([d5, s[0]], dim=1))
        return out.permute(0, 2, 1)
    
    # def block(self, in_c, out_c):
    #     return nn.Sequential(
    #         nn.Conv1d(in_c, out_c, kernel_size=3, padding=1),
    #         nn.LayerNorm(out_c),
    #         nn.GELU(),
    #         nn.Dropout(0.1),
    #         nn.MaxPool1d(kernel_size=2, stride=2)
    #     )
    
    def up(self, in_c, out_c):
        return nn.Sequential(
            nn.Conv1d(in_c, out_c, kernel_size=3, padding=1),
            nn.GELU(),
            nn.Dropout(0.1),
            nn.Upsample(scale_factor=2, mode='nearest')
        )
    # def forward(self, x):
    #     print(f"\nInput shape: {x.shape}")
    #     s = []
        
    #     x1 = self.enc1(x); s.append(x1)
    #     print(f"After enc1: {x1.shape}")
        
    #     x2 = self.enc2(x1); s.append(x2)
    #     print(f"After enc2: {x2.shape}")
        
    #     x3 = self.enc3(x2); s.append(x3)
    #     print(f"After enc3: {x3.shape}")
        
    #     x4 = self.enc4(x3); s.append(x4)
    #     print(f"After enc4: {x4.shape}")
        
    #     x5 = self.enc5(x4); s.append(x5)
    #     print(f"After enc5: {x5.shape}")
        
    #     x6 = self.enc6(x5)
    #     print(f"After enc6: {x6.shape}")
        
    #     d1 = self.dec1(x6)
    #     print(f"After dec1: {d1.shape}")
        
    #     d2 = self.dec2(torch.cat([d1, s[4]], dim=1))
    #     print(f"After dec2: {d2.shape}")
        
    #     d3 = self.dec3(torch.cat([d2, s[3]], dim=1))
    #     print(f"After dec3: {d3.shape}")
        
    #     d4 = self.dec4(torch.cat([d3, s[2]], dim=1))
    #     print(f"After dec4: {d4.shape}")
        
    #     d5 = self.dec5(torch.cat([d4, s[1]], dim=1))
    #     print(f"After dec5: {d5.shape}")
        
    #     out = self.final(torch.cat([d5, s[0]], dim=1))
    #     print(f"After final: {out.shape}")
        
    #     return out.permute(0, 2, 1)


In [6]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, 
                 stride=1, dilation=1, p_conv_drop=0.1):
        super(ConvBlock, self).__init__()
        
        self.conv1d = nn.Conv1d(in_channels, out_channels, 
                               kernel_size=kernel_size, 
                               bias=False, 
                               padding='same')
        
        self.norm = nn.LayerNorm(out_channels)
        self.activation = nn.GELU()
        self.drop = nn.Dropout(p=p_conv_drop)
        self.downsample = nn.MaxPool1d(kernel_size=stride, stride=stride)

        self.stride = stride
        self.in_channels = in_channels
        self.out_channels = out_channels
        
    def forward(self, x):
        x = self.conv1d(x)
        
        # norm by last axis
        x = torch.transpose(x, -2, -1) 
        x = self.norm(x)
        x = torch.transpose(x, -2, -1)
        
        x = self.activation(x)
        x = self.drop(x)
        x = self.downsample(x)
        return x

class UpConvBlock(nn.Module):
    def __init__(self, scale, **args):
        super(UpConvBlock, self).__init__()
        self.conv_block = ConvBlock(**args)
        self.upsample = nn.Upsample(scale_factor=scale, mode='linear', align_corners=False)
            
    def forward(self, x):
        x = self.conv_block(x)
        x = self.upsample(x)
        return x    

class AutoEncoder1D(nn.Module):
    def __init__(self,
                 n_electrodes=62,   # Number of channels (ECoG electrodes)
                 n_freqs=1,         # Number of frequency bands (1 for raw data)
                 n_channels_out=5,  # Number of fingers to predict
                 channels=[32, 64, 128, 128],  # Number of features on each encoder layer
                 kernel_sizes=[3, 3, 3],
                 strides=[2, 2, 2],  # Reduced stride to handle long sequences
                 dilation=[1, 1, 1]
                 ):
        
        super(AutoEncoder1D, self).__init__()
        
        self.n_electrodes = n_electrodes
        self.n_freqs = n_freqs
        self.n_inp_features = n_freqs * n_electrodes
        self.n_channels_out = n_channels_out
        
        self.model_depth = len(channels)-1
        
        # Initial dimensionality reduction
        self.spatial_reduce = ConvBlock(self.n_inp_features, channels[0], kernel_size=3)
        
        # Encoder part
        self.downsample_blocks = nn.ModuleList([
            ConvBlock(channels[i], channels[i+1], 
                     kernel_sizes[i],
                     stride=strides[i], 
                     dilation=dilation[i]) 
            for i in range(self.model_depth)
        ])

        # Prepare channels for decoder
        channels = [ch for ch in channels[:-1]] + channels[-1:]

        # Decoder part with skip connections
        self.upsample_blocks = nn.ModuleList([
            UpConvBlock(scale=strides[i],
                       in_channels=channels[i+1] if i == self.model_depth-1 else channels[i+1]*2,
                       out_channels=channels[i],
                       kernel_size=kernel_sizes[i]) 
            for i in range(self.model_depth-1, -1, -1)
        ])
        
        # Final 1x1 convolution
        self.conv1x1_one = nn.Conv1d(channels[0]*2, self.n_channels_out, kernel_size=1, padding='same')
      
    def forward(self, x):
        # Input shape: [batch, electrodes, time]
        batch, elec, time = x.shape
        
        # Add frequency dimension if needed
        if len(x.shape) == 3:
            x = x.unsqueeze(2)  # [batch, electrodes, 1, time]
            
        # Reshape and process
        x = x.reshape(batch, -1, time)  # flatten the input
        x = self.spatial_reduce(x)
        
        # Encoder path with skip connections
        skip_connections = []
        for i in range(self.model_depth):
            skip_connections.append(x)
            x = self.downsample_blocks[i](x)

        # Decoder path with skip connections
        for i in range(self.model_depth):
            x = self.upsample_blocks[i](x)
            x = torch.cat((x, skip_connections[-1 - i]), dim=1)
        
        # Final prediction
        x = self.conv1x1_one(x)
        return x.permute(0, 2, 1)  # [batch, time, fingers]

In [29]:
class CombinedLoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.mse = nn.MSELoss()
    def forward(self, pred, target):
        mse = self.mse(pred, target)
        cos = F.cosine_similarity(pred, target, dim=-1).mean()
        return 0.5 * (mse + (1 - cos))

def train_model(model, train_loader, epochs=10, lr=8.4e-5):
    model.train()
    loss_fn = CombinedLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    for epoch in range(epochs):
        total_loss = 0
        for x, y in train_loader:
            optimizer.zero_grad()
            pred = model(x)
            loss = loss_fn(pred, y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}: Loss = {total_loss / len(train_loader):.4f}")
    return model

In [30]:
train_file = 'raw_training_data.mat' # file path
test_file = 'leaderboard_data.mat'

# load training data
train_data = sio.loadmat(train_file) # returns dict with keys and values as numpy arrays
ecog = train_data['train_ecog']
data_glove = train_data['train_dg']

# leaderboard testing data
test_data = sio.loadmat(test_file)
leaderboard_ecog = test_data['leaderboard_ecog']

In [31]:
train_ecog = []
test_ecog = []
train_glove = []
test_glove = []

train_len = int(0.8*len(ecog[0][0]))

for subject_idx in range(3):
    ecog_data = ecog[subject_idx]
    glove_data = data_glove[subject_idx]
    train_ecog.append(ecog_data[0][:train_len])
    test_ecog.append(ecog_data[0][train_len:])
    train_glove.append(glove_data[0][:train_len])
    test_glove.append(glove_data[0][train_len:])

In [32]:
fs = 1000
xLen = len(train_ecog[0])
winLen= 0.1
winDisp = 0.05

In [33]:
def NumWins(xLen, fs, winLen, winDisp):
  winLen = winLen * fs
  winDisp = winDisp * fs
  return int((xLen - winLen) // winDisp + 1)

In [34]:
NumWins(xLen, fs, winLen, winDisp)

4799

In [35]:
filtered_ecogs = [filter_data(ecog) for ecog in train_ecog]

In [36]:
# Normalize glove
normalized_gloves = [(train_glove_i - train_glove_i.min()) / (train_glove_i.max() - train_glove_i.min()) for train_glove_i in train_glove]

In [37]:
X, Y = create_windows(filtered_ecogs[0], normalized_gloves[0])

In [38]:
# Train
dataset = FingerFlexDataset(X, Y)


In [39]:
loader = DataLoader(dataset, batch_size=32, shuffle=True)


In [40]:
model = AutoEncoder1D()

In [41]:
# Make a copy of the array to ensure positive strides
filtered_ecog_tensor = torch.from_numpy(filtered_ecogs[0].copy()).float()

# Now you can safely transpose and add batch dimension
if len(filtered_ecog_tensor.shape) == 2:  # If it's (time_steps, channels)
    filtered_ecog_tensor = filtered_ecog_tensor.T  # Transpose to (channels, time_steps)
    filtered_ecog_tensor = filtered_ecog_tensor.unsqueeze(0)  # Add batch dimension

# Verify the shape
print("Filtered ECoG tensor shape:", filtered_ecog_tensor.shape)

Filtered ECoG tensor shape: torch.Size([1, 62, 240000])


In [42]:
# Current shape: [1, 240000, 62]
# We want: [1, 62, 240000]

# First, remove the batch dimension
filtered_ecog_tensor = filtered_ecog_tensor.squeeze(0)  # Now [240000, 62]

# Then transpose to get channels first
filtered_ecog_tensor = filtered_ecog_tensor.T  # Now [62, 240000]

# Finally, add batch dimension back
filtered_ecog_tensor = filtered_ecog_tensor.unsqueeze(0)  # Now [1, 62, 240000]

# Verify the shape
print("Corrected Filtered ECoG tensor shape:", filtered_ecog_tensor.shape)

Corrected Filtered ECoG tensor shape: torch.Size([1, 240000, 62])


In [43]:
model

AutoEncoder1D(
  (spatial_reduce): ConvBlock(
    (conv1d): Conv1d(62, 32, kernel_size=(3,), stride=(1,), padding=same, bias=False)
    (norm): LayerNorm((32,), eps=1e-05, elementwise_affine=True)
    (activation): GELU(approximate='none')
    (drop): Dropout(p=0.1, inplace=False)
    (downsample): MaxPool1d(kernel_size=1, stride=1, padding=0, dilation=1, ceil_mode=False)
  )
  (downsample_blocks): ModuleList(
    (0): ConvBlock(
      (conv1d): Conv1d(32, 64, kernel_size=(3,), stride=(1,), padding=same, bias=False)
      (norm): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      (activation): GELU(approximate='none')
      (drop): Dropout(p=0.1, inplace=False)
      (downsample): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    )
    (1): ConvBlock(
      (conv1d): Conv1d(64, 128, kernel_size=(3,), stride=(1,), padding=same, bias=False)
      (norm): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
      (activation): GELU(approximate='none')

In [44]:
trained_model = train_model(model, loader, epochs=20)
torch.save(trained_model.state_dict(), 'trained_model.pth')

Epoch 1: Loss = 0.1749
Epoch 2: Loss = 0.1184
Epoch 3: Loss = 0.1041
Epoch 4: Loss = 0.0960
Epoch 5: Loss = 0.0907
Epoch 6: Loss = 0.0866
Epoch 7: Loss = 0.0833
Epoch 8: Loss = 0.0802
Epoch 9: Loss = 0.0776
Epoch 10: Loss = 0.0753
Epoch 11: Loss = 0.0737
Epoch 12: Loss = 0.0718
Epoch 13: Loss = 0.0706
Epoch 14: Loss = 0.0689
Epoch 15: Loss = 0.0676
Epoch 16: Loss = 0.0662
Epoch 17: Loss = 0.0654
Epoch 18: Loss = 0.0641
Epoch 19: Loss = 0.0635
Epoch 20: Loss = 0.0623


In [46]:
# 1. First, load your trained model
model = AutoEncoder1D()
model.load_state_dict(torch.load('trained_model.pth'))
model.eval()  # Set to evaluation mode
# 3. Prepare the input data
# Convert to tensor and add batch dimension
test_ecog_tensor = torch.FloatTensor(test_ecog[0].T)  # [time, channels]
test_ecog_tensor = test_ecog_tensor.unsqueeze(0)  # [1, time, channels]

# 4. Make predictions
with torch.no_grad():  # Disable gradient computation for inference
    predictions = model(test_ecog_tensor)  # [1, time, 5]

# 5. Convert predictions to numpy and remove batch dimension
predictions = predictions.squeeze(0).numpy()  # [time, 5]

In [47]:
predictions

array([[0.32443577, 0.31685024, 0.18314555, 0.13791126, 0.27647138],
       [0.20387895, 0.18787694, 0.22293738, 0.24766283, 0.17975752],
       [0.21603225, 0.22571966, 0.23749822, 0.21542756, 0.17827828],
       ...,
       [0.21963586, 0.20924117, 0.21377352, 0.10472213, 0.16552237],
       [0.20235266, 0.19962719, 0.22381625, 0.0989549 , 0.17417732],
       [0.25801334, 0.18685997, 0.2622264 , 0.22069572, 0.14892665]],
      shape=(60000, 5), dtype=float32)

In [49]:
r, _ = pearsonr(predictions, test_glove[0])

In [50]:
r

array([-0.03528733,  0.03272834, -0.0168465 ,  0.01353211, -0.00617093])

In [24]:
test_ecog[0].shape

(60000, 62)

In [None]:
filtered_ecogs[1]

In [51]:
dummy_labels = np.zeros((test_ecog[0].shape[0], 5))
X_test, Y_test = create_windows(filtered_ecogs[0], dummy_labels)
test_dataset = FingerFlexDataset(test_ecog[0], dummy_labels)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

ValueError: all input arrays must have the same shape

In [None]:
all_preds = []
with torch.no_grad():
    for x, _ in test_loader:
        preds = model(x)
        all_preds.append(preds.numpy())

# Stitch results back together
predicted_fingers = np.concatenate(all_preds, axis=0)  # Shape: [windows, time, 5]
predicted_fingers = predicted_fingers.reshape(-1, 5)[:test_ecog.shape[0]]  # Truncate