##### Mount Google Drive (Optional)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

##### Import Libraries

In [None]:
from tensorflow.keras import layers, optimizers, losses, metrics, regularizers, callbacks
from keras.models import Model
import numpy as np

##### Import Data

In [None]:
path = 'data path'
x_train = np.load(path + 'x_train.npy')
y_train = np.load(path + 'y_train.npy')
x_test  = np.load(path + 'x_test.npy')
y_test  = np.load(path + 'y_test.npy')

x_train = x_train.transpose(0, 2, 1)
x_test  = x_test.transpose(0, 2, 1)

x_train = x_train.reshape(19634, 12, 1000, 1)
x_test  = x_test.reshape(2203, 12, 1000, 1)

print("x_train :", x_train.shape)
print("y_train :", y_train.shape)
print("x_test  :", x_test.shape)
print("y_test  :", y_test.shape)
print('Data loaded')

In [None]:
# Model 1
X_in = layers.Input(shape=(12, 1000, 1))

X = layers.Conv2D(filters=32, kernel_size=(1, 5))(X_in)
X = layers.BatchNormalization()(X)
X = layers.ReLU()(X)
X = layers.MaxPooling2D(pool_size=(1, 2), strides=1)(X)

convC1 = layers.Conv2D(filters=64, kernel_size=(1, 7))(X)

X = layers.Conv2D(filters=32, kernel_size=(1, 5))(X)
X = layers.BatchNormalization()(X)
X = layers.ReLU()(X)
X = layers.MaxPooling2D(pool_size=(1, 4), strides=1)(X)

convC2 = layers.Conv2D(filters=64, kernel_size=(1, 6))(convC1)

X = layers.Conv2D(filters=64, kernel_size=(1, 5))(X)
X = layers.BatchNormalization()(X)
X = layers.Add()([convC2, X])             # skip Connection
X = layers.ReLU()(X)
X = layers.MaxPooling2D(pool_size=(1, 2), strides=1)(X)

convE1 = layers.Conv2D(filters=32, kernel_size=(1, 4))(X)

X = layers.Conv2D(filters=64, kernel_size=(1, 3))(X)
X = layers.BatchNormalization()(X)
X = layers.ReLU()(X)
X = layers.MaxPooling2D(pool_size=(1, 4), strides=1)(X)

convE2 = layers.Conv2D(filters=64, kernel_size=(1, 5))(convE1)

X = layers.Conv2D(filters=64, kernel_size=(1, 3))(X)
X = layers.BatchNormalization()(X)
X = layers.Add()([convE2, X])             # skip Connection
X = layers.ReLU()(X)
X = layers.MaxPooling2D(pool_size=(1, 2), strides=1)(X)
print('Added 5 layers for temporal analysis')

X = layers.Conv2D(filters=64, kernel_size=(12, 1))(X)
X = layers.BatchNormalization()(X)
X = layers.ReLU()(X)
X = layers.GlobalAveragePooling2D()(X)
print('Added 1 layer for spatial analysis')

X = layers.Flatten()(X)

X = layers.Dense(units=128, kernel_regularizer=regularizers.L2(0.005))(X)
X = layers.BatchNormalization()(X)
X = layers.ReLU()(X)

Added 5 layers for temporal analysis
Added 1 layer for spatial analysis


In [None]:
# Model 2
in_shape = (19634, 12, 1000, 1)                   # x_train.shape
filters  = [32, 32, 64, 64, 64, 64, 128, 128]     # number of filters
k_size   = [7,  5,  5,  5,  5,  3,  3,  3]        # kernel size
stride   = [1,  1,  1,  1,  1,  1,  1,  1]        # stride size
pool     = [2,  4,  2,  4,  2,  4,  2,  2]        # pool size
unit     = [64, 32]                                
drop     = [0.05, 0.05]

Y_in = layers.Input(shape=(12, 1000, 1))

for i in range(5):
  if i == 0:
    Y = layers.Conv2D(filters=filters[i], kernel_size=(1, k_size[i]), strides=stride[i])(Y_in)
  else:
    Y = layers.Conv2D(filters=filters[i], kernel_size=(1, k_size[i]), strides=stride[i])(Y)
  Y = layers.BatchNormalization()(Y)
  Y = layers.LeakyReLU()(Y)
  Y = layers.MaxPooling2D(pool_size=(1, pool[i]), strides=stride[i])(Y)
