In [2]:
# siamese_lfp_model.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import numpy as np
from sklearn.model_selection import train_test_split
import os
from scipy.io import loadmat
import h5py

In [4]:
# ----------- Config ------------
SEGMENT_LENGTH = 24000  # 1 second @ 24kHz
BATCH_SIZE = 256
EMBEDDING_DIM = 128
LEARNING_RATE = 1e-4
EPOCHS = 200
LOG_DIR = "runs/siamese_lfp3"

In [5]:
# ----------- Data Preparation ------------
def segment_data(data, segment_length):
    num_segments = len(data) // segment_length
    segments = np.array(np.split(data[:num_segments * segment_length], num_segments))
    np.random.shuffle(segments)
    return segments


# def create_pairs(gpi_segments, stn_segments):
#     pairs = []
#     labels = []

#     # GPi-GPi (label 0)
#     for i in range(len(gpi_segments) - 1):
#         pairs.append((gpi_segments[i], gpi_segments[i+1]))
#         labels.append(0)

#     # STN-STN (label 0)
#     for i in range(len(stn_segments) - 1):
#         pairs.append((stn_segments[i], stn_segments[i+1]))
#         labels.append(0)

#     # GPi-STN (label 1)
#     for i in range(min(len(gpi_segments), len(stn_segments))):
#         pairs.append((gpi_segments[i], stn_segments[i]))
#         labels.append(1)

#     ratio_similar = labels.count(0) / len(labels)
#     print(f"Similarity label ratio: {ratio_similar:.2f} similar (label 0), {1 - ratio_similar:.2f} dissimilar (label 1)")

#     return pairs, labels

def create_pairs(gpi_segments, stn_segments):
    # GPi-GPi (label 0)
    gpi_gpi_pairs = [(gpi_segments[i], gpi_segments[i+1]) for i in range(len(gpi_segments) - 1)]
    gpi_gpi_labels = [0] * len(gpi_gpi_pairs)

    # STN-STN (label 0)
    stn_stn_pairs = [(stn_segments[i], stn_segments[i+1]) for i in range(len(stn_segments) - 1)]
    stn_stn_labels = [0] * len(stn_stn_pairs)

    # GPi-STN (label 1)
    gpi_stn_pairs = [(gpi_segments[i], stn_segments[i]) for i in range(min(len(gpi_segments), len(stn_segments)))]
    gpi_stn_labels = [1] * len(gpi_stn_pairs)

    # Combine similar pairs
    similar_pairs = gpi_gpi_pairs + stn_stn_pairs
    similar_labels = gpi_gpi_labels + stn_stn_labels

    # Balance both classes
    min_len = min(len(similar_pairs), len(gpi_stn_pairs))
    balanced_pairs = similar_pairs[:min_len] + gpi_stn_pairs[:min_len]
    balanced_labels = similar_labels[:min_len] + gpi_stn_labels[:min_len]

    ratio_similar = balanced_labels.count(0) / len(balanced_labels)
    print(f"Balanced similarity label ratio: {ratio_similar:.2f} similar (label 0), {1 - ratio_similar:.2f} dissimilar (label 1)")

    return balanced_pairs, balanced_labels


class LFPDataset(Dataset):
    def __init__(self, pairs, labels):
        self.pairs = pairs
        self.labels = labels

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        x1, x2 = self.pairs[idx]
        x1 = torch.tensor(x1, dtype=torch.float32).unsqueeze(0)
        x2 = torch.tensor(x2, dtype=torch.float32).unsqueeze(0)
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        return x1, x2, label

