In [1]:

from tensorflow import keras as keras

from drowsiness_detection import config
from drowsiness_detection.data import load_experiment_data
from drowsiness_detection.models import ThreeDStandardScaler
import matplotlib.pyplot as plt
import numpy as np
from contextlib import redirect_stdout
from sklearn.metrics import classification_report
from drowsiness_detection.data import session_type_mapping, label_names_dict

In [None]:
# load data
config.set_paths(30, 60)
X_train, X_test, y_train, y_test, cv_splits = load_experiment_data(
    exclude_by="a",
    num_targets=2,
    seed=45,
    test_size=.2,
    split_by_subjects=True,
    use_dummy_data=False,
    nn_experiment=True,
    feature_col_indices=(5, 8, 9, 14, 15, 16, 19),
    model_name="mvts_transformer")

In [None]:
num_samples = None
X_train = X_train[:num_samples]
X_test = X_test[:num_samples]
y_test = y_test[:num_samples]
y_train = y_train[:num_samples]



In [None]:
scaler = ThreeDStandardScaler(feature_axis=-1)

X_train_scaled = scaler.fit_transform(X_train, y_train)
X_test_scaled = scaler.transform(X_test)

In [None]:
input_shape = (100, 1800, 7)

num_filters = 64,16,4
kernel_sizes =  5, 3, 3
strides = 3,1,1

input_layer = keras.layers.Input(input_shape[1:])
conv_layer1 = keras.layers.Conv1D(filters=num_filters[0], kernel_size=kernel_sizes[0],
                                  strides=strides[0],
                                  padding="same")(input_layer)
conv_layer1 = keras.layers.BatchNormalization()(conv_layer1)
conv_layer1 = keras.layers.ReLU()(conv_layer1)

conv_layer2 = keras.layers.Conv1D(filters=num_filters[1], kernel_size=kernel_sizes[1],
                                  strides=strides[1],
                                  padding="same")(conv_layer1)
conv_layer2 = keras.layers.BatchNormalization()(conv_layer2)
conv_layer2 = keras.layers.ReLU()(conv_layer2)

conv_layer3 = keras.layers.Conv1D(filters=num_filters[2], kernel_size=kernel_sizes[2],
                                  strides=strides[2],
                                  padding="same")(conv_layer2)
conv_layer3 = keras.layers.BatchNormalization()(conv_layer3)
conv_layer3 = keras.layers.ReLU()(conv_layer3)

pool_layer = keras.layers.GlobalMaxPool1D()(conv_layer3)
# pool_layer = keras.layers.GlobalAvgPool1D()(conv_layer3)

dense_layer = keras.layers.Dense(64, activation='relu')(pool_layer)
dropout_layer = keras.layers.Dropout(0.8)(dense_layer)
output_layer = keras.layers.Dense(1, activation="sigmoid")(dropout_layer)

model = keras.models.Model(inputs=input_layer, outputs=output_layer)

optimizer = keras.optimizers.Adam(learning_rate=0.001)
model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
print(model.summary())



In [None]:
history = model.fit(
    x=X_train_scaled,
    y=y_train,
    epochs=3,
    class_weight={0: 0.84, 1: 1.14},
    batch_size=16,
    validation_data=(X_test_scaled, y_test),
    # use_multiprocessing=False,
    # workers=7

)

In [None]:
def plot_history(history):
    # summarize history for accuracy
    plt.plot(history.history['accuracy'])
    plt.plot(history.history['val_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()
    # summarize history for loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

def save_summary(model):
    y_hat = (model.predict(X_test_scaled)>.5)
    acc = np.mean(y_hat == y_test)

    print("mean-prediction", np.mean(y_hat))
    with open("summaries.csv", "a") as fp:
        fp.write(str(round(acc,4)) + "\n")
        with redirect_stdout(fp):
             model.summary()
        fp.write("\n")

    report = classification_report(y_true=y_test, y_pred=y_hat,
                                   target_names=label_names_dict[2])
    print(report)



save_summary(model)
plot_history(history)

In [None]:
from sklearn.metrics import RocCurveDisplay

y_hat = model.predict(X_test_scaled)

RocCurveDisplay.from_predictions( y_pred=y_hat, y_true=y_test, pos_label=1)