In [1]:
# For readability: disable warnings from libraries like matplotlib, etc.
import warnings
warnings.filterwarnings('ignore')

import os
# Make sure torch is imported somewhere above this cell:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import scipy.io
from scipy.interpolate import griddata
import time
from itertools import product, combinations
from mpl_toolkits.mplot3d import Axes3D
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
from mpl_toolkits.axes_grid1 import make_axes_locatable
import matplotlib.gridspec as gridspec
import scipy.sparse as sp
import scipy.sparse.linalg as la
from pyDOE import lhs
from matplotlib.colors import LogNorm
from matplotlib.ticker import LogLocator, FuncFormatter
from matplotlib.ticker import FormatStrFormatter
import copy
import pandas as pd

# --- Device Setup ---
print("CUDA available?", torch.cuda.is_available())
if torch.cuda.is_available():
    print("Device:", torch.cuda.get_device_name(0))

# Select the most performant device available (CUDA > MPS > CPU)
device = (
    torch.device('cuda') if torch.cuda.is_available()
    else torch.device('mps') if hasattr(torch.backends, "mps") and torch.backends.mps.is_available()
    else torch.device('cpu')
)
print("Using device:", device)

# -----------------------------------------------------------------------------
# 1) Build dataset for arbitrary (N_i, N_b, N_k, N_f)
# -----------------------------------------------------------------------------
def build_dataset(N_i, N_b, N_k, N_f, N_val,
                  x_min=0.0, x_max=1.0,
                  t_min=0.0, t_max=0.25,
                  k_min=0.2, k_max=2.0,
                  seed=123):
    """
    Build training and collocation sets for the parametric heat equation.

    Returns:
        X_u_train : (N_u, 3) array of data points (x, t, log10(k)) for IC + BC.
        u_train   : (N_u, 1) analytic solution at X_u_train.
        X_f_train : (N_f, 3) collocation points in (x, t, log10(k)).
        X_val     : (N_val, 3) validation points in (x, t, log10(k)).
        lb, ub    : lower/upper bounds for normalisation in the NN.
    """

    # --- k and logk ---
    # We sample k in [k_min, k_max] but represent it via log10(k).
    logk_min = np.log10(k_min)
    logk_max = np.log10(k_max)
    logk_vec = np.linspace(logk_min, logk_max, N_k)  # equally spaced in log10(k)
    k_vec    = 10.0**logk_vec                        # corresponding physical k

    # --- Initial condition: u(x,0;k) = sin(pi x) ---
    # x in [x_min, x_max], t=0, and all logk samples
    x_ic = np.linspace(x_min, x_max, N_i)
    t_ic = [0.0]
    x_ic_g, t_ic_g, logk_ic_g = np.meshgrid(x_ic, t_ic, logk_vec, indexing='ij')

    x_u_ic    = x_ic_g.ravel()[:, None]
    t_u_ic    = t_ic_g.ravel()[:, None]
    logk_u_ic = logk_ic_g.ravel()[:, None]
    X_u_train_ic = np.hstack([x_u_ic, t_u_ic, logk_u_ic])

    # --- Boundary conditions: u(0,t;k)=0, u(1,t;k)=0 ---
    # We discretise t with 2*N_b points between t_min and t_max.
    t_line = np.linspace(t_min, t_max, 2 * N_b)
    x_bc_left  = [0.0]
    x_bc_right = [1.0]
    x_bc = np.concatenate([x_bc_left, x_bc_right], axis=0)  # [0, 1]

    x_bc_g, t_bc_g, logk_bc_g = np.meshgrid(x_bc, t_line, logk_vec, indexing='ij')
    x_u_bc    = x_bc_g.ravel()[:, None]
    t_u_bc    = t_bc_g.ravel()[:, None]
    logk_u_bc = logk_bc_g.ravel()[:, None]
    X_u_train_bc = np.hstack([x_u_bc, t_u_bc, logk_u_bc])

    # --- Combine IC and BC into one "data" set ---
    X_u_train = np.vstack([X_u_train_ic, X_u_train_bc]).astype(np.float32)

    # --- Analytic solution for those IC/BC points ---
    x_cal    = X_u_train[:, 0]
    t_cal    = X_u_train[:, 1]
    logk_cal = X_u_train[:, 2]
    k_cal    = 10.0**logk_cal

    # Closed-form solution: u(x,t;k) = exp(-k π² t) sin(π x)
    u_train = np.exp(-k_cal * (np.pi**2) * t_cal) * np.sin(np.pi * x_cal)
    u_train = u_train[:, None].astype(np.float32)

    # --- Collocation + validation via LHS in (x, t, logk) ---
    lb = np.array([x_min, t_min, logk_min], dtype=np.float32)
    ub = np.array([x_max, t_max, logk_max], dtype=np.float32)

    np.random.seed(seed)
    U_all = lhs(3, samples=N_f + N_val)   # Latin Hypercube in [0,1]^3
    X_all = lb + (ub - lb) * U_all        # map to [lb, ub] in (x, t, logk)
    X_f_train = X_all[:N_f]
    X_val     = X_all[N_f:]

    return X_u_train, u_train, X_f_train, X_val, lb, ub


