In [3]:
import scipy.optimize
import numpy as np
import tensorflow as tf
import keras
from keras import layers
np.random.seed(2727)
"""
Todo: one class to create the nn, one class called Optimizer that recives a function and 
perfome some minimization algorithm on it and one class called HeatModel which controls
the workflow and the interaction of the other classes.     
"""

In [67]:
class Network:
    def __init__(self, input_dim) -> None:
        self.input_dim = input_dim

    def build(self):
        nn = keras.Sequential(
        [
            layers.Input(shape=(self.input_dim,)),
            layers.Dense(32, activation="tanh", kernel_initializer='he_normal'),
            layers.Dense(32, activation="tanh", kernel_initializer='he_normal'),
            layers.Dense(1),
        ])
        return nn


class Optimizer:
    def __init__(self) -> None:
        pass

class HeatModel:

    """
    
    This class performes PINN-type algorithm to numerically solve 1D heat equation.

    """

    def __init__(self, input_dim) -> None:
        # First try: model represent the core nn.
        self.model = Network(input_dim).build()

        #self.opt = opt

        # x represents the weights of the nn.
        #self.x_train = x_train
        # y represents the outputs of each loss associated to the pde.
        #self.y_train = y_train

    def derivatives(self, z):
        """computes the residue of the pinn loss [u_eqn, u_ini, u_up_bd, u_lower_bd]

        Args:
            z (tf.Tensor): shape=(num_data_sample, 2)

        Returns:
            _type_: _description_
        """
        with tf.GradientTape() as tape2:
            tape2.watch(z)
            with tf.GradientTape() as tape1:
                tape1.watch(z)
                u = self.model(z)
            u_z = tape1.batch_jacobian(u, z)
            u_t = u_z[..., 0]
        u_zz = tape2.batch_jacobian(u_z, z)
        u_xx = u_zz[..., 1, 1]

        return u, u_t, u_xx
    
    @tf.function
    def tf_evaluate(self, x, y):
        with tf.GradientTape() as tape:
            loss = tf.reduce_mean(tf.keras.losses.mse(self.residue(x), y))
        grads = tape.gradient(loss, self.model.trainable_variabels)
        return loss, grads
    

    def callback(self, flat_weights):
        """Called after each iteration.

        Args:
            weights (np.array): current weight.
        """
        shapes = self.model.get_weights()
        # Cumulative sum to get the correct indexes.
        split_ids = np.cumsum([np.prod(shape) for shape in [0] + shapes])
        weights = [flat_weights[from_id:to_id].reshape(shape)
            for from_id, to_id, shape in zip(split_ids[:-1], split_ids[1:], shapes) ]
        self.model.set_weights(weights)

    def fit(self):
        # Flattened weights.
        initial_weight = np.concatenate([w.flatten() for w in self.model.get_weights()])
        scipy.optimize.fmin_l_bfgs_b(func=self.tf_evaluate, x0=initial_weight)


In [69]:
pinn = HeatModel(2)
pinn.model(tf.zeros((10,2)))

<tf.Tensor: shape=(10, 1), dtype=float32, numpy=
array([[0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.],
       [0.]], dtype=float32)>

In [63]:
pinn.derivatives(tf.zeros((10,2)))



(<tf.Tensor: shape=(10, 1), dtype=float32, numpy=
 array([[0.],
        [0.],
        [0.],
        [0.],
        [0.],
        [0.],
        [0.],
        [0.],
        [0.],
        [0.]], dtype=float32)>,
 <tf.Tensor: shape=(10, 1), dtype=float32, numpy=
 array([[-0.09977153],
        [-0.09977153],
        [-0.09977153],
        [-0.09977153],
        [-0.09977153],
        [-0.09977153],
        [-0.09977153],
        [-0.09977153],
        [-0.09977153],
        [-0.09977153]], dtype=float32)>,
 <tf.Tensor: shape=(10, 1), dtype=float32, numpy=
 array([[0.],
        [0.],
        [0.],
        [0.],
        [0.],
        [0.],
        [0.],
        [0.],
        [0.],
        [0.]], dtype=float32)>)

In [62]:
x_f = 2
x_ini = 0
t_f = 0.2

# training input
num_train_samples = 100
tx_eqn = np.random.rand(num_train_samples, 2)
tx_eqn[..., 1] = (x_f-x_ini)*tx_eqn[..., 1] + x_ini           
tx_ini = np.random.rand(num_train_samples, 2)
tx_ini[..., 0] = 0                               
tx_ini[..., 1] = (x_f-x_ini)*tx_ini[..., 1] + x_ini           
tx_bnd_up = np.random.rand(num_train_samples, 2)
tx_bnd_up[..., 0] = t_f*tx_bnd_up[..., 0]               
tx_bnd_up[..., 1] = x_f  # x = -1 or +1
tx_bnd_down = np.random.rand(num_train_samples, 2)
tx_bnd_down[..., 0] = t_f*tx_bnd_down[..., 0]              
tx_bnd_down[..., 1] = x_ini  

x_train = [tx_eqn, tx_ini, tx_bnd_up, tx_bnd_down]

In [42]:
aux = np.zeros((num_train_samples, 1))
y_train = [aux, init, aux, aux]

4

In [44]:
np.zeros((2,))

array([0., 0.])