In [1]:
#  _____ __  __ _   _           _   _ _   _
# |  ___|  \/  | | | |         | \ | | \ | |
# | |_  | |\/| | | | |  _____  |  \| |  \| |
# |  _| | |  | | |_| | |_____| | |\  | |\  |
# |_|   |_|  |_|\___/          |_| \_|_| \_|
#
"""
 @authors: Matteo Larcher
"""


# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# import libraries
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

import numpy as np
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping
from plotly import graph_objects as go
from plotly.subplots import make_subplots
import os

# custom libraries
from FMU_layer import *

# Load the TensorBoard notebook extension
%load_ext tensorboard

2024-11-02 23:48:24.043532: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Configs
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

# set keras float precision
tf.keras.backend.set_floatx('float64')

# set eager execution
tf.config.run_functions_eagerly(False)

# set random seeds
tf.keras.utils.set_random_seed(123)

# set the path of the FMU model
dirname = os.getcwd()
fmu_path = os.path.join(dirname, "fmu_model", "xy_model_om_dd_par.fmu")

# simulation parameters
start_time = 0.0
stop_time = 20.0
step_size = 0.1
t_vect = np.arange(start_time, stop_time, step_size)

In [3]:
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Test FMU Layer
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

start_values = {"x":0.0, "y":0.0}
parameters = {"const.k": 1.0, "custom_parameter1.p": 1.0}
learnable_parameters = {}
input_x = np.cos(t_vect).astype(tf.keras.backend.floatx()).reshape(1, -1)
input_y = np.sin(t_vect).astype(tf.keras.backend.floatx()).reshape(1, -1)
input_data = tf.Variable(tf.stack([input_x, input_y], axis=2), dtype=tf.keras.backend.floatx(), trainable=True)

fmu_layer = FMULayer(
    fmu_path,
    start_time,
    start_values,
    parameters,
    learnable_parameters,
    step_size,
    return_sequences=True,
    return_state=False,
    stateful=True,
    name="test_fmu_layer",
    do_step_in_gradient=True,
)

output = fmu_layer(input_data)

# Plot the results
fig = make_subplots(rows=2, cols=1)
fig.add_trace(go.Scatter(x=t_vect, y=input_x[0], name="Input x", mode="lines"), row=1, col=1)
fig.add_trace(go.Scatter(x=t_vect, y=input_y[0], name="Input y", mode="lines"), row=1, col=1)
fig.add_trace(go.Scatter(x=t_vect, y=output[0][:,0], name="z", mode="lines"), row=2, col=1)
fig.update_layout(title="FMU Layer Test", xaxis_title="Time [s]", yaxis_title="Value")
fig.show()


In [4]:
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Test the layer gradient
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

with tf.GradientTape() as tape:
    tape.watch(input_data)
    outputs = fmu_layer(input_data)
    # gather gradients
    gradients = tape.gradient(outputs, input_data)

# Plot the gradients
fig = make_subplots(rows=1, cols=1)
fig.add_trace(go.Scatter(x=t_vect, y=outputs[0][:,0], name="z", mode="lines"), row=1, col=1)
fig.add_trace(go.Scatter(x=t_vect, y=input_x[0], name="x", mode="lines"), row=1, col=1)
fig.add_trace(go.Scatter(x=t_vect, y=input_y[0], name="y", mode="lines"), row=1, col=1)
fig.add_trace(go.Scatter(x=t_vect, y=gradients[0][:,0], name="dz/dx", mode="lines", line=dict(dash='dot')), row=1, col=1)
fig.add_trace(go.Scatter(x=t_vect, y=gradients[0][:,1], name="dz/dy", mode="lines", line=dict(dash='dot')), row=1, col=1)

fig.update_layout(title="FMU Layer Gradient Test", xaxis_title="Time [s]", yaxis_title="Value")
fig.show()


In [5]:
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Model definition
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

def get_model(fmu_path, start_time, start_values, parameters, step_size):
    x_in = tf.keras.layers.Input(shape=(None, 1), batch_size=1, name="x_in")
    y_in = tf.keras.layers.Input(shape=(None, 1), batch_size=1, name="y_in")
    inputs = tf.keras.layers.Concatenate(axis=2, name="inputs")([x_in, y_in])

    # Add a dense layer before the FMULayer
    dense_1 = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(2), name="dense_before")(inputs)

    z_out = FMULayer(
        fmu_path,
        start_time,
        start_values,
        parameters,
        learnable_parameters,
        step_size,
        do_step_in_gradient=False,
        return_sequences=True,
        return_state=False,
        stateful=True,
        name="xy_fmu",
    )(dense_1)

    # custom layer that performs (in1*in2)+parameters[0]
    # z_out = tf.keras.layers.Lambda(lambda x: x[:,:,0]*x[:,:,1]+1.0, name="custom_layer", output_shape=(None,1))(dense_1)

    # custom layer that extracts the first output of the FMULayer
    z_out_1 = tf.keras.layers.Lambda(lambda x: x[:,:,0], name="z_out_1", output_shape=(None,1))(z_out)

    # Add a dense layer after the FMULayer
    dense_2 = tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(1), name="dense_after")(z_out_1)

    model = tf.keras.Model(inputs=[x_in, y_in], outputs=dense_2)
    return model

