In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import gzip
import os
import h5py

In [None]:
!pip install -q -U keras-tuner

[?25l[K     |███▍                            | 10 kB 20.6 MB/s eta 0:00:01[K     |██████▊                         | 20 kB 10.8 MB/s eta 0:00:01[K     |██████████                      | 30 kB 8.2 MB/s eta 0:00:01[K     |█████████████▍                  | 40 kB 7.5 MB/s eta 0:00:01[K     |████████████████▊               | 51 kB 4.7 MB/s eta 0:00:01[K     |████████████████████            | 61 kB 5.5 MB/s eta 0:00:01[K     |███████████████████████▍        | 71 kB 5.6 MB/s eta 0:00:01[K     |██████████████████████████▊     | 81 kB 4.5 MB/s eta 0:00:01[K     |██████████████████████████████  | 92 kB 5.0 MB/s eta 0:00:01[K     |████████████████████████████████| 98 kB 2.1 MB/s 
[?25h

In [None]:
import keras_tuner as kt

In [None]:
from google.colab import drive 
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd /content/drive/My\ Drive/Group 4/

/content/drive/.shortcut-targets-by-id/1-A7MSZ6V8eP_xZaDJ_j7yN9ddVL8-jgm/Group 4


In [None]:
def model_builder(hp):

    model = keras.models.Sequential()

    hp_units_1 = hp.Int('units_1', min_value=20, max_value=30, step=1)

    model.add(layers.Conv2D(hp_units_1, kernel_size=(5,20), 
                            activation='relu', 
                            input_shape=(64,256,1), 
                            bias_regularizer=regularizers.l1(1e-5),
                            activity_regularizer=regularizers.l1(1e-5),
                            kernel_regularizer=regularizers.l1(1e-5)))
    model.add(layers.MaxPooling2D(pool_size=(3,12)))

    hp_units_2 = hp.Int('units_2', min_value=10, max_value=20, step=1)

    model.add(layers.Flatten())
    model.add(layers.Dense(hp_units_2, activation='relu',
              bias_regularizer=regularizers.l1(1e-5),
              activity_regularizer=regularizers.l1(1e-5),
              kernel_regularizer=regularizers.l1(1e-5)))
    model.add(layers.Dense(1, activation='sigmoid'))

    model.compile(optimizer=keras.optimizers.SGD(learning_rate=1e-4),
                  loss=keras.losses.BinaryCrossentropy(),
                  metrics=tf.metrics.BinaryAccuracy())

    return model

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import regularizers

In [None]:
root = os.getcwd()
save_path = root + '/Models/S2_nonmatch/cnn_cv_checkpoints'

In [None]:
tuner = kt.Hyperband(model_builder,
                     objective='val_binary_accuracy',
                     max_epochs=5,
                     factor=3, 
                     directory=save_path,
                     project_name='cnn_cv')

INFO:tensorflow:Reloading Oracle from existing project /content/drive/.shortcut-targets-by-id/1-A7MSZ6V8eP_xZaDJ_j7yN9ddVL8-jgm/Group 4/Models/S2_nonmatch/cnn_cv_checkpoints/cnn_cv/oracle.json
INFO:tensorflow:Reloading Tuner from /content/drive/.shortcut-targets-by-id/1-A7MSZ6V8eP_xZaDJ_j7yN9ddVL8-jgm/Group 4/Models/S2_nonmatch/cnn_cv_checkpoints/cnn_cv/tuner0.json


In [None]:
stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_binary_loss', patience=5)

Function to list patient names

In [None]:
def list_patient_names(root):

    patient_list = sorted([item for item in os.listdir(root) 
                           if os.path.isdir(os.path.join(root, item))],
                           key = lambda x: x[4:])
    
    return patient_list

Function to list trial files

In [None]:
def list_trial_files(root, patient_name):
    
    patient_folder = root + patient_name + "/"
    
    trial_names = sorted([f for f in os.listdir(patient_folder) 
                          if os.path.isfile(os.path.join(patient_folder, f))])
    
    return trial_names

Function to read in patient name, patient type, trial number, stimulus type, sensor name

In [None]:
def parse_dat(root, patient_name, trial_file):
    
    txtfile = root + patient_name + "/" + trial_file
    
    # Read in the entire data file
    lines = []

    with gzip.open(txtfile,'rt') as f:
        for line in f:
            lines.append(line)
            
    # Get the patient name
    patient_name = lines[0].split(' ')[1][:-4]
    
    # Get patient type (alcoholic or control)
    patient_type = patient_name[3]
    
    # Get trial number
    trial_num = trial_file[-6:-3]
    
    # Get stimulus type
    stim_type = lines[3].split(' ')[1] + ' ' + lines[3].split(' ')[2]
    
    # Get sensor names
    readings = lines[4:]
    headers = [r for r in readings if r[0]=='#']
    sensor_names = [h.split(' ')[1] for h in headers]
        
    # Get sensor readings for all 64 detectors for all 256 timestamps
    data = [r for r in readings if r[0]!='#']
    
    data_mat = np.zeros((64, 256))
    
    for i in range(64):
        for j in range(256):
            data_mat[i, j] = float(data[i*256+j].split(' ')[3][:-1])
            
    return patient_name, patient_type, trial_num, stim_type, sensor_names, data_mat

Load in all the data

In [None]:
data_path = root + '/eeg_full/'

In [None]:
df = pd.read_csv(data_path+'S2_nonmatch_patient_summary.csv')
hf = h5py.File(data_path+'S2_nonmatch.h5', 'r')

In [None]:
df['Type'] = df['Type'].replace({'c':0, 'a':1})

In [None]:
dat_tensor = np.zeros((len(df), 64, 256))

In [None]:
dat_targets = np.zeros(len(df))

In [None]:
for i in range(len(df)):
    
    pat = df.iloc[i]['Patient']
    tri = '%03d'%df.iloc[i]['Trial']
    
    fname = pat+'.rd.'+tri
    
    dat = np.array(hf.get(fname))
    
    dat_tensor[i] = dat
    dat_targets[i] = df.iloc[i]['Type']

Split training and testing data

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
train_tensor, test_tensor, train_target, test_target = train_test_split(
                                            dat_tensor, dat_targets, 
                                            test_size=0.2, random_state=0)

Construct a neural network

In [None]:
tuner.search(train_tensor, train_target, epochs=5, 
             validation_split=0.2, callbacks=[stop_early])

INFO:tensorflow:Oracle triggered exit


In [None]:
best_hps=tuner.get_best_hyperparameters(num_trials=1)[0]

In [None]:
hyperparams = np.array([best_hps.get('units_1'),
                        best_hps.get('units_2')])

In [None]:
hyperparams

array([29, 13])

In [None]:
hyper_path = root + '/Models/S2_nonmatch/cnn_hyperparams.npy'
np.save(hyper_path, hyperparams)