In [None]:
!pip install pyEDFlib
import pyedflib
import numpy as np
import warnings 
import os
import collections
from PIL import Image
from numpy.random import seed 
import tensorflow
warnings.filterwarnings("ignore")

In [None]:
!wget -r -N -c -np https://physionet.org/files/chbmit/1.0.0/

In [97]:
pathDataSet = 'physionet.org/files/chbmit/1.0.0/'  # path of the dataset
patients = ["01","02","03","04","05","06","07","08","09","10"]   # the record of the patients we use to train

In [27]:
def loadSummaryPatient(index): 
    f = open(pathDataSet+'chb'+patients[index]+'/chb'+patients[index]+'-summary.txt', 'r')
    parent = 'chb'+patients[index]+'/'
    return f, parent

In [28]:
def seizureImageGenerate(secSt, secEn, name_F, parent):
    file1 = pyedflib.EdfReader(pathDataSet+parent+name_F)
    n = file1.signals_in_file
    signal_labels = file1.getSignalLabels()
    signal_headers = file1.getSignalHeaders()
    rate = signal_headers[0]['sample_rate']
    dur = file1.getFileDuration()
    x = np.zeros((n, file1.getNSamples()[0]))
    for i in range(n):
        x[i,:] = file1.readSignal(i)
        label = file1.getLabel(i)
    file1.close()
    # a = os.getcwd()
    path = 'chbfig/'+ parent
    if os.path.isdir(path) is not True:
        os.makedirs(path)
    picnum = int(dur*rate/256)
    for i in range(picnum):
        img = x[:,i*256:(i+1)*256]
        Img = Image.fromarray(np.uint8(img))
        if secSt <= i+1 <= secEn:
            filename = '_seizure_'+ str(i)
            Img.save(path + name_F.split('.')[0] + filename+'.jpg')
        else:
            filename = '_nonseizure_'+ str(i)
            Img.save(path + name_F.split('.')[0] + filename+'.jpg')

In [29]:
def createDataset():
    print("START \n")
    for indexPatient in range(0, len(patients)):
        # fileList = []
    
        f, parent = loadSummaryPatient(indexPatient)
        line=f.readline()
        while (line):
            data=line.split(':')
            if (data[0]=="File Name"):
                name_F=data[1].strip()
                # print(name_F)
                for i in range(3):
                    line=f.readline()
                for j in range(0, int(line.split(': ')[1])):
                    secSt=int(f.readline().split(': ')[1].split(' ')[0])
                    secEn=int(f.readline().split(': ')[1].split(' ')[0])
                    seizureImageGenerate(secSt, secEn, name_F, parent)

            line=f.readline()
        f.close()

    print("END \n")

In [None]:
createDataset()

In [32]:
def testlabelGenerate():
    parent_path = 'chbfig/'
    featureList = []
    labelList = []
    for indexPatient in range(0, len(test_patients)):         # len(patients)
        sub_path = 'chb'+test_patients[indexPatient]+'/'
        print(sub_path)
        directory_name = parent_path+sub_path
        features = np.zeros((len(os.listdir(directory_name)),23,256))
        labels = np.zeros((len(os.listdir(directory_name))),dtype=int)
        i = 0 #the feature index
        for filename in os.listdir(directory_name):
            # print(filename)
            if '_seizure_' in filename:
                im_features = np.array(Image.open(directory_name+filename))
                features[i] = im_features[0:23,:]
                labels[i] = 1
            elif '_nonseizure_' in filename:
                im_features = np.array(Image.open(directory_name+filename))
                features[i] = im_features[0:23,:]
                labels[i] = 0
            i = i+1

        featureList.append(features)
        labelList.append(labels)

    X = featureList[0]
    Y = labelList[0]
    for j in range(1,len(featureList)):
        X = np.vstack((X,featureList[j]))
        Y = np.hstack((Y,labelList[j]))
  
    return X,Y

In [70]:
import numpy as np 
from keras.utils import Sequence
import random

