# Important notice: any use of generative AI for completing the assignment is strictly prohibited.

### Note: if working in Colab, don't forget to select runtime type in Colab: GPU

In [None]:
# use that if working in colab
# permit Colab access your google drive, and select Princeton account

from google.colab import drive
drive.mount('/content/drive')

In [None]:
diab_deep_path = "/content/drive/Shared drives/ECE477 datasets/Assignment9/diabdeep/"

In [None]:
# only for local work on Della when you have unarchived data

# diab_deep_path = "diabdeep/"

## Warning: to ensure the reproducibility of your results and to achieve the full grade, do not change or remove RANDOM_STATE variables and setting random seed statements. If you remove or change them, you may not get the full grade.

In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence

import random

# Set a random seed for reproducibility
RANDOM_STATE = 0
torch.manual_seed(RANDOM_STATE)
np.random.seed(RANDOM_STATE)
random.seed(RANDOM_STATE)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False


torch.use_deterministic_algorithms(True)
os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8'


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Using device:", device)

### DiabDeep data format:

Processed data with a 15-second window for data instances, and 45-second shift between the data instances (used in the DiabDeep paper).

Each file has shape (T, F), where T is the number of time steps for that patient (each row is one 15s window), F - feature dimension (2355).

Label 1: healthy

Label 2: unhealthy (has diabetes)

We’ll parse each filename to read the patient data and label, load it into memory, and produce (timeseries, label) pairs.

Note: we will remap labels to traditional 0/1 by subtracting 1.

In [None]:
class DiabDeepDataset(Dataset):
    def __init__(self, folder_path, subset_sensors=None, min_val=None, max_val=None):
        self.folder_path = folder_path
        # Load all filenames ending with .npy from the folder.
        self.filenames = sorted([f for f in os.listdir(folder_path) if f.endswith(".npy")])
        self.subset_sensors = subset_sensors

        # Build an index mapping: for each file and each row in the file,
        # we record (filename, window_index)
        self.index_mapping = []
        for f in self.filenames:
            filepath = os.path.join(folder_path, f)
            data = np.load(filepath)  # shape (T, F)
            T = data.shape[0]
            for t in range(T):
                self.index_mapping.append((f, t))

        # Compute min and max if not provided.
        if min_val is None or max_val is None:
            self.min_val, self.max_val = self.compute_min_max()
        else:
            self.min_val, self.max_val = min_val, max_val

    def compute_min_max(self):
        all_data = []
        for f in self.filenames:
            filepath = os.path.join(self.folder_path, f)
            data = np.load(filepath)  # shape (T, F)
            if self.subset_sensors is not None:
                data = data[:, self.subset_sensors]
            all_data.append(data)
        all_data = np.concatenate(all_data, axis=0)  # shape (total_windows, F)
        computed_min = np.min(all_data, axis=0)
        computed_max = np.max(all_data, axis=0)

        computed_max[computed_max == computed_min] = computed_min[computed_max == computed_min] + 1
        return computed_min, computed_max

    def __getitem__(self, idx):
        filename, window_idx = self.index_mapping[idx]
        filepath = os.path.join(self.folder_path, filename)
        data = np.load(filepath)  # shape (T, F)
        if self.subset_sensors is not None:
            data = data[:, self.subset_sensors]
        # Extract the specific window (row)
        window = data[window_idx, :]  # shape (F,)
        # Apply min–max scaling: scale each feature to [0, 1]
        window = (window - self.min_val) / (self.max_val - self.min_val + 1e-8)
        # Convert to tensor and add a time-step dimension: (1, F)
        window_tensor = torch.tensor(window, dtype=torch.float32).unsqueeze(0)

        # Parse the label from filename by splitting on '_'
        # Expected format: diab_w15_s45_label_Z_patient_XX_YYY.npy
        parts = filename.split("_")
        # remap labels for a 0/1
        label = int(parts[4]) - 1
        label_tensor = torch.tensor(label, dtype=torch.long)

        return window_tensor, label_tensor

    def __len__(self):
        return len(self.index_mapping)


def collate_fn_windows(batch):
    # Each item in batch: (window_tensor, label_tensor)
    windows, labels = zip(*batch)
    # windows are each of shape (1, F). Concatenate along batch dimension.
    windows = torch.cat(windows, dim=0).unsqueeze(1)  # resulting shape: (batch, 1, F)
    labels = torch.stack(labels)
    # Since each window is a single time step, lengths are all 1.
    lengths = torch.ones(len(labels), dtype=torch.long)
    return windows, labels, lengths


# H-LSTM: LSTM with additional hidden layers added to its control gates

### Task 1: Implement H-LSTM cell as described in the paper (10 points)
In H_LSTMCell class define a hidden layer and gates (take a look at figure 3 in the paper and the equations).

Fill in 3 gaps.