print('Added 5 layers for Temporal Analysis')

Y = layers.Conv2D(filters=128, kernel_size=(12, 1), strides=1)(Y)
Y = layers.BatchNormalization()(Y)
Y = layers.LeakyReLU()(Y)
Y = layers.MaxPooling2D(pool_size=(1, 2), strides=1)(Y)
print('Added a layer for Spatial Analysis')

Y = layers.Flatten()(Y)

Y = layers.Dense(units=128, kernel_regularizer=regularizers.l2(0.005))(Y)
Y = layers.BatchNormalization()(Y)
Y = layers.LeakyReLU()(Y)

Bilinear CNN: Concatenate/Multiply

In [None]:
# Source: https://stackoverflow.com/questions/60355085/how-to-merge-two-cnn-models
# Source: https://stackoverflow.com/questions/54959929/concatenate-multiple-cnn-models-in-keras

print('X:', X.shape)
print('Y:', Y.shape)

merged = layers.Concatenate()([X, Y])           # Concatenate() / Multiply() 
print('Merge:', merged.shape)

merged = layers.Dense(units=64, kernel_regularizer=regularizers.L2(0.009))(merged)
merged = layers.BatchNormalization()(merged)
merged = layers.ReLU()(merged) 
merged = layers.Dropout(rate=0.15)(merged)

output = layers.Dense(5, activation='sigmoid')(merged)
model = Model(inputs=[X_in, Y_in], outputs=output)
print(model.summary())



Bilinear CNN: Outer Product

In [None]:
# Outer product
print('X:', X.shape)
print('Y:', Y.shape)

from numpy import newaxis

merged = X[:, :, newaxis] * Y[:, newaxis, :]
print('Merge:', merged.shape)
merged = layers.Flatten()(merged)
print('Merge:', merged.shape)

merged = layers.Dense(units=64, kernel_regularizer=regularizers.L2(0.009))(merged)
merged = layers.BatchNormalization()(merged)
merged = layers.ReLU()(merged) 
merged = layers.Dropout(rate=0.15)(merged)

output = layers.Dense(5, activation='sigmoid')(merged)
model = Model(inputs=[X_in, Y_in], outputs=output)
print(model.summary())

##### Train Model

In [None]:
early    = callbacks.EarlyStopping(monitor="val_loss", patience=6, restore_best_weights=True)
reducelr = callbacks.ReduceLROnPlateau(monitor="val_loss", patience=3)
callback = [early, reducelr]

model.compile(optimizer = optimizers.Adam(learning_rate=0.0005), 
              loss = losses.BinaryCrossentropy(),
              metrics = [metrics.BinaryAccuracy(), metrics.AUC(curve='ROC', multi_label=True)])

history = model.fit([x_train, x_train], y_train, validation_split=0.12, epochs=20, batch_size=64, callbacks=callback)

##### Save Model

In [None]:
save_path = 'save path'
model.save(save_path + "Bilinear-<model_name>.h5")

##### Evaluate Model

In [None]:
print(model.evaluate([x_test, x_test], y_test))

In [None]:
from sklearn.metrics import precision_recall_curve, f1_score, roc_auc_score, accuracy_score, auc 


def sklearn_metrics(y_true, y_pred):
    y_bin = np.copy(y_pred)
    y_bin[y_bin >= 0.5] = 1
    y_bin[y_bin < 0.5]  = 0

    # Compute area under precision-Recall curve
    auc_sum = 0
    for i in range(5):
      precision, recall, thresholds = precision_recall_curve(y_true[:, i], y_pred[:,i])
      auc_sum += auc(recall, precision) 

    print("Accuracy        : {:.2f}".format(accuracy_score(y_true.flatten(), y_bin.flatten())* 100))
    print("Macro AUC score : {:.2f}".format(roc_auc_score(y_true, y_pred, average='macro') * 100))
    print('AUPRC           : {:.2f}'.format((auc_sum / 5) * 100))
    print("Micro F1 score  : {:.2f}".format(f1_score(y_true, y_bin, average='micro') * 100))

In [None]:
y_pred_train = model.predict([x_train, x_train])
y_pred_test  = model.predict([x_test, x_test])

In [None]:
print("Train")
sklearn_metrics(y_train, y_pred_train)
print("\nTest")
sklearn_metrics(y_test, y_pred_test)