In [1]:
import os
import pandas as pd
import numpy as np

In [2]:
from autokeras.keras_layers import ExpandLastDim
from autokeras.keras_layers import CastToFloat32
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, GlobalMaxPooling2D, GlobalAveragePooling2D, LSTM, SimpleRNN
from tensorflow.keras.layers import InputLayer, Dense, Flatten, Dropout
from tensorflow.keras.callbacks import EarlyStopping

from tensorflow.keras.layers.experimental.preprocessing import RandomTranslation, RandomFlip
from tensorflow.keras.layers.experimental.preprocessing import Resizing
from tensorflow.keras.layers import Concatenate
from tensorflow.keras.applications import EfficientNetB7

import warnings
warnings.filterwarnings(action='ignore')

import tensorflow as tf
tf.get_logger().setLevel('ERROR')
gpu_devices = tf.config.experimental.list_physical_devices("GPU")
for device in gpu_devices:
    tf.config.experimental.set_memory_growth(device, True)

# Make Input

In [37]:
def calculateWeight(tlx):
    tlx_weight = {'Mental':[0], 
                  'Physical':[0], 
                  'Temporal':[0], 
                  'Effort':[0],
                  'Performance':[0],
                  'Frustration':[0],
                  'Sum':[0]}
    tlx_weight = pd.DataFrame(tlx_weight)
    for i in range(len(tlx)):
        score = [0,0,0,0,0,0,0]
        for col1 in range(1,len(tlx.columns)):
            for col2 in range(col1+1, len(tlx.columns)):
                if tlx[tlx.columns[col1]][i] > tlx[tlx.columns[col2]][i]:
                    score[col1-1]+=1
                elif tlx[tlx.columns[col1]][i] < tlx[tlx.columns[col2]][i]:
                    score[col2-1]+=1
                else :
                    score[col1-1]+=0.5
                    score[col2-1]+=0.5
                    
        score[6] = score[0]+score[1]+score[2]+score[3]+score[4]+score[5]
        tlx_weight.loc[i]=score
    #print(tlx_weight.loc[0])
    return tlx_weight

def calculate_tlxLevel(tlx, tlx_weight):
    result = {'Mental':[0], 
                  'Physical':[0], 
                  'Temporal':[0], 
                  'Effort':[0],
                  'Performance':[0],
                  'Frustration':[0],
                  'Score':[0]}
    result = pd.DataFrame(result)
    for i in range(len(tlx)):
        score = [0,0,0,0,0,0,0]
        for col in range(len(tlx_weight.columns)-1):
            score[col] = int(tlx[tlx.columns[col+1]].loc[i] * tlx_weight[tlx_weight.columns[col]].loc[i] )
        score[6] =int((score[0]+score[1]+score[2]+score[3]+score[4]+score[5] )/ tlx_weight[tlx_weight.columns[6]].loc[i]/10)
        if score[6]>10: score[6]=10
        if score[6]<0: score[6]=0
        result.loc[i]=score
    return result['Score']


In [38]:
def txtToDataframe_STEW(filename, flag1, flag2):
    file = open(filename, 'r')
    lines = file.readlines()
    datas = []
    for line in lines:
        txt = line.replace('   ', ' ').lstrip().rstrip().replace(' ', ',')
        data = txt.split(',')
        datas.append(data)
    df = pd.DataFrame(datas)
    df.columns = ['AF3', 'F7', 'F3', 'FC5', 'T7', 'O1', 'O2', 'P8', 'T8', 'FC6', 'F4', 'F8', 'F8', 'AF4']
    df['label1']=flag1
    df['label2']=flag2
    return df


def getSTEWData(src) :
    file_list = os.listdir(src)
    
    rating = pd.DataFrame(pd.read_csv(src+'ratings.txt'))
    file_list.remove('ratings.txt')
    
    dataList=[]
    highList=[]
    lowList=[]
    
    print(rating.columns)
    j=0
    for i in rating['subject']:
        if i<10:
            num = str(0)+str(i)
        else:
            num = str(i)
        dataList.append(txtToDataframe_STEW(src+'sub'+num+'_hi.txt', 1,rating['test'][j]))
        dataList.append(txtToDataframe_STEW(src+'sub'+num+'_lo.txt', 0,rating['rest'][j]))
        highList.append(txtToDataframe_STEW(src+'sub'+num+'_hi.txt', 1,rating['test'][j]))
        lowList.append(txtToDataframe_STEW(src+'sub'+num+'_lo.txt', 0,rating['rest'][j]))
        j+=1
    return dataList, highList, lowList


