## Load/import packages

In [1]:
import json
import scipy
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import tensorflow.keras.backend as K

from tensorflow.keras import Sequential, layers
from tensorflow.keras.losses import CategoricalCrossentropy
from sklearn.utils import class_weight

# Import modules to run custom FW-RNN cell
from tensorflow.python.keras.layers.recurrent import (
    _generate_zero_filled_state_for_cell,
    _generate_zero_filled_state,
    ops,
    tensor_shape,
    activations,
    initializers,
    regularizers,
    nest,
    array_ops,
)

# Import variables and functions from my own scripts
from functions import plot_history, arr_replacevalue
from load_features import (
    train_features_AW2,
    val_features_AW2,
    train_labels_AW2,
    val_labels_AW2,
    labels_reshaper,
    features_reshaper,
)

%matplotlib inline

# Limit GPU memory usage
for device in tf.config.experimental.list_physical_devices("GPU"):
    tf.config.experimental.set_memory_growth(device, True)

# Prepare data

In [2]:
# Reshape data to specified sequence length
length = 60
seq_train_features = features_reshaper(train_features_AW2, length) # divisible 13, 39, 197
seq_val_features = features_reshaper(val_features_AW2, length)

seq_train_labels = labels_reshaper(train_labels_AW2, length)
seq_val_labels = labels_reshaper(val_labels_AW2, length)

In [3]:
def comp_sampleweights(labels):
    # Convert one-hot encoded labels back to label integers
    train_label_ints = np.argmax(labels, axis=2)

    # Compute class weights with sklearn
    class_weights = class_weight.compute_class_weight(
        "balanced", np.unique(train_label_ints), train_label_ints.flatten()
    )
    d_class_weights = dict(enumerate(class_weights))

    # Pass a 2D array with shape (samples, sequence_length), to apply a different weight to every timestep of every sample
    return arr_replacevalue(train_label_ints, d_class_weights)
    
train_samples_weights = comp_sampleweights(seq_train_labels) 



# Build FW-RNN model
-  Build custom FW_RNN cell and wrap it in RNN layer (https://www.tensorflow.org/api_docs/python/tf/keras/layers/RNN), like this: RNN(FW_RNN)
    -  "The cell abstraction, together with the generic keras.layers.RNN class, make it very easy to implement custom RNN architectures for your research."

Created by using this guide: https://www.tensorflow.org/guide/keras/custom_layers_and_models

In [13]:
# Build model with sequential api
def build_FWRNN(batch, units, activation_function):
    class FW_RNNCell(layers.Layer):
        def __init__(
            self,
            units,
            use_bias,
            batch_size,
            decay_rate,
            learning_rate,
            activation,
            step,
            LN = layers.LayerNormalization(),
            **kwargs

        ):
            super(FW_RNNCell, self).__init__(**kwargs)
            self.units = units
            self.step = step
            self.use_bias = use_bias
            self.activation = activations.get(activation)
            self.l = decay_rate
            self.e = learning_rate
            self.LN = LN


            self.batch = batch_size
            self.state_size = self.units

            # Initializer & regularizer for the slow input-to-hidden weights matrix
            self.C_initializer = initializers.get("glorot_uniform")

            # Initializer & regularizer for the slow hidden weights matrix
            self.W_h_initializer = initializers.get("identity")

            # Initializer & regularizer for the fast weights matrix
            self.A_initializer = initializers.get("zeros")

            # Initializer for the bias vector.
            self.b_x_initializer = initializers.get("zeros")


        def build(self, input_shape):
            # Build is only called at the start, to initialize all the weights and biases

            # C = Slow input-to-hidden weights [shape (4608, 64)]
            self.C = self.add_weight(
                shape=(input_shape[-1], self.units),
                name="inputweights",
                initializer=self.C_initializer,
            )

            # W_h The previous hidden state via the slow transition weights [shape (units, units)]
            # they suggest to multiply it with 0.05, so gain = 0.05
            self.W_h = self.add_weight(
                shape=(self.units, self.units),
                name="hiddenweights",
                initializer=self.W_h_initializer, 
            )
            self.W_h = tf.scalar_mul(0.05, self.W_h)

            # A (fast weights) [shape (batch_size, units, units)]
            self.A = self.add_weight(
                shape=(self.batch, self.units, self.units),
                name="fastweights",
                initializer=self.A_initializer,
            )

            if self.use_bias:
                self.bias = self.add_weight(
                    shape=(self.units,), name="bias", initializer=self.b_x_initializer,
                )
            else:
                self.bias = None
            self.built = True

        def call(self, inputs, states, training=None):
            prev_output = states[0] if nest.is_sequence(states) else states

            # Next hidden state h(t+1) is computed in two steps:
            # Step 1 calculate preliminary vector: h_0(t+1) = f(W_h ⋅ h(t) + C ⋅ x(t))
            h = K.dot(prev_output, self.W_h) + K.dot(inputs, self.C)
            if self.bias is not None:
                h = h + self.bias
            if self.activation is not None:
                h = self.activation(h)

            # Reshape h to use with a
            h_s = tf.reshape(h, [self.batch, 1, self.units])

            # Define preliminary vector in variable
            prelim = tf.reshape(K.dot(prev_output, self.W_h), (h_s.shape)) + tf.reshape(K.dot(inputs, self.C), (h_s.shape))

            # Fast weights update rule: A(t) = λ*A(t-1) + η*h(t) ⋅ h(t)^T
            self.A.assign(
                tf.math.add(
                    tf.scalar_mul(self.l, self.A),
                    tf.scalar_mul(
                        self.e, tf.linalg.matmul(tf.transpose(h_s, [0, 2, 1]), h_s)
                    ),
                )
            )

            # Step 2: Initiate inner loop with preliminary vector, which runs for S steps
            # to progressively change the hidden state into h(t+1) = h_s(t+1)
            # h_s+1(t+1) f([W_h ⋅ h(t) + C ⋅ x(t)]) + A(t)h_s(t+1)
            for _ in range(self.step):
                h_s = tf.math.add(prelim, tf.linalg.matmul(h_s, self.A)) 
    #             if self.bias is not None:
    #                 h_s = h_s + self.bias
                if self.activation is not None:
                    h_s = self.activation(h_s)

                # Apply layer normalization on hidden state
                h_s = self.LN(h_s)

            h = tf.reshape(h_s, [self.batch, self.units])

            output = h
            new_state = [output] if nest.is_sequence(states) else output
            return output, new_state

        def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
            return _generate_zero_filled_state_for_cell(self, inputs, batch_size, dtype)
    
    # Define model
    model = Sequential(name="FW-RNN")
    model.add(tf.keras.Input(shape=(seq_train_features.shape[1], seq_train_features.shape[2])))
    model.add(
        layers.RNN(
            FW_RNNCell(units=units, 
                       use_bias=True, 
                       activation=activation_function, 
                       step=1,
                       decay_rate = 0.95,
                       learning_rate = 0.5,
                       batch_size=batch,
                   ),
            return_sequences=True,
            name = 'FW-RNN'
        )
    )
    model.add(layers.Dense(7, activation="softmax", name="Dense_Output"))
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
        loss=CategoricalCrossentropy(label_smoothing=0.1),
        metrics=["accuracy", 'AUC'],
        run_eagerly=False,
    )
    return model

