In [1]:
import torch
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
import torch.nn as nn
from tqdm import tqdm
from sklearn.cluster import KMeans
from torch.utils.data import DataLoader, TensorDataset
from scipy.stats import pearsonr
import torch.nn.functional as F
import matplotlib.pyplot as plt
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np


In [2]:
train_data = torch.load('train.pt')
val_data = torch.load('val.pt')
test_data = torch.load('test.pt')

In [3]:
def prepare_dataset(data):
    eeg_list = []
    stim_list = []
    for eeg, stim in data:
        eeg_list.append(eeg.float())           # (320, 64)
        stim_list.append(stim.float())         # (320,)
    eeg_tensor = torch.stack(eeg_list)         # (N, 320, 64)
    stim_tensor = torch.stack(stim_list)       # (N, 320)
    return eeg_tensor, stim_tensor

In [4]:
X_train, y_train = prepare_dataset(train_data)
X_val, y_val = prepare_dataset(val_data)
X_test, y_test = prepare_dataset(test_data)

In [5]:
class PearsonCorrelationLoss(nn.Module):
    def __init__(self):
        super(PearsonCorrelationLoss, self).__init__()

    def forward(self, pred, target):
        pred = pred - pred.mean(dim=1, keepdim=True)
        target = target - target.mean(dim=1, keepdim=True)
        
        numerator = (pred * target).sum(dim=1)
        denominator = torch.sqrt((pred ** 2).sum(dim=1) * (target ** 2).sum(dim=1) + 1e-8)
        
        correlation = numerator / denominator
        return -correlation.mean()

In [6]:
# --- Cosine similarity metric ---
def cosine_similarity(pred, target):
    pred = F.normalize(pred, dim=1)
    target = F.normalize(target, dim=1)
    return (pred * target).sum(dim=1).mean()

In [7]:
class CNNLSTMWithSkip(nn.Module):
    def __init__(self, in_channels=64, hidden_dim=128, lstm_hidden_dim=64):
        super(CNNLSTMWithSkip, self).__init__()

        # First CNN layer
        self.cnn1 = nn.Sequential(
            nn.Conv1d(in_channels, 128, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.BatchNorm1d(128)
        )

        # Second CNN layer
        self.cnn2 = nn.Sequential(
            nn.Conv1d(128, 64, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.BatchNorm1d(64)
        )

        # First LSTM layer
        self.lstm1 = nn.LSTM(input_size=64, hidden_size=lstm_hidden_dim, batch_first=True, bidirectional=True)

        # Second LSTM layer
        self.lstm2 = nn.LSTM(input_size=2*lstm_hidden_dim, hidden_size=lstm_hidden_dim, batch_first=True, bidirectional=True)

        # Output regression layer
        self.regressor = nn.Linear(2 * lstm_hidden_dim, 1) 

    def forward(self, x):
        # x: (B, 320, 64) --> CNN expects (B, C, T)
        x_cnn = x.permute(0, 2, 1)  # (B, 64, 320)
        out1 = self.cnn1(x_cnn)     # (B, 128, 320)
        out2 = self.cnn2(out1)      # (B, 64, 320)
        out2 = out2.permute(0, 2, 1)  # (B, 320, 64)

        # LSTM layers
        out3, _ = self.lstm1(out2)  # (B, 320, 2*H)
        out4, _ = self.lstm2(out3)  # (B, 320, 2*H)

        
        out = self.regressor(out4)
        out = out.squeeze(-1)       # (B, 320)

        return out

In [8]:
model = CNNLSTMWithSkip().to('cuda')
loss_fn = PearsonCorrelationLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [9]:
batch_size = 32

train_loader = torch.utils.data.DataLoader(
    torch.utils.data.TensorDataset(X_train, y_train),
    batch_size=batch_size, shuffle=True
)

val_loader = torch.utils.data.DataLoader(
    torch.utils.data.TensorDataset(X_val, y_val),
    batch_size=batch_size, shuffle=False
)

test_loader = torch.utils.data.DataLoader(
    torch.utils.data.TensorDataset(X_test, y_test),
    batch_size=batch_size, shuffle=False
)

Epoch 1/10:   0%|                                                                              | 0/2856 [00:00<?, ?it/s]


RuntimeError: mat1 and mat2 shapes cannot be multiplied (10240x128 and 192x1)

In [None]:
model.eval()

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Get one batch from the test loader
x_test_sample, y_test_sample = next(iter(test_loader))
x_test_sample = x_test_sample.to(device)
y_test_sample = y_test_sample.to(device)

# Get predictions
with torch.no_grad():
    y_pred_sample = model(x_test_sample)

# Pick the first test point in the batch
y_pred_np = y_pred_sample[0].cpu().numpy()
y_true_np = y_test_sample[0].cpu().numpy()

# Plot
plt.figure(figsize=(12, 5))
plt.plot(y_true_np, label="Original ENV", linewidth=2)
plt.plot(y_pred_np, label="Predicted ENV", linestyle="--")
plt.title("Predicted vs Original ENV for a Sample Test Point")
plt.xlabel("Time Step")
plt.ylabel("Amplitude")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
y_true_vec = y_true_np.reshape(1, -1)
y_pred_vec = y_pred_np.reshape(1, -1)

# Pearson Correlation
pearson_corr, _ = pearsonr(y_true_np, y_pred_np)

# Cosine Similarity
cos_sim = cosine_similarity(y_true_vec, y_pred_vec)[0][0]

# Print the results
print(f"Pearson Correlation Coefficient: {pearson_corr:.4f}")
print(f"Cosine Similarity: {cos_sim:.4f}")