In [None]:
import os
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
google_drive_path = ''
os.chdir(google_drive_path)
print("Current working directory:", os.getcwd())
!ls


In [None]:
import os
import sys
import time
import math
from pathlib import Path
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter
from scipy.interpolate import griddata
from tqdm import trange
from torch.autograd import grad
from torch.utils.data import Dataset, DataLoader, TensorDataset
from tabulate import tabulate
from torchsummary import summary

# To allow imports from parent directory
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), ".."))) 

In [None]:
from Functions.Point_Sampling.point_sampler import Point_Sampler
from File_Paths.file_paths import data_path
from Test_Cases.Bridge_around_object.BRIDGE_Master_object import BRIDGE_Master_Object
from Models.GINN_Models.GINN import GINN
from Functions.Plotting_functions.smooth_FEM_plot import plot_FEM_results_smooth 
from Functions.Computations.L2_error import compute_L2_errors_2d 
from Functions.Training.Properties import Properties 
from Functions.Plotting_functions.BRIDGE_PINN_results import * 
from Functions.Plotting_functions.training_curves.BRIDGE_PINN_curves import * 
from Functions.logging.BRIDGE_PINN_logging import save_metrics_csv_2d  
from Functions.utils import set_random_seed 
from File_Paths.file_paths import (
    mesh_path,
    point_cloud_path,
    FEM_path,
    data_path,
    twoD_PINN_trained_model_dir, 
)

# BRIDGE test case
BRIDGE = BRIDGE_Master_Object(Normalize=True, Symmetry=False)
BRIDGE.create_interfaces()
material_properties = Properties(test_case=BRIDGE)


