In [None]:
!pip install spkit
!pip install scikeras

In [None]:
import os
import sys

import scipy.io
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import math
from scipy.interpolate import griddata

from numpy import genfromtxt
import csv
import spkit as sp

import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.layers import Reshape, LSTM
from sklearn.model_selection import train_test_split
from tensorflow.keras.callbacks import EarlyStopping

from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report, \
    confusion_matrix, balanced_accuracy_score, precision_recall_curve, matthews_corrcoef, roc_curve, jaccard_score, \
    hamming_loss, fbeta_score, precision_recall_fscore_support, zero_one_loss, average_precision_score
from sklearn.metrics import cohen_kappa_score, roc_auc_score, mean_squared_error, auc
import seaborn as sns

from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import KFold

# from keras.wrappers.scikit_learn import KerasClassifier
from scikeras.wrappers import KerasClassifier

from keras.optimizers import Adam

# VISUALIZE DATA SAM40

Tập dữ liệu này trình bày một tập hợp dữ liệu điện não đồ (EEG) được ghi lại từ 40 đối tượng (nữ: 14, nam: 26, tuổi trung bình: 21,5 tuổi). Thí nghiệm chủ yếu được thực hiện để theo dõi mức độ căng thẳng ngắn hạn gây ra ở một cá nhân trong khi thực hiện các nhiệm vụ khác nhau như kiểm tra từ màu Stroop, giải các câu hỏi số học, xác định hình ảnh phản chiếu đối xứng và trạng thái thư giãn. Các nhiệm vụ riêng lẻ được thực hiện trong 25 giây và ba thử nghiệm được ghi lại cho từng nhiệm vụ riêng lẻ. Điện não đồ được ghi lại từ 32 kênh (cộng với tham chiếu CMS/DRL) với dải động +/- 4,12 mV, với độ phân giải 0,51 µV/bit và phạm vi 14 bit. Điện não đồ được lấy mẫu ở 128 SPS (1024 Hz)

In [None]:
# Load data from .mat file
mat_data_sam40 = scipy.io.loadmat('/content/drive/MyDrive/Colab_Notebooks/Thesis/Data/SAM_40/raw_data/Arithmetic_sub_10_trial1.mat')
print('mat_data: ', mat_data_sam40, '\nlen mat_data: ',  len(mat_data_sam40))
print(mat_data_sam40['Data'])
print(len(mat_data_sam40['Data']))
print(mat_data_sam40['Data'].shape)

In [None]:
# Convert data to pandas DataFrame
df = pd.DataFrame(mat_data_sam40['Data'])
df.info()
df.head()

In [None]:
# Load data from .mat file
mat_data_sam40 = scipy.io.loadmat('/content/drive/MyDrive/Colab_Notebooks/Thesis/Data/SAM_40/raw_data/Arithmetic_sub_10_trial1.mat')

# Convert data to pandas DataFrame and transpose it
df = pd.DataFrame(mat_data_sam40['Data']).T

# Create time stamps based on the sampling frequency (128 Hz)
time_stamps = pd.date_range(start='1/1/2000', periods=df.shape[0], freq='7.8125ms')

# Plot each feature as a separate subplot
fig, axes = plt.subplots(nrows=df.shape[1], ncols=1, figsize=(15, 50))
for i, ax in enumerate(axes):
    ax.plot(time_stamps, df.iloc[:,i])
    ax.set_ylabel('Feature {}'.format(i+1))

# Set x-axis label to time and adjust the layout
axes[-1].set_xlabel('Time')
plt.tight_layout()
plt.show()

# AZIMUTHAL EQUIDISTANT PROJECTION 

In [None]:

alpha = (8,13)
beta = (12,30)
gamma = (30, 45)

def get_fft(snippet):
    Fs = 128;  # sampling rate
    #Ts = len(snippet)/Fs/Fs; # sampling interval
    snippet_time = len(snippet)/Fs
    Ts = 1.0/Fs; # sampling interval
    t = np.arange(0,snippet_time,Ts) # time vector

    # ff = 5;   # frequency of the signal
    # y = np.sin(2*np.pi*ff*t)
    y = snippet
