In [1]:
import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import pickle
from torch.utils.data import Dataset, SubsetRandomSampler, DataLoader

In [2]:
if torch.cuda.is_available():  
  dev = "cuda:0" 
else:  
  dev = "cpu"  
device = torch.device(dev)  
a = torch.zeros(4,3)    
a = a.to(device)

In [3]:
labels = {}
#W, R, 1, 2, 3, 4, M 
labels['W']=5
labels['R']=6
labels['M']=0
labels['1']=1
labels['2']=2
labels['3']=3
labels['4']=4

In [4]:
def load_data(dataset):
    batch_size=32
    validation_split = .2
    shuffle_dataset = True
    random_seed= 42
    dataset_size = len(dataset)
    indices = list(range(dataset_size))
    split = int(np.floor(validation_split * dataset_size))
    if shuffle_dataset :
        np.random.seed(random_seed)
        np.random.shuffle(indices)
    train_indices, val_indices = indices[split:], indices[:split]

    # Creating PT data samplers and loaders:
    train_sampler = SubsetRandomSampler(train_indices)
    valid_sampler = SubsetRandomSampler(val_indices)
    train_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, 
                                               sampler=train_sampler)
    val_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size,
                                                    sampler=valid_sampler)


    return train_loader, val_loader

In [8]:

class CustomSleepDataset(Dataset):
    
    def __init__(self, file_list):
        self.x = []
        self.y = []
        for i in file_list.keys():
            self.parse(file_list[i]["PSG.edf"], file_list[i]["Hypnogram.edf"])
        self.x = torch.FloatTensor(self.x)
        self.y = torch.LongTensor(self.y)
    def parse(self, sleep, hypnogram):
        _,_, header = highlevel.read_edf(hypnogram)
        signals, _, _ = highlevel.read_edf(sleep)
     
        for annotation in header['annotations']:
            start = int(annotation[0])
            end = int(annotation[1])
            sleep_stage = annotation[2][-1]
            if sleep_stage != '?':
                for i in range(start, end, 30):
                # pick i to i +30 for target range end
                    self.x.append([ signals[0][i*100:(i+30)*100],signals[1][i*100:(i+30)*100], signals[2][i*100:(i+30)*100]])
                    self.y.append(labels[sleep_stage])
    
    def __len__(self):

        return len(self.y)

    
    def __getitem__(self, index):
        
        return (self.x[index], self.y[index])
class CustomSleepSpectrogramDataset(CustomSleepDataset):
    
    def __init__(self, file_list):
        super().__init__(file_list)
    def parse(self, sleep, hypnogram):
        _,_, header = highlevel.read_edf(hypnogram)
        signals, _, _ = highlevel.read_edf(sleep)
        for annotation in header['annotations']:
            start = int(annotation[0])
            end = int(annotation[1])
            sleep_stage = annotation[2][-1]
            if (sleep_stage != '?') and (sleep_stage !='W') and (sleep_stage != 'R') and (sleep_stage != 'e'):
                for i in range(start, start+end, 30):
                # pick i to i +30 for target range end
                    x = np.asarray([ signals[0][i*100:(i+30)*100],signals[1][i*100:(i+30)*100], signals[2][i*100:(i+30)*100]])
                    _,_,x = spectrogram(x, fs=100)
                    self.x.append(x)
                    self.y.append(labels[sleep_stage])



In [9]:
filename = '2d_sleeper_dataset.pkl'
infile = open(filename,'rb')
dataset = pickle.load(infile)
infile.close()
train_loader, val_loader = load_data(dataset)

In [98]:
##train
n_epochs = 5

def train_model(model, train_dataloader, n_epoch=n_epochs, optimizer=optimizer, criterion=criterion):
    import torch.optim as optim
    model.train()# prep model for training
    model.to(device)
    for epoch in range(n_epoch):
        curr_epoch_loss = []
        for data, target in train_dataloader:
            data = data.cuda()
            target = target.cuda()
            optimizer.zero_grad()

            with torch.set_grad_enabled(True):
                outputs = model(data)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, target)
                # backward + optimize only if in training phase
                loss.backward()
                optimizer.step()
           
            curr_epoch_loss.append(loss.cpu().data.numpy())
        print(f"Epoch {epoch}: curr_epoch_loss={np.mean(curr_epoch_loss)}")
    return model


In [116]:
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=256, kernel_size=7,stride=1,padding=10)
        #kernel =2, stride=2
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(in_channels=256, out_channels=128, kernel_size=5,stride=2,padding=6)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv3 = nn.Conv2d(in_channels=128, out_channels=64, kernel_size=3,stride=2,padding=6)
        
        self.fc1 = nn.Linear(64*7*4, 5)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
#         print(x.shape)
        x = self.pool(F.relu(self.conv2(x)))
#         print(x.shape)
        x = self.pool(F.relu(self.conv3(x)))
#         print(x.shape)
#         x = x.squeeze(3)
        x = x.view(-1,64 * 7*4)

        
#         print(x.shape)
        x = self.fc1(x)
        return x
    

In [117]:
##load model
model = SimpleCNN()
criterion = torch.nn.modules.loss.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

In [118]:

train_model(model, train_loader,n_epochs, optimizer, criterion)

Epoch 0: curr_epoch_loss=nan
Epoch 1: curr_epoch_loss=nan
Epoch 2: curr_epoch_loss=nan
Epoch 3: curr_epoch_loss=nan
Epoch 4: curr_epoch_loss=nan


SimpleCNN(
  (conv1): Conv2d(3, 256, kernel_size=(7, 7), stride=(1, 1), padding=(10, 10))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(256, 128, kernel_size=(5, 5), stride=(2, 2), padding=(6, 6))
  (conv3): Conv2d(128, 64, kernel_size=(3, 3), stride=(2, 2), padding=(6, 6))
  (fc1): Linear(in_features=1792, out_features=5, bias=True)
)

In [119]:
def eval_model(model, dataloader):

    model.eval()
    Y_pred = []
    Y_test = []
    for data, target in dataloader:
        data = data.cuda()
        target = target.cuda()
        outputs = model(data)
        _,outputs = torch.max(outputs, dim = 1)
        Y_pred.append(outputs.cpu())

        Y_test.append(target.cpu().detach().numpy())
    Y_pred = np.concatenate(Y_pred, axis=0)
    Y_test = np.concatenate(Y_test, axis=0)

    return Y_pred, Y_test

In [120]:
from sklearn.metrics import accuracy_score

y_pred, y_true = eval_model(model, val_loader)

acc = accuracy_score(y_true, y_pred)

In [121]:
acc

0.0

In [122]:
y_pred

array([0, 0, 0, ..., 0, 0, 0])