In [1]:
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
from google.colab import drive
drive.mount('/content/drive')

torch.autograd.set_detect_anomaly(True)
torch.manual_seed(128)

<torch._C.Generator at 0x1e8d61d2670>

In [2]:
import torch.nn as nn
import os
torch.manual_seed(42)


class NeuralNet(nn.Module):

    def __init__(self, input_dimension, output_dimension, n_hidden_layers, neurons, regularization_param, regularization_exp, retrain_seed):
        super(NeuralNet, self).__init__()
        # Number of input dimensions n
        self.input_dimension = input_dimension
        # Number of output dimensions m
        self.output_dimension = output_dimension
        # Number of neurons per layer
        self.neurons = neurons
        # Number of hidden layers
        self.n_hidden_layers = n_hidden_layers
        # Activation function
        self.activation = nn.Tanh()
        self.regularization_param = regularization_param
        # Regularization exponent
        self.regularization_exp = regularization_exp
        # Random seed for weight initialization

        self.input_layer = nn.Linear(self.input_dimension, self.neurons)
        self.hidden_layers = nn.ModuleList([nn.Linear(self.neurons, self.neurons) for _ in range(n_hidden_layers - 1)])
        self.output_layer = nn.Linear(self.neurons, self.output_dimension)
        self.retrain_seed = retrain_seed
        # Random Seed for weight initialization
        self.init_xavier()

    def forward(self, x):
        # The forward function performs the set of affine and non-linear transformations defining the network
        # (see equation above)
        x = self.activation(self.input_layer(x))
        for k, l in enumerate(self.hidden_layers):
            x = self.activation(l(x))
        return self.output_layer(x)

    def init_xavier(self):
        torch.manual_seed(self.retrain_seed)

        def init_weights(m):
            if type(m) == nn.Linear and m.weight.requires_grad and m.bias.requires_grad:
                g = nn.init.calculate_gain('tanh')
                torch.nn.init.xavier_uniform_(m.weight, gain=g)
                # torch.nn.init.xavier_normal_(m.weight, gain=g)
                m.bias.data.fill_(0)

        self.apply(init_weights)

    def regularization(self):
        reg_loss = 0
        for name, param in self.named_parameters():
            if 'weight' in name:
                reg_loss = reg_loss + torch.norm(param, self.regularization_exp)
        return self.regularization_param * reg_loss


def fit(model, training_set, num_epochs, optimizer, p, verbose=True):
    history = list()

    # Loop over epochs
    for epoch in range(num_epochs):
        if verbose: print("################################ ", epoch, " ################################")

        running_loss = list([0])

        # Loop over batches
        for j, (x_train_, u_train_) in enumerate(training_set):
            def closure():
                # zero the parameter gradients
                optimizer.zero_grad()
                # forward + backward + optimize
                u_pred_ = model(x_train_)
                # Item 1. below
                loss = torch.mean((u_pred_.reshape(-1, ) - u_train_.reshape(-1, )) ** p) + model.regularization()
                # Item 2. below
                loss.backward()
                # Compute average training loss over batches for the current epoch
                running_loss[0] += loss.item()
                return loss

            # Item 3. below
            optimizer.step(closure=closure)

        if verbose: print('Loss: ', (running_loss[0] / len(training_set)))
        history.append(running_loss[0])

    return history


