## General notes on model implementation:

Data Preprocessing:

Devide data into:

-Temporal data: Two stamp measurements over time
-Static data: Patient-specific information such as age, height, and gender.
-Target: The variable you want to predict (classification)


Model Architecture:

-Recurrent Layer (for time-dependent sensor data)
-Feed-forward Layer (for static features and final output)
-Combination Layer: Combines the output of the LSTM and static features into a unified representation.
-Output Layer: Generates the final prediction for each timestamp.





# Classification problem using random data on our data shape

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np

# Set random seed for reproducibility
torch.manual_seed(0)

# Parameters for synthetic data
num_patients = 1000             # Number of samples (patients)
sequence_length = 10            # Length of time series per patient
num_sensors = 2                 # Number of sensors (columns for timestamp measurements)
num_measurements_per_sensor = 37 # Measurements per sensor per timestamp
num_static_features = 8         # Static features (e.g., age, height, gender)
num_classes = 2                 # Number of classes for classification

# New temporal input size
input_size = num_sensors * num_measurements_per_sensor  # 2 * 5 = 10

# Hyperparameters
hidden_size = 64
num_layers = 1
num_epochs = 10
batch_size = 32
learning_rate = 0.001

# Generate random synthetic data
temporal_data = torch.randn(num_patients, sequence_length, input_size)   # Shape: (num_patients, sequence_length, input_size)
static_data = torch.randn(num_patients, num_static_features)             # Shape: (num_patients, num_static_features)
targets = torch.randint(0, num_classes, (num_patients,))                 # Shape: (num_patients,) with random class labels

# Create a DataLoader for batching
dataset = TensorDataset(temporal_data, static_data, targets)
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Define the Deep State Space Model for Classification
class DeepStateSpaceModel(nn.Module):
    def __init__(self, input_size, hidden_size, static_input_size, num_classes, num_layers=1):
        super(DeepStateSpaceModel, self).__init__()

        # LSTM for temporal data
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

        # Linear layer for static data
        self.static_fc = nn.Linear(static_input_size, hidden_size)

        # Combining LSTM output and static features
        self.fc_combined = nn.Linear(hidden_size * 2, hidden_size)

        # Output layer (for classification, matches num_classes)
        self.output_layer = nn.Linear(hidden_size, num_classes)

    def forward(self, temporal_data, static_data):
        # Pass temporal data through LSTM
        lstm_out, _ = self.lstm(temporal_data)

        # Take only the last hidden state (many-to-one structure)
        lstm_out = lstm_out[:, -1, :]

        # Pass static data through linear layer
        static_out = self.static_fc(static_data)

        # Concatenate LSTM output with static output
        combined = torch.cat((lstm_out, static_out), dim=1)

        # Pass through a fully connected layer to combine features
        combined_out = torch.relu(self.fc_combined(combined))

        # Final output layer with no activation (use CrossEntropyLoss which applies softmax)
        output = self.output_layer(combined_out)

        return output

# Initialize model, loss function, and optimizer
model = DeepStateSpaceModel(input_size=input_size, hidden_size=hidden_size,
                            static_input_size=num_static_features, num_classes=num_classes, num_layers=num_layers)

criterion = nn.CrossEntropyLoss()  # For classification
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(num_epochs):
    for batch in train_loader:
        temporal_batch, static_batch, target_batch = batch

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = model(temporal_batch, static_batch)
        loss = criterion(output, target_batch)  # CrossEntropyLoss expects raw logits

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

# Testing on a random sample
with torch.no_grad():
    sample_temporal = torch.randn(1, sequence_length, input_size)
    sample_static = torch.randn(1, num_static_features)
    prediction = model(sample_temporal, sample_static)

    # Convert logits to probabilities
    probabilities = torch.softmax(prediction, dim=1)
    predicted_class = torch.argmax(probabilities, dim=1)

    print("Sample prediction probabilities:", probabilities)
    print("Predicted class:", predicted_class.item())


