In [None]:
import mne
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import os

# Define paths to cleaned EEG data
data_dir = "path_to_your_cleaned_data"
eeg_files = {
    "pizza": os.path.join(data_dir, "pizza_clean.fif"),
    "apple": os.path.join(data_dir, "apple_clean.fif"),
    "hammer": os.path.join(data_dir, "hammer_clean.fif")
}

# Model parameters
n_channels = 32
sampling_rate = 500  # Hz
input_length = 1000  # Number of time points per segment
batch_size = 32
epochs = 30
learning_rate = 0.001

# Load EEG Data
def load_eeg_data(file_path, label):
    raw = mne.io.read_raw_fif(file_path, preload=True)
    data = raw.get_data()  # Shape: (n_channels, n_samples)
    
    # Convert into segments of fixed length
    segment_length = input_length  # 1000 time points (~2 sec at 500 Hz)
    num_segments = data.shape[1] // segment_length
    segments = np.array_split(data[:, :num_segments * segment_length], num_segments, axis=1)
    
    # Convert to PyTorch tensor
    X = np.stack(segments)  # Shape: (num_segments, n_channels, segment_length)
    y = np.full((len(X),), label)  # Assign label to all segments
    return X, y

# Load data from all classes
X_data, y_data = [], []
label_map = {"pizza": 0, "apple": 1, "hammer": 2}

for label, file_path in eeg_files.items():
    print(f"Loading {label} data...")
    X, y = load_eeg_data(file_path, label_map[label])
    X_data.append(X)
    y_data.append(y)

# Convert to numpy arrays
X_data = np.vstack(X_data)  # (total_samples, n_channels, input_length)
y_data = np.hstack(y_data)  # (total_samples,)

# Convert to PyTorch tensors
X_data = torch.tensor(X_data, dtype=torch.float32)
y_data = torch.tensor(y_data, dtype=torch.long)

# Define EEG Dataset
class EEGDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Create train/test split (80% train, 20% test)
split_idx = int(0.8 * len(X_data))
train_X, test_X = X_data[:split_idx], X_data[split_idx:]
train_y, test_y = y_data[:split_idx], y_data[split_idx:]

train_dataset = EEGDataset(train_X, train_y)
test_dataset = EEGDataset(test_X, test_y)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Define CNN Model for EEG Classification
class EEG_CNN(nn.Module):
    def __init__(self, n_channels=32, input_length=1000, num_classes=3):
        super(EEG_CNN, self).__init__()

        self.conv1 = nn.Conv1d(n_channels, 64, kernel_size=5, stride=1, padding=2)
        self.bn1 = nn.BatchNorm1d(64)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=5, stride=1, padding=2)
        self.bn2 = nn.BatchNorm1d(128)
        self.conv3 = nn.Conv1d(128, 256, kernel_size=5, stride=1, padding=2)
        self.bn3 = nn.BatchNorm1d(256)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)

        self.fc1 = nn.Linear(256 * (input_length // 8), 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool(torch.relu(self.bn1(self.conv1(x))))
        x = self.pool(torch.relu(self.bn2(self.conv2(x))))
        x = self.pool(torch.relu(self.bn3(self.conv3(x))))
        x = x.view(x.shape[0], -1)  # Flatten for fully connected layers
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Initialize Model, Loss, and Optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EEG_CNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training Loop
def train_model():
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f"Epoch {epoch + 1}/{epochs}, Loss: {running_loss / len(train_loader):.4f}")

# Train Model
print("Training EEG Classification Model...")
train_model()

# Evaluate Model
def evaluate_model():
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)

            outputs = model(X_batch)
            _, predicted = torch.max(outputs, 1)

            total += y_batch.size(0)
            correct += (predicted == y_batch).sum().item()

    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy:.2f}%")

# Run Evaluation
evaluate_model()