In [3]:
class Pinns:
    def __init__(self, n_int_, n_sb_, n_tb_):
        self.n_int = n_int_
        self.n_sb = n_sb_
        self.n_tb = n_tb_


        #Set constants
        self.alpha_f = 0.05
        self.alpha_s = 0.08
        self.h_f = 5
        self.h_s = 6
        self.T_hot = 4
        self.T_0 = 1
        self.U_f = 1


        # Extrema of the solution domain (t,x) in [0,8]x[0,1]
        self.domain_extrema = torch.tensor([[0, 1],  # Time dimension
                                            [0, 1]])  # Space dimension

        # Number of space dimensions
        self.space_dimensions = 1


      # Two NNs to Approximate Fluid, Solid Temperatures
        self.approximate_solution_fluid = NeuralNet(input_dimension=self.domain_extrema.shape[0], output_dimension=1,
                                              n_hidden_layers=7,
                                              neurons=40,
                                              regularization_param=0.01,
                                              regularization_exp=2.,
                                              retrain_seed=42)
        self.approximate_solution_solid = NeuralNet(input_dimension=self.domain_extrema.shape[0], output_dimension=1,
                                    n_hidden_layers=7,
                                    neurons=40,
                                    regularization_param=0.01,
                                    regularization_exp=2.,
                                    retrain_seed=42)


      # Generator of Sobol sequences
        self.soboleng = torch.quasirandom.SobolEngine(dimension=self.domain_extrema.shape[0], scramble = True)

      # Training Sets S_tb, S_sb, S_int as Torch Dataloader
        self.training_set_tb, self.training_set_sb, self.training_set_int, = self.assemble_datasets()

    ################################################################################################

    def convert(self, tens):
        assert (tens.shape[1] == self.domain_extrema.shape[0])
        return tens * (self.domain_extrema[:, 1] - self.domain_extrema[:, 0]) + self.domain_extrema[:, 0]


    def apply_T_0(self, x):
        return torch.full_like(x, self.T_0)


    def apply_boundary_condition_fluid(self, t):
        numerator = self.T_hot - self.T_0
        denominator = 1 + torch.exp(-200*(t - 0.25))

        return numerator/denominator + self.T_0

    def create_sb_masks(self, inp_sb):
        sb_0_mask = (inp_sb[:,1] == 0)
        sb_L_mask = (inp_sb[:,1] == 1)

        return sb_0_mask, sb_L_mask


    ################################################################################################
    # Function returning the input-output tensor required to assemble the training set S_tb corresponding to the temporal boundary
    def add_temporal_boundary_points(self):
        t0 = self.domain_extrema[0, 0]
        input_tb = self.convert(self.soboleng.draw(self.n_tb))
        input_tb[:, 0] = torch.full(input_tb[:, 0].shape, t0)
        output_tb = self.apply_T_0(input_tb[:, 1]).reshape(-1, 1)

        return input_tb, output_tb

    # Function returning the input-output tensor required to assemble the training set S_sb corresponding to the spatial boundary
    def add_spatial_boundary_points(self):
        x0 = self.domain_extrema[1, 0]
        xL = self.domain_extrema[1, 1]

        input_sb = self.convert(self.soboleng.draw(self.n_sb))

        input_sb_0 = torch.clone(input_sb)
        input_sb_0[:, 1] = torch.full(input_sb_0[:, 1].shape, x0)

        input_sb_L = torch.clone(input_sb)
        input_sb_L[:, 1] = torch.full(input_sb_L[:, 1].shape, xL)

        output_sb_0 = self.apply_boundary_condition_fluid(input_sb_0[:, 0]).reshape(-1, 1)
        output_sb_L = torch.zeros((input_sb.shape[0], 1))

        return torch.cat([input_sb_0, input_sb_L], 0), torch.cat([output_sb_0, output_sb_L], 0)

    #  Function returning the input-output tensor required to assemble the training set S_int corresponding to the interior domain where the PDE is enforced
    def add_interior_points(self):
        input_int = self.convert(self.soboleng.draw(self.n_int))
        output_int = torch.zeros((input_int.shape[0], 1))
        return input_int, output_int


    # Function returning the training sets S_sb, S_tb, S_int as dataloader
    def assemble_datasets(self):
        input_tb, output_tb = self.add_temporal_boundary_points()  # S_tb
        input_sb, output_sb = self.add_spatial_boundary_points()  # S_sb
        input_int, output_int = self.add_interior_points()  # S_int


        training_set_tb = DataLoader(torch.utils.data.TensorDataset(input_tb, output_tb), batch_size=self.n_tb, shuffle=False)
        training_set_sb = DataLoader(torch.utils.data.TensorDataset(input_sb, output_sb), batch_size=2 * self.space_dimensions * self.n_sb, shuffle=False) #2 * self.space_dimensions * self.n_sb
        training_set_int = DataLoader(torch.utils.data.TensorDataset(input_int, output_int), batch_size=self.n_int, shuffle=False)

        return training_set_tb, training_set_sb, training_set_int


