# Thermal Neural Networks (Tensorflow example)

This jupyter notebook showcases how to utilize a [thermal neural network (TNN)](https://www.sciencedirect.com/science/article/pii/S0952197622005279) on an exemplary data set with the [Tensorflow](https://www.tensorflow.org/) framework.

This example is concise for the sake of comprehensibility, that is, no cross-validation with a validation set is conducted, e.g., for early stopping, no learning rate scheduling, no repeated experiments with different random number generator seeds, etc.

Feel free to build and expand your training pipeline on top of this example.

The data set can be downloaded from [Kaggle](https://www.kaggle.com/wkirgsn/electric-motor-temperature).
It should be placed in `data/input/`.

In [6]:
from pathlib import Path
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt

import tensorflow as tf

## Data setup

In [7]:
path_to_csv = Path().cwd() / "data" / "input" / "measures_v2.csv"
data = pd.read_csv(path_to_csv)
target_cols = ["pm", "stator_yoke", "stator_tooth", "stator_winding"]

temperature_cols = target_cols + ["ambient", "coolant"]
test_profiles = [60, 62, 74]
train_profiles = [p for p in data.profile_id.unique() if p not in test_profiles]
profile_sizes = data.groupby("profile_id").agg("size")

# normalize
non_temperature_cols = [c for c in data if c not in temperature_cols + ["profile_id"]]
data.loc[:, temperature_cols] /= 200  # deg C
data.loc[:, non_temperature_cols] /= data.loc[:, non_temperature_cols].abs().max(axis=0)

# extra feats (FE)
if {"i_d", "i_q", "u_d", "u_q"}.issubset(set(data.columns.tolist())):
    extra_feats = {
        "i_s": lambda x: np.sqrt((x["i_d"] ** 2 + x["i_q"] ** 2)),
        "u_s": lambda x: np.sqrt((x["u_d"] ** 2 + x["u_q"] ** 2)),
    }
data = data.assign(**extra_feats)
input_cols = [c for c in data.columns if c not in target_cols]


In [8]:
# Rearrange features
input_cols = [c for c in data.columns if c not in target_cols + ["profile_id"]]
data = data.loc[:, input_cols + ["profile_id"] + target_cols]


def generate_tensor(profiles_list):
    """Returns profiles of the data set in a coherent 3D tensor with
    time-major shape (T, B, F) where
    T : Maximum profile length
    B : Batch size = Amount of profiles
    F : Amount of input features.

    Also returns a likewise-shaped sample_weights tensor, which zeros out post-padded zeros for use
    in the cost function (i.e., it acts as masking tensor)"""

    tensor = np.full(
        (profile_sizes[profiles_list].max(), len(profiles_list), data.shape[1] - 1),
        np.nan,
    )
    for i, (pid, df) in enumerate(
        data.loc[data.profile_id.isin(profiles_list), :].groupby("profile_id")
    ):
        assert pid in profiles_list, f"PID is not in {profiles_list}!"
        tensor[: len(df), i, :] = df.drop(columns="profile_id").to_numpy()
    sample_weights = 1 - np.isnan(tensor[:, :, 0])
    tensor = np.nan_to_num(tensor).astype(np.float32)
    tensor = tf.convert_to_tensor(tensor)
    sample_weights = tf.convert_to_tensor(sample_weights)
    return tensor, sample_weights


train_tensor, train_sample_weights = generate_tensor(train_profiles)
test_tensor, test_sample_weights = generate_tensor(test_profiles)

## Model declaration

In [9]:
# working with RNNs in tensorflow:
#  https://www.tensorflow.org/guide/keras/working_with_rnns


class TNNCell(tf.keras.layers.Layer):
    """The main TNN logic. Here, the sub-NNs are initialized as well as the constant learnable
    thermal capacitances. The forward function houses the LPTN ODE discretized with the explicit Euler method
    """

    def __init__(self):
        super().__init__()
        self.sample_time = 0.5  # in s
        self.state_size = len(target_cols)

    def build(self, input_shape):
        # log inverse capacitances
        #   hand-picked init mean, might be application-dependent
        self.caps = self.add_weight(
            shape=(input_shape[-1], self.state_size),
            initializer=tf.keras.initializers.RandomNormal(mean=-9.2, stddev=0.5),
        )

        n_temps = len(temperature_cols)  # number of temperatures (targets and input)
        n_conds = int(0.5 * n_temps * (n_temps - 1))  # number of thermal conductances
        # conductance net sub-NN
        self.conductance_net = tf.keras.models.Sequential(
            tf.keras.layers.Dense(
                (input_shape[-1] + self.state_size, n_conds), activation="sigmoid"
            )
        )
        # populate adjacency matrix. It is used for indexing the conductance sub-NN output
        self.adj_mat = np.zeros((n_temps, n_temps), dtype=int)
        adj_idx_arr = np.ones_like(self.adj_mat)
        triu_idx = np.triu_indices(n_temps, 1)
        adj_idx_arr = adj_idx_arr[triu_idx].ravel()
        self.adj_mat[triu_idx] = np.cumsum(adj_idx_arr) - 1
        self.adj_mat += self.adj_mat.T
        self.adj_mat = tf.convert_to_tensor(
            self.adj_mat[: self.state_size, :], dtype=tf.int64
        )
        self.n_temps = n_temps

        # power loss sub-NN
        self.ploss = tf.keras.models.Sequential(
            tf.keras.layers.Dense(
                (len(input_cols) + self.state_size, 16), activation="tanh"
            ),
            tf.keras.layers.Dense((16, self.state_size)),
        )

        self.temp_idcs = [i for i, x in enumerate(input_cols) if x in temperature_cols]
        self.nontemp_idcs = [
            i
            for i, x in enumerate(input_cols)
            if x not in temperature_cols + ["profile_id"]
        ]

    def get_initial_state(inputs=None, batch_size=None, dtype=None):
        """This function should be fed with the target tensor instead of the input tensor"""
        return inputs[0, :, :]

    def call(self, input_at_t, states_at_t):
        output_at_t = states_at_t
        temps = tf.concat([output_at_t, input_at_t[:, self.temp_idcs]], axis=1)
        sub_nn_inp = tf.concat([input_at_t, output_at_t], axis=1)
        conducts = tf.abs(self.conductance_net(sub_nn_inp))
        power_loss = tf.abs(self.ploss(sub_nn_inp))
        heat_transfer_from_temp_diffs = tf.math.reduce_sum(
            (tf.expand_dims(temps, axis=1) - tf.expand_dims(output_at_t, axis=-1))
            * conducts[:, self.adj_mat],
            axis=-1,
        )
        states_at_t_plus_1 = output_at_t + self.sample_time * tf.exp(self.caps) * (
            heat_transfer_from_temp_diffs + power_loss
        )
        return output_at_t, tf.clip(states_at_t_plus_1, -1, 5)

## Training

In [13]:
cell = TNNCell()
model = tf.keras.layers.RNN(cell, return_sequences=True, time_major=True)


In [18]:
loss_func = tf.keras.losses.MeanSquaredError()
opt = tf.keras.optimizers.Nadam(learning_rate=1e-3, clipvalue=1.0)
#model.compile(optimizer=opt, loss=loss_func)
n_epochs = 100
tbptt_size = 32  # 512

n_batches = np.ceil(train_tensor.shape[0] / tbptt_size).astype(int)
with tqdm(desc="Training", total=n_epochs) as pbar:
    for epoch in range(n_epochs):
        # first state is ground truth temperature data
        hidden = train_tensor[0, :, -len(target_cols) :]

        # propagate batch-wise through data set
        for i in range(n_batches):
            with tf.GradientTape() as tape:
                output, hidden = model(
                    inputs=train_tensor[
                        i * tbptt_size : (i + 1) * tbptt_size, :, : len(input_cols)
                    ],
                    initial_state=(tf.stop_gradient(hidden), ),
                )
                loss = loss_func(
                    output,
                    train_tensor[
                        i * tbptt_size : (i + 1) * tbptt_size, :, -len(target_cols) :
                    ],
                    sample_weight=train_sample_weights,
                )
            grads = tape.gradient(loss, model.variables)
            opt.apply_gradients(zip(grads, model.variables))

        pbar.update()
        pbar.set_postfix_str(f"loss: {loss.item():.2e}")

Training:   0%|          | 0/100 [00:00<?, ?it/s]


TypeError: int() argument must be a string, a bytes-like object or a real number, not 'tuple'