<a href="https://colab.research.google.com/github/liz-lewis-manchester/CNM_2025_group_03/blob/test-code/task3_testingsens_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path

# Read + interpolate
def load_initial_conditions_csv(csv_path: str):
    csv_path = Path(csv_path)

    # Try a couple encodings (µ sometimes breaks utf-8)
    for enc in ("utf-8", "cp1252", "latin1"):
        try:
            df = pd.read_csv(csv_path, encoding=enc)
            break
        except UnicodeDecodeError:
            df = None
    if df is None:
        raise UnicodeDecodeError("Could not decode the CSV file. Try saving it as UTF-8.")

    if df.shape[1] < 2:
        raise ValueError("CSV must have at least two columns, distance and concentration.")

    # Take first two columns regardless of names
    x_col = df.columns[0]
    c_col = df.columns[1]

    df = df[[x_col, c_col]].copy()
    df.columns = ["x", "c"]

    # Force numeric and drop bad rows
    df["x"] = pd.to_numeric(df["x"], errors="coerce")
    df["c"] = pd.to_numeric(df["c"], errors="coerce")
    df = df.dropna().sort_values("x")

    # If duplicate x values exist, average them
    df = df.groupby("x", as_index=False)["c"].mean()

    x_meas = df["x"].to_numpy(dtype=float)
    c_meas = df["c"].to_numpy(dtype=float)

    return x_meas, c_meas


def interpolate_to_grid(x_grid, x_meas, c_meas):
    return np.interp(x_grid, x_meas, c_meas, left=0.0, right=0.0)

# Advection model
def run_advection_implicit(
    L=20.0, T=300.0, U=0.1, dx=0.2, dt=10.0,
    x_meas=None, c_meas=None,
    inflow_func=None,
    outflow_zero_gradient=True
):

    x = np.arange(0.0, L + dx, dx)
    Nt = int(T / dt) + 1

    if x_meas is None or c_meas is None:
        raise ValueError("Provide x_meas and c_meas from the CSV.")

    # Initial condition from CSV, interpolated
    C = interpolate_to_grid(x, x_meas, c_meas)

    # Coefficients
    a = (1.0 / dt) + (U / dx)
    b = (U / dx)

    # Store final only
    for n in range(1, Nt):
        t_now = n * dt
        C_old = C.copy()

        # Upstream boundary (x=0)
        if inflow_func is None:
            C[0] = 0.0  # clean inflow after t=0
        else:
            C[0] = float(inflow_func(t_now))

        # Forward substitution along x
        for i in range(1, len(x)):
            C[i] = ((1.0 / dt) * C_old[i] + b * C[i - 1]) / a

        # Downstream outflow BC
        if outflow_zero_gradient:
            C[-1] = C[-2]

    return x, C

# Metrics for sensitivity
def compute_metrics(x, C, dx, U, dt):
    Cmax = float(np.max(C))
    i_peak = int(np.argmax(C))
    x_peak = float(x[i_peak])

    # mass proxy
    mass = float(np.sum(C) * dx)

    # centre of mass + width
    if mass > 0:
        x_cm = float(np.sum(x * C) * dx / mass)
        var = float(np.sum(((x - x_cm) ** 2) * C) * dx / mass)
        width = float(np.sqrt(max(var, 0.0)))
    else:
        x_cm, width = 0.0, 0.0

    Cr = float(U * dt / dx)

    return {
        "U": U,
        "dx": dx,
        "dt": dt,
        "Courant(Udt/dx)": Cr,
        "C_max": Cmax,
        "x_peak": x_peak,
        "mass(sumC*dx)": mass,
        "x_centre_of_mass": x_cm,
        "plume_width_std": width
    }


# Sensitivity sweeps + plots
def sensitivity_sweep(csv_path="initial_conditions.csv", out_dir="task3_outputs"):
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    # Load measured ICs from CSV
    x_meas, c_meas = load_initial_conditions_csv(csv_path)

    # Baseline
    L, T = 20.0, 300.0
    U0, dx0, dt0 = 0.1, 0.2, 10.0

    U_values = [0.05, 0.1, 0.2]
    dx_values = [0.4, 0.2, 0.1, 0.05]
    dt_values = [20.0, 10.0, 5.0, 2.5]

    results = []

    # Sensitivity to U
    plt.figure()
    for U in U_values:
        x, C_end = run_advection_implicit(L, T, U, dx0, dt0, x_meas, c_meas)
        plt.plot(x, C_end, label=f"U={U:g}")
        results.append(compute_metrics(x, C_end, dx0, U, dt0))
    plt.title("Sensitivity to velocity U (final time T=300s)")
    plt.xlabel("x (m)")
    plt.ylabel("C (µg/m³)")
    plt.legend()
    plt.tight_layout()
    plt.savefig(out_dir / "sensitivity_U.png", dpi=200)

    # Sensitivity to dx
    plt.figure()
    for dx in dx_values:
        x, C_end = run_advection_implicit(L, T, U0, dx, dt0, x_meas, c_meas)
        plt.plot(x, C_end, label=f"dx={dx:g}")
        results.append(compute_metrics(x, C_end, dx, U0, dt0))
    plt.title("Sensitivity to spatial resolution dx (final time T=300s)")
    plt.xlabel("x (m)")
    plt.ylabel("C (µg/m³)")
    plt.legend()
    plt.tight_layout()
    plt.savefig(out_dir / "sensitivity_dx.png", dpi=200)

    # Sensitivity to dt
    plt.figure()
    for dt in dt_values:
        x, C_end = run_advection_implicit(L, T, U0, dx0, dt, x_meas, c_meas)
        plt.plot(x, C_end, label=f"dt={dt:g}s")
        results.append(compute_metrics(x, C_end, dx0, U0, dt))
    plt.title("Sensitivity to temporal resolution dt (final time T=300s)")
    plt.xlabel("x (m)")
    plt.ylabel("C (µg/m³)")
    plt.legend()
    plt.tight_layout()
    plt.savefig(out_dir / "sensitivity_dt.png", dpi=200)

    # Save metrics table
    df = pd.DataFrame(results)
    df.to_csv(out_dir / "task3_sensitivity_metrics.csv", index=False)

    # print a quick summary
    print("\nSaved plots + metrics to:", out_dir.resolve())
    print(df.sort_values(["U", "dx", "dt"]).to_string(index=False))


if __name__ == "__main__":
    sensitivity_sweep("initial_conditions.csv", "task3_outputs")