In [None]:
import mne
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.utils.tensorboard import SummaryWriter

In [None]:
path1 = [] # preprocessed data
path2 = [] # annotations

num_data_max = 5 # data의 개수 (SN001~SN005일 경우 : 5)

for i in range (1,num_data_max+1):
    idx = str(i).zfill(3)
    path1.append("./pre_SN"+idx+".edf")
    
for i in range (1,num_data_max+1):
    idx = str(i).zfill(3)
    path2.append("./SN"+idx+"_sleepscoring.edf")

pre = []
for idx in path1:
    pre.append(mne.io.read_raw_edf(idx, preload=True))
#     편의에 따라
#     pre.append(os.path.join("./", "SN" + idx + ".edf"))

In [None]:
# 채널 타입을 설정하기 위한 사전 정의
channel_types = {
    'EEG F4-M1': 'eeg',  # 인덱스 0
    'EEG C4-M1': 'eeg',  # 인덱스 1
    'EEG O2-M1': 'eeg',  # 인덱스 2
    'EEG C3-M2': 'eeg',  # 인덱스 3
    'EMG chin': 'emg',  # 인덱스 4, EMG chin
    'EOG E1-M2': 'eog',  # 인덱스 5, EOG
    'EOG E2-M2': 'eog',  # 인덱스 6, EOG
    'ECG': 'ecg'   # 인덱스 7, ECG
}

# 채널 타입을 설정
for i in pre:
    i.set_channel_types(channel_types)

# 수면단계와 매핑

In [None]:
# 어노테이션 데이터 로드
annotations = []
for idx in path2:
    annotations.append(mne.read_annotations(idx))

# pre 데이터 객체에 어노테이션 설정
for i in range(0,len(annotations)):
    pre[i].set_annotations(annotations[i])

# 정규화

In [None]:
def normalize_raw_data(raw, channel_types={'eeg': True, 'eog': True, 'emg': True, 'ecg': True}):

    # Ensure the data is preloaded
    if not raw.preload:
        raw.load_data()

    # Pick specified channel types
    channel_indices = mne.pick_types(raw.info, **channel_types)

    # Initialize the standard scaler
    scaler = StandardScaler()

    # Retrieve the data for selected channels
    data = raw.get_data(picks=channel_indices)

    # Reshape data for scaling
    data = data.reshape(data.shape[0], -1).T  # Transpose to have features along rows as expected by StandardScaler

    # Fit and transform the data
    scaled_data = scaler.fit_transform(data)

    # Replace original data with scaled data
    raw._data[channel_indices, :] = scaled_data.T  # Transpose back to original shape

    return raw

# Normalize all_pre directly
for i in pre:
    i = normalize_raw_data(i)

# CNN Model

## Annotation setting

- Lights off, Lights on 제거해서 기존 856개 -> 854개 annotations(labels) : 1번째 data(856), 2번째 data(858), 3번째 data(956)...

In [None]:
def annotation_remove(annotations):
    # Find Lights off & Lights on
    indices_to_remove = [idx for idx, desc in enumerate(annotations.description) if desc.startswith('Lights off') or desc.startswith('Lights on')]

    # Create new annotations
    new_annotations = mne.Annotations(onset=[annotations.onset[i] for i in range(len(annotations.onset)) if i not in indices_to_remove],
                                      duration=[annotations.duration[i] for i in range(len(annotations.duration)) if i not in indices_to_remove],
                                      description=[annotations.description[i] for i in range(len(annotations.description)) if i not in indices_to_remove],
                                      orig_time=annotations.orig_time)
    return new_annotations

for i in range(len(annotations)):
    annotations[i] = annotation_remove(annotations[i])

# check removal of two annotations
# labels = np.array(annotations[2].description)
# print("Label shape:", labels.shape)
# print("Unique label values:", np.unique(labels))

## Label Encoding

In [None]:
all_possible_labels = ['Sleep stage W','Sleep stage R','Sleep stage N1','Sleep stage N2','Sleep stage N3',]

label_encoder = LabelEncoder().fit(all_possible_labels)

# initialize list for encoded labels
encoded_labels = []

for idx in annotations:
    encoded_labels.append(label_encoder.transform(np.array(idx.description)))

for idx in encoded_labels:
    print(idx.shape)
print(label_encoder.classes_)

In [None]:
epoch_duration =30

# Create events every 30 seconds
events = []
for idx in pre:
    events.append(mne.make_fixed_length_events(idx, duration=epoch_duration))

# Create the epochs
epochs = []
for i in range(0,len(pre)):
    epochs.append(mne.Epochs(pre[i], events[i], tmin=0.0, tmax=epoch_duration - 1 / 100, baseline=None, preload=True))

In [None]:
data_list = []
num_epochs = 800  # 각 데이터의 epoch 개수

for i in range(0,len(epochs)):
    data_list.append(epochs[i].get_data())  #3D array (n epochs, 8 channels, 3000 timepoints)
    data_list[i]= np.float32(data_list[i][:-1]) #이건 마지막 labeling 안된 에포크 제거하는 줄
    #data_list[i]= np.float32(data_list[i][:num_epochs+i]) #kernel 자꾸 죽어서 크기 작게 slicing 해봄..

labels_list = []
for i in range(0,len(encoded_labels)):
    labels_list.append(encoded_labels[i])
    #labels_list.append(encoded_labels[i][:num_epochs+i])

print("data shape : ", len(data_list), "*", data_list[0].shape)
print("label shape : ", len(labels_list), "*",labels_list[0].shape)