# create the model with the FMU layer
model = get_model(fmu_path, start_time, start_values, parameters, step_size)

model.summary()

# compile the model
model.compile(optimizer=tf.keras.optimizers.Nadam(learning_rate=0.1), loss='mse')

In [6]:
# create the input data
input_x = np.cos(t_vect).reshape(1, -1, 1)
input_y = np.sin(t_vect).reshape(1, -1, 1)
input_data = [input_x, input_y]

In [7]:
# predict
z = model.predict(input_data)
# target
target = 4*((2*input_x+4*input_y+1) * (3*input_y-2*input_x-3)) + 2

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 11s/step


In [8]:
# Plot the results
fig = go.Figure()

# Add traces for z and target
fig.add_trace(go.Scatter(x=t_vect, y=z.reshape(-1), mode='lines', name='z'))
fig.add_trace(go.Scatter(x=t_vect, y=target[0, :, 0], mode='lines', name='target', line=dict(dash='dash')))

# Update layout
fig.update_layout(
    title="Model Prediction vs Target",
    xaxis_title="Time [s]",
    yaxis_title="z",
    legend=dict(x=0, y=1, traceorder="normal"),
)

# Show the figure
fig.show()

In [9]:
# TensorBoard callback for live plotting
log_dir = os.path.join(dirname, "logs", "fit", "xy_model")
tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=10)

# custom TensorBoard callback
class CustomTensorBoard(tf.keras.callbacks.TensorBoard):
    def __init__(self, *args, **kwargs):
        super(CustomTensorBoard, self).__init__(*args, **kwargs)

    def on_epoch_end(self, epoch, logs=None):
        if epoch % 5 == 0:  # Generate images every 5 epochs
            # plot model inputs, outputs and target
            fig = go.Figure()
            fig.add_trace(go.Scatter(x=t_vect, y=model.predict(input_data).reshape(-1), mode='lines', name='z'))
            fig.add_trace(go.Scatter(x=t_vect, y=target[0, :, 0], mode='lines', name='target', line=dict(dash='dash')))
            fig.update_layout(title="Model Prediction vs Target", xaxis_title="Time [s]", yaxis_title="z")

            # add the plot to TensorBoard
            writer = tf.summary.create_file_writer(log_dir)
            with writer.as_default():
                img = tf.image.decode_image(fig.to_image(format="png"), channels=4)
                img = tf.expand_dims(img, 0)  # add batch dimension
                tf.summary.image(name='model', data=img, step=epoch)

        # call the original on_epoch_end method
        super(CustomTensorBoard, self).on_epoch_end(epoch, logs)

# Set the TensorBoard port
os.environ['TENSORBOARD_PORT'] = '6006'

In [10]:
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# fit the model
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

early_stopping = EarlyStopping(monitor='loss', patience=10, restore_best_weights=True)
history = model.fit(input_data, target.reshape(1, -1, 1), epochs=100, batch_size=1, shuffle=False, callbacks=[early_stopping, tensorboard_callback, CustomTensorBoard()])

Epoch 1/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 147ms/step loss: 1769.38
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m35s[0m 35s/step - loss: 1769.3878
Epoch 2/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 388ms/step - loss: 1752.2965
Epoch 3/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 379ms/step - loss: 1734.0905
Epoch 4/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 376ms/step - loss: 1712.3171
Epoch 5/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 377ms/step - loss: 1685.6411
Epoch 6/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 151ms/step - loss: 1652.97
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 704ms/step - loss: 1652.9773
Epoch 7/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 379ms/step - loss: 1613.3927
Epoch 8/100
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 448ms/step - loss: 1566.1265
E

In [11]:
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Plot the training history
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

fig = go.Figure()
fig.add_trace(go.Scatter(x=history.epoch, y=history.history['loss'], mode='lines', name='loss'))
fig.update_layout(title="Training History", xaxis_title="Epoch", yaxis_title="Loss")
fig.show()

In [12]:
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Predict and plot the results of the trained model
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -

z = model.predict(input_data)

# Create a figure
fig = go.Figure()

# Add traces for z and target
fig.add_trace(go.Scatter(x=t_vect, y=z.reshape(-1), mode='lines', name='z'))
fig.add_trace(go.Scatter(x=t_vect, y=target[0, :, 0], mode='lines', name='target', line=dict(dash='dash')))

# Update layout
fig.update_layout(
    title="Model Prediction vs Target",
    xaxis_title="Time [s]",
    yaxis_title="z",
    legend=dict(x=0, y=1, traceorder="normal"),
)

# Show the figure
fig.show()


[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 148ms/step


In [13]:
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Export the model to ONNX
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
