In [None]:
from tensorflow import keras
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import os
import time

In [None]:
class BurgersPinn(keras.Model):
  def __init__(self, nu, network, n_inputs=2, n_outputs=1):
    super().__init__()
    self.network = network
    self.nu = nu

  def fit(self, inputs, epochs, optimizer, progress_interval=500):
    """
    Train the model with the given inputs and optimizer.

    Args:
      inputs: A list of tensors, where the first tensor is the equation data,
        the second tensor is the initial condition data, and the third tensor
        is the boundary condition data.
      epochs: The number of epochs to train for.
      optimizer : The optimizer to use for training.
      progress_interval: The number of epochs between each progress report.
    """
    start_time = time.time()
    for epoch in range(epochs):
      with tf.GradientTape() as tape:
        _, losses = self.call(inputs, training=True)
        loss = tf.reduce_sum(losses)

      grads = tape.gradient(loss, self.trainable_weights)
      optimizer.apply_gradients(zip(grads, self.trainable_weights))
      if epoch % progress_interval == 0:
        print(f"Epoch: {epoch} Loss: {loss.numpy():.4f} Total Elapsed Time: {time.time() - start_time:.2f}")

  @tf.function
  def input_gradient(self, x):
    """
    Compute the first and second derivatives of the network output with respect to the input.
    
    Args:
      x: input tensor of shape (n_inputs, 2)

    returns:
      u: network output of shape (n_inputs, 1)
      u_t: first derivative of u with respect to t
      u_x: first derivative of u with respect to x
      u_xx: second derivative of u with respect to x
    """
    with tf.GradientTape() as g2tape: # grad tape for getting second order derivatives
      g2tape.watch(x) # gradients w.r.t. inputs
      with tf.GradientTape() as gtape: # grad tape for first order drivatives
        gtape.watch(x)
        u = self.network(x)
        
      first_order = gtape.batch_jacobian(u, x)
      u_t = first_order[..., 0]
      u_x = first_order[..., 1]
      
    u_xx = g2tape.batch_jacobian(u_x, x)[..., 1]
    return u, u_t, u_x, u_xx
  
  def call(self, inputs, training=True):
    """
    Performs forward pass of the model, computing the loss and returning it. If training is True, the
    loss is also computed for the initial and boundary conditions.

    Args:
      inputs: If training is True, a list of tensors, where the first tensor is the equation data,
        the second tensor is the initial condition data, and the third tensor
        is the boundary condition data. If training is False, a single tensor of shape (n_inputs, 2).
      training: Whether or not to compute the loss for the initial and boundary conditions. Defaults to True.
    returns:
      u: network output of shape (n_inputs, 1)
    """
    if training:
      tx_equation = inputs[0]
    else:
      tx_equation = inputs
    u, u_t, u_x, u_xx = self.input_gradient(tx_equation)
    burgers_eq = u_t + u*u_x - self.nu*u_xx
    
    losses = tf.stack([tf.reduce_mean(tf.square(burgers_eq))])

    if training:
      tx_init = inputs[1]
      tx_bound = inputs[2]
      n_i = tx_init.shape[0]
      u_ib = self.network(tf.concat([tx_init, tx_bound], axis = 0))


      init_loss = tf.reduce_mean(tf.square(u_ib[:n_i] - tf.sin(np.pi*tx_init[..., 1])))
      boundary_loss = tf.reduce_mean(tf.square(u_ib[n_i:]))
      losses = tf.concat([losses, [init_loss, boundary_loss]], axis=0)

      return burgers_eq, losses

    return burgers_eq, losses
  
  @staticmethod
  def build_network(layers, n_inputs=2, n_outputs=1, activation=keras.activations.tanh, initialization=keras.initializers.glorot_normal):
    """
    Builds a fully connected neural network with the specified number of layers and nodes per layer.

    Args:
        layers (list): List of integers specifying the number of nodes in each layer.
        n_inputs (int): Number of inputs to the network.
        n_outputs (int): Number of outputs from the network.
        activation (function): Activation function to use in each layer.
        initialization (function): Initialization function to use in each layer.
    returns:
        keras.Model: A keras model representing the neural network.
    """
    inputs = keras.layers.Input((n_inputs))
    x = inputs
    for i in layers:
      x = keras.layers.Dense(i, activation = activation, kernel_initializer=initialization)(x)
    
    outputs = keras.layers.Dense(n_outputs, kernel_initializer=initialization)(x)
    return keras.Model(inputs=[inputs], outputs = [outputs])

  @staticmethod
  def simulate_burgers(n_samples, training = True, dtype=tf.float32, boundary_samples = None, random_seed = 42):
    """
    Simulate the burgers equation

    Args:
        n_samples (int): number of samples to generate
        training (bool, optional): If true, generates initial and boundary samples as well. Defaults to True.
        dtype (tf.dtype, optional): Data type of the samples. Defaults to tf.float32.
        boundary_samples (int, optional): Number of boundary samples to generate. No effext if training = False. Defaults to None.
        random_seed (int, optional): Random seed for reproducibility. Defaults to 42.
    returns:
        tf.Tensor: Samples of the burgers equation. If training = True, returns a tuple of tensors (equation_samples, initial_samples, boundary_samples).
    """
    r = np.random.RandomState(random_seed)
    tx_samples = r.uniform(0, 1, (n_samples, 2))
    tx_samples[:, 1] = tx_samples[:, 1]*2 - 1

    if training:
      tx_init = np.zeros((boundary_samples, 1))
      tx_init = np.append(tx_init, r.uniform(-1, 1, (boundary_samples, 1)), axis=1)

      tx_boundary = r.uniform(0, 1, (boundary_samples, 1))
      ones = np.ones((boundary_samples//2, 1))
      ones = np.append(ones, -np.ones((boundary_samples - boundary_samples//2, 1)), axis=0)
      tx_boundary = np.append(tx_boundary, ones, axis=1)
      r.shuffle(tx_boundary)
      return tf.convert_to_tensor(tx_samples, dtype=dtype), tf.convert_to_tensor(tx_init, dtype=dtype), tf.convert_to_tensor(tx_boundary, dtype=dtype)
    
    return tx_samples

In [None]:
network = BurgersPinn.build_network([16, 32])

In [None]:
pinn_model = BurgersPinn(0.01*np.pi, network)

In [None]:
tx_samples, tx_init, tx_boundary = BurgersPinn.simulate_burgers(1000, training=True, boundary_samples=1000)

In [None]:
pinn_model.network.build((None, 2))

In [None]:
pinn_model.network.summary()

In [None]:
pinn_model.fit(tf.stack([tx_samples, tx_init, tx_boundary], axis=0), epochs=5000, optimizer=keras.optimizers.Adam(lr=0.01), progress_interval=100)

In [None]:
num_test_samples = 1000
t_flat = np.linspace(0, 1, num_test_samples)
x_flat = np.linspace(-1, 1, num_test_samples)
t, x = np.meshgrid(t_flat, x_flat)
tx = np.stack([t.flatten(), x.flatten()], axis=-1)
u = pinn_model.network.predict(tx, batch_size=num_test_samples)
u = u.reshape(t.shape)

# plot u(t,x) distribution as a color-map
fig = plt.figure(figsize=(7,4))
gs = GridSpec(2, 5)
plt.subplot(gs[0, :])
plt.pcolormesh(t, x, u)
plt.xlabel('t')
plt.ylabel('x')
cbar = plt.colorbar(pad=0.05, aspect=10)
cbar.set_label('u(t,x)')
cbar.mappable.set_clim(-1, 1)
# plot u(t=const, x) cross-sections
t_cross_sections = [0,0.25, 0.5,0.75,1]
for i, t_cs in enumerate(t_cross_sections):
  plt.subplot(gs[1, i])
  tx = np.stack([np.full(t_flat.shape, t_cs), x_flat], axis=-1)
  u = pinn_model.network.predict(tx, batch_size=num_test_samples)
  plt.plot(x_flat, u)
  plt.title('t={}'.format(t_cs))
  plt.xlabel('x')
  plt.ylabel('u(t,x)')
plt.tight_layout()
plt.show()