# PINN for generating thermography data

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim

In [2]:
from thermography_preprocessing import ThermoDataPreparation
from utils import get_meshgrids3D_flattend, select_indices

## PINN(a): simple heat equation (ignore the heat transfer caused by geometrical structre)

### Simplified heat eq.
$$
  \frac{\partial T}{\partial t} - \alpha \nabla^2 T = 0
$$
where **T** is temperature, **α** is thermal diffusivity, and **∇²T** is the Laplacian (spatial diffusion).

### Ideas of PINN
* Loss function: $L = L_{data} + L_{phy}$
* Data loss $L_{data}$: boundary conditions, checked for a set of sample points (< all sample points)
* Physics Loss $L_{phy}$: physic loss, checked for **ALL** sample points

In [30]:
# Define the physics-informed neural network (PINN)
class HeatPINNSimple(nn.Module):
    def __init__(self, layers=[3, 50, 50, 50, 1]):
        super(HeatPINNSimple, self).__init__()
        self.model = nn.Sequential(*[nn.Sequential(nn.Linear(layers[i], layers[i+1]), nn.Tanh()) for i in range(len(layers)-2)], nn.Linear(layers[-2], layers[-1]))
        # Trainable parameters (unknowns in the equation)
        self.alpha = nn.Parameter(torch.tensor(1.0))  # Thermal conductivity, according to wiki, it is apprently sufficient to set alpha=1
    
    def forward(self, x):
        return self.model(x)
        

class TrainingPINNSimple():
    """
    Note
    ----
        According to Moseley_20, training sample points for selected anew for each epoch
        "For each update step a random set of 
            * discretised points are sampled from the initial wavefield to compute the boundary loss 
            * a random set of continuous points over the full input space ... to compute the physics loss."

    Considerations (v250228)
    --------------
        * I need the info of the intervals, dx, dy and dt
        * Otherwise, these intervals will be treated equally via fixed alpha
        * idea; should I make alpha anisotropic?? i.e. alph = [alpha_y, alph_x, alph_t]?? What should I change in the heat eq for that? 
    """
    def __init__(self):
        self.model = HeatPINNSimple()

    @property
    def optimizer(self):
        return self._optimizer
    @optimizer.setter
    def optimizer(self, _lr):
        self._optimizer = optim.Adam(self.model.parameters(), lr=_lr)

    #==== (a) Physics loss
    def physics_loss(self, Nx, Ny, Nt, size=0.3):
        # (1) Model the thermo. values: modeling points need to be selected first
        xyt_pde = torch.from_numpy(self.select_pde_points(Nx, Ny, Nt, size)) # = modeling positions
        xyt_pde.requires_grad = True
        T_pred = self.model(xyt_pde) #self.model.forward(xyt_pde)
        # (2) Partial derivatives
        grads = torch.autograd.grad(T_pred, xyt_pde, grad_outputs=torch.ones_like(T_pred), create_graph=True)[0]
        T_t = grads[:, 2:3]  # dT/dt
        T_xx = torch.autograd.grad(grads[:, 0:1], _data_xyt, grad_outputs=torch.ones_like(grads[:, 0:1]), create_graph=True)[0][:, 0:1]  # d²T/dx²
        T_yy = torch.autograd.grad(grads[:, 1:2], _data_xyt, grad_outputs=torch.ones_like(grads[:, 1:2]), create_graph=True)[0][:, 1:2]  # d²T/dy²
        # (3) Loss
        residual = T_t - self.model.alpha * (T_xx + T_yy)
        return torch.mean(residual**2)

    def select_pde_points(self, Nx, Ny, Nt, size):
        # Output = numpy array
        rng = np.random.default_rng(seed=None)
        n = int(size*Nx* Ny* Nt) # Number of sampling pints for computing the PDE
        return np.array([rng.uniform(size=n)*Ny, rng.uniform(size=n)*Nx, rng.uniform(size=n)*Nt])

    #==== (b) Data fidelity loss    
    def data_fidelity_loss(self, _data_T):
        # Randomly select the sampling positions (as indices)
        idx_bp, bp = self.select_boundary_points(_data_T)
        # Prediction
        T_pred = self.model(torch.from_numpy(bp)) #self.model.forward(torch.from_numpy(bp))
        return torch.mean((T_pred - torch.from_numpy(_data_T.flatten('F')[idx_bp])) ** 2)

    def select_boundary_points(self, _data_T, size=0.1):
        # Output = numpy array
        Ny, Nx, Nt = _data_T.shape
        xx, yy, tt = get_meshgrids3D_flattend(Nx, Ny, Nt) # each is in vector-form
        idx_bp = select_indices(low=0, high=int(Nx*Ny*Nt), size=int(size*(Nx*Ny*Nt)), seed=None)
        bp = np.array([yy[idx_bp], xx[idx_bp], tt[idx_bp]]).astype(float)
        return (idx_bp, bp)
        
    #==== Training
    def train(self, _data_T, n_epochs=50, epoch_phy=9, lambda_data=1.0, lambda_physics=1.0):
        Ny, Nx, Nt = _data_T.shape
        for epoch in range(n_epochs):
            self.optimizer.zero_grad()
            # (a) Data fidelity loss
            data_loss_value = self.data_fidelity_loss(_data_T)
            # (b) Physics loss: activated after few epochs
            if epoch > epoch_phy: 
                physics_loss_value = self.physics_loss(self, Nx, Ny, Nt)
            else:
                physics_loss_value = 0.0
            total_loss =  lambda_data * data_loss_value + lambda_physics * physics_loss_value
            total_loss.backward()
            optimizer.step()
            if epoch % 100 == 0:
                print(f"Epoch {epoch}, Total Loss: {total_loss.item():.6f}, Physics Loss: {physics_loss_value.item():.6f}, Data Loss: {data_loss_value.item():.6f}")

        return self.model


