In [14]:
import datetime
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Recall, Precision, AUC
from sklearn.metrics import confusion_matrix, recall_score, f1_score, precision_score, roc_auc_score
import seaborn as sns
from matplotlib import pyplot as plt
from hyperopt import hp, fmin, tpe, Trials
from tensorflow.keras import losses
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import TimeSeriesSplit
from sklearn.utils.class_weight import compute_class_weight
np.random.seed(78)

In [2]:
from importlib import reload
import EuroTruck.ProcessEuroTruckData.process_ets_data
reload(EuroTruck.ProcessEuroTruckData.process_ets_data)
from EuroTruck.ProcessEuroTruckData.process_ets_data import data_rq_cc
from EuroTruck.ProcessEuroTruckData.process_ets_data import data_michele_cc
from EuroTruck.ProcessEuroTruckData.process_ets_data import data_sara_cc

In [7]:
def seperate_ground_truth(data, size, step):
    awake_window = []
    light_drowsy_window = []
    drowsy_window = []
    general_window = []
    time = next(iter(data.table))
    while time + datetime.timedelta(seconds=size) <= next(reversed(data.table)):
        awake_count = 0
        light_drowsy_count = 0
        drowsy_count = 0
        for i in range(size):
            check_time = time + datetime.timedelta(seconds=i)
            if check_time not in data.table:
                break
            if data.table[check_time]["groud_truth"] == [1]:
                awake_count += 1
            if data.table[check_time]["groud_truth"] == [2] or data.table[check_time]["groud_truth"] == [3]:
                light_drowsy_count += 1
            if data.table[check_time]["groud_truth"] == [4]:
                drowsy_count += 1

        if awake_count == size:
            general_window.append([time + datetime.timedelta(seconds=i) for i in range(size)])
            awake_window.append([time + datetime.timedelta(seconds=i) for i in range(size)])
        if light_drowsy_count == size:
            general_window.append([time + datetime.timedelta(seconds=i) for i in range(size)])
            light_drowsy_window.append([time + datetime.timedelta(seconds=i) for i in range(size)])
        if drowsy_count == size:
            general_window.append([time + datetime.timedelta(seconds=i) for i in range(size)])
            drowsy_window.append([time + datetime.timedelta(seconds=i) for i in range(size)])

        step_seconds = datetime.timedelta(seconds=step)
        time += step_seconds
    
    # 因为这里awake——window只会用来决定某个窗口的ground truth，所以转换成set可以方便查找    
    awake_window=set([tuple(window) for window in awake_window])
    light_drowsy_window=set([tuple(window) for window in light_drowsy_window])
    drowsy_window=set([tuple(window) for window in drowsy_window])
    
    # drowsy window is a list of list of datetime objects which is the key
    return awake_window, light_drowsy_window, drowsy_window, general_window


def define_feature_matrix(data, awake_window, light_drowsy_window, drowsy_window, general_window,sampling_rate,size):
    if general_window == []:
        return np.empty((0, sampling_rate*size, 4))
    feature_matrix = []
    label = []
    for window in general_window:
        feature_matrix_per_window = []

        SWA_colunm = []
        for time in window:
            SWA_colunm.extend(data.table[time]["SWA_data"])
        feature_matrix_per_window.append(SWA_colunm)

        SWV_column = []
        for time in window:
            SWV_column.extend(data.table[time]["SWV_data"])
        feature_matrix_per_window.append(SWV_column)

        LD_column = []
        for time in window:
            LD_column.extend(data.table[time]["lateral_displacement_data"])
        feature_matrix_per_window.append(LD_column)

        LA_column = []
        for time in window:
            LA_column.extend(data.table[time]["lateral_acceleration_data"])
        feature_matrix_per_window.append(LA_column)

        feature_matrix_per_window = np.transpose(np.array(feature_matrix_per_window))
        feature_matrix.append(feature_matrix_per_window)
        if tuple(window) in awake_window:
            label.append(0)
        if tuple(window) in light_drowsy_window:
            label.append(1)
        if tuple(window) in drowsy_window:
            label.append(2)
        
    feature_matrix = np.array(feature_matrix)
    label = np.array(label)
    return feature_matrix, label