def generatePathList(patients, test_size):
    parent_path = 'chbfig/'
    pathList = []
    for indexPatient in range(0, len(patients)):
        sub_path = 'chb'+patients[indexPatient]+'/'
        directory_name = parent_path+sub_path
        for filename in os.listdir(directory_name):
            pathList.append(directory_name+filename)
    L = len(pathList)
    test_index = int(L*test_size)
    index = random.sample(range(L), L)
    return index[:test_index],index[test_index:],pathList


class DataGenerator(Sequence):

    def __init__(self, index, pathList, parent_path = 'chbfig/', batch_size=32):
        self.batch_size = batch_size
        self.parent_path = parent_path
        self.pathList = pathList
        self.index = index
        self.L = len(self.index)


    def __len__(self):
        return self.L - self.batch_size

    def __getitem__(self, idx):
        batch_indexs = self.index[idx:(idx+self.batch_size)]
        image_path = [self.pathList[k] for k in batch_indexs]
    
        return self._load_image(image_path)

  
    def _load_image(self, image_path):
        features = np.zeros(((len(image_path)),23,256))
        labels = np.zeros((len(image_path)),dtype=int)
        i = 0 #the feature index
        for name in image_path:
            #print(name)
            if '_seizure_' in name:
                features[i] = np.array(Image.open(name))[0:23,:]
                labels[i] = 1
            elif '_nonseizure_' in name:
                features[i] = np.array(Image.open(name))[0:23,:]
                labels[i] = 0
            i = i+1
        #print(features)
        #print(labels)
        #print(np.expand_dims(np.array(features), axis=3).shape)
        #print(labels.shape)
        return np.expand_dims(np.array(features), axis=3),labels


In [77]:
import keras
from keras.models import Sequential
from keras.layers import Dropout, Dense, Activation, Flatten, LSTM, Bidirectional, ConvLSTM2D
from keras.layers import Conv2D, MaxPooling2D, BatchNormalization, TimeDistributed, SimpleRNN, GRU
from keras.optimizers import SGD

model = Sequential()

model.add(Conv2D(64, (2, 4), input_shape=((23, 256, 1))))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(1, 4)))
model.add(Dropout(0.15))

model.add(Conv2D(32, (2, 4)))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(1, 4)))
model.add(Dropout(0.15))

model.add(Conv2D(32, (2, 4)))
model.add(BatchNormalization())
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(1, 4)))
model.add(Dropout(0.15))

model.add(TimeDistributed(Flatten()))
model.add(Bidirectional(GRU(32)))
model.add(Dropout(0.15))

model.add(Dense(32, activation='relu'))
model.add(Dense(1, activation='sigmoid'))


sgd = SGD(lr=0.0001, decay=1e-6, momentum=0.9, nesterov=True)
model.compile(metrics=['accuracy',keras.metrics.Recall(name='sen')], 
              loss='binary_crossentropy', optimizer=sgd)


In [94]:
seed(1) 

In [98]:
save_dir = os.path.join(os.getcwd(), 'cks')
if os.path.isdir(save_dir) is not True:
        os.makedirs(save_dir)
filepath = "model_{epoch:03d}-{val_accuracy:.2f}-{val_sen:.2f}.hdf5"
checkpoint = keras.callbacks.ModelCheckpoint(os.path.join(save_dir, filepath), verbose=0,
                             save_best_only=False, save_weights_only=False)

In [99]:
test_id, train_id, path_list = generatePathList(patients,test_size=0.3) # 30% test_size 
train_data = DataGenerator(train_id, path_list)
test_data = DataGenerator(test_id, path_list)

In [None]:
model.fit_generator(generator=train_data, epochs=50, verbose=1, callbacks=[checkpoint], steps_per_epoch=None, 
                    validation_data=test_data, class_weight={0:1, 1:50}, workers=4, use_multiprocessing=True,
                    shuffle=False, initial_epoch=0)