In [None]:
# ----------- Model Definition ------------
class CNNEncoder(nn.Module):
    def __init__(self, embedding_dim):
        super(CNNEncoder, self).__init__()
        # self.layers = nn.ModuleList([
        #     nn.Dropout(0.3),
        #     nn.Conv1d(1, 4, kernel_size=16, stride=2, padding=8),
        #     nn.BatchNorm1d(4),
        #     nn.ReLU(),
        #     nn.Dropout(0.3),
        #     nn.Conv1d(4, 8, kernel_size=32, stride=2, padding=16),
        #     nn.BatchNorm1d(8),
        #     nn.ReLU(),
        #     nn.Dropout(0.3),
        #     nn.Conv1d(8, 16, kernel_size=64, stride=2, padding=32),
        #     nn.BatchNorm1d(16),
        #     nn.ReLU(),
        #     # nn.Dropout(0.1),
        #     nn.Conv1d(16, 32, kernel_size=128, stride=2, padding=64),
        #     nn.BatchNorm1d(32),
        #     nn.ReLU(),
        #     # nn.Dropout(0.1),
        #     nn.Conv1d(32, 64, kernel_size=256, stride=2, padding=128),
        #     nn.BatchNorm1d(64),
        #     nn.ReLU(),
        #     # nn.Dropout(0.1),
        #     nn.Conv1d(64, 64, kernel_size=512, stride=2, padding=256),
        #     nn.BatchNorm1d(64),
        #     nn.ReLU(),
        #     nn.Dropout(0.3),
        #     nn.AdaptiveAvgPool1d(1)
        # ])

        self.layers = nn.ModuleList([
            nn.Dropout(0.3),
            nn.Conv1d(1, 32, kernel_size=32, stride=8, padding=0),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Conv1d(32, 64, kernel_size=32, stride=8, padding=0),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Conv1d(64, 64, kernel_size=16, stride=4, padding=0),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Conv1d(64, 64, kernel_size=8, stride=2, padding=0),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Conv1d(64, 64, kernel_size=4, stride=2, padding=0),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Conv1d(64, 64, kernel_size=4, stride=2, padding=0),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.AdaptiveAvgPool1d(1)
        ])
        self.fc = nn.Linear(64, embedding_dim)

    def forward(self, x):
        for i, layer in enumerate(self.layers):
            x = layer(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


def count_parameters(model):
    total = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Total trainable parameters: {total:,}\n")
    print("Trainable parameters by layer:")
    for name, param in model.named_parameters():
        if param.requires_grad:
            print(f"{name:50} {param.numel():,}")


def print_model_summary(model, train_loader, device):
    print("===== Sample Forward Pass Shape Info =====")
    x_sample = next(iter(train_loader))[0].to(device)
    model.encoder(x_sample)
    print("\nModel Summary:\n")
    print(model)
    count_parameters(model)


class SiameseNet(nn.Module):
    def __init__(self, embedding_dim):
        super(SiameseNet, self).__init__()
        self.encoder = CNNEncoder(embedding_dim)

    def forward(self, x1, x2):
        embed1 = self.encoder(x1)
        embed2 = self.encoder(x2)
        distance = F.pairwise_distance(embed1, embed2)
        return distance 


In [7]:

# ----------- Training Loop ------------
def train(model, dataloader, optimizer, criterion, device, writer, epoch):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for i, (x1, x2, label) in enumerate(dataloader):
        x1, x2, label = x1.to(device), x2.to(device), label.to(device)
        optimizer.zero_grad()
        output = model(x1, x2)
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        preds = (output > 0.5).float()
        correct += (preds == label).sum().item()
        total += label.size(0)
        if i == 0:
            print("Preds:", preds[:10])
            print("Labels:", label[:10])

        writer.add_scalar('Train/Batch_Loss', loss.item(), epoch * len(dataloader) + i)
    avg_loss = running_loss / len(dataloader)
    accuracy = correct / total
    print(f"Epoch {epoch+1} Train Accuracy: {accuracy:.4f}")
    writer.add_scalar('Train/Epoch_Loss', avg_loss, epoch)
    writer.add_scalar('Train/Accuracy', accuracy, epoch)
    return avg_loss


def evaluate(model, dataloader, criterion, device, writer, epoch):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for x1, x2, label in dataloader:
            x1, x2, label = x1.to(device), x2.to(device), label.to(device)
            output = model(x1, x2)
            loss = criterion(output, label)
            total_loss += loss.item()
            preds = (output > 0.5).float()  # Same for both train and test
            correct += (preds == label).sum().item()
            total += label.size(0)
    
    avg_loss = total_loss / len(dataloader)
    accuracy = correct / total
    print(f"Epoch {epoch+1} Test Accuracy: {accuracy:.4f}")
    writer.add_scalar('Test/Epoch_Loss', avg_loss, epoch)
    writer.add_scalar('Test/Accuracy', accuracy, epoch)
    return avg_loss

In [None]:
# ----------- Run Pipeline ------------
def main():
    # Print dataset shape and sizes
    print("===== Dataset Information =====")
    # TensorBoard setup
    os.makedirs(LOG_DIR, exist_ok=True)
    writer = SummaryWriter(LOG_DIR)
    # # Simulate random LFP-like signals
    # GPiData = np.random.randn(24000 * 60 * 2)  # 2 minutes of data
    # STNData = np.random.randn(24000 * 60 * 2)
    # Simulate random LFP-like signals
    # Generate synthetic LFP-like signals with sinusoids + noise

    # fs = 24000
    # duration_sec = 60 * 2
    # t = np.linspace(0, duration_sec, fs * duration_sec, endpoint=False)

    # gpi_signal = np.sin(2 * np.pi * 10 * t)  # 10 Hz sinusoid
    # stn_signal = np.sin(2 * np.pi * 10 * t)  # 30 Hz sinusoid
#ashaks
    # GPiData = gpi_signal + 0.5 * np.random.randn(len(t))
    # STNData = stn_signal + 0.5 * np.random.randn(len(t))

    with h5py.File("F:\Python Projects\data\period9\microGPi1_L_1_CommonFiltered.mat", "r") as f:
        GPiData = np.array(f["data"]).squeeze()
        fs = int(np.array(f["fs"]).squeeze())
        # rtesting

    # for Sina's PC: F:\Python Projects\data\period9
    # for shared PC: D:\Sina\Data\period9\microSTN_L_3_CommonFiltered.mat
    with h5py.File("F:\Python Projects\data\period9\microSTN_L_3_CommonFiltered.mat", "r") as f:
        STNData = np.array(f["data"]).squeeze()  # replace with your actual filename

    # Segment
    gpi_segments = segment_data(GPiData, SEGMENT_LENGTH)
    stn_segments = segment_data(STNData, SEGMENT_LENGTH)
    pairs, labels = create_pairs(gpi_segments, stn_segments)

    # Split data
    # train_pairs, test_pairs, train_labels, test_labels = train_test_split(pairs, labels, test_size=0.2, stratify=labels)
    train_pairs, test_pairs, train_labels, test_labels = train_test_split( pairs, labels, test_size=0.2, stratify=labels, shuffle=True, random_state=42)

    # Dataloaders
    train_ds = LFPDataset(train_pairs, train_labels)
    test_ds = LFPDataset(test_pairs, test_labels)
    print(f"Train set: {len(train_ds)} pairs")
    print(f"Test set: {len(test_ds)} pairs")
    print(f"Batch size: {BATCH_SIZE}")

    train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
    test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE)

    # Model setup
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(device)
    model = SiameseNet(embedding_dim=EMBEDDING_DIM).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.BCEWithLogitsLoss()

    print_model_summary(model, train_loader, device)
    print("===== Training Input Shape Info =====")
    for x1, x2, _ in train_loader:
        print(f"Train input shapes: x1: {x1.shape}, x2: {x2.shape}")
        break
    print("===== Testing Input Shape Info =====")

    def contrastive_loss(distances, labels, margin=1.0):
        labels = labels.float()
        loss_similar = (1 - labels) * distances.pow(2)
        loss_dissimilar = labels * F.relu(margin - distances).pow(2)
        return torch.mean(loss_similar + loss_dissimilar)

    criterion = contrastive_loss
    for x1, x2, _ in test_loader:
        print(f"Test input shapes: x1: {x1.shape}, x2: {x2.shape}")
        break

    for epoch in range(EPOCHS):

    
        train_loss = train(model, train_loader, optimizer, criterion, device, writer, epoch)
        test_loss = evaluate(model, test_loader, criterion, device, writer, epoch)
        print(f"Epoch {epoch+1}/{EPOCHS}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}")

    writer.close()
    return model