# -----------------------------------------------------------------------------
# 2) Compute global relative L2 error for a trained model at a fixed k
# -----------------------------------------------------------------------------
def compute_rel_L2(model,
                   k_val=1.0,
                   x_min=0.0, x_max=1.0,
                   t_min=0.0, t_max=0.25,
                   Nx=100, Nt=100,
                   device=device):
    """
    Compute global relative L2 error of the model solution against the analytic
    solution on a regular (x,t) grid at a fixed physical k = k_val.

    rel_L2 = ||u_pred - u_true||_2 / ||u_true||_2
    """

    # Build regular grid in x, t
    x_test = np.linspace(x_min, x_max, Nx)
    t_test = np.linspace(t_min, t_max, Nt)
    logk_val = np.log10(k_val)  # convert to log10(k) for NN input

    T, X = np.meshgrid(t_test, x_test, indexing='ij')  # shape (Nt, Nx)
    LOGK = np.full_like(T, logk_val)                   # broadcast log10(k_val)

    # Flatten to (Nt*Nx, 1) column vectors
    x_flat    = X.ravel()[:, None]
    t_flat    = T.ravel()[:, None]
    logk_flat = LOGK.ravel()[:, None]

    # Stack into (Nt*Nx, 3) array and convert to torch tensor
    X_star    = np.hstack([x_flat, t_flat, logk_flat]).astype(np.float32)
    X_star_tf = torch.from_numpy(X_star).to(device)

    # NN prediction over the grid
    model.eval()
    with torch.no_grad():
        u_pred = model(X_star_tf).squeeze(1).cpu().numpy().reshape(T.shape)

    # Analytic solution on the same grid
    u_true = u_true_numpy(X, T, LOGK)

    # Global relative L2 error
    num = np.linalg.norm(u_pred - u_true)
    den = np.linalg.norm(u_true)
    rel_L2 = num / den if den > 0 else num

    return rel_L2