In [34]:
src = './STEW Dataset/'
originalData, highData, lowData = getSTEWData(src)

mergedData = pd.concat([highData[0],highData[1]],ignore_index=True)
for i in range(2,len(highData)):
    mergedData = pd.concat([mergedData,highData[i]],ignore_index=True)
mergedData = mergedData.apply(pd.to_numeric)

label2=mergedData['label2']
data2=mergedData.drop(['label1','label2'],axis=1)

Index(['subject', 'rest', 'test'], dtype='object')


In [35]:
src = 'EEG data/User/'
files = os.listdir(src)

tlx=[]
for f in files:
    print(src+f)
    tlx.append(pd.read_csv(src+f))

EEG data/User/김석연.csv
EEG data/User/김예진.csv
EEG data/User/문예완.csv
EEG data/User/박기웅.csv
EEG data/User/박혜연.csv
EEG data/User/유상봉.csv
EEG data/User/윤찬영.csv
EEG data/User/임수빈.csv
EEG data/User/정찬영.csv
EEG data/User/홍혜인.csv


In [39]:
workloadLevel = []
for t in tlx:
    workloadLevel.append(calculate_tlxLevel(t, calculateWeight(t)))

In [None]:
src = 'EEG data/prepross/'
datas = os.listdir(src)

eegData=[]
for d in datas:
    print(src+d)
    eegData.append(pd.read_csv(src+d))

EEG data/prepross/김석연.csv
EEG data/prepross/김예진.csv
EEG data/prepross/문예완.csv
EEG data/prepross/박기웅.csv
EEG data/prepross/박혜연.csv
EEG data/prepross/유상봉.csv


In [None]:
def split_data(df, label, end):
    col = ['EEG.AF3', 'EEG.F7', 'EEG.F3', 'EEG.FC5', 'EEG.T7',
           'EEG.P7', 'EEG.O1', 'EEG.O2', 'EEG.P8', 'EEG.T8', 'EEG.FC6', 'EEG.F4',
           'EEG.F8', 'EEG.AF4', 'MarkerValueInt']
    col_rename = ['AF3', 'F7', 'F3', 'FC5', 'T7',
           'P7', 'O1', 'O2', 'P8', 'T8', 'FC6', 'F4',
           'F8', 'AF4', 'vis_name']
    data_extraction = df[col]
    data_extraction.columns = col_rename
    rest = data_extraction[data_extraction.vis_name == 0]
    survey = data_extraction[data_extraction.vis_name == 100]
    
    vis = data_extraction[data_extraction.vis_name == 1].reset_index(drop=True)
    vis['label'] = label[0]
    vis.drop(['vis_name'], axis=1, inplace=True)
    
    for i in range(2,end):
        df = data_extraction[data_extraction.vis_name == i].reset_index(drop=True)
        df['label'] = label[i-1]
        df.drop(['vis_name'], axis=1, inplace=True)
        vis = pd.concat([vis,df], ignore_index=True, axis=0)
    return rest, survey, vis

cnt = 0
for eeg, label in zip (eegData, workloadLevel):
    print(cnt,datas[cnt])
    if cnt == 0 :
        rest, survey, vis = split_data(eeg, label,21)
        cnt+=1
        continue
    elif cnt == 6:
        r, s, v = split_data(eeg, label,21)
    else:
        r, s, v = split_data(eeg, label, 22)
    
    rest = pd.concat([rest,r], ignore_index=True, axis=0)
    survey = pd.concat([survey,s], ignore_index=True, axis=0)
    vis = pd.concat([vis,v], ignore_index=True, axis=0)
    cnt+=1

In [None]:
label=vis['label']
data=vis.drop(['label'],axis=1)

In [None]:
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
scaled = scaler.fit_transform(data)
data = pd.DataFrame(scaled, columns = data.columns, index=data.index)
data

In [None]:
scaler = MinMaxScaler()
scaled = scaler.fit_transform(data2)
data2 = pd.DataFrame(scaled, columns = data2.columns, index=data2.index)
data2

