In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
import json
from scipy.stats import qmc

import GPy
import torch

from ModelSetting_new import Model
from UtilityFunctions import U_SMOCU
from OptimizeMethod import Multi_start_SGD

from safir_runner import SafirRunner
from al_plotting import *
from al_helpers import DesignVar, normalise_point, denormalise_point, df_to_training_data, save_model_state, load_model_state, relative_sequential_difference

In [None]:
design_vars = [
    DesignVar("t_h", 1.0, 60.0, "min"),   # heating duration
    DesignVar("B",   0.250, 0.500, "m"),  # column width
    DesignVar("F", 300000, 1800000, "N"),  # Applied force
    DesignVar("l", 3.00, 5.00, "m"),       # column length
    DesignVar("T_max", 600, 1200, "째C"),   # Maximum temperature 
    DesignVar("r_cool", 6, 12, "째C/min"),  # cooling rate
    # DesignVar("rho", 360, 490,"kg/m^3"),
    # DesignVar("E", 9.44e9, 17.37e9, "N/m^2"),
    # DesignVar("f_c", 35.7e6, 45.8e6,"N/m^2"),
    # DesignVar("e0", 0.013275, 0.026865, "m"),
]
d = len(design_vars)

In [None]:
base_path = Path(r"C:\Users\justu\OneDrive - Delft University of Technology\Documents\Master Thesis\SAFIR\Files\Active_Learning\6D_Good")

base_path.mkdir(parents=True, exist_ok=True)

data_path   = base_path / "Data_AL.csv"
kernel_path = base_path / "Kernel_AL.json"


column_names = [
    "Analysis", "e0", "rho", "E", "mu", "f_c", "f_t", "w", "h_ch", "h_cc", "eps",
    "B", "l", "F", "t_h", "T_max", "r_cool", "H", "t_end", "t_end_guess",
    "fine_size", "n_elements", "failure", "failure_time",
    "time_thermo", "time_mech", "time_tot", "stiffness",
]

runner = SafirRunner(base_dir=base_path, column_names=column_names)

In [None]:
# Bernoulli likelihood for classification
lik = GPy.likelihoods.Bernoulli()

# ARD RBF kernel with one lengthscale per input dimension
kernel = GPy.kern.RBF(input_dim=d, variance=3.0, lengthscale=0.4, ARD=True)

x_bounds = [0.0, 1.0]          # Surrogate model design space interval

xl = np.full(d, x_bounds[0])
xu = np.full(d, x_bounds[1])
xinterval = (xl, xu)

# Uniform prior over the interval
volume = np.prod(xu - xl)
px = 1.0 / volume
px_log = -np.log(volume)

c_num = 2  # binary classification
parameters = [d, c_num, kernel, lik]

vinterval = None
linterval = None
prior_mean = -2

In [None]:
use_existing_data = False
if data_path.exists():
    df = pd.read_csv(data_path)
    if len(df) > 0:
        use_existing_data = True
else:
    df = pd.DataFrame(columns=column_names)

if use_existing_data:
    print(f"Warm start: using existing dataset with {len(df)} rows from {data_path}")
    X_init, Y_init = df_to_training_data(df, design_vars, y_col="failure")

    if kernel_path.exists():
        print(f"Found saved model state at {kernel_path}. Reloading.")
        kernel, saved_prior_mean = load_model_state(
            kernel_path=kernel_path,
            d=d,
            expected_design_vars=[dv.name for dv in design_vars],
            expected_xinterval=xinterval,
        )
        if saved_prior_mean is not None:
            prior_mean = float(saved_prior_mean)

    parameters = [d, c_num, kernel, lik]

else:
    print("Cold start: no existing data found. Running initial SAFIR simulation.")
    # Initial simulation point
    phys0 = {
        "t_h": 50.0,   # [min]
        "B":   0.260,  # [m]
        "F":   500000, # [N]
        "l":    3.2,   # [m]
        "T_max": 1150, # [째C]
        "r_cool": 11.5, # [째C/min]
    }
    y0, df = runner.run(phys0, df)
    print("Initial failure label:", y0)

    X_init = normalise_point(phys0, design_vars)[None, :]
    Y_init = np.array([[y0]], float)

# Loading in the model. 
model = Model(
    X=X_init,
    Y=Y_init,
    parameters=parameters,
    xinterval=xinterval,
    optimize=True,                                # Set optimisation of both hyperparameters to True
    kern_variance_fix=False,
    kern_lengthscale_fix=False,
    mean_fix=True,
    vinterval=vinterval,
    linterval=linterval,
    prior_mean=prior_mean,
    px_log=px_log,
)

