<a href="https://colab.research.google.com/github/yajuna/tensorflow_pde/blob/master/Heat_equation_with_tensorflow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook solves the polar heat equation in one dimension

$\rho c \frac{\partial T}{\partial t}=\frac{\partial k}{\partial r}\frac{\partial T}{\partial r}+\frac{k}{r}\frac{\partial T}{\partial r}+k\frac{\partial^2 T}{\partial r^2}+\text{source terms}$

With the assumption that $\frac{\partial k}{\partial r} = 0$, the equation simplifies to

$\rho c \frac{\partial T}{\partial t}=\frac{k}{r}\frac{\partial T}{\partial r}+k\frac{\partial^2 T}{\partial r^2}+\text{source terms}$

The initial condition is linearly interpolated from the measured core and bark temperatures at $t = 0$. Two boundary conditions are from the measured core and bark temperatures.

We aim to solve the inverse problem for determining the parameters in the original continuous PDE.

The inverse problem is formulated to be

$\frac{\partial T}{\partial t}=\lambda_1\frac{\partial T}{\partial r}+\lambda_2\frac{\partial^2 T}{\partial r^2}+\lambda_3$

and the Physics information is

$\text{residual} = \frac{\partial T}{\partial t}-\lambda_1\frac{\partial T}{\partial r}-\lambda_2\frac{\partial^2 T}{\partial r^2}-\lambda_3$


In [2]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import pandas
from time import time

In [3]:
# set data type and hyperparameters
# import data

DTYPE = 'float32'
tf.keras.backend.set_floatx(DTYPE)

EPOCH = 5000

##### measured tree temperature for initial and boundary conditions. Need fixing
# colnames_tree_temp = ['datetime', 's45_1', 'e9_1', 'n135_1','e45_2', 'n9_2', 'w135_2', 'n45_3', 'w9_3','s135_3', 'w_ext_35']
# url1 = "https://raw.githubusercontent.com/yajuna/linearRegression/master/Tree_Temp_Values_OCT21_to_OCT28_2022.xlsx"
# dataTemp = pandas.read_excel(url1,names=colnames_tree_temp)

# train_tree_temp_index = 416 - 2

# train_interp_temp_size = train_tree_temp_index

# ### Initial and boundary conditions
# # core temp is west, at 13.5cm, at 2m high
# test_coreTemp = np.array(dataTemp.s135_3[train_tree_temp_index: test_tree_temp_index])+ 273.15
# # West, at 9cm, at 3m high
# test_midTemp1 = np.array(dataTemp.w9_3[train_tree_temp_index: test_tree_temp_index])+ 273.15
# # North, at 4.5cm, at 3m high
# test_midTemp2 = np.array(dataTemp.n45_3[train_tree_temp_index: test_tree_temp_index])+ 273.15
# # bark temp is West, at bark, at 3.5m high
# test_barkTemp = np.array(dataTemp.w_ext_35[train_tree_temp_index: test_tree_temp_index])+ 273.15

# # linear interpolate the measured temperature
# test_coreTemp = np. interp(time, np.linspace(0,24,test_interp_temp_size),test_coreTemp)
# test_midTemp1 = np. interp(time, np.linspace(0,24,test_interp_temp_size),test_midTemp1)
# test_midTemp2 = np. interp(time, np.linspace(0,24,test_interp_temp_size),test_midTemp2)
# test_barkTemp = np. interp(time, np.linspace(0,24,test_interp_temp_size),test_barkTemp)


# initTemp = np.array([test_coreTemp[0], test_midTemp1[0], test_midTemp2[0], test_barkTemp[0]])
# init_temp = np.interp(np.linspace(0,radius,n_x), np.linspace(0,radius,initTemp.size),initTemp)

Linearly interpolate initial and boundary data according to the generated time and space collocation points.

In [None]:
# Set number of data points
N_0 = 500 # number of points in space
N_b = 500 # number of points in time
N_r = 100000

# Set boundary
tmin = 0.
tmax = 24.
xmin = 0.
xmax = 0.135

# Lower bounds in time and space
lb = tf.constant([tmin, xmin], dtype=DTYPE)
# Upper bounds in time and space
ub = tf.constant([tmax, xmax], dtype=DTYPE)

# Set random seed for reproducible results
tf.random.set_seed(0)

