In [None]:
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib.axis import Axis
import torch.nn.init as init
import time
import random
import pandas as pd
import logging
from google.colab import drive
from torch.cuda.amp import autocast, GradScaler
import torch.utils.checkpoint as checkpoint

In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
log_file = "/content/drive/MyDrive/NNOutputs/PINN.log"

# Create a custom logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Clear previous handlers if re-running in Colab
if logger.hasHandlers():
    logger.handlers.clear()

# Create file handler
file_handler = logging.FileHandler(log_file, mode='w')
file_handler.setFormatter(
    logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s',
                      datefmt='%Y-%m-%d %H:%M:%S')
    )

# Add handler to logger
logger.addHandler(file_handler)

# Example usage
logger.info("Logging setup complete.")

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("CUDA available:", torch.cuda.is_available())

CUDA available: True


In [None]:
def np_to_torch(arr):

    arr = torch.FloatTensor(arr)
    arr = arr.unsqueeze(-1)
    arr = arr.clone().detach().requires_grad_(True)

    return arr

def xyt_train_data(N_x, x_l, x_r, N_y, y_b, y_t, N_t, t_i, t_f, N_bc, x_c, y_c, r):

    start_PCM = [0]
    start_HS = [0]

    ####  PCM ###
    x_train = np.linspace(x_l,x_r,N_x)
    x_train = np.tile(x_train,N_y)
    y_train = np.linspace(y_b,y_t,N_y)
    y_train = np.repeat(y_train,N_x)

    n = x_train.shape[0]
    dist = np.sqrt( ( x_train - x_c )**2 + ( y_train - y_c )**2 )
    phi = (dist>r)
    n_PCM = int(np.sum(phi))
    phi = np.tile(phi, N_t)

    x_train = np.tile(x_train, N_t)
    y_train = np.tile(y_train, N_t)

    t_train = np.linspace(t_i, t_f, N_t)
    t_train = np.repeat(t_train, n)

    x_PCM = x_train[phi]
    y_PCM = y_train[phi]
    t_PCM = t_train[phi]
    start_PCM.append( start_PCM[-1] + t_PCM.shape[0])

    #### HS ####
    x_train = np.linspace(x_l,x_r, int(N_x))
    x_train = np.tile(x_train,int(N_y))
    y_train = np.linspace(y_b,y_t,int(N_y))
    y_train = np.repeat(y_train,int(N_x))

    n = x_train.shape[0]
    dist = np.sqrt(( x_train - x_c )**2 + ( y_train - y_c )**2)
    phi = (dist<=r)
    n_HS = int(np.sum(phi))
    phi = np.tile(phi, N_t)

    x_train = np.tile(x_train, N_t)
    y_train = np.tile(y_train, N_t)
    dist = np.tile(dist, N_t)

    t_train = np.linspace(t_i, t_f, N_t)
    t_train = np.repeat(t_train, n)

    x_HS = x_train[phi]
    y_HS = y_train[phi]
    t_HS = t_train[phi]
    dist = dist[phi]
    start_HS.append( start_HS[-1] + t_HS.shape[0])

    x_PCM = np_to_torch(x_PCM)
    y_PCM = np_to_torch(y_PCM)
    t_PCM = np_to_torch(t_PCM)
    x_HS = np_to_torch(x_HS)
    y_HS = np_to_torch(y_HS)
    t_HS = np_to_torch(t_HS)
    # dist = np_to_torch(dist)
    dist = torch.FloatTensor(dist)
    dist = dist.unsqueeze(-1)

    return x_PCM, y_PCM, t_PCM, x_HS, y_HS, t_HS, start_PCM, start_HS, n_PCM, n_HS, dist