print("Initial kernel:", model.gpc.kern)
print("Initial lengthscales:", model.gpc.kern.lengthscale)
print("Initial mean function:", model.gpc.mean_function)
print("Initial training points:", model.gpc.X.shape[0])

In [None]:
# gpc_loaded = GPy.core.GP.load_model(str(base_path / "Final_GPC.zip"))

In [None]:
# gpc_loaded.kern.lengthscale

In [None]:
import numpy as np

def phi(tstar: np.ndarray):
    return 1.0 - 0.324*np.exp(-0.2*tstar) - 0.204*np.exp(-1.7*tstar) - 0.472*np.exp(-19.0*tstar)

def Tmax_from_th(t_h_min: np.ndarray, T0: float = 20.0):
    """
    t_h_min: heating duration in minutes
    Returns T_max in Celsius (array), rounded to 2 decimals.
    """
    tstar = np.asarray(t_h_min, dtype=float) / 60.0
    return np.round(T0 + 1325.0 * phi(tstar), 2)

In [None]:
savepath = Path(r"C:\Users\justu\OneDrive - Delft University of Technology\Documents\Master Thesis\Images\Results\GPC\6D_Design\6D\T_max_Projection\Band_Fractiles")
fixed_phys = {
    "t_h": 15,
    "B": 0.28,
    "F": 322000,
    "l": 3.65,
    "T_max": 1000,
    "r_cool": 10.40,
    # "rho": 420,
    # "E": 12.8e9, 
    # "f_c": 40.4e6, 
    # "e0": 0.02,
}

dependent = {
    "T_max": lambda Xphys, idx: Tmax_from_th(Xphys[:, idx["t_h"]], T0=20.0)
}

plot_gpc_slice_2d(
    model=model,
    design_vars=design_vars,
    dims=("t_h", "B"),
    fixed_values=fixed_phys,
    dependent_vars = dependent,
    plot_points=False,
    points_mode="all",
    near_tol_norm=0.03,
    # title="2D Slice",
    title=None,
    x_bounds = (6, 60),
    # save_path= savepath / f"2D_th_B_NO_TITLE_BOUNDED_75fractile.pdf",
    show=True,
)

In [None]:
savepath = Path(r"  ")
fixed_phys = {
    "t_h": 15,
    "B": 0.28,
    "F": 322000,
    "l": 3.65,
    "T_max": 1000,       # Is overwritten if made dependent on t_h
    "r_cool": 10.4,
    # "rho": 420,
    # "E": 12.8e9, 
    # "f_c": 40.4e6, 
    # "e0": 0.02,
}
dependent = {
    "T_max": lambda Xphys, idx: Tmax_from_th(Xphys[:, idx["t_h"]], T0=20.0)
}

for i in fixed_phys:
    for j in fixed_phys:
        x_bounds = None
        y_bounds = None
        if i == j:
            continue
        if i == "t_h":
            x_bounds = (6, 60)
        if j == "t_h":
            y_bounds = (6, 60)
        plot_gpc_slice_2d(
            model=model,
            design_vars=design_vars,
            dims=(str(i), str(j)),
            fixed_values=fixed_phys,
            dependent_vars = dependent,
            plot_points=False,
            points_mode="all",
            near_tol_norm=0.03,
            title=None,
            x_bounds = x_bounds,
            y_bounds = y_bounds, 
            # save_path= savepath / f"MEANSLICE_{i}_{j}.pdf",
            show=False,
        )
print("Done")

In [None]:
savepath = Path(r"  ")
fixed_phys = {
    "t_h": 15,
    # "B": 0.28,
    # "F": 322000,
    # "l": 3.65,
    # "T_max": 1000,
    # "r_cool": 10.4,
    "rho": 420,
    "E": 12.8e9, 
    # "f_c": 40.4e6, 
    # "e0": 0.02,
}

# dependent = {
#     "T_max": lambda Xphys, idx: Tmax_from_th(Xphys[:, idx["t_h"]], T0=20.0)
# }


plot_gpc_slice_3d(
    model=model,
    design_vars=design_vars,
    dims=("t_h", "rho", "E"),
    fixed_values=fixed_phys,
    dependent_vars=None,
    grid_res=120,
    eps=0.05,
    plot_points=False,
    # points_mode="near",
    # near_tol_norm=0.03,
    # x1_bounds=(6,60),
    # title="3D Slice",
    title=None,
    elev=25,
    azim=-120,     #245,
    save_path=savepath / f"3D_SLICE_th_rho_E_NO_TITLE.pdf",
)

In [None]:
# Create the material distributions and sample strategies. 

def sample_lognormal_from_ln_params(lam, zeta, size, rng):
    return rng.lognormal(mean=lam, sigma=zeta, size=size)