# Draw uniform sample points for initial boundary data; need N_0 == N_b for concat
t_0 = tf.ones((N_0,1), dtype=DTYPE)*lb[0]
x_0 = tf.random.uniform((N_0,1), lb[1], ub[1], dtype=DTYPE)
X_0 = tf.concat([t_0, x_0], axis=1)

# initial condition
u_0 = TBD

# Boundary data- we compare left and right boundary at give time steps.
t_b = tf.random.uniform((N_b,1), lb[0], ub[0], dtype=DTYPE)
x_lb = tf.ones((N_b,1), dtype=DTYPE)*lb[1]
x_ub = tf.ones((N_b,1), dtype=DTYPE)*ub[1]
X_lb = tf.concat([t_b, x_lb], axis=1)
X_ub = tf.concat([t_b, x_ub], axis=1)

# boundary conditions are core and bark
u_lb = TBD
u_ub = TBD

# Draw uniformly sampled collocation points
t_r = tf.random.uniform((N_r,1), lb[0], ub[0], dtype=DTYPE)
x_r = tf.random.uniform((N_r,1), lb[1], ub[1], dtype=DTYPE)
X_r = tf.concat([t_r, x_r], axis=1)

# # Collect boundary and inital data in lists
# X_data = [X_0, X_b]
# u_data = [u_0, u_b]

Plot initial and boundary data; plot collocation data

In [None]:
fig = plt.figure(figsize=(9,6))
plt.scatter(t_0, x_0, c=u_0, marker='X', vmin=-1, vmax=1)
plt.scatter(t_b, x_lb, c=u_lb, marker='X', vmin=-1, vmax=1)
plt.scatter(t_b, x_ub, c=u_ub, marker='X', vmin=-1, vmax=1)
plt.scatter(t_r, x_r, c='r', marker='.', alpha=0.1)
plt.xlabel('$t$')
plt.ylabel('$x$')

plt.title('Positions of collocation points and boundary data');
#plt.savefig('Xdata_heat.pdf', bbox_inches='tight', dpi=300)

Class implementation of PINN. Derive `PINN_NeuralNet` from `tf.keras.Model`.

Required arguments are the lower bound `lb` and upper bound `ub`. This is a general neural net that is equation independent.

In [4]:
# Define model architecture

class PINN_NeuralNet(tf.keras.Model):
  """Basic architecture of the PINN model
  """

  def __init__(self, lb, ub,
               output_dim = 1,
               num_hidden_layers = 8,
               num_neurons_per_layer = 20,
               activation = 'tanh',
               kernel_initializer = 'glorot_normal',
               **kwargs):
    super().__init__(**kwargs)

    self.num_hidden_layers = num_hidden_layers
    self.output_dim = output_dim
    self.lb = lb
    self.ub = ub

    # Define NN architecture
    self.scale = tf.keras.layers.Lambda(
            lambda x: 2.0*(x - lb)/(ub - lb) - 1.0)
    self.hidden = [tf.keras.layers.Dense(num_neurons_per_layer,
                             activation=tf.keras.activations.get(activation),
                             kernel_initializer=kernel_initializer)
                           for _ in range(self.num_hidden_layers)]
    self.out = tf.keras.layers.Dense(output_dim)

  def call(self, X):
    """
    Forward-pass thru NN
    """
    Z = self.scale(X)
    for i in range(self.num_hidden_layers):
      Z = self.hidden[i](Z)
    return self.out(Z)

Define base class `PINNSolver`. This is equation dependent. Components to customize are

1. get_r <---- modify what derivatives are computed
2. loss_fn <--- modify the boundary conditions
3. fun_r <--- residual of the PDE
4. should check all functions if boundary data were changed

