In [7]:
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import accuracy_score
import tensorflow as tf
from keras.models import Sequential
from keras.layers import LSTM, Dense, Flatten
from keras.callbacks import EarlyStopping

In [8]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [9]:
# Set the path to your data directory
data_directory = '/content/drive/My Drive/data'
# Function to calculate SDNN from heart rate data
def calculate_sdnn(heart_rate_values, window_size=5):
    # Convert heart rate to RR intervals (assuming constant heart rate for simplicity)
    rr_intervals = 60 / heart_rate_values  # since values are in bpm

    # Calculate SDNN over a moving window
    sdnn_values = []
    for i in range(len(rr_intervals) - window_size + 1):
        sdnn_values.append(np.std(rr_intervals[i:i+window_size]))

    return np.array(sdnn_values)

# Modified function to load and preprocess heart rate data
def load_and_preprocess_heartrate(subject_id):
    file_path = os.path.join(data_directory, 'heart_rate', f'{subject_id}_heartrate.txt')
    data = pd.read_csv(file_path, header=None, names=['date', 'heartrate'])
    heartrate_values = data['heartrate']

    # Calculate SDNN from heart rate data
    sdnn_values = calculate_sdnn(heartrate_values)

    return sdnn_values

def load_labeled_sleep(subject_id):
    file_path = os.path.join(data_directory, 'labels', f'{subject_id}_labeled_sleep.txt')
    data = pd.read_csv(file_path, header=None, delim_whitespace=True, names=['date', 'stage'])
    # Encode sleep labels as 0 for awake, 1 for light sleep, and 2 for deep sleep
    data['stage'] = data['stage'].map({0: 0, 1: 1, 2: 1, 3: 2, 5: 2})  # N1 and N2 as light sleep, N3 as deep sleep
    return data['stage'].values

subject_ids = [file.split('_')[0] for file in os.listdir(os.path.join(data_directory, 'labels')) if '_labeled_sleep.txt' in file]

X_heartrate = []
y_sleep = []

sequence_length = 50

for subject_id in subject_ids:
    heartrate_values = load_and_preprocess_heartrate(subject_id)
    sleep_labels = load_labeled_sleep(subject_id)

    for i in range(0, len(heartrate_values) - sequence_length):
        X_heartrate.append(heartrate_values[i:i+sequence_length])
        y_sleep.append(sleep_labels[min(i+sequence_length-1, len(sleep_labels)-1)])

In [12]:
X_heartrate = np.array(X_heartrate)
y_sleep = np.array(y_sleep)

label_encoder = LabelEncoder()
y_sleep = label_encoder.fit_transform(y_sleep)

scaler = StandardScaler()
X_heartrate_scaled = scaler.fit_transform(X_heartrate)

X_train, X_test, y_train, y_test = train_test_split(
    X_heartrate_scaled, y_sleep, test_size=0.1, random_state=42
)
print("Unique labels in y_train:", np.unique(y_train))
print("Unique labels in y_test:", np.unique(y_test))



Unique labels in y_train: [0 1 2 3]
Unique labels in y_test: [0 1 2 3]


In [13]:
# Aggregate labels 2 and 3 into a single class
y_train[y_train == 3] = 2
y_test[y_test == 3] = 2
# Adjusting the number of output classes
num_classes = 3

# Model architecture
model = Sequential([
    LSTM(units=100, input_shape=(sequence_length, 1), return_sequences=True),
    LSTM(units=50, return_sequences=True),
    Flatten(),
    Dense(64, activation='relu'),
    Dense(num_classes, activation='softmax')  # Softmax activation for multi-class classification
])

# Early stopping callback
early_stopping = EarlyStopping(patience=3, restore_best_weights=True)

# Model compilation
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Model training
history = model.fit(
    X_train[..., np.newaxis],  # Expand dimension for LSTM input shape
    y_train,
    epochs=20,
    batch_size=32,
    validation_split=0.1,
    callbacks=[early_stopping]
)


# Model evaluation
y_pred = model.predict(X_test[..., np.newaxis])
y_pred_classes = np.argmax(y_pred, axis=1)  # Convert probabilities to class labels
accuracy = accuracy_score(y_test, y_pred_classes)
print("Test Accuracy:", accuracy)


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Test Accuracy: 0.9567969615445482