size= 10
step = 1
sample_rate = data_rq_cc.get_min_sampling_rate()

X_rq,y_rq = define_feature_matrix(data_rq_cc, *seperate_ground_truth(data_rq_cc, size, step), sample_rate, size)
X_michele,y_michele = define_feature_matrix(data_michele_cc, *seperate_ground_truth(data_michele_cc, size, step), sample_rate, size)
X_sara,y_sara = define_feature_matrix(data_sara_cc, *seperate_ground_truth(data_sara_cc, size, step), sample_rate, size)
X_train = np.concatenate((X_rq,X_sara),axis=0)
y_train = np.concatenate((y_rq,y_sara),axis=0)
X_rest = X_michele
y_rest = y_michele
X_test=X_rest[:int(X_rest.shape[0]/2)]
y_test=y_rest[:int(y_rest.shape[0]/2)]

X_val=X_rest[int(X_rest.shape[0]/2):]
y_val=y_rest[int(y_rest.shape[0]/2):]

classes = np.unique(y_train)
class_weights = compute_class_weight(class_weight='balanced', classes=classes, y=y_train)
class_weight_dict = dict(zip(classes, class_weights))

In [20]:
def make_model():
    model = tf.keras.models.Sequential([
        tf.keras.layers.Input(shape=(X_train.shape[1], X_train.shape[2])),
        # filters, kernel_size, activation, input_shape
        tf.keras.layers.Conv1D(128, 3, activation=None, kernel_initializer='he_normal'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.MaxPooling1D(3),

        tf.keras.layers.Conv1D(32, 3, activation=None, kernel_initializer='he_normal'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.MaxPooling1D(3),

        tf.keras.layers.Conv1D(32, 3, activation=None, kernel_initializer='he_normal'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.MaxPooling1D(3),

        # tf.keras.layers.Conv1D(32, 3, activation=None),
        # tf.keras.layers.BatchNormalization(),
        # tf.keras.layers.Activation('relu'),
        # tf.keras.layers.MaxPooling1D(2),

        tf.keras.layers.GRU(64),
        # tf.keras.layers.Dropout(0.25),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(2, activation='softmax',kernel_initializer='he_normal')
    ])

    model.compile(optimizer=Adam(learning_rate=0.001), loss=tf.keras.losses.CategoricalFocalCrossentropy(alpha=list(class_weight_dict), gamma=2), metrics=['categorical_accuracy'])
    return model

def confusion_matrix_on_test_data(model,model_name, X_test, y_test):
    labels=[0,1,2]
    y_pred=[]
    for row in model.predict(X_test):
        y_pred.append(np.argmax(row))
    y_pred=np.array(y_pred) 
    cm = confusion_matrix(y_test, y_pred,labels=labels)
    sns.heatmap(cm, annot=True, cmap='Blues', fmt='g', xticklabels=['Class 1', 'Class 2', 'Class 3'], yticklabels=['Class 1', 'Class 2', 'Class 3'])
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title('Confusion Matrix of '+model_name)
    plt.show()

In [21]:
y_train_one_hot = to_categorical(y_train)
y_val_one_hot = to_categorical(y_val)
model=make_model()
model.fit(X_train, y_train_one_hot, epochs=100, batch_size=128, verbose=0,validation_data=(X_val, y_val_one_hot))

<keras.src.callbacks.history.History at 0x354423650>

In [22]:

auc_roc = roc_auc_score(y_val, model.predict(X_val,verbose=0)[:, 1])
print('AUC-ROC:', auc_roc)
# confusion_matrix_on_test_data(model, 'Test Data', X_test, y_test)

AUC-ROC: 0.5177154492905938
