# Imports + device setup

In [3]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader, Subset
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix, classification_report

import matplotlib.pyplot as plt
import seaborn as sns

import os

In [5]:
if torch.backends.mps.is_available():
    device = torch.device("mps")        # Apple Silicon GPU
    print("Using MPS (Apple GPU)")
else:
    device = torch.device("cpu")
    print("Using CPU")

Using MPS (Apple GPU)


# Load preprocessed data

In [6]:
data_dir = "../preprocessed_data"

X = np.load(os.path.join(data_dir, "X.npy"), mmap_mode='r')
y = np.load(os.path.join(data_dir, "y.npy"))

print("X shape:", X.shape)  # (N, 2, 3000)
print("y shape:", y.shape)

X shape: (414961, 2, 3000)
y shape: (414961,)


# Dataset class

In [None]:
class SleepDataset(Dataset):
    def __init__(self, X, y, indices):
        self.X = X
        self.y = y
        self.indices = indices

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        real_idx = self.indices[idx]
        x = torch.from_numpy(self.X[real_idx]).float()
        label = torch.tensor(self.y[real_idx]).long()
        return x, label

# CNN-LSTM architecture

In [None]:
class CNNLSTM(nn.Module):
    def __init__(self, n_channels=2, n_classes=5):
        super().__init__()

        self.cnn = nn.Sequential(
            nn.Conv1d(n_channels, 32, kernel_size=7, stride=2, padding=3),
            nn.ReLU(),
            nn.MaxPool1d(2),

            nn.Conv1d(32, 64, kernel_size=7, stride=2, padding=3),
            nn.ReLU(),
            nn.MaxPool1d(2),

            nn.Conv1d(64, 128, kernel_size=5, padding=2),
            nn.ReLU()
        )

        self.lstm_hidden = 128
        self.lstm = nn.LSTM(
            input_size=128,
            hidden_size=self.lstm_hidden,
            batch_first=True,
            bidirectional=True
        )

        self.fc = nn.Sequential(
            nn.Linear(self.lstm_hidden * 2, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, n_classes)
        )

    def forward(self, x):
        x = self.cnn(x)            # (batch, 128, T')
        x = x.permute(0, 2, 1)     # (batch, T', 128)
        _, (h_n, _) = self.lstm(x)
        h = torch.cat((h_n[-2], h_n[-1]), dim=1)
        return self.fc(h)

# Training + eval functions

In [None]:
def train_one_epoch(model, loader, criterion, optimizer):
    model.train()
    losses = []
    all_preds, all_targets = [], []

    for X_batch, y_batch in loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

        losses.append(loss.item())
        preds = outputs.argmax(1)
        all_preds.extend(preds.cpu().numpy())
        all_targets.extend(y_batch.cpu().numpy())

    acc = accuracy_score(all_targets, all_preds)
    f1 = f1_score(all_targets, all_preds, average='macro')

    return np.mean(losses), acc, f1


def evaluate(model, loader, criterion):
    model.eval()
    losses = []
    all_preds, all_targets = [], []

    with torch.no_grad():
        for X_batch, y_batch in loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)

            losses.append(loss.item())
            preds = outputs.argmax(1)
            all_preds.extend(preds.cpu().numpy())
            all_targets.extend(y_batch.cpu().numpy())

    acc = accuracy_score(all_targets, all_preds)
    f1 = f1_score(all_targets, all_preds, average='macro')

    return np.mean(losses), acc, f1, all_preds, all_targets

# Perform k-fold cross validation

In [None]:
kf = KFold(n_splits=5, shuffle=True, random_state=42)

results = []

for fold, (train_idx, val_idx) in enumerate(kf.split(X)):
    print(f"\n===== Fold {fold+1} =====")

    train_ds = SleepDataset(X, y, train_idx)
    val_ds   = SleepDataset(X, y, val_idx)

    train_loader = DataLoader(train_ds, batch_size=128, shuffle=True)
    val_loader   = DataLoader(val_ds, batch_size=128, shuffle=False)

    model = CNNLSTM(n_channels=X.shape[1], n_classes=5).to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(1, 6):
        tr_loss, tr_acc, tr_f1 = train_one_epoch(model, train_loader, criterion, optimizer)
        val_loss, val_acc, val_f1, _, _ = evaluate(model, val_loader, criterion)

        print(f"Epoch {epoch}: "
              f"Train Acc={tr_acc:.3f}, Val Acc={val_acc:.3f}, "
              f"Train F1={tr_f1:.3f}, Val F1={val_f1:.3f}")

    results.append((val_acc, val_f1))

# Model performance report

In [None]:
val_loss, val_acc, val_f1, preds, targets = evaluate(model, val_loader, criterion)

print("\nClassification Report:")
print(classification_report(targets, preds))

cm = confusion_matrix(targets, preds)

plt.figure(figsize=(8,6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion Matrix")
plt.show()