In [None]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt

from tensorflow import keras
from tensorflow.keras import applications

from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score

In [None]:
filename = 'eeg-eye-state_csv.csv'
df = pd.read_csv(filename)
print(df.shape)
df.head()

In [None]:
plt.plot(df.Class)

In [None]:
total_duration = 117 #s
sampling_rate = df.shape[0] / total_duration
sampling_rate

In [None]:
df.Class.value_counts()

In [None]:
def window_distribute(_df, window_size=10):
    grp = _df.Class.groupby(_df.Class.diff().ne(0).cumsum())

    indices = pd.Index([], dtype=pd.Int64Dtype)
    for g in grp.groups.values():
        chunk_size = int(g.size / window_size) * window_size
        indices = indices.append( g[:chunk_size] )
    
    return _df.iloc[indices]

In [None]:
def generate_sequence(_df):
    X = []
    y = []
    for start in range(0, df_w.shape[0], window_size):
        _df = df_w.iloc[start:(start+window_size)]
        y += _df.iloc[:1].Class.tolist()
        X += [ _df.drop(columns=['Class']).values ]

    y = np.array(y) - 1 # To use 0 and 1 classes
    X = np.array(X)

    return X, y

In [None]:
window_size = 14 # Use 1 for simpler models
df_w = window_distribute(df, window_size)
df_w

In [None]:
X, y = generate_sequence(df_w)
X.shape, y.shape

In [None]:
scaler = preprocessing.StandardScaler() # MinMaxScaler gave terrible results

# Scale and reshape
shape = X.shape
X_res = X.reshape(X.shape[0], -1)
X_sc = scaler.fit_transform(X_res)
X_sc = X_sc.reshape(shape)

# Pad the edges with 0
pad_size = (32 - window_size) // 2 # Use 0 padding for simpler models
pad_sizes = (pad_size, pad_size)
X_pad = np.pad(X_sc, ((0, 0), pad_sizes, pad_sizes), constant_values=1)

# Repeat the last dimension for 3 channels
X_rep = np.repeat(X_pad[..., np.newaxis], 3, axis=-1)
X_rep.shape

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_rep, y, shuffle=True, test_size=0.25)
print(X_train.shape, X_test.shape)

In [None]:
def show_samples(X, y, label):
    fig, axes = plt.subplots(1, 5, figsize=(24, 24))
    sample_indices = np.where(y == label)[0][:5]
    for i, idx in enumerate(sample_indices):
        axes[i].imshow(X[idx],)
    
    plt.show()

In [None]:
show_samples(X_sc, y, 0)

In [None]:
show_samples(X_sc, y, 1)

In [None]:
show_samples(X_train, y_train, 0)

In [None]:
show_samples(X_train, y_train, 1)

In [None]:
def report_metrics(model, X_, y_):
    y_pred = model.predict(X_)
    
    if hasattr(model, 'predict_proba'):
        y_proba = model.predict_proba(X_)
    else:
        y_proba = y_pred
        y_pred = y_pred >= 0.5

    acc = accuracy_score(y_, y_pred)
    f1 = f1_score(y_, y_pred)
    roc_auc = roc_auc_score(y_, y_proba[:, -1])

    print('Accuracy:', acc)
    print('F1:', f1)
    print('ROC AUC:', roc_auc)

In [None]:
# Network
def build_model(input_shape, freeze_weight=False, weights='imagenet'):
    inputs = keras.layers.Input(input_shape)

    base_model = applications.MobileNetV3Large(include_top=False, input_tensor=inputs, weights=weights)

    if freeze_weight and (weights is not None):
        print('Pre-trained will be frozen!')
        for layer in base_model.layers:
            layer.trainable=False

    pooling = keras.layers.GlobalAveragePooling2D() (base_model.output)
    outputs = keras.layers.Dense(1, activation='sigmoid') (pooling)

    model = keras.Model(inputs=base_model.input, outputs=outputs)
    return model

In [None]:
model = build_model(input_shape=X_train.shape[1:], freeze_weight=True)
model.summary()

opt = keras.optimizers.Adam(learning_rate=1e-4)
model.compile(optimizer=opt, loss=['binary_crossentropy'], metrics=['accuracy'])

In [None]:
epochs = 5000
h = model.fit(X_train, y_train, epochs=epochs, 
            validation_split=0.3)

In [None]:
plt.plot(h.history['accuracy'], label='Train Acc')
plt.plot(h.history['val_accuracy'], label='Val Acc')

In [None]:
plt.plot(h.history['accuracy'], label='Train Acc')
plt.plot(h.history['val_accuracy'], label='Val Acc')

In [None]:
plt.plot(h.history['accuracy'], label='Train Acc')
plt.plot(h.history['val_accuracy'], label='Val Acc')

In [None]:
report_metrics(model, X_train, y_train)

In [None]:
report_metrics(model, X_test, y_test)

### Simpler Models

In [None]:
model_ = keras.Sequential()
model_.add( keras.layers.Dense(64, input_shape=(14, ), activation='relu') )
# model_.add( keras.layers.Dense(32, activation='relu') )
model_.add( keras.layers.Dense(1, activation='sigmoid') )

model_.summary()

In [None]:
opt = keras.optimizers.Adam(learning_rate=1e-3)
model_.compile(optimizer=opt, loss=['binary_crossentropy'], metrics=['accuracy'])

In [None]:
X_train[:, 0, :, 0].shape

In [None]:
h = model_.fit(X_train[:, 0, :, 0], y_train, epochs=100, validation_split=0.3, shuffle=True)

In [None]:
plt.plot(h.history['accuracy'], label='Train Acc')
plt.plot(h.history['val_accuracy'], label='Val Acc')

In [None]:
report_metrics(model_, X_train[:, 0, :, 0], y_train)

In [None]:
report_metrics(model_, X_test[:, 0, :, 0], y_test)

In [None]:
from sklearn.ensemble import RandomForestClassifier

In [None]:
clf = RandomForestClassifier(random_state=32)

In [None]:
clf.fit(X_train[:, 0, :, 0], y_train)

In [None]:
report_metrics(clf, X_train[:, 0, :, 0], y_train)

In [None]:
report_metrics(clf, X_test[:, 0, :, 0], y_test)