<a href="https://colab.research.google.com/github/olivieromassi/Hysteresis-Modelling-with-Neural-Networks/blob/main/ModelTraining.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Necessary imports to configure the environment.

In [None]:
import tensorflow as tf

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import os

Getting the Dataset from the Github repository:

In [None]:
!wget https://github.com/olivieromassi/Hysteresis-Modelling-with-Neural-Networks/raw/main/Dataset/P2_measurements.zip
!unzip P2_measurements.zip -d P2_measurements

The Hysteresis Dataset is downloaded from the Github repository and stored into a dictionary.

In [None]:
cwd = "/content/P2_measurements"

# Creating a dictionary containing all the measurements divided by frequency
dataset = {}
columns = ["t", "B", "H", "-H"] # t: timestamp, B: Magnetic Flux Density, H: Magnetic Field 

for file in os.listdir(cwd):
    temp_data = pd.read_csv(os.path.join(cwd, file),sep=';', header=None, names=columns)
    dataset[file] = temp_data

# dRNN Model Training:

This notebook contains the preliminary tests: different strategies to train the dRNN are here explored, trying to understand how to proceed.

First, the model is trained over a single the Hysteresis loop measured for an input at a given frequency.

In [None]:
# The loop corresponding to a triangular input at 1Hz is loaded
train_df = dataset['P2_1Hz.CSV'][['H', 'B']]

# The training set is normalized
train_mean = train_df.mean()
train_std = train_df.std()

train_df = (train_df - train_mean) / train_std

The WindowGenerator Class can:

1. Handle the indexes and offsets.
2. Split windows of features into (features, labels) pairs.
3. Efficiently generate batches of these windows from the training, evaluation, and test data, using `tf.data.Datasets`.

The following code is inspired from https://www.tensorflow.org/tutorials/structured_data/time_series

In [None]:
class WindowGenerator():
    def __init__(self, input_width, label_width, shift,
               train_df=train_df, val_df=None, test_df=None,
               label_columns=None, feature_columns=None):
        # Store the raw data.
        self.train_df = train_df
        self.val_df = val_df
        self.test_df = test_df

        # Work out the label column indices.
        self.label_columns = label_columns
        if label_columns is not None:
            self.label_columns_indices = {name: i for i, name in
                                        enumerate(label_columns)}
        self.feature_columns = feature_columns
        self.column_indices = {name: i for i, name in
                           enumerate(train_df.columns)}

        # Work out the window parameters.
        self.input_width = input_width
        self.label_width = label_width
        self.shift = shift

        self.total_window_size = input_width + shift

        self.input_slice = slice(0, input_width)
        self.input_indices = np.arange(self.total_window_size)[self.input_slice]

        self.label_start = self.total_window_size - self.label_width
        self.labels_slice = slice(self.label_start, None)
        self.label_indices = np.arange(self.total_window_size)[self.labels_slice]

    def __repr__(self):
        return '\n'.join([
            f'Total window size: {self.total_window_size}',
            f'Input indices: {self.input_indices}',
            f'Label indices: {self.label_indices}',
            f'Label column name(s): {self.label_columns}'])
        
    def split_window(self, features):
        inputs = features[:, self.input_slice, :]
        labels = features[:, self.labels_slice, :]
        if self.label_columns is not None:
            labels = tf.stack(
                [labels[:, :, self.column_indices[name]] for name in self.label_columns],
                axis=-1)
        # This customization allows to select the features to use as input, by properly setting
        # the feature_columns argument   
        if self.label_columns is not None:
            inputs = tf.stack(
                [inputs[:, :, self.column_indices[name]] for name in self.feature_columns],
                axis=-1)
        # Slicing doesn't preserve static shape information, so set the shapes
        # manually. This way the `tf.data.Datasets` are easier to inspect.
        inputs.set_shape([None, self.input_width, None])
        labels.set_shape([None, self.label_width, None])

        return inputs, labels

    def make_dataset(self, data):
        data = np.array(data, dtype=np.float32)
        ds = tf.keras.preprocessing.timeseries_dataset_from_array(
                data=data,
                targets=None,
                sequence_length=self.total_window_size,
                sequence_stride=1,
                shuffle=False,
                batch_size=20,)

        ds = ds.map(self.split_window)

        return ds

In [None]:
# Instantiating a window class
wide_window = WindowGenerator(
    input_width=32, label_width=1, shift=0,
    label_columns=['B'], feature_columns=['H'])

wide_window

In [None]:
# Building the tf.data.Dataset
train_data = wide_window.make_dataset(train_df)

In [None]:
# Instantiating an iterator over the training dataset
iterator = iter(train_data)  

In [None]:
features, labels = next(iterator)
print("Features batch:" + str(features))
print("Labels batch:" + str(labels))

In [None]:
class DiagonalRNNCell(tf.keras.layers.SimpleRNNCell):
    def __init__(self, **kwargs):
        super(DiagonalRNNCell, self).__init__(recurrent_initializer='glorot_uniform',
                                              **kwargs)
    
    # Overriding the build method 
    def build(self, input_shape):
        super(DiagonalRNNCell, self).build(input_shape)

        # Override only the recurrent kernel 
        self.recurrent_kernel = self.add_weight(
            shape=(self.units,),
            name='recurrent_kernel',
            initializer=self.recurrent_initializer,
            regularizer=self.recurrent_regularizer,
            constraint=self.recurrent_constraint)

    # Overriding the call method    
    def call(self, inputs, states, training=None):
        prev_output = states[0] if tf.nest.is_nested(states) else states
        dp_mask = self.get_dropout_mask_for_cell(inputs, training)
        rec_dp_mask = self.get_recurrent_dropout_mask_for_cell(
            prev_output, training)

        if dp_mask is not None:
            h = tf.keras.backend.dot(inputs * dp_mask, self.kernel)
        else:
            h = tf.keras.backend.dot(inputs, self.kernel)
        if self.bias is not None:
            h = tf.keras.backend.bias_add(h, self.bias)

        if rec_dp_mask is not None:
            prev_output = prev_output * rec_dp_mask
        # The Tensor product becomes a vector element-wise product
        output = h + prev_output * self.recurrent_kernel
        if self.activation is not None:
            output = self.activation(output)

        new_state = [output] if tf.nest.is_nested(states) else output
        return output, new_state

In [None]:
# Example: building a dRNN using the custom cell

cell = DiagonalRNNCell(units=256)

inputs = tf.keras.layers.Input(shape=(20, 1,), batch_size=20)
layer = tf.keras.layers.RNN(cell, stateful=True)(inputs) # To have cross-batch statefulness
outputs = tf.keras.layers.Dense(1, activation='linear')(layer)


model = tf.keras.Model(inputs=inputs, outputs=outputs, name='dRNN')
model.summary()

tf.keras.utils.plot_model(model, show_shapes=True)

In [None]:
model.compile(loss=tf.keras.losses.MSE, optimizer=tf.keras.optimizers.Adam(), metrics=[tf.metrics.MeanAbsoluteError()])

In [None]:
# Training the model for 10 epochs, resetting the states after each epoch.
for i in range(10):
    model.fit(train_data, epochs=1, verbose=2, batch_size=20)
    model.reset_states()