Epoch [1/10], Loss: 0.6942
Epoch [2/10], Loss: 0.6717
Epoch [3/10], Loss: 0.6647
Epoch [4/10], Loss: 0.6608
Epoch [5/10], Loss: 0.4475
Epoch [6/10], Loss: 0.2163
Epoch [7/10], Loss: 0.0688
Epoch [8/10], Loss: 0.0069
Epoch [9/10], Loss: 0.0053
Epoch [10/10], Loss: 0.0030
Sample prediction probabilities: tensor([[0.9896, 0.0104]])
Predicted class: 0


## Implementation on bigger dataset with accuracy metric

Code with Train, Validation, and Test Splits

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.metrics import accuracy_score

# Set random seed for reproducibility
torch.manual_seed(0)

# Parameters for synthetic data
num_patients = 5000             # Total number of samples (patients)
sequence_length = 10            # Length of time series per patient
num_sensors = 2                 # Number of sensors
num_measurements_per_sensor = 5 # Measurements per sensor per timestamp
num_static_features = 3         # Static features (e.g., age, height, gender)
num_classes = 3                 # Number of classes for classification

# New temporal input size
input_size = num_sensors * num_measurements_per_sensor  # 2 * 5 = 10

# Hyperparameters
hidden_size = 64
num_layers = 1
num_epochs = 10
batch_size = 32
learning_rate = 0.001

# Generate random synthetic data
temporal_data = torch.randn(num_patients, sequence_length, input_size)   # Shape: (num_patients, sequence_length, input_size)
static_data = torch.randn(num_patients, num_static_features)             # Shape: (num_patients, num_static_features)
targets = torch.randint(0, num_classes, (num_patients,))                 # Shape: (num_patients,) with random class labels

# Split dataset into train, validation, and test sets (70%, 15%, 15%)
train_size = int(0.7 * num_patients)
val_size = int(0.15 * num_patients)
test_size = num_patients - train_size - val_size
train_data, val_data, test_data = random_split(TensorDataset(temporal_data, static_data, targets), [train_size, val_size, test_size])

# Create DataLoaders for batching
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

# Define the Deep State Space Model for Classification
class DeepStateSpaceModel(nn.Module):
    def __init__(self, input_size, hidden_size, static_input_size, num_classes, num_layers=1):
        super(DeepStateSpaceModel, self).__init__()

        # LSTM for temporal data
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

        # Linear layer for static data
        self.static_fc = nn.Linear(static_input_size, hidden_size)

        # Combining LSTM output and static features
        self.fc_combined = nn.Linear(hidden_size * 2, hidden_size)

        # Output layer (for classification, matches num_classes)
        self.output_layer = nn.Linear(hidden_size, num_classes)

    def forward(self, temporal_data, static_data):
        # Pass temporal data through LSTM
        lstm_out, _ = self.lstm(temporal_data)

        # Take only the last hidden state (many-to-one structure)
        lstm_out = lstm_out[:, -1, :]

        # Pass static data through linear layer
        static_out = self.static_fc(static_data)

        # Concatenate LSTM output with static output
        combined = torch.cat((lstm_out, static_out), dim=1)

        # Pass through a fully connected layer to combine features
        combined_out = torch.relu(self.fc_combined(combined))

        # Final output layer with no activation (use CrossEntropyLoss which applies softmax)
        output = self.output_layer(combined_out)

        return output

# Initialize model, loss function, and optimizer
model = DeepStateSpaceModel(input_size=input_size, hidden_size=hidden_size,
                            static_input_size=num_static_features, num_classes=num_classes, num_layers=num_layers)

criterion = nn.CrossEntropyLoss()  # For classification
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop with validation
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    for batch in train_loader:
        temporal_batch, static_batch, target_batch = batch

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = model(temporal_batch, static_batch)
        loss = criterion(output, target_batch)  # CrossEntropyLoss expects raw logits

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    # Validation loop to calculate accuracy on validation data
    model.eval()  # Set the model to evaluation mode
    val_preds = []
    val_labels = []
    with torch.no_grad():
        for batch in val_loader:
            temporal_batch, static_batch, target_batch = batch
            output = model(temporal_batch, static_batch)

            # Get predictions
            _, preds = torch.max(output, 1)
            val_preds.extend(preds.cpu().numpy())
            val_labels.extend(target_batch.cpu().numpy())

    # Calculate validation accuracy
    val_accuracy = accuracy_score(val_labels, val_preds)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Validation Accuracy: {val_accuracy * 100:.2f}%")