In [31]:
# (0) Load: Thermography data
screwNo = 78 #78, 85, 88
path_rel_1 = f'/Volumes/Sandisk_SD/Work/IZFP/ReMachine/Thermografie/4_InspectedSamples/B{screwNo}/measurements'
pos_y, pos_x = (5, -2) # <- quite clean data (fileNo.59)
prepper = ThermoDataPreparation()
prepper.reader = path_rel_1
data_T = prepper.get_processed_data_time(pos=(pos_y, pos_x), ymin=0)


In [32]:
# # Initialize and train the model
training = TrainingPINNSimple()
training.optimizer = 0.001
model_simple = training.train(data_T)

RuntimeError: mat1 and mat2 must have the same dtype, but got Double and Float

## Next steps:
1. Data prep; select a set of "clean" data (max 5) -> segment it and save it in a folder (both for trainig and testing)
2. Write a data pipline to use these data set  
3. Train
4. Test with a noisy data

## PINN (b): Heat eq. with convection term (account for the heat transfer due to the geometrical structure)

### Modified heat equation
$$\rho c_p \frac{\partial T}{\partial t} = \alpha \nabla^2 T + Q - h A (T - T_{\infty})$$
with some (unknown) constants parameters, and some variables. 

#### Constant parameters
1. **Density ($\rho$)**, known (material parameter)
2. **Specific Heat Capacity ($c_p$)**, known (material parameter)
3. **Thermal Conductivity ($\alpha$)** (for now, I will assume this to be constant), unknown
4. **Convective Heat Transfer Coefficient ($h$)**, unknown
5. **Ambient Temperature ($T_{\infty}$)**, I don't know exactly, but between 20 to 25 Celsius, I guess
6. **Internal Heat Generation ($Q$)**,  unknown