################################################################################################

    #   Compute Temporal Boundary Residuals
    def compute_temporal_boundary_residual(self, input_tb, output_tb):
        assert(torch.all(input_tb[:,0] == 0))
        T_f = self.approximate_solution_fluid(input_tb).reshape(-1,)
        T_s = self.approximate_solution_fluid(input_tb).reshape(-1,)

        temp_tb_train = output_tb.reshape(-1,)

        assert(T_f.shape == T_s.shape == temp_tb_train.shape)
        residual_tb_fluid = temp_tb_train - T_f
        residual_tb_solid = temp_tb_train - T_s
        residual_tb = torch.cat((residual_tb_fluid, residual_tb_solid), dim=0)

        return residual_tb.reshape(-1,)

    #   Compute Spatial Boundary Residuals
    def compute_spatial_boundary_residual(self, input_sb, output_sb):
        input_sb.requires_grad = True
        T_f = self.approximate_solution_fluid(input_sb).reshape(-1,)
        T_s = self.approximate_solution_solid(input_sb).reshape(-1,)

        grad_T_f = torch.autograd.grad(T_f.sum(), input_sb, create_graph=True)[0]
        grad_T_f_x = grad_T_f[:, 1]
        grad_T_s = torch.autograd.grad(T_s.sum(), input_sb, create_graph=True)[0]
        grad_T_s_x = grad_T_s[:, 1]
        assert(T_f.shape == T_s.shape and T_f.shape == grad_T_f_x.shape and T_s.shape == grad_T_s_x.shape)

        mask_sb_0, mask_sb_L = self.create_sb_masks(input_sb)
        residual_sb_fluid = torch.zeros_like(T_f)
        temp_train_sb = output_sb.squeeze(1)
        residual_sb_fluid[mask_sb_0] = temp_train_sb[mask_sb_0] - T_f[mask_sb_0]
        residual_sb_fluid[mask_sb_L] = grad_T_f_x[mask_sb_L]
        residual_sb_solid = grad_T_s_x

        residual_sb = torch.cat((residual_sb_fluid, residual_sb_solid), dim=0)
        return residual_sb.reshape(-1, )

    #   Compute Interior Residuals
    def compute_interior_residual(self, input_int):
        input_int.requires_grad = True
        T_f = self.approximate_solution_fluid(input_int).reshape(-1,)
        T_s = self.approximate_solution_solid(input_int).reshape(-1,)

        grad_T_f = torch.autograd.grad(T_f.sum(), input_int, create_graph=True)[0]
        grad_T_f_t = grad_T_f[:, 0]
        grad_T_f_x = grad_T_f[:, 1]
        grad_T_f_xx = torch.autograd.grad(grad_T_f_x.sum(), input_int, create_graph=True)[0][:, 1]

        grad_T_s = torch.autograd.grad(T_s.sum(), input_int, create_graph=True)[0]
        grad_T_s_t = grad_T_s[:, 0]
        grad_T_s_x = grad_T_s[:, 1]
        grad_T_s_xx = torch.autograd.grad(grad_T_s_x.sum(), input_int, create_graph=True)[0][:, 1]

        assert(T_f.shape == T_s.shape and T_f.shape == grad_T_f_t.shape and T_f.shape == grad_T_s_x.shape and T_f.shape == grad_T_s_xx.shape )


        residual_1 = grad_T_f_t + self.U_f*grad_T_f_x - self.alpha_f*grad_T_f_xx + self.h_f*(T_f-T_s)
        residual_2 = grad_T_s_t - self.alpha_s*grad_T_s_xx - self.h_s*(T_f - T_s)

        residual_int = torch.cat((residual_1, residual_2), dim=0)

        return residual_int.reshape(-1, )


    #   Function to compute the total loss (weighted sum of spatial boundary loss, temporal boundary loss and interior loss)
    def compute_loss(self, inp_train_tb, T_train_tb, inp_train_sb, T_train_sb, inp_train_int, T_train_int, verbose=True):
      # Temporal Boundary Residuals
        r_tb = self.compute_temporal_boundary_residual(inp_train_tb, T_train_tb)

      # Spatial Boundary Residuals
        r_sb = self.compute_spatial_boundary_residual(inp_train_sb, T_train_sb)

      # Interior Residuals
        r_int = self.compute_interior_residual(inp_train_int)

        loss_tb = torch.mean(abs(r_tb) ** 2)
        loss_sb = torch.mean(abs(r_sb) ** 2)
        loss_int = torch.mean(abs(r_int) ** 2)

        loss_function = loss_tb + loss_sb

        loss = torch.log10(150*(loss_tb + loss_sb) + loss_int)
        if verbose: print("Total loss: ", round(loss.item(), 4), "| PDE Loss: ", round(torch.log10(loss_int).item(), 4), "| Function Loss: ", round(torch.log10(loss_function).item(), 4))

        return loss

    ################################################################################################
    def fit(self, num_epochs, optimizer, verbose=True):
        history = list()
        """
        if os.path.exists('/content/drive/MyDrive/parameters_pinn_task1_f'):
            checkpoint_f = torch.load('/content/drive/MyDrive/parameters_pinn_task1_f')
            self.approximate_solution_fluid.load_state_dict(checkpoint_f['state_dict'])
            optimizer.load_state_dict(checkpoint_f['optimizer'])
            checkpoint_s = torch.load('/content/drive/MyDrive/parameters_pinn_task1_s')
            self.approximate_solution_solid.load_state_dict(checkpoint_s['state_dict'])
        """

        # Loop over epochs
        for epoch in range(num_epochs):
            if verbose: print("################################ ", epoch, " ################################")

            for j, ((inp_train_tb, u_train_tb), (inp_train_sb, u_train_sb), (inp_train_int, u_train_int)) in enumerate(zip(self.training_set_tb, self.training_set_sb, self.training_set_int)):
                def closure():
                    optimizer.zero_grad()
                    loss = self.compute_loss(inp_train_tb, u_train_tb, inp_train_sb, u_train_sb, inp_train_int, u_train_int, verbose=True)
                    loss.backward()

                    history.append(loss.item())
                    return loss

                optimizer.step(closure=closure)
        """
        self.best_parameters_f = {'state_dict': self.approximate_solution_fluid.state_dict(), 'optimizer': optimizer.state_dict(),}
        torch.save(self.best_parameters_f, '/content/drive/MyDrive/parameters_pinn_task1_f')
        self.best_parameters_s = {'state_dict': self.approximate_solution_solid.state_dict(), 'optimizer': optimizer.state_dict(),}
        torch.save(self.best_parameters_s, '/content/drive/MyDrive/parameters_pinn_task1_s')
        """

        print('Final Loss: ', history[-1])

        return history