class RBF(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        y = 0.5*torch.exp(-10*torch.square(x)) - 0.01
        return y

class Sigm(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        y = 1/( 1 + torch.exp(-80*x) )
        return y

def xavier_init(m):
    if isinstance(m, nn.Linear):
        init.xavier_normal_(m.weight)
        # init.xavier_normal_(m.bias)

class ANN(nn.Module):
    def __init__(self, layer_size_1):
        super(ANN, self).__init__()

        rbf = RBF()
        self.tanh = nn.Tanh()
        sigmoid = nn.Sigmoid()
        self.act = rbf
        mean_init = 0
        std_init = 0.707

        ########### Fully conected model-1 ###########

        modules_11 = []
        modules_11.append(nn.Linear(layer_size_1[0], layer_size_1[1]))
        modules_11.append(self.act)
        modules_11.append(nn.Linear( layer_size_1[1], layer_size_1[2]))
        self.fc_11 = nn.Sequential( *modules_11 )

        modules_12 = []
        modules_12.append(nn.Linear( layer_size_1[1], layer_size_1[2]))
        modules_12.append(self.act)
        modules_12.append(nn.Linear( layer_size_1[1], layer_size_1[2]))
        self.fc_12 = nn.Sequential( *modules_12 )

        modules_13 = []
        modules_13.append(self.act)
        modules_13.append(nn.Linear(layer_size_1[2], layer_size_1[3]))
        modules_13.append(self.act)
        self.fc_13 = nn.Sequential( *modules_13 )


        for module in [self.fc_11, self.fc_12, self.fc_13]:
            for layer in module:
                if isinstance(layer, nn.Linear):
                    init.xavier_normal_(layer.weight)

    def forward(self, x_PCM, y_PCM, t_PCM, x_HS, y_HS, t_HS, mode):

      ########### Fully conected model-1 ###########

        # For PCM branch
        out_1 = self.fc_11(torch.cat((x_PCM, y_PCM, t_PCM), 1))
        out_2 = self.fc_12(out_1)
        T_PCM = self.fc_13(out_1 + out_2)

        # For HS branch
        out_1 = self.fc_11(torch.cat((x_HS, y_HS, t_HS), 1))
        out_2 = self.fc_12(out_1)
        T_HS = self.fc_13(out_1 + out_2)

        ################ Derivatives ################
        if mode == "train":
            dTdx_PCM = torch.autograd.grad(T_PCM, x_PCM, grad_outputs=torch.ones_like(T_PCM), create_graph=True)[0]
            d2Tdx2_PCM = torch.autograd.grad(dTdx_PCM, x_PCM, grad_outputs=torch.ones_like(dTdx_PCM), create_graph=True)[0]
            dTdy_PCM = torch.autograd.grad(T_PCM, y_PCM, grad_outputs=torch.ones_like(T_PCM), create_graph=True)[0]
            d2Tdy2_PCM = torch.autograd.grad(dTdy_PCM, y_PCM, grad_outputs=torch.ones_like(dTdy_PCM), create_graph=True)[0]
            dTdt_PCM = torch.autograd.grad(T_PCM, t_PCM, grad_outputs=torch.ones_like(T_PCM), create_graph=True)[0]

            dTdx_HS = torch.autograd.grad(T_HS, x_HS, grad_outputs=torch.ones_like(T_HS), create_graph=True)[0]
            d2Tdx2_HS = torch.autograd.grad(dTdx_HS, x_HS, grad_outputs=torch.ones_like(dTdx_HS), create_graph=True)[0]
            dTdy_HS = torch.autograd.grad(T_HS, y_HS, grad_outputs=torch.ones_like(T_HS), create_graph=True)[0]
            d2Tdy2_HS = torch.autograd.grad(dTdy_HS, y_HS, grad_outputs=torch.ones_like(dTdy_HS), create_graph=True)[0]
            dTdt_HS = torch.autograd.grad(T_HS, t_HS, grad_outputs=torch.ones_like(T_HS), create_graph=True)[0]

            return  T_PCM, dTdx_PCM, dTdy_PCM, d2Tdx2_PCM, d2Tdy2_PCM, dTdt_PCM, T_HS, dTdx_HS, dTdy_HS, d2Tdx2_HS, d2Tdy2_HS, dTdt_HS

        else:

            return  T_PCM, 0, 0, 0, 0, 0, T_HS, 0, 0, 0, 0, 0

def get_loss(x_PCM, y_PCM, t_PCM, x_HS, y_HS, t_HS, start_PCM, start_HS, k1, k2, k3, w1, w2, w3, w4, w5, w6,
             w7, w8, s_norm, n_PCM, n_HS, T_change, N_t, model, source_term):

    # with autocast():


    T_PCM, dTdx_PCM, dTdy_PCM, d2Tdx2_PCM, d2Tdy2_PCM, dTdt_PCM, T_HS, dTdx_HS, dTdy_HS, d2Tdx2_HS, d2Tdy2_HS, dTdt_HS = model(x_PCM, y_PCM, t_PCM, x_HS, y_HS, t_HS, "train")

    eq_PCM = w1*( torch.sum( torch.mul( torch.square( dTdt_PCM - k1*(d2Tdx2_PCM + d2Tdy2_PCM) - (6500-2000*x_PCM)*delta_t/(rho*Cp*T_sc).detach()), torch.where((T_PCM >= T_change), 1, 0) ) )
    + torch.sum( torch.mul( torch.square( dTdt_PCM - k2*(d2Tdx2_PCM + d2Tdy2_PCM) - (6500-2000*x_PCM)*delta_t/(rho*Cp*T_sc).detach()), torch.where((T_PCM < T_change) & (t_PCM > 0) , 1, 0) ) )
    )/(start_HS[-1] + start_PCM[-1])

    eq_HS = w2*(torch.sum( torch.mul( torch.square( dTdt_HS - k1*(d2Tdx2_HS + d2Tdy2_HS) - (6500-2000*x_HS)*delta_t/(rho*Cp*T_sc).detach()), torch.where((T_HS >= T_change) & (t_HS > 0), 1, 0) ) )
    + torch.sum( torch.mul( torch.square( dTdt_HS - k2*(d2Tdx2_HS + d2Tdy2_HS) - (6500-2000*x_HS)*delta_t/(rho*Cp*T_sc).detach()), torch.where((T_HS < T_change), 1, 0) ) )
    )/(start_HS[-1] + start_PCM[-1])

    bc1 = w3*torch.sum( torch.mul(torch.square( dTdx_PCM ),
                                  torch.where((x_PCM == x_l) & (t_PCM > 0), 1, 0)) )/(N_y*N_t) #left boundary
    bc2 = w4*torch.sum( torch.mul(torch.square( dTdy_PCM ),
                                  torch.where((y_PCM == y_b) & (t_PCM > 0), 1, 0)) )/(N_x*N_t) #bottom boundary
    bc3 = w4*torch.sum( torch.mul(torch.square( dTdy_PCM ),
                                  torch.where((y_PCM == y_t) & (t_PCM > 0), 1, 0)) )/(N_x*N_t) #top boundary
    bc4 = w6*torch.sum( torch.mul(torch.square( dTdx_PCM ),
                              torch.where((x_PCM == x_r) & (t_PCM > 0), 1, 0)) )/(N_y*N_t) #right boundary


    ic_PCM = w7*torch.sum( torch.square( T_PCM[0:n_PCM] )  )/(n_PCM)
    ic_HS = w8*torch.sum( torch.square( T_HS[0:n_HS] )  )/(n_HS)

    loss = eq_PCM + eq_HS + bc1 + bc2 + bc3 + bc4 + ic_PCM + ic_HS

    return loss, eq_PCM, eq_HS, bc1, bc2, bc3, bc4, ic_PCM, ic_HS

def save_model(model, path):
    torch.save(model.state_dict(), path)

def load_model(model, path):
    model.load_state_dict(torch.load(path))
    model.eval()

def freeze(model):
    for i, param in enumerate(model.parameters()):
        print("#######", i, " #######" )
        if i not in [4, 5, 10, 11]:
            param.requires_grad = False
        print(param)


def plotcheck(x_PCM, y_PCM, t_PCM, x_HS, y_HS, t_HS, start_PCM, start_HS, n_PCM, n_HS, N_t_test, model, T_change, current_dir):

    ######################### Compute Results ###########################
    model_cpu = model.to('cpu')
    model_cpu.eval()
    with torch.no_grad():
      T_PCM, dTdx_PCM, dTdy_PCM, d2Tdx2_PCM, d2Tdy2_PCM, dTdt_PCM, T_HS, dTdx_HS, dTdy_HS, d2Tdx2_HS, d2Tdy2_HS, dTdt_HS = model_cpu(x_PCM, y_PCM, t_PCM, x_HS, y_HS, t_HS, "eval")

    x_PCM = x_PCM.detach().cpu().numpy()
    y_PCM = y_PCM.detach().cpu().numpy()
    t_PCM = t_PCM.detach().cpu().numpy()
    T_PCM = T_PCM.detach().cpu().numpy()

    x_HS = x_HS.detach().cpu().numpy()
    y_HS = y_HS.detach().cpu().numpy()
    t_HS = t_HS.detach().cpu().numpy()
    T_HS = T_HS.detach().cpu().numpy()

    T_disp = []
    X_disp = []
    Y_disp = []
    cnt_PCM = 0
    cnt_HS = 0

    for i in range(N_t_test):
        x_tp = []
        y_tp = []
        T_tp = []
        F_tp = []

        for j in range(n_PCM):  # n = number of spatial points per time step (assumed same for PCM and HS)
            # PCM Points
            x_tp.append(x_PCM[cnt_PCM])
            y_tp.append(y_PCM[cnt_PCM])
            T_tp.append(T_PCM[cnt_PCM])
            cnt_PCM = cnt_PCM + 1

        for j in range(n_HS):
            x_tp.append(x_HS[cnt_HS])
            y_tp.append(y_HS[cnt_HS])
            T_tp.append(T_HS[cnt_HS])
            cnt_HS = cnt_HS + 1
        T_disp.append(T_tp)
        X_disp.append(x_tp)
        Y_disp.append(y_tp)

    # Time step interval
    del_t = (t_f - t_i) / (N_t_test - 1)
    c_max = 0.3

    #### Plot temperature contours #####
    for k in range(N_t_test):
      if k % 1 == 0:
        plt.figure(figsize=(5, 4))
        sc = plt.scatter(X_disp[k], Y_disp[k], c=T_disp[k], cmap=plt.cm.jet)
        plt.xlim(x_l - 0.001, x_r + 0.001)
        plt.ylim(y_b - 0.001, y_t + 0.001)
        plt.colorbar(sc)
        plt.title('t = ' + str(round(k * del_t, 3)) + ', Temperature plot')
        filename = os.path.join(current_dir, f"plot_{k*del_t:.2f}.png")
        plt.savefig(filename)
        plt.close()


In [None]:
#### material params:- RT-35, aluminum ####
k_therm = 0.2
L = 160000
Cp = 2000
rho = 800

T_solidus = 305
T_liquidus = 311

#### Heat source ####
s = 5000

#### Normalising coeffs/ actual domain and temporal simulation params ####
T_change = 0.2
delta_x = 0.03
delta_y = 0.03
delta_t = 4000
T_sc = (T_liquidus - T_solidus)/T_change

#### Normalised Coefficients  ####
k1 = k_therm/(rho*Cp*delta_x**2)*delta_t
k2 = k_therm/(rho*(Cp+L/(T_liquidus - T_solidus))*delta_x**2)*delta_t
s_norm = s*delta_t/(rho*Cp*T_sc)

#### FVM Domain Parameters ####
N_x = 66
N_y = 66
N_bc = 66
x_l = 0
y_b = 0
x_r = 1
y_t = 1
t_i = 0
t_f = 1.25
N_t = int(t_f*100)+1

#### Circular heat source ####
x_c = 0.5
y_c = 0.5
r = 0.3

print('k1 = ',k1,' k2 = ', k2, ' s_norm = ', s_norm)


# Training data and initial data
layer_size_1 = [3, 25, 25, 1]
model = ANN(layer_size_1).to(device)
# model = ANN(layer_size_1)
print(model)
total_trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("Total trainable parameters in the model:", total_trainable_params)

k1 =  0.5555555555555556  k2 =  0.03875968992248062  s_norm =  0.4166666666666667
ANN(
  (tanh): Tanh()
  (act): RBF()
  (fc_11): Sequential(
    (0): Linear(in_features=3, out_features=25, bias=True)
    (1): RBF()
    (2): Linear(in_features=25, out_features=25, bias=True)
    (3): RBF()
    (4): Linear(in_features=25, out_features=25, bias=True)
  )
  (fc_12): Sequential(
    (0): Linear(in_features=25, out_features=25, bias=True)
    (1): RBF()
    (2): Linear(in_features=25, out_features=25, bias=True)
    (3): RBF()
    (4): Linear(in_features=25, out_features=25, bias=True)
  )
  (fc_14): Sequential(
    (0): Linear(in_features=25, out_features=25, bias=True)
    (1): RBF()
    (2): Linear(in_features=25, out_features=25, bias=True)
    (3): RBF()
    (4): Linear(in_features=25, out_features=25, bias=True)
  )
  (fc_13): Sequential(
    (0): RBF()
    (1): Linear(in_features=25, out_features=1, bias=True)
    (2): RBF()
  )
)
Total trainable parameters in the model: 5326


In [None]:

# Lists for storing
loss_store = []
lr1 = 1e-5
optimiser1 = torch.optim.Rprop(model.parameters(), lr=lr1)
epochs = 10000
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimiser1, T_max=epochs)

# Load model
model.train()
fileName = 'radialsource2.pth'
save_path = "/content/drive/MyDrive/NNOutputs/" + fileName
load_model(model, save_path)
# freeze(model)
current_dir = "/content/drive/MyDrive/NNOutputs"

# Loss function weights
# w1 = 1
# w2 = 1
# w3 = 20
# w4 = 20
# w5 = 20
# w6 = 20
# w7 = 1
# w8 = 1

w1 = 1
w2 = 1
w3 = 1
w4 = 1
w5 = 1
w6 = 1
w7 = 50
w8 = 50

print('N_t = ', N_t)
print('t_f = ', t_f)

# Initial conditions
x_PCM, y_PCM, t_PCM, x_HS, y_HS, t_HS, start_PCM, start_HS, n_PCM, n_HS, dist= xyt_train_data(N_x, x_l, x_r, N_y, y_b, y_t, N_t, t_i, t_f, N_bc, x_c, y_c, r)
N_t_test = int(t_f*4 + 1)
x_PCM_test, y_PCM_test, t_PCM_test, x_HS_test, y_HS_test, t_HS_test, start_PCM_test, start_HS_test, n_PCM_test, n_HS_test, _ = xyt_train_data(N_x, x_l, x_r, N_y, y_b, y_t, N_t_test, t_i, t_f, N_bc, x_c, y_c, r)

x_PCM = x_PCM.to(device).requires_grad_()
y_PCM = y_PCM.to(device).requires_grad_()
t_PCM = t_PCM.to(device).requires_grad_()
x_HS = x_HS.to(device).requires_grad_()
y_HS = y_HS.to(device).requires_grad_()
t_HS = t_HS.to(device).requires_grad_()

x_PCM_test = x_PCM_test.cpu()
y_PCM_test = y_PCM_test.cpu()
t_PCM_test = t_PCM_test.cpu()
x_HS_test = x_HS_test.cpu()
y_HS_test = y_HS_test.cpu()
t_HS_test = t_HS_test.cpu()

for epoch in range(epochs):

    loss, eq_PCM, eq_HS, bc1, bc2, bc3, bc4, ic_PCM, ic_HS = get_loss(x_PCM, y_PCM, t_PCM, x_HS, y_HS, t_HS, start_PCM,
                                                                                     start_HS, k1, k2, 1, w1, w2, w3, w4, w5, w6, w7,
                                                                                     w8, s_norm, n_PCM, n_HS, T_change, N_t, model, 1
                                                                                     )
    optimiser1.zero_grad()
    loss.backward()
    optimiser1.step()
    scheduler.step()

    if epoch%1==0:
        logging.info(f"Epoch = {epoch}, Total Loss = {loss.detach().cpu().numpy():.6f}, eq_PCM = {eq_PCM.detach().cpu().numpy():.6f}, eq_HS = {eq_HS.detach().cpu().numpy():.6f}, BC1 (left) = {bc1.detach().cpu().numpy():.6f}, BC2 (bottom) = {bc2.detach().cpu().numpy():.6f}, BC3 (top) = {bc3.detach().cpu().numpy():.6f}, BC4 (right) = {bc4.detach().cpu().numpy():.6f}, ic_PCM = {ic_PCM.detach().cpu().numpy():.6f}, ic_HS = {ic_HS.detach().cpu().numpy():.6f}")
        del loss, eq_PCM, eq_HS, bc1, bc2, bc3, bc4  # delete unnecessary tensors
        torch.cuda.empty_cache()

    if epoch%20==0:
        torch.save(model.state_dict(), save_path)
        logging.info(f"epoch = {epoch}, saved")
        plotcheck(x_PCM_test, y_PCM_test, t_PCM_test, x_HS_test, y_HS_test, t_HS_test, start_PCM_test, start_HS_test, n_PCM_test, n_HS_test, N_t_test, model, T_change, current_dir)
        model = model.to('cuda')

N_t =  126
t_f =  1.25


KeyboardInterrupt: 