Here we only consider the method based on tensorflow optimizer object as input. The [original notebook](https://colab.research.google.com/github/janblechschmidt/PDEsByNNs/blob/main/PINN_Solver.ipynb#scrollTo=wcOkamgfZEks) though, also has method based on SciPy's LBFGS method.

In [None]:
class PINNSolver():
    def __init__(self, X_r):
        self.model = model

        # Store collocation points, separate t and x
        self.t = X_r[:,0:1]
        self.x = X_r[:,1:2]

        # Initialize history of losses and global iteration counter
        self.hist = []
        self.iter = 0

    def get_r(self):

        with tf.GradientTape(persistent=True) as tape:
            # Watch variables representing t and x during this GradientTape
            tape.watch(self.t)
            tape.watch(self.x)

            # Compute current values u(t,x)
            u = self.model(tf.stack([self.t[:,0], self.x[:,0]], axis=1))

            u_x = tape.gradient(u, self.x)

        u_t = tape.gradient(u, self.t)
        u_xx = tape.gradient(u_x, self.x)

        del tape

        return self.fun_r(self.t, self.x, u, u_t, u_x, u_xx)


    def loss_fn(self, X, u):

        # Compute phi_r
        r = self.get_r()
        phi_r = tf.reduce_mean(tf.square(r))

        # Initialize loss
        loss = phi_r

        # Add phi_0 and phi_b to the loss
        for i in range(len(X)):
            u_pred = self.model(X[i])
            loss += tf.reduce_mean(tf.square(u[i] - u_pred))

        return loss


    def get_grad(self, X, u):
        with tf.GradientTape(persistent=True) as tape:
            # This tape is for derivatives with
            # respect to trainable variables
            tape.watch(self.model.trainable_variables)
            loss = self.loss_fn(X, u)

        g = tape.gradient(loss, self.model.trainable_variables)
        del tape

        return loss, g


    def fun_r(self, t, x, u, u_t, u_x, u_xx):
        """Residual of the PDE"""
        return u_t + u * u_x - viscosity * u_xx
        # return u_t - lambd1 * u_x - lambd2 * u_xx - lambd3

    def solve_with_TFoptimizer(self, optimizer, X, u, N=1001):
        """This method performs a gradient descent type optimization."""

        @tf.function
        def train_step():
            loss, grad_theta = self.get_grad(X, u)

            # Perform gradient descent step
            optimizer.apply_gradients(zip(grad_theta, self.model.trainable_variables))
            return loss

        for i in range(N):

            loss = train_step()

            self.current_loss = loss.numpy()
            self.callback()


    def callback(self, xr=None):
        if self.iter % 50 == 0:
            print('It {:05d}: loss = {:10.8e}'.format(self.iter,self.current_loss))
        self.hist.append(self.current_loss)
        self.iter+=1


    def plot_solution(self, **kwargs):
        N = 600
        tspace = np.linspace(self.model.lb[0], self.model.ub[0], N+1)
        xspace = np.linspace(self.model.lb[1], self.model.ub[1], N+1)
        T, X = np.meshgrid(tspace, xspace)
        Xgrid = np.vstack([T.flatten(),X.flatten()]).T
        upred = self.model(tf.cast(Xgrid,DTYPE))
        U = upred.numpy().reshape(N+1,N+1)
        fig = plt.figure(figsize=(9,6))
        ax = fig.add_subplot(111, projection='3d')
        ax.plot_surface(T, X, U, cmap='viridis', **kwargs)
        ax.set_xlabel('$t$')
        ax.set_ylabel('$x$')
        ax.set_zlabel('$u_{\theta(t,x)$}')
        ax.view_init(35,35)
        return ax

    def plot_loss_history(self, ax=None):
        if not ax:
            fig = plt.figure(figsize=(7,5))
            ax = fig.add_subplot(111)
        ax.semilogy(range(len(self.hist)), self.hist,'k-')
        ax.set_xlabel('$n_{epoch}$')
        ax.set_ylabel('$\\phi^{n_{epoch}}$')
        return ax

Derive heat solver class with customized residual, as well as initial and boundary conditions, and gradients.

In [None]:
class HeatPINNSolver(PINNSolver):

Solve the equation with the two classes PINN_NeuralNet and PINNSolver

This is the forward problem

In [None]:
# initialize model
model = PINN_NeuralNet(lb, ub)
model.build(input_shape=(None,2))

# initialize PINN solver
solver = HeatPINNSolver(model, X_r)

Start training

In [None]:
lr = tf.keras.optimizers.schedules.PiecewiseConstantDecay([1000,3000],[1e-2,1e-3,5e-4])
optim = tf.keras.optimizers.Adam(learning_rate=lr)

# start timer
t0 = time()
solver.solve_with_TFoptimizer(optim, X_data, u_data, N=4001)

# Print computation time
print('\nComputation time: {} seconds'.format(time()-t0))

Plot solution and loss history

In [None]:
solver.plot_solution();
solver.plot_loss_history();