In [None]:
def compute_accuracy(model, dataloader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for epochs, labels in dataloader:
            epochs = epochs.squeeze(0).float()  # Remove batch dimension and convert to float
            labels = labels.squeeze(0).long()  # Remove batch dimension and convert to long
            outputs = model(epochs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

In [None]:
# Custom dataset for PSG data
class PSGDataset(Dataset):
    def __init__(self, data_list, labels_list):
        self.data_list = data_list
        self.labels_list = labels_list

    def __len__(self):
        return len(self.data_list)

    def __getitem__(self, idx):
        data = self.data_list[idx]  # shape: (n_epochs, 8, 3000)
        labels = self.labels_list[idx]  # shape: (n_epochs,)
        return torch.tensor(data, dtype=torch.float32), torch.tensor(labels, dtype=torch.long)

train_split = 11
train_data_list = data_list[:train_split]
train_labels_list = labels_list[:train_split]
test_data_list = data_list[train_split:]
test_labels_list = labels_list[train_split:]

train_dataset = PSGDataset(train_data_list, train_labels_list)
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=False)

test_dataset = PSGDataset(test_data_list, test_labels_list)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [None]:
def predict(model, data_loader):
    model.eval()
    all_predictions = []
    with torch.no_grad():
        for epochs, _ in data_loader:
            epochs = epochs.squeeze(0).float()  # Remove batch dimension and convert to float
            outputs = model(epochs)
            _, predicted = torch.max(outputs, 1)
            all_predictions.extend(predicted.cpu().numpy())
    return all_predictions

# Example usage:
predictions = predict(model, data_loader)

In [None]:
# Check GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Sample input lists (assuming you have these prepared)
data_list = [torch.randn(80, 8, 3000) for _ in range(10)]
labels_list = [torch.randint(0, 5, (80,)) for _ in range(10)]

# Split the data into training and test sets
train_data_list = data_list[:7]
train_labels_list = labels_list[:7]
test_data_list = data_list[7:]
test_labels_list = labels_list[7:]

# Dataset, DataLoader definition
class PSGDataset(Dataset):
    def __init__(self, data_list, labels_list):
        self.data_list = data_list
        self.labels_list = labels_list

    def __len__(self):
        return sum(len(data) for data in self.data_list)

    def __getitem__(self, idx):
        cumulative_idx = 0
        for i, data in enumerate(self.data_list):
            if cumulative_idx + len(data) > idx:
                item_idx = idx - cumulative_idx
                return self.data_list[i][item_idx], self.labels_list[i][item_idx]
            cumulative_idx += len(data)
        raise IndexError("Index out of range")

train_dataset = PSGDataset(train_data_list, train_labels_list)
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)

test_dataset = PSGDataset(test_data_list, test_labels_list)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Model definition
class CNN_LSTM(nn.Module):
    def __init__(self, num_channels, seq_length, num_classes):
        super(CNN_LSTM, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=num_channels, out_channels=16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(2)
        self.dropout = nn.Dropout(p=0.5)
        self.lstm = nn.LSTM(input_size=32 * (seq_length // 4), hidden_size=128, batch_first=True)
        self.fc = nn.Linear(128, num_classes)

    def forward(self, x):
        batch_size, num_channels, seq_length = x.size()
        print("Initial shape:", x.shape)

        # Pass through first convolutional layer and pooling
        x = self.pool(F.relu(self.conv1(x)))  # Shape: (batch_size, 16, seq_length/2)
        print("After conv1 and pool:", x.shape)

        # Pass through second convolutional layer and pooling
        x = self.pool(F.relu(self.conv2(x)))  # Shape: (batch_size, 32, seq_length/4)
        print("After conv2 and pool:", x.shape)

        # Reshape for LSTM layer
        x = x.permute(0, 2, 1)  # Shape: (batch_size, seq_length/4, 32)
        print("After permute:", x.shape)

        # LSTM layer
        x, _ = self.lstm(x)  # Shape: (batch_size, seq_length/4, 128)
        print("After LSTM:", x.shape)

        # Apply dropout
        x = self.dropout(x)
        x = self.fc(x[:, -1, :])  # Shape: (batch_size, num_classes)
        print("After FC:", x.shape)

        return x

model = CNN_LSTM(num_channels=8, seq_length=3000, num_classes=5).to(device)  # Move model to GPU

# Initialize weights
def weights_init(m):
    if isinstance(m, nn.Conv1d) or isinstance(m, nn.Linear):
        nn.init.kaiming_normal_(m.weight.data)

model.apply(weights_init)

# Loss function and optimizer definition
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-4)

# Function to calculate accuracy
def calculate_accuracy(dataloader, model):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += targets.size(0)
            correct += (predicted == targets).sum().item()
    return correct / total

writer = SummaryWriter('runs/experiment_1')

# Model training
num_epochs = 15  # Number of epochs

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, targets in train_dataloader:
        inputs, targets = inputs.to(device), targets.to(device)  # Move data to GPU

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()

        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()

        running_loss += loss.item()

    # Calculate training loss
    training_loss = running_loss / len(train_dataloader)

    # Calculate training accuracy
    train_accuracy = calculate_accuracy(train_dataloader, model)

    # Calculate test accuracy
    test_accuracy = calculate_accuracy(test_dataloader, model)

    # Log to TensorBoard
    writer.add_scalar('Loss/train', training_loss, epoch)
    writer.add_scalar('Accuracy/train', train_accuracy, epoch)
    writer.add_scalar('Accuracy/test', test_accuracy, epoch)

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {training_loss:.4f}, Train Accuracy: {train_accuracy:.4f}, Test Accuracy: {test_accuracy:.4f}")

print("Training finished.")
writer.close()


In [None]:
%load_ext tensorboard
%tensorboard --logdir runs