In [9]:
# Build baseline model (RNN or LSTM) with sequential api
def build_base(model, units):
    model = Sequential(name=model)
    model.add(tf.keras.Input(shape=(seq_train_features.shape[1], seq_train_features.shape[2])))
    if model == "RNN":
        model.add(layers.SimpleRNN(units, return_sequences=True, name="RNN layer"))
    elif model == "LSTM":
        model.add(layers.LSTM(units, return_sequences=True, name="LSTM layer"))
    model.add(layers.LayerNormalization())
    model.add(layers.Dense(7, activation="softmax", name="Dense_Output"))
    model.compile(
        optimizer="adagrad",
        loss=CategoricalCrossentropy(label_smoothing=0.1),
        metrics=["accuracy", "AUC"],
        run_eagerly=False,
    )
    return model


In [None]:
batchsize = 32
# disivible length of train and val features by batchsize
train_div = (seq_train_features.shape[0] // batchsize) * batchsize
val_div = (seq_val_features.shape[0] // batchsize) * batchsize


for num_units in [8, 16, 32, 64, 128, 256]:
    for model in ["RNN", "LSTM", "FW-RNN"]:
        # Access tensorboard in cmd of the main repo folder with following code:
        # tensorboard --logdir='logs/'
        name = f"{model}_{num_units}units"
        tb_callback = tf.keras.callbacks.TensorBoard(
            log_dir=f"logs/models_with_extractedfeatures_vgg19block5_adapthist_grayscale/{name}"
        )

        # Set callbacks for model training
        csvlog = tf.keras.callbacks.CSVLogger(
            f"data/models/{model}_{num_units}units.csv", separator=",", append=False,
        )
        if model == "RNN":
            NN = build_base(model, num_units)
        elif model == "LSTM":
            NN = build_base(model, num_units)
        elif model == "FWRNN":
            NN = build_FWRNN(batchsize, num_units, "relu")

        NN.summary()
        history = NN.fit(
            seq_train_features[:train_div],
            seq_train_labels[:train_div],
            batch_size=batchsize,
            sample_weight=train_samples_weights[:train_div],
            validation_data=(seq_val_features[:val_div], seq_val_labels[:val_div],),
            callbacks=[csvlog, tb_callback],
            epochs=150,
            verbose=2,
            shuffle=True,
        )
        # Plot model
        tf.keras.utils.plot_model(
            NN,
            to_file=f"data/model_architectures/models_with_extractedfeatures_vgg19block5_adapthist_grayscale/{model}_{num_units}units_architecture.png",
            show_shapes=True,
            show_dtype=True,
            show_layer_names=True,
            rankdir="LR",
            expand_nested=False,
            dpi=96,
        )

        # Save model
        tf.keras.Model.save(
            NN,
            filepath=f"data/models/models_with_extractedfeatures_vgg19block5_adapthist_grayscale/{model}_{num_units}units.h5",
        )