In [1]:
import numpy as np
import pandas as pd
import torch
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import os
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split


In [2]:
# replace all instances of 40 with 30 in a numpy array
def replace_40_with_30(arr):
    arr[arr == 40] = 30
    return arr

In [3]:
# Example LSTM Model
class SleepScoringLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers, dropout_prob=0.5):
        super(SleepScoringLSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout_prob)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(p=dropout_prob)
    
    def forward(self, x):
        h_lstm, _ = self.lstm(x)
        out = self.fc(h_lstm[:, -1, :])  # Use the output from the last time step
        return out

In [4]:
features = []
labels = []
for f in os.listdir('training_data'):
    if f.split('__')[0] == 'features':
        features.append(torch.tensor(np.load('training_data/' + f), dtype=torch.float32))
        stem = f.split('__')[1]
        assert os.path.exists('training_data/labels__' + stem), f"no labels for {f}"
        
        label = np.load('training_data/labels__' + stem)
        label[label == 40] = 30 #makes all 'wake-good' become 'wake'
        
        labels.append(torch.tensor(label, dtype=torch.float32))
    else:
        continue

In [5]:
# Function to combine multiple recordings
def combine_recordings(list_of_X_tensors, list_of_y_tensors):
    X_combined = torch.cat(list_of_X_tensors, dim=0)
    y_combined = torch.cat(list_of_y_tensors, dim=0)
    return X_combined, y_combined

# Combine all recordings
X_combined, y_combined = combine_recordings(features, labels)


# Unique labels and mapping to zero-indexed range
unique_labels = torch.unique(y_combined).tolist()
label_mapping = {label: idx for idx, label in enumerate(unique_labels)}

# Apply the mapping
y_combined = torch.tensor([label_mapping[label.item()] for label in y_combined], dtype=torch.long)

# Debugging: Print unique values in y_combined
print("Unique labels:", torch.unique(y_combined))

# Ensure the labels are within the expected range
num_classes = len(unique_labels) # Set the number of classes based on your dataset
assert y_combined.min() >= 0 and y_combined.max() < num_classes, "Labels out of range"

Unique labels: tensor([0, 1, 2, 3, 4, 5, 6, 7])


In [6]:
# Split the combined data into training and validation sets
dataset = TensorDataset(X_combined, y_combined)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

batch_size = 128
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Hyperparameters
input_dim = 5  # Number of features
hidden_dim = 128  # Number of hidden units in LSTM
output_dim = num_classes  # Number of output classes (e.g., sleep state and confidence)
num_layers = 2 # Number of LSTM layers
dropout_prob = 0.25
learning_rate = 0.002
num_epochs = 40

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Create the model
model = SleepScoringLSTM(input_dim, hidden_dim, output_dim, num_layers, dropout_prob).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [7]:
# Training loop
for epoch in range(num_epochs):
    model.train()
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        
        # Ensure outputs are of type torch.FloatTensor and labels are torch.LongTensor
        outputs = outputs.float()
        labels = labels.long()

        loss = criterion(outputs, labels)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    # if (epoch+1) % 10 == 0:
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
    
    # Evaluation on validation data
    model.eval()
    val_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            
            outputs = outputs.float()
            labels = labels.long()
            
            val_loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_loss /= len(val_loader)
    val_accuracy = correct / total
    print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy * 100:.2f}%')

Epoch [1/40], Loss: 0.0556
Validation Loss: 0.2031, Validation Accuracy: 93.05%
Epoch [2/40], Loss: 0.1201
Validation Loss: 0.1854, Validation Accuracy: 93.54%
Epoch [3/40], Loss: 0.1058
Validation Loss: 0.1783, Validation Accuracy: 93.76%
Epoch [4/40], Loss: 0.0455
Validation Loss: 0.1705, Validation Accuracy: 93.95%
Epoch [5/40], Loss: 0.3458
Validation Loss: 0.1654, Validation Accuracy: 94.16%
Epoch [6/40], Loss: 0.2408
Validation Loss: 0.1611, Validation Accuracy: 94.29%
Epoch [7/40], Loss: 0.0166
Validation Loss: 0.1580, Validation Accuracy: 94.37%
Epoch [8/40], Loss: 0.1130
Validation Loss: 0.1540, Validation Accuracy: 94.50%
Epoch [9/40], Loss: 0.0539
Validation Loss: 0.1523, Validation Accuracy: 94.55%
Epoch [10/40], Loss: 0.0101
Validation Loss: 0.1487, Validation Accuracy: 94.66%
Epoch [11/40], Loss: 0.2024
Validation Loss: 0.1490, Validation Accuracy: 94.64%
Epoch [12/40], Loss: 0.1202
Validation Loss: 0.1455, Validation Accuracy: 94.76%
Epoch [13/40], Loss: 0.1224
Validatio

In [9]:
torch.save(model.state_dict(), 'models/ssfm_v1.pth')

# Evaluate

In [None]:
# Load the model
model = SleepScoringLSTM(input_dim, hidden_dim, output_dim, num_layers).to(device)
model.load_state_dict(torch.load('models/ssfm_v1.pth'))  # Replace 'model.pth' with your model file
model.eval()

In [None]:
# Ensure the model is in evaluation mode
model.eval()

new_tensor = torch.tensor(np.load('training_data/features__ACR_33--swisin--NNXr12.npy'), dtype=torch.float32)

# Move data to the device (GPU or CPU)
new_sequences_tensor = new_tensor.to(device)


In [None]:
state_mapping = {0: 'NREM',
                 1: 'REM',
                 2: 'Wake',
                 3: 'Wake-Good',
                 4: 'Transition-to-REM',
                 5: 'Transition-to-NREM',
                 6: 'Transition-to-Wake',
                 7: 'Brief-Aroudal',
                 8: 'Unsure',
                 }

In [None]:

# Get predictions
with torch.no_grad():
    predictions = model(new_sequences_tensor)
    _, predicted_labels = torch.max(predictions, 1)

In [None]:
predicted_labels.shape

In [None]:

# Map predicted labels back to original labels
predicted_labels = [state_mapping[label.item()] for label in predicted_labels]

# Display predicted labels
print(predicted_labels)

In [None]:
t = np.arange(0, len(predicted_labels)*2, 2)

In [None]:
len(predicted_labels)

In [None]:
len(t)

In [None]:
states = pd.DataFrame({'Time': t, 'State': predicted_labels})

In [None]:
states.to_csv('states.csv', index=False)