# -----------------------------------------------------------------------------
# 3) Plot training curves (loss, val MSE, max |error|) and save to file
# -----------------------------------------------------------------------------
def plot_training_curves(loss_values, mse_v_hist, max_errors,
                         N_i, N_b, N_k, N_f,
                         out_dir="sweep_results"):
    """
    Make the TrainLoss / ValMSE / Max|Error| vs epoch plot and save to file.

    Args:
        loss_values : list of train loss per epoch
        mse_v_hist  : list of (epoch, val_MSE)
        max_errors  : list of (epoch, max_abs_error) on validation
        N_i, N_b, N_k, N_f : configuration used (for filename)
        out_dir     : directory where the PNG is saved
    """

    os.makedirs(out_dir, exist_ok=True)

    # Training loss epochs
    ep_train = range(len(loss_values))

    # Validation MSE: unpack (epoch, mse)
    ep_val  = [int(i) for i, _ in mse_v_hist]
    mse_val = [
        (m.detach().cpu().item() if torch.is_tensor(m) else float(m))
        for _, m in mse_v_hist
    ]

    # Max absolute error: unpack (epoch, max_err)
    ep_max   = [int(i) for i, _ in max_errors]
    max_errs = [
        (m.detach().cpu().item() if torch.is_tensor(m) else float(m))
        for _, m in max_errors
    ]

    # Plot curves on log scale (since errors/loss typically span many orders)
    plt.figure(figsize=(8, 6))
    plt.plot(ep_train, loss_values, color='black', label='Train Loss')
    plt.plot(ep_val,   mse_val,     color='red',   label='Validation MSE')
    plt.plot(ep_max,   max_errs,    color='blue',  label='Max |Error| (Validation)')

    plt.xlabel('Iteration')
    plt.ylabel('Loss / Error')
    plt.yscale('log')
    plt.title('Training Loss, Validation MSE, and Max Validation Error vs Iterations')
    plt.legend()
    plt.tight_layout()

    # File name tagged with the configuration
    fname = f"train_Ni{N_i}_Nb{N_b}_Nk{N_k}_Nf{N_f}.png"
    fpath = os.path.join(out_dir, fname)
    plt.savefig(fpath, dpi=300, bbox_inches='tight')
    plt.close()

    return fpath


# -----------------------------------------------------------------------------
# 4) Run a single experiment for given (N_i, N_b, N_k, N_f)
# -----------------------------------------------------------------------------
def run_single_experiment(N_i, N_b, N_k, N_f,
                          N_val=100,
                          Train_epochs=100000,
                          learning_rate=5e-4,
                          k_val_eval=1.0,
                          results_dir="sweep_results",
                          seed=123):
    """
    Runs one full experiment:
      - builds dataset for given (N_i, N_b, N_k, N_f)
      - trains a fresh model
      - saves training curve plot
      - computes global rel L2 error at k = k_val_eval
      - returns a dict with all requested info
    """

    # 1) Build dataset (IC+BC data, collocation, validation, bounds)
    X_u_train, u_train, X_f_train, X_val, lb, ub = build_dataset(
        N_i=N_i, N_b=N_b, N_k=N_k, N_f=N_f, N_val=N_val,
        x_min=x_min, x_max=x_max, t_min=t_min, t_max=t_max,
        k_min=k_min, k_max=k_max, seed=seed
    )

    # 2) Initialise a new PINN model for this dataset
    model = NeuralNet(layers, lb, ub).to(device).float()

    # 3) Train and measure total wall-clock time for this configuration
    exp_start = time.time()
    loss_values, mse_v_hist, max_errors, best_ep, best_TL, best_v, best_max_err = train(
        Train_epochs,
        X_u_train,
        u_train,
        X_f_train,
        X_val,
        model,
        learning_rate
    )
    total_elapsed = time.time() - exp_start

    # 4) Compute global relative L2 error at a chosen k (e.g. k = 1.0)
    rel_L2 = compute_rel_L2(
        model,
        k_val=k_val_eval,
        x_min=x_min, x_max=x_max,
        t_min=t_min, t_max=t_max,
        Nx=100, Nt=100,
        device=device
    )

    # 5) Create and save the training curve plot for this run
    curve_path = plot_training_curves(
        loss_values, mse_v_hist, max_errors,
        N_i=N_i, N_b=N_b, N_k=N_k, N_f=N_f,
        out_dir=results_dir
    )

    # 6) Pack all information into a record dictionary
    record = {
        "N_i": N_i,
        "N_b": N_b,
        "N_k": N_k,
        "N_f": N_f,
        "total_elapsed": total_elapsed,
        "best_ep": best_ep,
        "best_TL": best_TL,
        "best_v": best_v,
        "best_max_err": best_max_err,
        "rel_L2": rel_L2,
        "loss_values": loss_values,
        "mse_v_hist": mse_v_hist,
        "max_errors": max_errors,
        "training_curve_path": curve_path,
    }

    return record


# -----------------------------------------------------------------------------
# 5) Sweep settings and loops for N_f, N_i, N_b, N_k
# -----------------------------------------------------------------------------