#     print('Ts: ',Ts)
#     print(t)
#     print(y.shape)
    n = len(y) # length of the signal
    k = np.arange(n)
    T = n/Fs
    frq = k/T # two sides frequency range
    frq = frq[range(n//2)] # one side frequency range

    Y = np.fft.fft(y)/n # fft computing and normalization
    Y = Y[range(n//2)]
    #Added in: (To remove bias.)
    #Y[0] = 0
    return frq,abs(Y)
#f,Y = get_fft(np.hanning(len(snippet))*snippet)

In [None]:
# Calculate the average power in the alpha, beta, and gamma bands
def theta_alpha_beta_averages(f,Y):
    gamma_range = (30, 45)
    alpha_range = (8, 12)
    beta_range = (12, 30)


    gamma = Y[(f>gamma_range[0]) & (f<=gamma_range[1])].mean()
    alpha = Y[(f>alpha_range[0]) & (f<=alpha_range[1])].mean()
    beta = Y[(f>beta_range[0]) & (f<=beta_range[1])].mean()
    return gamma, alpha, beta

In [None]:
# Transform Cartesian coordinates to spherical
def cart2sph(x, y, z):
    x2_y2 = x ** 2 + y ** 2
    r = math.sqrt(x2_y2 + z ** 2)  # r
    elev = math.atan2(z, math.sqrt(x2_y2))  # Elevation
    az = math.atan2(y, x)  # Azimuth
    return r, elev, az

# Transform polar coordinates to Cartesian
def pol2cart(theta, rho):
    return rho * math.cos(theta), rho * math.sin(theta)

In [None]:
def make_steps(samples,frame_duration,overlap):
    '''
    in:
    samples - number of samples in the session
    frame_duration - frame duration in seconds
    overlap - float fraction of frame to overlap in range (0,1)

    out: list of tuple ranges
    '''
    #steps = np.arange(0,len(df),frame_length)
    Fs = 128
    i = 0
    intervals = []
    samples_per_frame = Fs * frame_duration
    while i+samples_per_frame <= samples:
        intervals.append((i,i+samples_per_frame))
        i = i + samples_per_frame - int(samples_per_frame*overlap)
    return intervals

In [None]:
def make_frames(df,frame_duration):
    '''
    in: dataframe or array with all channels, frame duration in seconds
    out: array of theta, alpha, beta averages for each probe for each time step
        shape: (n-frames,m-probes,k-brainwave bands)
    '''
    Fs = 128.0
    frame_length = Fs*frame_duration
    frames = []
    steps = make_steps(len(df),frame_duration,overlap)
    for i,_ in enumerate(steps):
        frame = []
        if i == 0:
            continue
        else:
            for channel in df.columns:
                snippet = np.array(df.loc[steps[i][0]:steps[i][1],int(channel)])
                f,Y =  get_fft(snippet)
                gamma, alpha, beta = theta_alpha_beta_averages(f,Y)
                frame.append([gamma, alpha, beta])

        frames.append(frame)
    return np.array(frames)

In [None]:
def azim_proj(pos):
    """
    Computes the Azimuthal Equidistant Projection of input point in 3D Cartesian Coordinates.
    Imagine a plane being placed against (tangent to) a globe. If
    a light source inside the globe projects the graticule onto
    the plane the result would be a planar, or azimuthal, map
    projection.

    :param pos: position in 3D Cartesian coordinates
    :return: projected coordinates using Azimuthal Equidistant Projection
    """
    [r, elev, az] = cart2sph(pos[0], pos[1], pos[2])
    return pol2cart(az, math.pi / 2 - elev)

In [None]:
def gen_images(locs, features, n_gridpoints, normalize=True,
               augment=False, pca=False, std_mult=0.1, n_components=2, edgeless=False):
    """
    Generates EEG images given electrode locations in 2D space and multiple feature values for each electrode

    :param locs: An array with shape [n_electrodes, 2] containing X, Y
                        coordinates for each electrode.
    :param features: Feature matrix as [n_samples, n_features]
                                Features are as columns.
                                Features corresponding to each frequency band are concatenated.
                                (alpha1, alpha2, ..., beta1, beta2,...)
    :param n_gridpoints: Number of pixels in the output images
    :param normalize:   Flag for whether to normalize each band over all samples
    :param augment:     Flag for generating augmented images
    :param pca:         Flag for PCA based data augmentation
    :param std_mult     Multiplier for std of added noise
    :param n_components: Number of components in PCA to retain for augmentation
    :param edgeless:    If True generates edgeless images by adding artificial channels
                        at four corners of the image with value = 0 (default=False).
    :return:            Tensor of size [samples, colors, W, H] containing generated
                        images.
    """
    feat_array_temp = []
    nElectrodes = locs.shape[0]     # Number of electrodes
    print("nElectrodes: ",nElectrodes)
    print("feature.shape: ",features.shape)
    # Test whether the feature vector length is divisible by number of electrodes
    assert features.shape[1] % nElectrodes == 0
    n_colors = features.shape[1] // nElectrodes
    for c in range(int(n_colors)):
        feat_array_temp.append(features[:, c * nElectrodes : nElectrodes * (c+1)])
    if augment:
        if pca:
            for c in range(n_colors):
                feat_array_temp[c] = augment_EEG(feat_array_temp[c], std_mult, pca=True, n_components=n_components)
        else:
            for c in range(n_colors):
                feat_array_temp[c] = augment_EEG(feat_array_temp[c], std_mult, pca=False, n_components=n_components)
    nSamples = features.shape[0]
    # Interpolate the values
    grid_x, grid_y = np.mgrid[
                     min(locs[:, 0]):max(locs[:, 0]):n_gridpoints*1j,
                     min(locs[:, 1]):max(locs[:, 1]):n_gridpoints*1j
                     ]
    temp_interp = []
    for c in range(n_colors):
        temp_interp.append(np.zeros([nSamples, n_gridpoints, n_gridpoints]))
    # Generate edgeless images
    if edgeless:
        min_x, min_y = np.min(locs, axis=0)
        max_x, max_y = np.max(locs, axis=0)
        locs = np.append(locs, np.array([[min_x, min_y], [min_x, max_y],[max_x, min_y],[max_x, max_y]]),axis=0)
        for c in range(n_colors):
            feat_array_temp[c] = np.append(feat_array_temp[c], np.zeros((nSamples, 4)), axis=1)
    # Interpolating
    for i in range(nSamples):
        for c in range(n_colors):
            temp_interp[c][i, :, :] = griddata(locs, feat_array_temp[c][i, :], (grid_x, grid_y),
                                    method='cubic', fill_value=np.nan)
        print('Interpolating {0}/{1}\r'.format(i+1, nSamples), end='\r')
    # Normalizing
    for c in range(n_colors):
        if normalize:
            temp_interp[c][~np.isnan(temp_interp[c])] = scale(temp_interp[c][~np.isnan(temp_interp[c])])
        temp_interp[c] = np.nan_to_num(temp_interp[c])
    return np.swapaxes(np.asarray(temp_interp), 0, 1)     # swap axes to have [samples, colors, W, H]

In [None]:
def make_data_pipeline(file_names,labels,image_size,frame_duration,overlap):
    '''
    IN:
    file_names - list of strings for each input file (one for each subject)
    labels - list of labels for each
    image_size - int size of output images in form (x, x)
    frame_duration - time length of each frame (seconds)
    overlap - float fraction of frame to overlap in range (0,1)

    OUT:
    X: np array of frames (unshuffled)
    y: np array of label for each frame (1 or 0)
    '''
    ##################################
    ###Still need to do the overlap###!!!
    ##################################

    Fs = 128   #sampling rate
    frame_length = Fs * frame_duration

    print('Generating training data...')


    for i, file in enumerate(file_names):
        print ('Processing session: ',file, '. (',i+1,' of ',len(file_names),')')
        data = genfromtxt(file, delimiter=',').T
        # df = pd.DataFrame(data)
        df = pd.DataFrame(data[:, 1:])

        X_0 = make_frames(df,frame_duration)
        #steps = np.arange(0,len(df),frame_length)
        X_1 = X_0.reshape(len(X_0),32*3)

        images = gen_images(np.array(locs_2d),X_1, image_size, normalize=False)
        images = np.swapaxes(images, 1, 3)
        print(len(images), ' frames generated with label ', labels[i], '.')
        print('\n')
        if i == 0:
            X = images
            y = np.ones(len(images))*labels[0]
        else:
            X = np.concatenate((X,images),axis = 0)
            y = np.concatenate((y,np.ones(len(images))*labels[i]),axis = 0)


    return X, np.array(y)