# Final test accuracy calculation
model.eval()  # Set the model to evaluation mode
test_preds = []
test_labels = []
with torch.no_grad():
    for batch in test_loader:
        temporal_batch, static_batch, target_batch = batch
        output = model(temporal_batch, static_batch)

        # Get predictions
        _, preds = torch.max(output, 1)
        test_preds.extend(preds.cpu().numpy())
        test_labels.extend(target_batch.cpu().numpy())

# Calculate test accuracy
test_accuracy = accuracy_score(test_labels, test_preds)
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")


Epoch [1/10], Loss: 1.0997, Validation Accuracy: 34.00%
Epoch [2/10], Loss: 1.1052, Validation Accuracy: 34.93%
Epoch [3/10], Loss: 1.0651, Validation Accuracy: 36.00%
Epoch [4/10], Loss: 1.1088, Validation Accuracy: 34.93%
Epoch [5/10], Loss: 1.1247, Validation Accuracy: 36.27%
Epoch [6/10], Loss: 1.0670, Validation Accuracy: 33.20%
Epoch [7/10], Loss: 1.0449, Validation Accuracy: 34.93%
Epoch [8/10], Loss: 1.0578, Validation Accuracy: 36.67%
Epoch [9/10], Loss: 1.0921, Validation Accuracy: 34.13%
Epoch [10/10], Loss: 1.0797, Validation Accuracy: 34.13%
Test Accuracy: 40.67%


## Implementation with making timestamp number a dynamic variable depending on each patient

Dynamic Padding: Use padding to ensure each batch has a uniform sequence length, required by LSTM layers. We'll use torch.nn.utils.rnn.pad_sequence for padding sequences within a batch.
Pack Padded Sequences: Use torch.nn.utils.rnn.pack_padded_sequence and pad_packed_sequence within the LSTM model to handle variable-length sequences.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from sklearn.metrics import accuracy_score
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import numpy as np

# Set random seed for reproducibility
torch.manual_seed(0)

# Parameters for synthetic data
num_patients = 5000             # Total number of samples (patients)
max_sequence_length = 150       # Maximum length of time series per patient
num_sensors = 2                 # Number of sensors
num_measurements_per_sensor = 5 # Measurements per sensor per timestamp
num_static_features = 3         # Static features (e.g., age, height, gender)
num_classes = 3                 # Number of classes for classification

# New temporal input size
input_size = num_sensors * num_measurements_per_sensor  # 2 * 5 = 10

# Hyperparameters
hidden_size = 64
num_layers = 1
num_epochs = 10
batch_size = 32
learning_rate = 0.001

# Generate random synthetic data with varying sequence lengths
class VariableLengthDataset(Dataset):
    def __init__(self, num_patients, max_sequence_length, input_size, num_static_features, num_classes):
        self.num_patients = num_patients
        self.input_size = input_size
        self.num_static_features = num_static_features
        self.num_classes = num_classes

        # Generate synthetic data
        self.temporal_data = []
        self.sequence_lengths = []
        self.static_data = torch.randn(num_patients, num_static_features)  # Static features
        self.targets = torch.randint(0, num_classes, (num_patients,))      # Class labels

        for _ in range(num_patients):
            seq_len = np.random.randint(1, max_sequence_length + 1)
            self.sequence_lengths.append(seq_len)
            temporal_seq = torch.randn(seq_len, input_size)
            self.temporal_data.append(temporal_seq)

    def __len__(self):
        return self.num_patients

    def __getitem__(self, idx):
        return self.temporal_data[idx], self.static_data[idx], self.targets[idx], self.sequence_lengths[idx]

# Instantiate dataset and split it
dataset = VariableLengthDataset(num_patients, max_sequence_length, input_size, num_static_features, num_classes)
train_size = int(0.7 * num_patients)
val_size = int(0.15 * num_patients)
test_size = num_patients - train_size - val_size
train_data, val_data, test_data = random_split(dataset, [train_size, val_size, test_size])