# Base values (same as your current defaults)
BASE_N_i = 101
BASE_N_b = 51
BASE_N_k = 51
BASE_N_f = 1000

# Training hyperparameters for all sweeps
Train_epochs = 100000
learning_rate = 0.0005
N_val = 100          # number of validation points from LHS
k_val_eval = 1.0     # k at which global rel L2 is computed

# ==========================
# Sweep 1: Collocation points N_f
# ==========================
Nf_list = [500, 1000, 2000, 4000]  # values to test for N_f

results_Nf = []

for N_f in Nf_list:
    print(f"\n=== Sweep N_f = {N_f} (N_i={BASE_N_i}, N_b={BASE_N_b}, N_k={BASE_N_k}) ===")
    rec = run_single_experiment(
        N_i=BASE_N_i,
        N_b=BASE_N_b,
        N_k=BASE_N_k,
        N_f=N_f,
        N_val=N_val,
        Train_epochs=Train_epochs,
        learning_rate=learning_rate,
        k_val_eval=k_val_eval,
        results_dir="sweep_Nf",
        seed=123
    )
    results_Nf.append(rec)

# Convert list of records to DataFrame for easy inspection and saving
df_Nf = pd.DataFrame([
    {
        "N_i": r["N_i"], "N_b": r["N_b"], "N_k": r["N_k"], "N_f": r["N_f"],
        "total_elapsed": r["total_elapsed"],
        "best_ep": r["best_ep"],
        "best_TL": r["best_TL"],
        "best_v": r["best_v"],
        "best_max_err": r["best_max_err"],
        "rel_L2": r["rel_L2"],
        # Histories stored as objects (lists) – still useful inside Python
        "loss_values": r["loss_values"],
        "mse_v_hist": r["mse_v_hist"],
        "max_errors": r["max_errors"],
        "training_curve_path": r["training_curve_path"],
    }
    for r in results_Nf
])

print("\nN_f sweep summary:")
display(df_Nf)

# Save N_f sweep summary table as CSV
df_Nf.to_csv("sweep_Nf_summary.csv", index=False)

# ==========================
# Sweep 2: IC points N_i
# ==========================
Ni_list = [21, 51, 101, 201]  # values to test for N_i

results_Ni = []

for N_i in Ni_list:
    print(f"\n=== Sweep N_i = {N_i} (N_b={BASE_N_b}, N_k={BASE_N_k}, N_f={BASE_N_f}) ===")
    rec = run_single_experiment(
        N_i=N_i,
        N_b=BASE_N_b,
        N_k=BASE_N_k,
        N_f=BASE_N_f,
        N_val=N_val,
        Train_epochs=Train_epochs,
        learning_rate=learning_rate,
        k_val_eval=k_val_eval,
        results_dir="sweep_Ni",
        seed=123
    )
    results_Ni.append(rec)

df_Ni = pd.DataFrame([
    {
        "N_i": r["N_i"], "N_b": r["N_b"], "N_k": r["N_k"], "N_f": r["N_f"],
        "total_elapsed": r["total_elapsed"],
        "best_ep": r["best_ep"],
        "best_TL": r["best_TL"],
        "best_v": r["best_v"],
        "best_max_err": r["best_max_err"],
        "rel_L2": r["rel_L2"],
        "loss_values": r["loss_values"],
        "mse_v_hist": r["mse_v_hist"],
        "max_errors": r["max_errors"],
        "training_curve_path": r["training_curve_path"],
    }
    for r in results_Ni
])

print("\nN_i sweep summary:")
display(df_Ni)

df_Ni.to_csv("sweep_Ni_summary.csv", index=False)

# ==========================
# Sweep 3: BC points N_b
# ==========================
Nb_list = [11, 21, 51, 101]  # values to test for N_b

results_Nb = []

for N_b in Nb_list:
    print(f"\n=== Sweep N_b = {N_b} (N_i={BASE_N_i}, N_k={BASE_N_k}, N_f={BASE_N_f}) ===")
    rec = run_single_experiment(
        N_i=BASE_N_i,
        N_b=N_b,
        N_k=BASE_N_k,
        N_f=BASE_N_f,
        N_val=N_val,
        Train_epochs=Train_epochs,
        learning_rate=learning_rate,
        k_val_eval=k_val_eval,
        results_dir="sweep_Nb",
        seed=123
    )
    results_Nb.append(rec)

