In [1]:
from pymatreader import read_mat
import os
os.environ["KERAS_BACKEND"] = "torch"
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:512"
import numpy as np
import librosa
import scipy
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
import keras
from keras.layers import Conv2D, BatchNormalization, AveragePooling2D, Input, Flatten, Dense, Reshape, SpatialDropout2D, MultiHeadAttention, Conv1D, Dropout
from keras.layers import LayerNormalization, AveragePooling1D, RepeatVector, Multiply, Concatenate,GlobalAveragePooling2D
import torch
from keras.losses import CosineSimilarity
from torch.utils.data import Dataset
import tensorflow as tf
torch.set_default_device('cuda')
from sklearn.preprocessing import scale
from torch.utils.data import Dataset, DataLoader
import more_itertools


#torch.backends.cudnn.benchmark = True

In [2]:
X = np.load('sub1.npy')
stim1 = np.load('stim.npy')
stim2 = np.load('mask.npy')
label = read_mat('ExJobb/y.mat')['label'][0:4]%2
label = np.repeat(label.reshape(-1,1),4224,axis=1)


In [3]:
def unison_shuffled_copies(a, b, c, d):
    p = np.random.permutation(len(a))
    return a[p], b[p], c[p], d[p]

X,stim1,stim2,label = unison_shuffled_copies(X,stim1,stim2,label)


In [4]:
def create_data(data,train,val,test):
    dtrain = np.hstack([data[i] for i in range(train)]).T
    dval = np.hstack([data[i] for i in range(train,val)]).T
    dtest = np.hstack([data[i] for i in range(val,test)]).T
    return [dtrain,dval,dtest]

[stimtr,stimval,stimts] = create_data(stim1,65,70,80)
[mastr,masval,masts] = create_data(stim2,65,70,80)
[resptr,respval,respts] = create_data(X,65,70,80)
[l_tr,l_val,l_ts] = create_data(label,65,70,80)

In [5]:
stimtr = stimtr.reshape(-1,1)
stimval = stimval.reshape(-1,1)
stimts = stimts.reshape(-1,1)

mastr = mastr.reshape(-1,1)
masval = masval.reshape(-1,1)
masts = masts.reshape(-1,1)

ltr = l_tr.reshape(-1,1)
lval = l_val.reshape(-1,1)
lts = l_ts.reshape(-1,1)

In [6]:
trsc = StandardScaler()
X_tr = np.expand_dims(trsc.fit_transform(resptr),axis=-1).astype(np.float32)
X_val = np.expand_dims(trsc.transform(respval),axis=-1).astype(np.float32)
X_test = np.expand_dims(trsc.fit_transform(respts),axis=-1).astype(np.float32)

trst = StandardScaler()
y_tr = trst.fit_transform(stimtr).astype(np.float32)
y_val = trst.transform(stimval).astype(np.float32)
y_test = trst.fit_transform(stimts).astype(np.float32)

trsm = StandardScaler()
m_tr = trsm.fit_transform(mastr).astype(np.float32)
m_val = trsm.transform(masval).astype(np.float32)
m_test = trsm.fit_transform(masts).astype(np.float32)

In [7]:
x = np.array(list(more_itertools.windowed(X_tr,n=128,step=64)))
y = np.array(list(more_itertools.windowed(y_tr,n=128,step=64)))
m = np.array(list(more_itertools.windowed(m_tr,n=128,step=64)))
labels = list(more_itertools.windowed(ltr,128,step=64))
labels = np.mean(labels,axis=1,keepdims=False)

In [8]:
xv = np.array(list(more_itertools.windowed(X_val,n=128,step=64)))
yv = np.array(list(more_itertools.windowed(y_val,n=128,step=64)))
mv = np.array(list(more_itertools.windowed(m_val,n=128,step=64)))
lv = list(more_itertools.windowed(lval,128,step=64))
lv = np.mean(lv,axis=1,keepdims=False)

In [9]:
lv.shape

(329, 1)

In [10]:
train_dataset = tf.data.Dataset.from_tensor_slices(((x,y,m),labels))
val_dataset = tf.data.Dataset.from_tensor_slices(((xv,yv,mv),lv))

In [11]:
train_dataset = train_dataset.batch(4).shuffle(32)
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)

In [12]:
val_dataset = val_dataset.batch(4).shuffle(32)
val_dataset = val_dataset.prefetch(tf.data.AUTOTUNE)