In [None]:
# Run
if __name__ == "__main__":
    model = main()

===== Dataset Information =====
Balanced similarity label ratio: 0.50 similar (label 0), 0.50 dissimilar (label 1)
Train set: 6177 pairs
Test set: 1545 pairs
Batch size: 256



A module that was compiled using NumPy 1.x cannot be run in
NumPy 2.2.4 as it may crash. To support both 1.x and 2.x
versions of NumPy, modules must be compiled with NumPy 2.0.
Some module may need to rebuild instead e.g. with 'pybind11>=2.12'.

If you are a user of the module, the easiest solution will be to
downgrade to 'numpy<2' or try to upgrade the affected module.
We expect that some modules will need time to support NumPy 2.

Traceback (most recent call last):  File "C:\Program Files\Python310\lib\runpy.py", line 196, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Program Files\Python310\lib\runpy.py", line 86, in _run_code
    exec(code, run_globals)
  File "d:\Sina\NIPS2025\NIPS2025\venv\lib\site-packages\ipykernel_launcher.py", line 18, in <module>
    app.launch_new_instance()
  File "d:\Sina\NIPS2025\NIPS2025\venv\lib\site-packages\traitlets\config\application.py", line 1075, in launch_instance
    app.start()
  File "d:\Sina\NIPS2025\NIPS

===== Sample Forward Pass Shape Info =====

Model Summary:

SiameseNet(
  (encoder): CNNEncoder(
    (layers): ModuleList(
      (0): Dropout(p=0.3, inplace=False)
      (1): Conv1d(1, 4, kernel_size=(16,), stride=(2,), padding=(8,))
      (2): BatchNorm1d(4, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (3): ReLU()
      (4): Dropout(p=0.3, inplace=False)
      (5): Conv1d(4, 8, kernel_size=(32,), stride=(2,), padding=(16,))
      (6): BatchNorm1d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (7): ReLU()
      (8): Dropout(p=0.3, inplace=False)
      (9): Conv1d(8, 16, kernel_size=(64,), stride=(2,), padding=(32,))
      (10): BatchNorm1d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (11): ReLU()
      (12): Conv1d(16, 32, kernel_size=(128,), stride=(2,), padding=(64,))
      (13): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (14): ReLU()
      (15): Conv1d(32, 64, k

24414