In [None]:
class H_LSTMCell(nn.Module):
    """
    A custom LSTM cell with a hidden layer added to each gate.
    This follows the idea of a "hidden-layer LSTM" (H-LSTM) as in the paper.
    """
    def __init__(self, input_size, hidden_size, gate_hidden_size=64):
        super(H_LSTMCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.gate_hidden_size = gate_hidden_size
        self.relu = nn.ReLU()

        # Hidden layer for gate computations: processes concatenated [x, h_prev]
        # x: (batch, input_size), h_prev: (batch, hidden_size)
        # your code is here
        self.fc_hidden = ...

        # For each gate, we apply a linear layer from the hidden layer output to the gate's output.
        # Implement self.fc_f, self.fc_i, self.fc_o, self.fc_g: forget, input, output gates and candidate cell state
        # according to the DiabDeep paper
        # your code is here

    def forward(self, x, h_prev, c_prev):
        # x: (batch, input_size)
        # h_prev, c_prev: (batch, hidden_size)
        combined = torch.cat((x, h_prev), dim=1)  # (batch, input_size + hidden_size)
        # Implement according to the DiabDeep paper (sigma referes to sigmoid activation function):
        # hidden layer layer for gate computations calculation (outputs(batch, gate_hidden_size));
        # forget, input, output and candidate cell gates calculations
        # your code is here

        h = o * torch.tanh(c)
        return h, c

class H_LSTMLayer(nn.Module):
    """
    A single-layer H-LSTM that processes an entire sequence.
    """
    def __init__(self, input_size, hidden_size, gate_hidden_size=64):
        super(H_LSTMLayer, self).__init__()
        self.cell = H_LSTMCell(input_size, hidden_size, gate_hidden_size)

    def forward(self, x):
        # x: (batch, seq_len, input_size)
        batch_size, seq_len, _ = x.size()
        h = torch.zeros(batch_size, self.cell.hidden_size, device=x.device)
        c = torch.zeros(batch_size, self.cell.hidden_size, device=x.device)
        outputs = []
        for t in range(seq_len):
            h, c = self.cell(x[:, t, :], h, c)
            outputs.append(h.unsqueeze(1))
        outputs = torch.cat(outputs, dim=1)  # (batch, seq_len, hidden_size)
        return outputs, (h, c)

## Classifier Model Using H-LSTM

### Task 2: Implement the forward pass in H_LSTMClassifier (5 points)

1. Define dropout and classifictaion layers
2. Add concatenation, calculate dropout and output logits in the forward pass.

(total 3 gaps)

In [None]:
class H_LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, gate_hidden_size=64, dropout=0.2):
        """
        input_size: dimension of input features
        hidden_size: hidden state dimension of the H-LSTM (set to 96 as in the paper)
        num_classes: number of classes (e.g. two for diab-healthy vs. has diabetes)
        gate_hidden_size: hidden layer dimension within each gate
        """
        super(H_LSTMClassifier, self).__init__()
        # define H_LSTMLayer layer
        self.h_lstm = ...

        # define dropout and classification layers
        # your code is here


    def forward(self, x, lengths):
        """
        x: padded tensor of shape (batch, max_seq_len, input_size)
        lengths: tensor of original sequence lengths (batch,)
        """
        outputs, (h, c) = self.h_lstm(x)  # outputs: (batch, seq_len, hidden_size)
        # For each sample in the batch, get the output at the last valid time step.
        batch_size = x.size(0)
        last_outputs = []
        for i in range(batch_size):
            last_outputs.append(outputs[i, lengths[i]-1, :].unsqueeze(0))
        # concat all last_outputs tensors to get tensor of (batch, hidden_size)
        # apply dropout before classification, then cacluate the prediction logits
        # your code is here

        return logits

#  Training and Evaluation Functions

### Task 3: complete training and evaluation loops (5 points)

Fill in 3 gaps

In [None]:
def train_model(model, train_loader, test_loader, optimizer, scheduler, criterion, epochs=30):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for data, labels, lengths in train_loader:
            data = data.to(device)
            labels = labels.to(device)
            lengths = lengths.to(device)

            optimizer.zero_grad()
            outputs = model(data, lengths)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            # calc running loss
            # your code is here

        epoch_loss = running_loss / len(train_loader)
        test_acc = evaluate_model(model, test_loader)
        scheduler.step(test_acc)
        print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}, Test Acc: {test_acc:.2f}%")

def evaluate_model(model, loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, labels, lengths in loader:
            # predict the output on the data
            # your code is here

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            # calculate the number of correct predictions 
            # your code is here

    return 100.0 * correct / total

# Load the dataset

In [None]:
# Reset the generator for reproducible shuffling.
g = torch.Generator()
g.manual_seed(RANDOM_STATE)

dataset_folder = diab_deep_path
dataset = DiabDeepDataset(folder_path=dataset_folder)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# pass generator=g to train_loader for reproducibility
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collate_fn_windows, generator=g)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, collate_fn=collate_fn_windows)

# Putting It All Together

### Warning: training takes a while. For test purposes run only for 1-5 epochs or you may run out of Colab credits soon. Run for all epochs only for few final trainings.

In [None]:
torch.manual_seed(RANDOM_STATE)
torch.cuda.manual_seed_all(RANDOM_STATE)

model = H_LSTMClassifier(input_size=2355, hidden_size=96, num_classes=2, gate_hidden_size=64, dropout=0.2).to(device)

optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=5, verbose=True)
criterion = nn.CrossEntropyLoss()

# Train the model.
# to get the best performance, the network should be trained for 100 epochs with ReduceLROnPlateau(patience=30) scheduler
# but to save Colab gpu we train only for 25 epochs
train_model(model, train_loader, test_loader, optimizer, scheduler, criterion, epochs=25)

# Evaluate on the test set.
test_acc = evaluate_model(model, test_loader)
print(f"Test Accuracy: {test_acc:.2f}%")