In [13]:
def EEG_Channel(input_shape):
    input = Input(input_shape)
    x = input
    
    eeg = Conv2D(32,(3,3),padding="same",activation="elu")(x)
    eeg = AveragePooling2D((3,1))(eeg)
    eeg = Conv2D(8,(3,3),padding="same",activation = "elu")(eeg)
    eeg = AveragePooling2D((4,1))(eeg)
    #eeg = Conv2D(4,(3,3),padding="same",activation = "elu")(eeg)

    
    res = Conv2D(1,(10,1),activation="tanh")(eeg)
    res = Flatten()(res)
    res = RepeatVector(128)(res)
    res = Reshape((128,66,1))(res)
    eeg = Multiply()([input,res])

    out =AveragePooling2D((1,8))(eeg)
    
    model = keras.Model(input,out)
    return model

In [14]:
def Envelope_model(input_shape):
    input = Input(input_shape)

    x = input

    env = Conv1D(64,1,padding="same",activation="elu")(x)
    out = Conv1D(8,3,padding="same",activation = "elu")(env)
    #out = Conv1D(16,5,padding="same",activation="elu")(env)

    out = Reshape((128,8,1))(out)


    model = keras.Model(input,out)
    return model

In [15]:
def transform_block(inputs,head_dim,num_heads,dropout,filters):
    x = MultiHeadAttention(key_dim=head_dim,num_heads=num_heads,dropout=dropout)(inputs,inputs)
    x = Dropout(dropout)(x)
    x = LayerNormalization(epsilon=1e-6)(x)

    res = x+inputs

    x = Conv2D(filters,(3,3),padding="same",activation="relu")(res)
    x = Dropout(dropout)(x)
    x = Conv2D(inputs.shape[-1],(1,1),activation="relu")(x)
    x = LayerNormalization(epsilon=1e-6)(x)
    return x+res

In [16]:
def Combined_model(inputsh,head_dim,num_heads,dropout,filters,num_transf):
    
    eeg = Input(inputsh[0])
    stim = Input(inputsh[1])
    mask = Input(inputsh[2])
    
    eeg_model = EEG_Channel((128,66,1))
    eeg_out = eeg_model(eeg)

    stim_model = Envelope_model((128,1))
    stim_out = stim_model(stim)

    mask_model = Envelope_model((128,1))
    mask_out = mask_model(mask)

    comb = Concatenate()([eeg_out,stim_out,mask_out])
    x = comb

    for _ in range(num_transf):
        x = transform_block(x,head_dim,num_heads,dropout,filters)

    x = AveragePooling2D((4,4))(x)
    x = Dropout(dropout)(x)
    x = Conv2D(1,(5,2),activation="elu")(x)

    x = Flatten()(x)

    x = Dense(16)(x)

    x = Dense(1,activation="sigmoid")(x)

    return keras.Model([eeg,stim,mask],x)

    

In [17]:
inputsh = [(128,66,1),(128,1),(128,1)]
head_dim = 256
num_heads = 4
dropout = 0.2
filters = 8
num_transf = 8

In [18]:
model = Combined_model(inputsh,head_dim,num_heads,dropout,filters,num_transf)

In [19]:
model.summary()

In [20]:
model.compile(optimizer="Nadam",loss=keras.losses.BinaryCrossentropy(),metrics=[keras.metrics.BinaryAccuracy(),keras.metrics.FalseNegatives(),keras.metrics.FalsePositives()])

In [None]:
model.fit(train_dataset,epochs=10,validation_data = val_dataset)

Epoch 1/10
[1m1073/1073[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m975s[0m 908ms/step - binary_accuracy: 0.6308 - false_negatives: 406.0242 - false_positives: 342.6164 - loss: 0.6344 - val_binary_accuracy: 0.5289 - val_false_negatives: 43.0000 - val_false_positives: 112.0000 - val_loss: 0.7606
Epoch 2/10
[1m1022/1073[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m46s[0m 912ms/step - binary_accuracy: 0.7018 - false_negatives: 335.8327 - false_positives: 293.8395 - loss: 0.5887

In [24]:
k = model.evaluate(val_dataset)

[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 54ms/step - binary_accuracy: 0.7012 - false_negatives: 37.1163 - loss: 0.6247


In [25]:
k

[0.5546475648880005, 0.7933130860328674, 67.0]