# Custom collate function for padding sequences
def collate_fn(batch):
    temporal_data, static_data, targets, seq_lengths = zip(*batch)
    seq_lengths = torch.tensor(seq_lengths)
    static_data = torch.stack(static_data)
    targets = torch.tensor(targets)

    # Pad temporal data sequences to match the longest sequence in the batch
    temporal_data_padded = pad_sequence(temporal_data, batch_first=True)

    return temporal_data_padded, static_data, targets, seq_lengths

# Create DataLoaders for batching
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

# Define the Deep State Space Model for Classification with variable sequence lengths
class DeepStateSpaceModel(nn.Module):
    def __init__(self, input_size, hidden_size, static_input_size, num_classes, num_layers=1):
        super(DeepStateSpaceModel, self).__init__()

        # LSTM for temporal data
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

        # Linear layer for static data
        self.static_fc = nn.Linear(static_input_size, hidden_size)

        # Combining LSTM output and static features
        self.fc_combined = nn.Linear(hidden_size * 2, hidden_size)

        # Output layer (for classification, matches num_classes)
        self.output_layer = nn.Linear(hidden_size, num_classes)

    def forward(self, temporal_data, static_data, seq_lengths):
        # Pack and pass temporal data through LSTM
        packed_input = pack_padded_sequence(temporal_data, seq_lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_output, (hn, _) = self.lstm(packed_input)

        # Get the final hidden state for the last element in each sequence
        lstm_out = hn[-1, :, :]

        # Pass static data through a linear layer
        static_out = self.static_fc(static_data)

        # Concatenate LSTM output with static output
        combined = torch.cat((lstm_out, static_out), dim=1)

        # Pass through a fully connected layer to combine features
        combined_out = torch.relu(self.fc_combined(combined))

        # Final output layer with no activation (use CrossEntropyLoss which applies softmax)
        output = self.output_layer(combined_out)

        return output

# Initialize model, loss function, and optimizer
model = DeepStateSpaceModel(input_size=input_size, hidden_size=hidden_size,
                            static_input_size=num_static_features, num_classes=num_classes, num_layers=num_layers)

criterion = nn.CrossEntropyLoss()  # For classification
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop with validation
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    for batch in train_loader:
        temporal_batch, static_batch, target_batch, seq_lengths = batch

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = model(temporal_batch, static_batch, seq_lengths)
        loss = criterion(output, target_batch)  # CrossEntropyLoss expects raw logits

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    # Validation loop to calculate accuracy on validation data
    model.eval()  # Set the model to evaluation mode
    val_preds = []
    val_labels = []
    with torch.no_grad():
        for batch in val_loader:
            temporal_batch, static_batch, target_batch, seq_lengths = batch
            output = model(temporal_batch, static_batch, seq_lengths)

            # Get predictions
            _, preds = torch.max(output, 1)
            val_preds.extend(preds.cpu().numpy())
            val_labels.extend(target_batch.cpu().numpy())

    # Calculate validation accuracy
    val_accuracy = accuracy_score(val_labels, val_preds)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Validation Accuracy: {val_accuracy * 100:.2f}%")

# Final test accuracy calculation
model.eval()  # Set the model to evaluation mode
test_preds = []
test_labels = []
with torch.no_grad():
    for batch in test_loader:
        temporal_batch, static_batch, target_batch, seq_lengths = batch
        output = model(temporal_batch, static_batch, seq_lengths)

        # Get predictions
        _, preds = torch.max(output, 1)
        test_preds.extend(preds.cpu().numpy())
        test_labels.extend(target_batch.cpu().numpy())

# Calculate test accuracy
test_accuracy = accuracy_score(test_labels, test_preds)
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")


Epoch [1/10], Loss: 1.1072, Validation Accuracy: 31.47%
Epoch [2/10], Loss: 1.0921, Validation Accuracy: 31.73%
Epoch [3/10], Loss: 1.0622, Validation Accuracy: 32.27%
Epoch [4/10], Loss: 1.1488, Validation Accuracy: 32.40%
Epoch [5/10], Loss: 1.0908, Validation Accuracy: 31.20%
Epoch [6/10], Loss: 0.9737, Validation Accuracy: 32.93%
Epoch [7/10], Loss: 0.9888, Validation Accuracy: 31.33%
Epoch [8/10], Loss: 0.9579, Validation Accuracy: 33.47%
Epoch [9/10], Loss: 0.9435, Validation Accuracy: 32.67%
Epoch [10/10], Loss: 1.1682, Validation Accuracy: 33.87%
Test Accuracy: 33.47%


## Deep State Model combining above

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from sklearn.metrics import accuracy_score
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import numpy as np

# Set random seed for reproducibility
torch.manual_seed(0)

# Parameters for synthetic data
num_patients = 5000             # Total number of samples (patients)
max_sequence_length = 150       # Maximum length of time series per patient
num_sensors = 2                 # Number of sensors
num_measurements_per_sensor = 5 # Measurements per sensor per timestamp
num_static_features = 3         # Static features (e.g., age, height, gender)
num_classes = 3                 # Number of classes for classification

# New temporal input size
input_size = num_sensors * num_measurements_per_sensor  # 2 * 5 = 10

# Hyperparameters
hidden_size = 64
num_layers = 1
num_epochs = 10
batch_size = 32
learning_rate = 0.001

# Generate random synthetic data with varying sequence lengths
class VariableLengthDataset(Dataset):
    def __init__(self, num_patients, max_sequence_length, input_size, num_static_features, num_classes):
        self.num_patients = num_patients
        self.input_size = input_size
        self.num_static_features = num_static_features
        self.num_classes = num_classes

        # Generate synthetic data
        self.temporal_data = []
        self.sequence_lengths = []
        self.static_data = torch.randn(num_patients, num_static_features)  # Static features
        self.targets = torch.randint(0, num_classes, (num_patients,))      # Class labels

        for _ in range(num_patients):
            seq_len = np.random.randint(1, max_sequence_length + 1)
            self.sequence_lengths.append(seq_len)
            temporal_seq = torch.randn(seq_len, input_size)
            self.temporal_data.append(temporal_seq)

    def __len__(self):
        return self.num_patients

    def __getitem__(self, idx):
        return self.temporal_data[idx], self.static_data[idx], self.targets[idx], self.sequence_lengths[idx]

# Instantiate dataset and split it
dataset = VariableLengthDataset(num_patients, max_sequence_length, input_size, num_static_features, num_classes)
train_size = int(0.7 * num_patients)
val_size = int(0.15 * num_patients)
test_size = num_patients - train_size - val_size
train_data, val_data, test_data = random_split(dataset, [train_size, val_size, test_size])

# Custom collate function for padding sequences
def collate_fn(batch):
    temporal_data, static_data, targets, seq_lengths = zip(*batch)
    seq_lengths = torch.tensor(seq_lengths)
    static_data = torch.stack(static_data)
    targets = torch.tensor(targets)

    # Pad temporal data sequences to match the longest sequence in the batch
    temporal_data_padded = pad_sequence(temporal_data, batch_first=True)

    return temporal_data_padded, static_data, targets, seq_lengths

# Create DataLoaders for batching
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

# Define a Deep State Space Model (DSSM) for classification
class DeepStateSpaceModel(nn.Module):
    def __init__(self, input_size, hidden_size, static_input_size, num_classes, num_layers=1):
        super(DeepStateSpaceModel, self).__init__()

        # LSTM for temporal data (this is the "observation model")
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

        # Linear layer for static data (this is the "static feature processing")
        self.static_fc = nn.Linear(static_input_size, hidden_size)

        # State transition model (latent state evolution)
        self.transition_matrix = nn.Parameter(torch.randn(hidden_size, hidden_size))  # Linear state transition matrix

        # Output layer (for classification)
        self.output_layer = nn.Linear(hidden_size * 2, num_classes)  # Changed from hidden_size to hidden_size * 2

    def forward(self, temporal_data, static_data, seq_lengths):
        # Pack and pass temporal data through LSTM
        packed_input = pack_padded_sequence(temporal_data, seq_lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_output, (hn, _) = self.lstm(packed_input)

        # Get the final hidden state (latent state at the last time step)
        lstm_out = hn[-1, :, :]  # shape: [batch_size, hidden_size]

        # Apply transition model (latent state dynamics)
        lstm_out = torch.matmul(lstm_out, self.transition_matrix)  # shape: [batch_size, hidden_size]

        # Pass static data through a linear layer
        static_out = self.static_fc(static_data)  # shape: [batch_size, hidden_size]

        # Combine the LSTM output with static features
        combined = torch.cat((lstm_out, static_out), dim=1)  # shape: [batch_size, hidden_size * 2]

        # Output classification
        output = self.output_layer(combined)  # shape: [batch_size, num_classes]

        return output

# Initialize model, loss function, and optimizer
model = DeepStateSpaceModel(input_size=input_size, hidden_size=hidden_size,
                            static_input_size=num_static_features, num_classes=num_classes, num_layers=num_layers)

criterion = nn.CrossEntropyLoss()  # For classification
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop with validation
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    for batch in train_loader:
        temporal_batch, static_batch, target_batch, seq_lengths = batch

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = model(temporal_batch, static_batch, seq_lengths)
        loss = criterion(output, target_batch)  # CrossEntropyLoss expects raw logits

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    # Validation loop to calculate accuracy on validation data
    model.eval()  # Set the model to evaluation mode
    val_preds = []
    val_labels = []
    with torch.no_grad():
        for batch in val_loader:
            temporal_batch, static_batch, target_batch, seq_lengths = batch
            output = model(temporal_batch, static_batch, seq_lengths)

            # Get predictions
            _, preds = torch.max(output, 1)
            val_preds.extend(preds.cpu().numpy())
            val_labels.extend(target_batch.cpu().numpy())

    # Calculate validation accuracy
    val_accuracy = accuracy_score(val_labels, val_preds)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Validation Accuracy: {val_accuracy * 100:.2f}%")

# Final test accuracy calculation
model.eval()  # Set the model to evaluation mode
test_preds = []
test_labels = []
with torch.no_grad():
    for batch in test_loader:
        temporal_batch, static_batch, target_batch, seq_lengths = batch
        output = model(temporal_batch, static_batch, seq_lengths)

        # Get predictions
        _, preds = torch.max(output, 1)
        test_preds.extend(preds.cpu().numpy())
        test_labels.extend(target_batch.cpu().numpy())

# Calculate test accuracy
test_accuracy = accuracy_score(test_labels, test_preds)
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")


Epoch [1/10], Loss: 1.1392, Validation Accuracy: 34.93%
Epoch [2/10], Loss: 1.0977, Validation Accuracy: 34.67%
Epoch [3/10], Loss: 1.1168, Validation Accuracy: 33.73%
Epoch [4/10], Loss: 1.1242, Validation Accuracy: 32.00%
Epoch [5/10], Loss: 1.0711, Validation Accuracy: 32.27%
Epoch [6/10], Loss: 1.1099, Validation Accuracy: 33.07%
Epoch [7/10], Loss: 1.0059, Validation Accuracy: 33.60%
Epoch [8/10], Loss: 1.1149, Validation Accuracy: 32.67%
Epoch [9/10], Loss: 1.0510, Validation Accuracy: 32.13%
Epoch [10/10], Loss: 0.8642, Validation Accuracy: 31.60%
Test Accuracy: 32.93%


## Final DSSM: Adding State Space Evolution (representing the evolution of the hidden state over time)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from sklearn.metrics import accuracy_score
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import numpy as np

# Parameters for synthetic data
num_patients = 5000             # Total number of samples (patients)
max_sequence_length = 150       # Maximum length of time series per patient
num_sensors = 2                 # Number of sensors
num_measurements_per_sensor = 5 # Measurements per sensor per timestamp
num_static_features = 3         # Static features (e.g., age, height, gender)
num_classes = 3                 # Number of classes for classification

# New temporal input size
input_size = num_sensors * num_measurements_per_sensor  # 2 * 5 = 10

# Hyperparameters
hidden_size = 64
num_layers = 1
num_epochs = 10
batch_size = 32
learning_rate = 0.001

# Generate random synthetic data with varying sequence lengths
class VariableLengthDataset(Dataset):
    def __init__(self, num_patients, max_sequence_length, input_size, num_static_features, num_classes):
        self.num_patients = num_patients
        self.input_size = input_size
        self.num_static_features = num_static_features
        self.num_classes = num_classes

        # Generate synthetic data
        self.temporal_data = []
        self.sequence_lengths = []
        self.static_data = torch.randn(num_patients, num_static_features)  # Static features
        self.targets = torch.randint(0, num_classes, (num_patients,))      # Class labels

        for _ in range(num_patients):
            seq_len = np.random.randint(1, max_sequence_length + 1)
            self.sequence_lengths.append(seq_len)
            temporal_seq = torch.randn(seq_len, input_size)
            self.temporal_data.append(temporal_seq)

    def __len__(self):
        return self.num_patients

    def __getitem__(self, idx):
        return self.temporal_data[idx], self.static_data[idx], self.targets[idx], self.sequence_lengths[idx]

# Instantiate dataset and split it
dataset = VariableLengthDataset(num_patients, max_sequence_length, input_size, num_static_features, num_classes)
train_size = int(0.7 * num_patients)
val_size = int(0.15 * num_patients)
test_size = num_patients - train_size - val_size
train_data, val_data, test_data = random_split(dataset, [train_size, val_size, test_size])

# Custom collate function for padding sequences
def collate_fn(batch):
    temporal_data, static_data, targets, seq_lengths = zip(*batch)
    seq_lengths = torch.tensor(seq_lengths)
    static_data = torch.stack(static_data)
    targets = torch.tensor(targets)

    # Pad temporal data sequences to match the longest sequence in the batch
    temporal_data_padded = pad_sequence(temporal_data, batch_first=True)

    return temporal_data_padded, static_data, targets, seq_lengths

# Create DataLoaders for batching
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

# Define a Deep State Space Model (DSSM) for classification
class DeepStateSpaceModel(nn.Module):
    def __init__(self, input_size, hidden_size, static_input_size, num_classes, num_layers=1):
        super(DeepStateSpaceModel, self).__init__()

        # LSTM for temporal data (this is the "observation model")
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

        # Linear layer for static data (this is the "static feature processing")
        self.static_fc = nn.Linear(static_input_size, hidden_size)

        # Transition matrix for the state space model (latent state evolution)
        self.transition_matrix = nn.Parameter(torch.randn(hidden_size, hidden_size) * 0.1)  # Scaled initialization

        # Output layer (for classification)
        self.output_layer = nn.Linear(hidden_size * 2, num_classes)  # Changed from hidden_size to hidden_size * 2

    def forward(self, temporal_data, static_data, seq_lengths):
        # Pack and pass temporal data through LSTM
        packed_input = pack_padded_sequence(temporal_data, seq_lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_output, (hn, _) = self.lstm(packed_input)

        # Initial latent state (from LSTM's final hidden state)
        latent_state = hn[-1, :, :]  # shape: [batch_size, hidden_size]

        # State space evolution: evolve the latent state over time using the transition matrix
        for t in range(1, temporal_data.size(1)):  # Loop through time steps (sequence length)
            latent_state = torch.matmul(latent_state, self.transition_matrix)  # Transition from previous state to the next

        # Pass static data through a linear layer
        static_out = self.static_fc(static_data)  # shape: [batch_size, hidden_size]

        # Combine the LSTM output with static features
        combined = torch.cat((latent_state, static_out), dim=1)  # shape: [batch_size, hidden_size * 2]

        # Output classification
        output = self.output_layer(combined)  # shape: [batch_size, num_classes]

        return output

# Initialize model, loss function, and optimizer
model = DeepStateSpaceModel(input_size=input_size, hidden_size=hidden_size,
                            static_input_size=num_static_features, num_classes=num_classes, num_layers=num_layers)

criterion = nn.CrossEntropyLoss()  # For classification
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Gradient clipping to avoid exploding gradients
max_grad_norm = 1.0  # Clip gradients to a maximum norm of 1.0

# Training loop with validation
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    for batch in train_loader:
        temporal_batch, static_batch, target_batch, seq_lengths = batch

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        output = model(temporal_batch, static_batch, seq_lengths)
        loss = criterion(output, target_batch)  # CrossEntropyLoss expects raw logits

        # Backward pass and optimization
        loss.backward()

        # Clip gradients to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)

        optimizer.step()

    # Validation loop to calculate accuracy on validation data
    model.eval()  # Set the model to evaluation mode
    val_preds = []
    val_labels = []
    with torch.no_grad():
        for batch in val_loader:
            temporal_batch, static_batch, target_batch, seq_lengths = batch
            output = model(temporal_batch, static_batch, seq_lengths)

            # Get predictions
            _, preds = torch.max(output, 1)
            val_preds.extend(preds.cpu().numpy())
            val_labels.extend(target_batch.cpu().numpy())

    # Calculate validation accuracy
    val_accuracy = accuracy_score(val_labels, val_preds)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Validation Accuracy: {val_accuracy * 100:.2f}%")

# Final test accuracy calculation
model.eval()  # Set the model to evaluation mode
test_preds = []
test_labels = []
with torch.no_grad():
    for batch in test_loader:
        temporal_batch, static_batch, target_batch, seq_lengths = batch
        output = model(temporal_batch, static_batch, seq_lengths)

        # Get predictions
        _, preds = torch.max(output, 1)
        test_preds.extend(preds.cpu().numpy())
        test_labels.extend(target_batch.cpu().numpy())

# Calculate test accuracy
test_accuracy = accuracy_score(test_labels, test_preds)
print(f"Test Accuracy: {test_accuracy * 100:.2f}%")


Epoch [1/10], Loss: 1.0879, Validation Accuracy: 34.27%
Epoch [2/10], Loss: 1.1007, Validation Accuracy: 32.13%
Epoch [3/10], Loss: 1.1178, Validation Accuracy: 31.20%
Epoch [4/10], Loss: 1.0971, Validation Accuracy: 33.60%
Epoch [5/10], Loss: 1.0784, Validation Accuracy: 33.33%
Epoch [6/10], Loss: 1.1164, Validation Accuracy: 36.00%
Epoch [7/10], Loss: 1.1089, Validation Accuracy: 35.60%
Epoch [8/10], Loss: 1.1082, Validation Accuracy: 33.87%
Epoch [9/10], Loss: 1.0577, Validation Accuracy: 34.93%
Epoch [10/10], Loss: 1.1136, Validation Accuracy: 32.80%
Test Accuracy: 33.07%


Components of the Model:

LSTM Layer: This processes the variable-length time-series data. The LSTM takes the temporal input and extracts features over time. The LSTM's final hidden state (hn[-1]) represents the learned latent state of the sequence. The LSTM is designed to handle sequences of varying lengths, which is done using the pack_padded_sequence function to efficiently process padded sequences.

Transition Matrix: After obtaining the final LSTM hidden state, the model introduces a state-space model with a transition matrix. This matrix is used to evolve the latent state over time. For each time step (from 1 to the sequence length), the latent state is updated by multiplying the previous latent state by this transition matrix. This mechanism helps the model learn how the latent state evolves over time, potentially capturing dynamics in the patient's time-series data.

Static Feature Processing: The static features (e.g., age, gender) are passed through a fully connected layer (static_fc), which transforms them into a hidden space of the same dimension as the LSTM output. This allows the static features to be integrated with the temporal sequence in a consistent latent space.

Combining Temporal and Static Features: The final representation is obtained by concatenating the updated latent state (from the state-space model) with the processed static features.

Output Layer: A fully connected output layer (output_layer) produces the final prediction, which is a vector of class logits (size = number of classes). The logits are passed through a softmax function (implicitly inside the loss function) during training to get probabilities.