In [None]:
def windowing_dataset(data, label, window_size):
    data_list = []
    label_list = []
    
    for i in range(0,len(data)//window_size,window_size):
        data_list.append(np.array(data.iloc[i:i+window_size]))
        label_list.append(np.array(label.iloc[i]))
    return np.array(data_list), np.array(label_list)

X_train, y_train = windowing_dataset(data,label,14)
X_test, y_test = windowing_dataset(data2,label2,14)
print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

In [None]:
label

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

label = to_categorical(label,10)
label2 = to_categorical(label2,10)

X_train, y_train = windowing_dataset(data,label,14)
x, y = windowing_dataset(data2,label2,14)
X_valid, X_test, y_valid, y_test = train_test_split(x,y, train_size=0.5, 
                                                    random_state=True,
                                                    stratify = y)

print(X_train.shape, X_test.shape, X_valid.shape,y_valid.shape,y_train.shape, y_test.shape)

In [None]:
def Model2_2_1():
    model = Sequential()
    model.add(InputLayer(input_shape=(14, 14)))
    model.add(ExpandLastDim())
    model.add(CastToFloat32())
    
    model.add(Conv2D(512, kernel_size=(4,4), strides=(1,1), activation='relu', padding='valid'))
    model.add(Conv2D(512, kernel_size=(4,4), strides=(1,1), activation='relu', padding='valid'))
    model.add(MaxPooling2D(2,2))
    model.add(Dropout(0.25))
    
    model.add(Flatten())
    model.add(Dropout(0.25))
    
    model.add(Dense(10, activation='sigmoid'))
    
    #model.summary()
    return model

def Model2_2_2():
    model = Sequential()
    model.add(InputLayer(input_shape=(14, 14)))
    model.add(ExpandLastDim())
    model.add(CastToFloat32())
    
    model.add(Conv2D(512, kernel_size=(2,2), strides=(1,1), activation='relu', padding='valid'))
    model.add(Conv2D(512, kernel_size=(2,2), strides=(1,1), activation='relu', padding='valid'))
    model.add(MaxPooling2D(2,2))
    
    model.add(Conv2D(128, kernel_size=(2,2), strides=(1,1), activation='relu', padding='same'))
    model.add(Conv2D(128, kernel_size=(2,2), strides=(1,1), activation='relu', padding='same'))
    model.add(MaxPooling2D(2,2))
    
    model.add(Conv2D(32, kernel_size=(2,2), strides=(1,1), activation='relu', padding='same'))
    model.add(Conv2D(32, kernel_size=(2,2), strides=(1,1), activation='relu', padding='same'))
    model.add(Dropout(0.25))
    
    model.add(Flatten())
    model.add(Dropout(0.25))
    
    model.add(Dense(50, activation='relu'))
    model.add(Dense(10, activation='sigmoid'))
    
    model.summary()
    return model

def Model2_2_3():
    model = Sequential()
    model.add(InputLayer(input_shape=(14, 14)))
    model.add(ExpandLastDim())
    model.add(CastToFloat32())
    
    model.add(Conv2D(512, kernel_size=(4,4), strides=(1,1), activation='relu', padding='valid'))
    model.add(Conv2D(512, kernel_size=(4,4), strides=(1,1), activation='relu', padding='valid'))
    model.add(MaxPooling2D(2,2))
    
    model.add(Conv2D(128, kernel_size=(4,4), strides=(1,1), activation='relu', padding='same'))
    model.add(Conv2D(128, kernel_size=(4,4), strides=(1,1), activation='relu', padding='same'))
    model.add(MaxPooling2D(2,2))
    
    model.add(Conv2D(32, kernel_size=(4,4), strides=(1,1), activation='relu', padding='same'))
    model.add(Conv2D(32, kernel_size=(4,4), strides=(1,1), activation='relu', padding='same'))
    
    model.add(Flatten())
    model.add(Dense(50, activation='relu'))
    model.add(Dropout(0.25))
    model.add(Dense(10, activation='sigmoid'))
    
    #model.summary()
    return model

In [None]:
from sklearn.metrics import f1_score

def runModel(model, crossentropy, x_train, y_train, x_valid, y_valid):
    early_stop = EarlyStopping(monitor='val_loss', patience=10)
    model.compile(optimizer='adam', loss= crossentropy, metrics='accuracy')

    history = model.fit(x_train, y_train, 
                        epochs = 200, 
                        validation_data = (x_valid, y_valid), 
                        callbacks=[early_stop], verbose=0)
    drawResult(history)
    return model

def eval_model(model, x_test, y_test):
    y_pred = model.predict(x_test)
    return f1_score(y_test.argmax(axis=1), y_pred.argmax(axis=1), average='macro')

In [None]:
crossentropy = 'categorical_crossentropy'

print(eval_model(runModel(Model2_2_1(),crossentropy,
                          X_train, y_train, X_valid, y_valid), X_test, y_test))
print(eval_model(runModel(Model2_2_2(),crossentropy,
                          X_train, y_train, X_valid, y_valid), X_test, y_test))
print(eval_model(runModel(Model2_2_3(),crossentropy,
                          X_train, y_train, X_valid, y_valid), X_test, y_test))