In [1]:
# !conda install scipy -y

In [2]:
import numpy as np
import os
from utilities import *
import h5py
from scipy.stats import zscore

# Use os.path.join() to create the correct file path
filepath = get_filepath()
print(f"Base filepath: {filepath}")

Base filepath: /Users/jesseh/Library/Mobile Documents/com~apple~CloudDocs/AA Master AI/Deep Learning


In [10]:
def load_and_normalize_files(directory_path, max_files=None, downsample_factor=None):
    """
    Load and normalize all h5 files in the specified directory
    
    Parameters:
    directory_path (str): Path to the directory containing h5 files
    max_files (int, optional): Maximum number of files to load
    downsample_factor (int, optional): Factor by which to downsample the data
    
    Returns:
    tuple: (data, labels) where data is a list of normalized matrices and labels are the corresponding task types
    """
    data_list = []
    labels = []
    
    # Get all h5 files in the directory
    h5_files = [f for f in os.listdir(directory_path) if f.endswith('.h5')]
    
    # Limit the number of files if specified
    if max_files is not None:
        h5_files = h5_files[:max_files]
    
    for file in h5_files:
       # Extract the task type from the filename
        if file.startswith("task_"):
            parts = file.split('_')
            task_type = '_'.join(parts[:2])  # e.g., task_motor or task_working
        else:
            task_type = file.split('_')[0]  # e.g., rest

        # Load the data
        file_path = os.path.join(directory_path, file)
        matrix = read_h5py_file(file_path)
        
        # Downsample if specified
        if downsample_factor is not None:
            matrix = matrix[:, ::downsample_factor]
        
        # Normalize the data using scipy's zscore
        normalized_matrix = zscore(matrix, axis=1, nan_policy='propagate')
        normalized_matrix = np.nan_to_num(normalized_matrix, nan=0.0)
        
        # Add to lists
        data_list.append(normalized_matrix)
        labels.append(task_type)
    
    return data_list, labels

# Example usage for Intra-subject classification
intra_train_path = os.path.join(filepath, "Intra", "train")
intra_test_path = os.path.join(filepath, "Intra", "test")

# Load a small subset of files to test the function
# Downsample factor is set to 16 to speed up the process, CHANGE LATER!
train_data, train_labels = load_and_normalize_files(intra_train_path, downsample_factor=16)
test_data, test_labels = load_and_normalize_files(intra_test_path, downsample_factor=16)
unique_labels = set(train_labels + test_labels)
print(f"Unique labels: {unique_labels}")

# Print summary
print(f"\nLoaded {len(train_data)} training files and {len(test_data)} test files")
print(f"Training labels: {train_labels}")
print(f"Test labels: {test_labels}")
print(f"Shape of first training sample after downsampling: {train_data[0].shape}")



Unique labels: {'task_motor', 'task_story', 'task_working', 'rest'}

Loaded 32 training files and 8 test files
Training labels: ['rest', 'task_story', 'task_motor', 'task_story', 'task_story', 'task_working', 'task_motor', 'rest', 'rest', 'task_working', 'task_motor', 'rest', 'task_motor', 'task_working', 'task_motor', 'rest', 'rest', 'task_motor', 'task_working', 'task_story', 'task_motor', 'task_working', 'task_motor', 'task_working', 'rest', 'rest', 'task_working', 'task_story', 'task_story', 'task_story', 'task_working', 'task_story']
Test labels: ['task_motor', 'rest', 'task_story', 'task_working', 'task_motor', 'task_story', 'task_working', 'rest']
Shape of first training sample after downsampling: (248, 2227)


In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

# Standard class mapping for MEG tasks
LABEL_MAP = {
    'rest': 0,
    'task_motor': 1, 
    'task_story': 2,
    'task_working': 3
}