#### Variables
1. **Surface Area per Unit Volume ($A$)**, I want to treat it unknown
2. **Temperature ($T$)**, unknown, values of interest to model with PINN

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# Define the physics-informed neural network (PINN)
class HeatPINN(nn.Module):
    def __init__(self, layers=[3, 50, 50, 50, 1]):
        super(HeatPINN, self).__init__()
        self.model = nn.Sequential(*[nn.Sequential(nn.Linear(layers[i], layers[i+1]), nn.Tanh()) for i in range(len(layers)-2)], nn.Linear(layers[-2], layers[-1]))
        
        # Trainable parameters (unknowns in the equation)
        self.alpha = nn.Parameter(torch.tensor(0.01))  # Thermal conductivity
        self.h = nn.Parameter(torch.tensor(0.1))  # Convective heat transfer coefficient
        self.Q = nn.Parameter(torch.tensor(0.0))  # Internal heat generation
        self.A = nn.Parameter(torch.tensor(1.0))  # Surface area per unit volume
        self.T_inf = nn.Parameter(torch.tensor(22.5))  # Ambient temperature (initial guess)
    
    def forward(self, x):
        return self.model(x)

# Generate collocation points (x, y, t) for PINN physics loss
def generate_collocation_points(n_points=1000):
    x = torch.rand(n_points, 1) * 10  # Assume screw thread cross-section (0,10)
    y = torch.rand(n_points, 1) * 10
    t = torch.rand(n_points, 1) * 5   # Time range (0,5 sec)
    return torch.cat([x, y, t], dim=1)

# Define the physics loss using the modified heat equation
def physics_loss(model, collocation_points, rho=7850, c_p=500):
    collocation_points.requires_grad = True
    T_pred = model(collocation_points)
    
    grads = torch.autograd.grad(T_pred, collocation_points, grad_outputs=torch.ones_like(T_pred), create_graph=True)[0]
    
    T_t = grads[:, 2:3]  # dT/dt
    T_xx = torch.autograd.grad(grads[:, 0:1], collocation_points, grad_outputs=torch.ones_like(grads[:, 0:1]), create_graph=True)[0][:, 0:1]  # d²T/dx²
    T_yy = torch.autograd.grad(grads[:, 1:2], collocation_points, grad_outputs=torch.ones_like(grads[:, 1:2]), create_graph=True)[0][:, 1:2]  # d²T/dy²
    
    alpha = model.alpha
    h = model.h
    Q = model.Q
    A = model.A
    T_inf = model.T_inf
    
    residual = rho * c_p * T_t - alpha * (T_xx + T_yy) - Q + h * A * (T_pred - T_inf)
    return torch.mean(residual**2)

# Define the data fidelity loss
def data_fidelity_loss(model, data_xyt, data_T):
    T_pred = model(data_xyt)
    return torch.mean((T_pred - data_T) ** 2)

# Training loop
def train_pinn(model, optimizer, data_xyt, data_T, n_epochs=1000, lambda_data=1.0, lambda_physics=1.0):
    collocation_points = generate_collocation_points()
    for epoch in range(n_epochs):
        optimizer.zero_grad()
        physics_loss_value = physics_loss(model, collocation_points)
        data_loss_value = data_fidelity_loss(model, data_xyt, data_T)
        total_loss = lambda_physics * physics_loss_value + lambda_data * data_loss_value
        total_loss.backward()
        optimizer.step()
        if epoch % 100 == 0:
            print(f"Epoch {epoch}, Total Loss: {total_loss.item():.6f}, Physics Loss: {physics_loss_value.item():.6f}, Data Loss: {data_loss_value.item():.6f}")

# Example dataset (replace with real data)
data_xyt = torch.rand(100, 3) * torch.tensor([10, 10, 5])  # Example (x,y,t) points
data_T = torch.rand(100, 1) * 100  # Example temperature values

# Initialize and train the model
pinn_model = HeatPINN()
optimizer = optim.Adam(pinn_model.parameters(), lr=0.001)
train_pinn(pinn_model, optimizer, data_xyt, data_T)


## Considerations
* should I go for the freq. domain directly because we are interested in one frequency anyway? 