In [4]:
n_int = 4096
n_sb = 512
n_tb = 1024

pinn = Pinns(n_int, n_sb, n_tb)


In [5]:
n_epochs = 1
optimizer_LBFGS = optim.LBFGS(list(pinn.approximate_solution_fluid.parameters()) + list(pinn.approximate_solution_solid.parameters()),
                              lr=float(0.5),
                              max_iter=20000,
                              max_eval=50000,
                              history_size=150,
                              line_search_fn="strong_wolfe",
                              tolerance_change=1.0 * np.finfo(float).eps)
optimizer_ADAM = optim.Adam(list(pinn.approximate_solution_fluid.parameters()) + list(pinn.approximate_solution_solid.parameters()),
                              lr=float(0.005))

In [6]:
hist = pinn.fit(num_epochs=n_epochs,
                optimizer=optimizer_LBFGS,
                verbose=True)

plt.figure(dpi=150)
plt.grid(True, which="both", ls=":")
plt.plot(np.arange(1, len(hist) + 1), hist, label="Train Loss")
plt.xscale("log")
plt.legend()

################################  0  ################################
Total loss:  3.6972 | PDE Loss:  0.9728 | Function Loss:  1.5203
Total loss:  3.6386 | PDE Loss:  0.9942 | Function Loss:  1.4615
Total loss:  3.2245 | PDE Loss:  1.275 | Function Loss:  1.0435
Total loss:  2.969 | PDE Loss:  1.7604 | Function Loss:  0.7652
Total loss:  2.7876 | PDE Loss:  1.5638 | Function Loss:  0.5847
Total loss:  2.614 | PDE Loss:  1.3226 | Function Loss:  0.4151
Total loss:  2.4418 | PDE Loss:  1.2735 | Function Loss:  0.2352
Total loss:  3.0991 | PDE Loss:  2.4862 | Function Loss:  0.8016


KeyboardInterrupt: 

In [None]:

    # Plot results with colour


def plotting():
  inputs = pinn.soboleng.draw(100000)
  inputs = pinn.convert(inputs)
  output_fluid = pinn.approximate_solution_fluid(inputs)
  output_solid = pinn.approximate_solution_solid(inputs)

  fig, axs = plt.subplots(1, 2, figsize=(16, 8), dpi=150)
  im1 = axs[0].scatter(inputs[:, 1].detach(), inputs[:, 0].detach(), c=output_fluid.detach(), cmap="jet", s=2)
  axs[0].set_xlabel("x")
  axs[0].set_ylabel("t")
  plt.colorbar(im1, ax=axs[0])
  axs[0].grid(True, which="both", ls=":")
  im2 = axs[1].scatter(inputs[:, 1].detach(), inputs[:, 0].detach(), c=output_solid.detach(), cmap="jet", s=2)
  axs[1].set_xlabel("x")
  axs[1].set_ylabel("t")
  plt.colorbar(im2, ax=axs[1])
  axs[1].grid(True, which="both", ls=":")
  axs[0].set_title("Temp Fluid")
  axs[1].set_title("Temp Solid")


    plt.show()

plotting()