class MEGDataset(Dataset):
    """Custom Dataset for MEG data"""
    def __init__(self, data_list, labels):
        self.data_list = data_list
        self.labels = labels
        
    def __len__(self):
        return len(self.data_list)
    
    def __getitem__(self, idx):
        # Get the data matrix (248, time_steps)
        data = self.data_list[idx]
        
        # Add channel dimension: (248, time_steps) -> (1, 248, time_steps)
        data = torch.FloatTensor(data).unsqueeze(0)
        
        # Convert string label to integer
        label = LABEL_MAP[self.labels[idx]]
        label = torch.LongTensor([label]).squeeze()
        
        return data, label

class MEGNet(nn.Module):
    """Smaller 2D CNN for MEG data classification"""
    def __init__(self, num_classes=4, input_channels=248, input_time_steps=2227):
        super(MEGNet, self).__init__()
        
        # Smaller conv blocks
        self.conv1 = nn.Conv2d(1, 16, kernel_size=(8, 16), padding=(4, 8))
        self.bn1 = nn.BatchNorm2d(16)
        self.pool1 = nn.MaxPool2d(kernel_size=(4, 8))  # More aggressive pooling
        
        self.conv2 = nn.Conv2d(16, 32, kernel_size=(4, 8), padding=(2, 4))
        self.bn2 = nn.BatchNorm2d(32)
        self.pool2 = nn.MaxPool2d(kernel_size=(4, 8))  # More aggressive pooling
        
        # Calculate the size after convolutions
        self.flatten_size = self._calculate_flatten_size(input_channels, input_time_steps)
        
        # Smaller fully connected layers
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(self.flatten_size, 64)
        self.fc2 = nn.Linear(64, num_classes)
        
    def _calculate_flatten_size(self, channels, time_steps):
        """Calculate the size after all conv+pool operations"""
        # Simulate forward pass to get dimensions
        x = torch.zeros(1, 1, channels, time_steps)
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        return x.numel()
    
    def forward(self, x):
        # Conv blocks with ReLU and pooling
        x = self.pool1(F.relu(self.bn1(self.conv1(x))))
        x = self.pool2(F.relu(self.bn2(self.conv2(x))))
        
        # Flatten for fully connected layers
        x = torch.flatten(x, 1)
        
        # Fully connected layers with dropout
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        
        return x

In [12]:
net = MEGNet(num_classes=len(LABEL_MAP), 
             input_channels=248, 
             input_time_steps=train_data[0].shape[1])  # Use actual time steps after downsampling

print(f"Model created. Total parameters: {sum(p.numel() for p in net.parameters()):,}")
print(f"Model input shape expected: (batch, 1, 248, {train_data[0].shape[1]})")

Model created. Total parameters: 1,063,380
Model input shape expected: (batch, 1, 248, 2227)


In [13]:
def create_dataloaders(train_data, train_labels, test_data, test_labels, batch_size=4):
    """Create PyTorch DataLoaders from your loaded data"""
    
    # Create datasets
    train_dataset = MEGDataset(train_data, train_labels)
    test_dataset = MEGDataset(test_data, test_labels)
    
    # Create dataloaders with num_workers=0 to avoid multiprocessing issues
    trainloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
    testloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=0)
    
    return trainloader, testloader

In [14]:
# Create dataloaders
print(f"Creating DataLoaders...")
trainloader, testloader = create_dataloaders(train_data, train_labels, test_data, test_labels, batch_size=4)

Creating DataLoaders...


In [15]:
# Defining a loss function and optimizer
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

# Alternative: You might also consider Adam optimizer (optim.Adam()) which adapts learning rates 
# automatically and often works well for neural networks without needing to tune momentum manually.

In [16]:
# Training the network
for epoch in range(2):  # loop over the dataset multiple times

    running_loss = 0.0
    for i, data in enumerate(trainloader, 0):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # print statistics
        running_loss += loss.item()
        if i % 2000 == 1999:    # print every 2000 mini-batches
            print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
            running_loss = 0.0

print('Finished Training')

Finished Training


In [17]:
PATH = './cifar_net.pth'
torch.save(net.state_dict(), PATH)