device = torch.device("cuda:0" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")

In [None]:

filename = "BRIDGE_validation_FEM_mine.xlsx"

# Build full path to the Excel file
file_path = Path(data_path) / filename

# Read Excel
df = pd.read_excel(file_path)   
coord_cols = ["X Location (mm)", "Y Location (mm)"]
coords_mm = df[coord_cols].to_numpy(dtype=float)    
result_cols = [
    "Equivalent (von-Mises) Stress (MPa)",
    "X Displacement (mm)",
    "Y Displacement (mm)",
]
results = df[result_cols].to_numpy(dtype=float)     
von_mises_FEM = results[:, 0]
x_disp_FEM    = results[:, 1]
y_disp_FEM    = results[:, 2]

scale  = BRIDGE.domain_scaling_factor     
center = BRIDGE.domain_center              

coords_scaled = (coords_mm - center) * scale    
x_scaled = coords_scaled[:, 0]
y_scaled = coords_scaled[:, 1]


plot_FEM_results_smooth(
    coords_scaled,
    von_mises_FEM,
    x_disp_FEM,
    y_disp_FEM,
    BRIDGE,
)


In [None]:
# FEM loading 
def load_bridge_FEM_reference():
    """
    Load BRIDGE FEM reference from Excel and return a dict with scaled coords and fields.
    """
    filename = "BRIDGE_validation_FEM_mine.xlsx"
    file_path = Path(data_path) / filename

    if not file_path.exists():
        print(f"[WARN] FEM file not found: {file_path}")
        return None

    df = pd.read_excel(file_path)

    coord_cols = ["X Location (mm)", "Y Location (mm)"]
    coords_mm = df[coord_cols].to_numpy(dtype=float)  

    result_cols = [
        "Equivalent (von-Mises) Stress (MPa)",
        "X Displacement (mm)",
        "Y Displacement (mm)",
    ]
    results = df[result_cols].to_numpy(dtype=float)  

    von_mises_FEM = results[:, 0]
    x_disp_FEM = results[:, 1]
    y_disp_FEM = results[:, 2]

    scale = BRIDGE.domain_scaling_factor
    center = BRIDGE.domain_center  

    coords_scaled = (coords_mm - center) * scale  

    fem_ref = {
        "coords_scaled": coords_scaled,
        "x_disp": x_disp_FEM,
        "y_disp": y_disp_FEM,
        "sigma_vm": von_mises_FEM,
    }
    print("Loaded FEM reference:", file_path)
    return fem_ref 


In [None]:
class PINN(torch.nn.Module):
    def __init__(self, hparams_model, hparams_feature_expansion, mollifier_alpha):
        super().__init__()
        self.mollifier_alpha = mollifier_alpha

        self.model = GINN(
            BRIDGE,
            feature_expansion=hparams_feature_expansion,
            Model_hyperparameters=hparams_model,
        )

    @staticmethod
    def enforce_dirichlet_BC(alpha, u, coords):
        edge_vertices = BRIDGE.edge_vertices

        if isinstance(edge_vertices, torch.Tensor):
            x_left = edge_vertices[0, 0].item()
            x_right = edge_vertices[2, 0].item()
        else:
            x_left = edge_vertices[0, 0]
            x_right = edge_vertices[2, 0]

        distances_left = torch.abs(coords[:, 0] - x_left)
        distances_right = torch.abs(coords[:, 0] - x_right)
        multiplier_left = torch.tanh(alpha * distances_left)
        multiplier_right = torch.tanh(alpha * distances_right)
        multiplier = multiplier_left * multiplier_right
        multiplier = multiplier.to(u.device)

        return u * multiplier.unsqueeze(1)

    def forward(self, coords):
        u = self.model(coords)
        return self.enforce_dirichlet_BC(self.mollifier_alpha, u, coords)



class density_Model(torch.nn.Module):
    '''used for plotting/masking etc'''
    def __init__(self, density_alpha: float):
        super().__init__()
        self.density_alpha = density_alpha
        self.dummy = nn.Parameter(torch.zeros(1), requires_grad=False)

    def forward(self, coords: torch.Tensor):
        SDF = BRIDGE.interfaces.calculate_SDF(coords)
        rho = torch.sigmoid(-self.density_alpha * SDF)
        return rho


In [None]:

class PINN_Loss(Properties):
    def __init__(self, u_model, v_model, training_hparams):
        super().__init__(test_case=BRIDGE)
        self.u_model = u_model.to(self.device)
        self.v_model = v_model.to(self.device)

        self.num_neumann_pts = training_hparams["num_neumann_points"]
        self.density_exponent = training_hparams["density_exponent"]

    def ritz_loss(self, x, density_field):

        # External Work 
        neumann_pts = self.interfaces.sample_points_on_neumann_boundary(
            self.num_neumann_pts, "vertical", "torch_tensor"
        ).to(self.device)
        neumann_pts.requires_grad_(True)

        R = BRIDGE.interfaces.obstacle_radius
        arc_length = math.pi * R
        ds = arc_length / neumann_pts.shape[0]

        traction_y = self.force_vector[1] / arc_length
        prescribed_traction = torch.zeros_like(neumann_pts)
        prescribed_traction[:, 1] = traction_y

        u_neu = self.u_model(neumann_pts).squeeze(-1)
        v_neu = self.v_model(neumann_pts).squeeze(-1)
        displacements_neumann = torch.stack([u_neu, v_neu], dim=1)

        work = torch.sum(prescribed_traction * displacements_neumann, dim=1)
        external_energy = torch.sum(work * ds)

        # ---------- Internal Strain Energy (SIMP) ----------
        coords = x.detach().clone().requires_grad_(True).to(self.device)

        u = self.u_model(coords)
        v = self.v_model(coords)

        grad_u = torch.autograd.grad(
            u,
            coords,
            grad_outputs=torch.ones_like(u),
            create_graph=True,
            retain_graph=True,
        )[0]
        grad_v = torch.autograd.grad(
            v,
            coords,
            grad_outputs=torch.ones_like(v),
            create_graph=True,
            retain_graph=True,
        )[0]

        eps11 = grad_u[:, 0]
        eps22 = grad_v[:, 1]
        eps12 = 0.5 * (grad_u[:, 1] + grad_v[:, 0])
        trace_epsilon = eps11 + eps22

        rho = density_field.to(self.device).view(-1)
        p = float(self.density_exponent)

        lam = self.lame_lambda * torch.ones_like(eps11)
        lam = rho.clamp(0.0, 1.0).pow(p) * lam

        mu = self.lame_mu * torch.ones_like(eps11)
        mu = rho.clamp(0.0, 1.0).pow(p) * mu

        sigma_11 = 2.0 * mu * eps11 + lam * trace_epsilon
        sigma_22 = 2.0 * mu * eps22 + lam * trace_epsilon
        sigma_12 = 2.0 * mu * eps12

        strain_energy_density = 0.5 * (
            sigma_11 * eps11 + 2.0 * sigma_12 * eps12 + sigma_22 * eps22
        )

        internal_energy = BRIDGE.domain_volume * strain_energy_density.mean()

  
        energy = internal_energy - external_energy
        return energy


In [None]:

class FixedGeometryDataset2D(Dataset): 
    """
    Samples points in BRIDGE.domain and assigns density ρ(x) from the exact SDF: ρ = sigmoid(-density_alpha * SDF).
    """

    def __init__(self, num_points: int, density_alpha: float):
        super().__init__()
        self.device = device

        sampler = Point_Sampler(
            BRIDGE.domain,
            BRIDGE.interfaces,
            num_points_domain=num_points,
            num_points_interface=0,
        )
        pts = next(sampler).to(self.device)  # (N,2)

        SDF = BRIDGE.interfaces.calculate_SDF(pts)  # (N,1) or (N,)
        self.density = torch.sigmoid(-density_alpha * SDF).view(-1)  # (N,)

        self.points = pts  # (N,2)

    def __len__(self):
        return self.points.shape[0]

    def __getitem__(self, idx):
        return self.points[idx], self.density[idx]


In [None]:
hparams_model = {
    "Model_type": "SIREN",
    "num_hidden_layers": 3,
    "num_hidden_neurons": 180,
    "SIREN_hparams": {
        "Model_type": "SIREN",
        "layers": [180, 180, 180, 180, 180],
        "dimensionality": 2,
        "w0_initial": 30.0,
        "w0": 1.0,
        "skip_connection": True,
    },
    "WIRE_hparams": {
        "Model_type": "WIRE",
        "layers": [180, 180, 180, 180],
        "dimensionality": 2,
        "w0_initial": 15,
        "w0": 2,
        "sigma0": 2,
        "sigma0_initial": 2,
        "layer_type": "real_gabor",
        "trainable": False,
        "skip_connection": True,
    },
    "MLP_hparams": {
        "Model_type": "MLP",
        "layers": [180, 180, 180, 180],
        "dimensionality": 2,
        "activation_function": "relu",
        "use_bias": True,
        "use_batch_norm": False,
        "use_dropout": False,
        "dropout_rate": 0.1,
        "skip_connection": True,
    },
}

hparams_feature_expansion = {
    "Feature Type": "None",
    "Num Frequencies": 3,
    "Max Frequency": 100,
}

training_hparams = {
    "total_points": 120_000,
    "batch_size": 40_000,
    "num_epochs": 5000,
    "learning_rate": 1e-3,
    "weight_decay": 1e-4,
    "gamma": 0.85,  
    "num_neumann_points": 10_000,
    "mollifier_alpha": 1.0,
    "density_alpha": 1000.0,  
    "density_exponent": 3,  
    "plot_interval": 100,
    "save_path": "./BRIDGE_PINN_val_2",
    "seed": 43,
}

os.makedirs(training_hparams["save_path"], exist_ok=True)

In [None]:
def train_pinn_bridge_2d(fem_ref=None):
    set_random_seed(training_hparams["seed"])
    save_dir = training_hparams["save_path"]
    os.makedirs(save_dir, exist_ok=True)

    # models 
    u_model = PINN(hparams_model["SIREN_hparams"], hparams_feature_expansion, training_hparams["mollifier_alpha"]).to(device)
    v_model = PINN(hparams_model["SIREN_hparams"],hparams_feature_expansion,training_hparams["mollifier_alpha"]).to(device)
    density_model = density_Model(training_hparams["density_alpha"]).to(device)

    params = list(u_model.parameters()) + list(v_model.parameters())
    optimizer = torch.optim.Adam(params=params,lr=training_hparams["learning_rate"],weight_decay=training_hparams["weight_decay"])
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer,step_size=int(training_hparams["num_epochs"] / 4),gamma=training_hparams["gamma"])

    loss_fn = PINN_Loss(u_model, v_model, training_hparams)

    history = {
        "epoch": [],
        "loss": [],
        "L2_u": [],
        "L2_v": [],
        "L2_s": [],
    }

    best_loss = float("inf")
    best_model_path = os.path.join(save_dir, "best_model_2D_fixed_new.pth")

    torch.autograd.set_detect_anomaly(False)

    for epoch in trange(training_hparams["num_epochs"]):
        dataset = FixedGeometryDataset2D(
            num_points=training_hparams["total_points"],
            density_alpha=training_hparams["density_alpha"],
        )
        dataloader = DataLoader(
            dataset,
            batch_size=training_hparams["batch_size"],
            shuffle=True,
            drop_last=False,
        )

        epoch_loss = 0.0

        for coords_batch, density_batch in dataloader:
            optimizer.zero_grad(set_to_none=True)
            loss_ritz = loss_fn.ritz_loss(coords_batch, density_batch)
            loss_ritz.backward()
            optimizer.step()
            epoch_loss += loss_ritz.item()

        scheduler.step()
        epoch_loss /= max(1, len(dataloader))

        # L2 errors vs FEM
        L2_u, L2_v, L2_s = compute_L2_errors_2d(
            u_model, v_model, material_properties, fem_ref, BRIDGE
        )

        history["epoch"].append(epoch)
        history["loss"].append(epoch_loss)
        history["L2_u"].append(0.0 if L2_u is None else L2_u)
        history["L2_v"].append(0.0 if L2_v is None else L2_v)
        history["L2_s"].append(0.0 if L2_s is None else L2_s)

        if epoch_loss < best_loss:
            best_loss = epoch_loss
            torch.save(
                {
                    "epoch": epoch,
                    "loss": best_loss,
                    "u_model_state_dict": u_model.state_dict(),
                    "v_model_state_dict": v_model.state_dict(),
                    "optimizer_state_dict": optimizer.state_dict(),
                },
                best_model_path,
            )

        if (epoch % training_hparams["plot_interval"]) == 0:
            print(
                f"Epoch {epoch}, loss={epoch_loss:.6e}, "
                f"lr={scheduler.optimizer.param_groups[0]['lr']:.3e}, "
                f"best_loss={best_loss:.6e}, "
                f"L2_u={history['L2_u'][-1]:.3e}, "
                f"L2_v={history['L2_v'][-1]:.3e}, "
                f"L2_s={history['L2_s'][-1]:.3e}"
            )

            # NN 1×3 
            u_img, v_img, sigma_img, rho_img = predict_uv_sigma_image_2d_binary(
                u_model,
                v_model,
                density_model,
                BRIDGE.domain,
                BRIDGE,
                rho_threshold_plot=0.5, 
            )
            fname_nn_1x3 = os.path.join(save_dir, f"uvsigma_NN_{epoch:06d}.png")
            save_uv_sigma_to_file(
                u_img, v_img, sigma_img, rho_img, fname_nn_1x3, BRIDGE.domain, BRIDGE, 0.5 
            )

            # --- FEM 1×3 & 3×3 comparison ---
            if fem_ref is not None:
                fname_fem_1x3 = os.path.join(save_dir, f"uvsigma_FEM_{epoch:06d}.png")
                save_FEM_results_smooth(fem_ref, fname_fem_1x3,BRIDGE, grid_resolution=300)

                fname_3x3 = os.path.join(
                    save_dir, f"uvsigma_compare_{epoch:06d}.png"
                )
                save_uv_sigma_comparison_grid_2d(
                    u_model,
                    v_model,
                    material_properties,
                    fem_ref,
                    fname_3x3,
                    BRIDGE,
                    grid_resolution=300,
                    rho_threshold=0.5, 
                )

            # --- Training curves (linear + log) ---
            fname_curves_lin = os.path.join(
                save_dir, f"training_curves_{epoch:06d}.png"
            )
            fname_curves_log = os.path.join(
                save_dir, f"training_curves_log_{epoch:06d}.png"
            )

            save_training_curves_2d(history, fname_curves_lin)
            save_training_curves_log_2d(history, fname_curves_log)

            #  CSV logging of metrics 
            save_metrics_csv_2d(history, save_dir)

    print("Training finished.")
    print("Best loss:", best_loss)
    print("Best model path:", best_model_path)
    return best_model_path, history


In [None]:
fem_ref = load_bridge_FEM_reference() 
best_model_path, history = train_pinn_bridge_2d(fem_ref)