def k_E_h_zeta(h):
    """
    Correction factor of Paper dependent on cross-section geometry.
    """
    h = float(h)
    return (0.60 / h) ** 0.78


def eccentricity_params(L):
    """
    Length dependent parameters from Blass paper. 
    """
    mu_L = 1.4e-5 * float(L)
    sigma_L = 4.53e-4 * float(L)
    return mu_L, sigma_L


def sample_eccentricity(L, size, rng, e0_global_mean=0.02):
    """
    Take a global meand and sample from normal distribution with translation to this normal mean and a given std. 
    """
    mu_L, sigma_L = eccentricity_params(L)
    signs = rng.choice(np.array([-1.0, 1.0]), size=size, replace=True)
    mean_shifted = e0_global_mean + signs * mu_L
    return rng.normal(loc=mean_shifted, scale=sigma_L, size=size)


def sample_material_variables(n, B, L, rng=None, e0_global_mean=0.02):
    """
    Draw samples for: rho, E, f_c, e0.

    Units:
    - rho: kg/m^3
    - E:   kN/mm^2
    - f_c: N/mm^2
    - e0:  m

    """
    rng = np.random.default_rng() if rng is None else rng
    n = int(n)

    rho = sample_lognormal_from_ln_params(lam=6.04, zeta=0.0521, size=n, rng=rng)

    lam_E = 2.55
    zeta_E = 0.0513
    kE = k_E_h_zeta(B)                 
    zeta_E_eff = kE * zeta_E          
    E = sample_lognormal_from_ln_params(lam=lam_E, zeta=zeta_E_eff, size=n, rng=rng)
    E = E * 1e9

    f_c = sample_lognormal_from_ln_params(lam=3.70, zeta=0.0416, size=n, rng=rng)
    f_c = f_c * 1e6
    e0 = sample_eccentricity(L=L, size=n, rng=rng, e0_global_mean=e0_global_mean)

    return {
        "rho": rho,             
        "E": E,                   
        "f_c": f_c,             
        "e0": e0,                
        "kE": kE,                 
        "zeta_E_eff": zeta_E_eff
    }

In [None]:
sample_material_variables(10, 0.28, 3.65)

In [None]:
def normalise_matrix(X_phys: np.ndarray, design_vars):
    """
    X_phys: (N, d) in same order as design_vars.

    Returns:
    --------
    Normalised inputs
    """
    X_phys = np.asarray(X_phys, float)
    lowers = np.array([dv.lower for dv in design_vars], dtype=float)
    uppers = np.array([dv.upper for dv in design_vars], dtype=float)
    X_norm = (X_phys - lowers) / (uppers - lowers)
    return np.clip(X_norm, 0.0, 1.0)

In [None]:
def failure_fraction_binary(
    model,
    design_vars,
    t_h: float,
    B: float,
    L: float,
    n_mc: int = 10_000,
    seed: int | None = None,
    threshold: float = 0.5,
    batch_size: int = 50,
):
    """
    Binary Monte Carlo estimate of failure probability at (t_h, B):

    """
    rng = np.random.default_rng(seed)
    n_mc = int(n_mc)

    mats = sample_material_variables(n=n_mc, B=B, L=L, rng=rng)

    X_phys = np.column_stack([
        np.full(n_mc, float(t_h)),
        np.full(n_mc, float(B)),
        mats["rho"],
        mats["E"],
        mats["f_c"],
        mats["e0"],
    ])

    X_norm = normalise_matrix(X_phys, design_vars)

    fail_count = 0
    for i0 in range(0, n_mc, batch_size):
        i1 = min(i0 + batch_size, n_mc)
        pmat = model.predict_proba(X_norm[i0:i1])
        p_fail = pmat[:, 1]
        fail_count += int(np.sum(p_fail >= threshold))

    return fail_count / n_mc


In [None]:
pf = failure_fraction_binary(
    model=model,
    design_vars=design_vars,
    t_h=20.0,
    B=0.28,
    L=3.65,
    n_mc=10_000,
    seed=123,
    threshold=0.5,
)
print("Binary failure fraction:", pf)