df_Nb = pd.DataFrame([
    {
        "N_i": r["N_i"], "N_b": r["N_b"], "N_k": r["N_k"], "N_f": r["N_f"],
        "total_elapsed": r["total_elapsed"],
        "best_ep": r["best_ep"],
        "best_TL": r["best_TL"],
        "best_v": r["best_v"],
        "best_max_err": r["best_max_err"],
        "rel_L2": r["rel_L2"],
        "loss_values": r["loss_values"],
        "mse_v_hist": r["mse_v_hist"],
        "max_errors": r["max_errors"],
        "training_curve_path": r["training_curve_path"],
    }
    for r in results_Nb
])

print("\nN_b sweep summary:")
display(df_Nb)

df_Nb.to_csv("sweep_Nb_summary.csv", index=False)

# ==========================
# Sweep 4: parameter samples N_k
# ==========================
Nk_list = [11, 21, 51, 101]  # values to test for N_k

results_Nk = []

for N_k in Nk_list:
    print(f"\n=== Sweep N_k = {N_k} (N_i={BASE_N_i}, N_b={BASE_N_b}, N_f={BASE_N_f}) ===")
    rec = run_single_experiment(
        N_i=BASE_N_i,
        N_b=BASE_N_b,
        N_k=N_k,
        N_f=BASE_N_f,
        N_val=N_val,
        Train_epochs=Train_epochs,
        learning_rate=learning_rate,
        k_val_eval=k_val_eval,
        results_dir="sweep_Nk",
        seed=123
    )
    results_Nk.append(rec)

df_Nk = pd.DataFrame([
    {
        "N_i": r["N_i"], "N_b": r["N_b"], "N_k": r["N_k"], "N_f": r["N_f"],
        "total_elapsed": r["total_elapsed"],
        "best_ep": r["best_ep"],
        "best_TL": r["best_TL"],
        "best_v": r["best_v"],
        "best_max_err": r["best_max_err"],
        "rel_L2": r["rel_L2"],
        "loss_values": r["loss_values"],
        "mse_v_hist": r["mse_v_hist"],
        "max_errors": r["max_errors"],
        "training_curve_path": r["training_curve_path"],
    }
    for r in results_Nk
])

print("\nN_k sweep summary:")
display(df_Nk)

df_Nk.to_csv("sweep_Nk_summary.csv", index=False)


CUDA available? True
Device: NVIDIA A2
Using device: cuda

=== Sweep N_f = 500 (N_i=101, N_b=51, N_k=51) ===


NameError: name 'x_min' is not defined

In [None]:
# t = data['t'].flatten()[:,None] # read in t and flatten into column vector
# x = data['x'].flatten()[:,None] # read in x and flatten into column vector
#  # Exact represents the exact solution to the problem, from the data provided
# Exact = np.real(data['usol']).T # Exact has structure of nt times nx

# print("usol shape (nt, nx) = ", Exact.shape)

# # We need to find all the x,t coordinate pairs in the domain
# X, T = np.meshgrid(x,t)

# # Flatten the coordinate grid into pairs of x,t coordinates
# X_star = np.hstack((X.flatten()[:,None], T.flatten()[:,None])) # coordinates x,t
# u_star = Exact.flatten()[:,None]   # corresponding solution value with each coordinate


# print("X has shape ", X.shape, ", X_star has shape ", X_star.shape, ", u_star has shape ", u_star.shape)

# # Domain bounds (-1,1)
# lb = X_star.min(axis=0)
# ub = X_star.max(axis=0)

# print("Lower bounds of x,t: ", lb)
# print("Upper bounds of x,t: ", ub)
# print('')
# print('The first few entries of X_star are:')
# print( X_star[0:5, :] )

# print('')
# print('The first few entries of u_star are:')
# print( u_star[0:5, :] )