In [None]:
def compute_binary_fraction_grid(
    model,
    design_vars,
    L: float,
    n_mc: int = 10_000,
    n_th: int = 50,
    n_B: int = 50,
    seed: int = 123,
    threshold: float = 0.5,
):
    th_vals = np.linspace(1.0, 60.0, n_th)
    B_vals  = np.linspace(0.250, 0.500, n_B)
    PF = np.zeros((n_B, n_th), dtype=float)  # rows=B, cols=t_h

    base_rng = np.random.default_rng(seed)
    seeds = base_rng.integers(0, 2**32 - 1, size=(n_B, n_th), dtype=np.uint32)

    for iB, B in enumerate(B_vals):
        for ith, t_h in enumerate(th_vals):
            PF[iB, ith] = failure_fraction_binary(
                model=model,
                design_vars=design_vars,
                t_h=float(t_h),
                B=float(B),
                L=L,
                n_mc=n_mc,
                seed=int(seeds[iB, ith]),
                threshold=threshold,
            )

    return th_vals, B_vals, PF

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
from matplotlib.lines import Line2D
from pathlib import Path


def plot_failure_fraction_contour(
    th_vals,
    B_vals,
    PF,
    *,
    levels=21,
    color_nonfailure="blue",
    color_mid="white",
    color_failure="red",
    plot_half_line=True,
    half_line_style="--",
    half_line_color="k",
    plot_low_lines=True,
    low_levels=(0.05,),
    low_line_style=":",
    low_line_color="k",
    xlabel=r"Heating duration $t_h$ [min]",
    ylabel=r"Column width $B$ [m]",
    title="Binary failure fraction",
    cbar_label="Binary failure fraction",
    save_path: str | Path | None = None,
    show: bool = True,
    return_fig: bool = False,
):
    """
    Contour plot of failure fraction.

    """

    TH, BB = np.meshgrid(th_vals, B_vals)

    cmap = LinearSegmentedColormap.from_list(
        "failure_fraction_cmap",
        [(0.0, color_nonfailure), (0.5, color_mid), (1.0, color_failure)],
    )

    fig, ax = plt.subplots(figsize=(7.0, 6.2))

    contour_levels = np.linspace(0.0, 1.0, int(levels))
    cs = ax.contourf(
        TH, BB, PF,
        levels=contour_levels,
        cmap=cmap,
        vmin=0.0, vmax=1.0,
        alpha=0.92
    )

    cbar = fig.colorbar(cs, ax=ax)
    cbar.set_label(cbar_label)

    legend_handles = []

    if plot_half_line:
        ax.contour(
            TH, BB, PF,
            levels=[0.5],
            colors=half_line_color,
            linestyles=half_line_style,
            linewidths=1.6,
        )
        legend_handles.append(
            Line2D([0], [0], color=half_line_color, linestyle=half_line_style, linewidth=1.6,
                   label="Failure boundary (0.5)")
        )

    if plot_low_lines and low_levels:
        ax.contour(
            TH, BB, PF,
            levels=list(low_levels),
            colors=low_line_color,
            linestyles=low_line_style,
            linewidths=1.2,
        )
        for lev in low_levels:
            legend_handles.append(
                Line2D([0], [0], color=low_line_color, linestyle=low_line_style, linewidth=1.2,
                       label=f"{lev:.2f} fraction")
            )

    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    if title is not None:
        ax.set_title(title)
    ax.set_xlim(float(np.min(th_vals)), float(np.max(th_vals)))
    ax.set_ylim(float(np.min(B_vals)), float(np.max(B_vals)))
    ax.grid(True)

    if legend_handles:
        ax.legend(
            handles=legend_handles,
            loc="upper center",
            bbox_to_anchor=(0.5, -0.13),
            ncol=min(3, len(legend_handles)),
            frameon=True,
        )

    fig.tight_layout()

    if save_path is not None:
        save_path = Path(save_path)
        save_path.parent.mkdir(parents=True, exist_ok=True)
        fig.savefig(save_path, dpi=300, bbox_inches="tight")

    if show:
        plt.show()
    else:
        plt.close(fig)

    if return_fig:
        return fig, ax

    return None

In [None]:
th_vals1, B_vals1, PF1 = compute_binary_fraction_grid(
    model=model,
    design_vars=design_vars,
    L=3.65,
    n_mc=200, 
    n_th=150,
    n_B=150,
    seed=123,
)

In [None]:
savepath = Path(r"C:\Users\justu\OneDrive - Delft University of Technology\Documents\Master Thesis\Images\Results\GPC\6D_Material\6D")
plot_failure_fraction_contour(
    th_vals1,
    B_vals1,
    PF1,
    levels=21,
    color_nonfailure="#2166ac",             # Changed blue color
    color_mid="white",
    color_failure="#b2182b",               # Changed red color
    plot_half_line=True,
    plot_low_lines=True,
    low_levels=(0.05,0.95),
    xlabel=r"Heating duration $t_h$ [min]",
    ylabel=r"Column width $B$ [m]",
    # title="Failure probability under material uncertainty",
    title = None,
    # save_path= savepath / "Material_Uncertainty_th_B.pdf",
)