<a href="https://colab.research.google.com/github/liz-lewis-manchester/CNM_2025_group_09/blob/solver/C%26NM_CW_Group_9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Computing & Numerical Methods Coursework Group 9

By: Ching Yau Chan, Hassan Alhamdani, Jiongjie Chen, Lucas So and Oyinmiebi Youdeowei

## Main Code (Solver)




In [None]:
"""
Advection Solver Module
=======================
Solves the 1D advection equation: ∂θ/∂t = -U * ∂θ/∂x
Using implicit finite difference scheme.

See README.md for usage guide and examples.
"""

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from IPython.display import HTML


def advection_solver(end_time, dt, length, dx, U, C0=0,
                     initial_conditions=None,
                     boundary_condition=None):
    """
    Solve 1D advection equation using implicit finite difference method.

    Parameters:
        end_time : float - simulation duration
        dt : float - time step
        length : float - domain length
        dx : float - spatial resolution
        U : float or array - velocity (constant or spatially varying)
        C0 : float - default inlet concentration
        initial_conditions : array, optional - custom initial profile
        boundary_condition : float or callable, optional - inlet condition

    Returns:
        dict with 'distance', 'time', 'concentrations', 'parameters'
    """
    # Create spatial grid
    num_points = int(length / dx) + 1
    distance = np.linspace(0, length, num_points)

    # Set initial conditions (default: zeros with C0 at inlet)
    if initial_conditions is None:
        initial_conditions = np.zeros(num_points)
        initial_conditions[0] = C0

    # Set boundary condition function
    if boundary_condition is None:
        bc_func = lambda t: C0
    elif callable(boundary_condition):
        bc_func = boundary_condition
    else:
        bc_func = lambda t: boundary_condition

    # Finite difference coefficients from implicit discretisation:
    # (θ_new - θ_old)/dt = -U * (θ_new[i] - θ_new[i-1])/dx
    # Rearranged: θ_new[i] = (θ_old[i]/dt + B*θ_new[i-1]) / A
    A = 1/dt + U/dx
    B = U/dx

    # Convert to arrays if scalar (for variable velocity support)
    if np.isscalar(A):
        A = np.full(num_points, A)
        B = np.full(num_points, B)

    # Time stepping setup
    num_frames = int(end_time / dt) + 1
    time_array = np.linspace(0, end_time, num_frames)

    # Storage for results
    all_concentrations = [initial_conditions.copy()]
    current = initial_conditions.copy()

    # Main time-stepping loop
    for k in range(1, num_frames):
        new = np.zeros(num_points)
        new[0] = bc_func(k * dt)  # Apply boundary condition

        # Sweep left to right (implicit scheme)
        for i in range(1, num_points):
            new[i] = (current[i]/dt + B[i]*new[i-1]) / A[i]

        current = new.copy()
        all_concentrations.append(new.copy())

    return {
        'distance': distance,
        'time': time_array,
        'concentrations': all_concentrations,
        'parameters': {'end_time': end_time, 'dt': dt, 'dx': dx,
                      'length': length, 'num_points': num_points}
    }


def create_animation(results, max_conc=None):
    """
    Create animated plot of concentration over time.

    Parameters:
        results : dict - output from advection_solver()
        max_conc : float, optional - y-axis maximum (auto-calculated if None)

    Returns:
        HTML animation object for display in Jupyter/Colab
    """
    # Extract data from results dictionary
    dist = results['distance']              # x-axis values (spatial grid)
    concs = results['concentrations']       # List of concentration arrays at each time step
    dt = results['parameters']['dt']        # Time step (for title display)
    length = results['parameters']['length'] # Domain length (for x-axis limit)

    # Auto-calculate y-axis maximum if not specified
    # Finds the maximum concentration across all time steps
    if max_conc is None:
        max_conc = max(np.max(c) for c in concs)

    # Create figure and axes for the animation
    fig, ax = plt.subplots(figsize=(10, 6))  # 10x6 inch figure

    # Animation function - called once for each frame
    # Parameter 'f' is the frame number (0, 1, 2, ...)
    def animate(f):
        ax.cla()  # Clear axes to remove previous frame

        # Plot concentration profile for this time step
        ax.plot(dist, concs[f], 'b-', linewidth=2)  # 'b-' = blue solid line

        # Set axis limits (keep consistent across all frames)
        ax.set_xlim(0, length)
        ax.set_ylim(0, max_conc * 1.1)  # 10% padding above max

        # Labels and title
        ax.set_xlabel("Distance (m)")
        ax.set_ylabel("Concentration (μg/m³)")
        ax.set_title(f"Time = {f * dt:.1f} s")  # Show current time
        ax.grid(True)

    # Create the animation object
    # - fig: figure to animate
    # - animate: function to call for each frame
    # - frames: total number of frames (one per time step)
    # - interval: milliseconds between frames (100ms = 10 fps)
    anim = animation.FuncAnimation(fig, animate, frames=len(concs), interval=100)

    plt.close()  # Prevent static figure from displaying

    # Convert to HTML for display in Jupyter/Colab notebooks
    return HTML(anim.to_jshtml())


def plot_snapshots(results, num_snapshots=5, title=""):
    """
    Create static plot with concentration profiles at multiple times.

    Parameters:
        results : dict - output from advection_solver()
        num_snapshots : int - number of time points to show (default 5)
        title : str - plot title

    Returns:
        None (displays plot directly)
    """
    # Extract data from results dictionary
    dist = results['distance']        # x-axis values (spatial grid)
    concs = results['concentrations'] # List of concentration arrays at each time step
    dt = results['parameters']['dt']  # Time step (for legend labels)

    # Create figure and axes
    fig, ax = plt.subplots(figsize=(10, 6))

    # Select which time steps to plot (evenly spaced)
    # Example: if 31 frames and 5 snapshots → indices [0, 7, 15, 22, 30]
    indices = np.linspace(0, len(concs)-1, num_snapshots, dtype=int)

    # Create colour gradient for different time points
    # viridis goes from purple (early times) to yellow (late times)
    colors = plt.cm.viridis(np.linspace(0, 1, num_snapshots))

    # Plot each snapshot
    for idx, frame in enumerate(indices):
        ax.plot(dist, concs[frame],           # x and y data
                color=colors[idx],             # Colour from gradient
                linewidth=2,                   # Line thickness
                label=f't = {frame*dt:.0f} s') # Legend label showing time

    # Labels, title, and formatting
    ax.set_xlabel("Distance (m)")
    ax.set_ylabel("Concentration (μg/m³)")
    ax.set_title(title)
    ax.legend()   # Show legend with time labels
    ax.grid(True) # Add grid lines
    plt.show()    # Display the plot


# Confirmation message when module is loaded
print("✓ Solver loaded successfully!")

## Test Case 1
Description:

Test the case where the 1D model domain extends to 20m downstream (with a 20cm spatial resolution) of the point that the pollutant enters the river and model how the pollutant moves over the 5 minutes after it enters the river (with a temporal resolution of 10s). Assume that the initial concentration of the pollutant is 250 µg/m³ at x=0 and 0 elsewhere. Assume that U = 0.1ms-1

In [None]:
# Run solver and store output in 'results1' for plotting
results1 = advection_solver(
    end_time=300,    # Simulation duration (s)
    dt=10,           # Time step (s)
    length=20,       # Domain length (m)
    dx=0.2,          # Spatial resolution (m)
    U=0.1,           # Flow velocity (m/s)
    C0=250           # Inlet concentration (µg/m³)
)

plot_snapshots(results1, title="Test Case 1: Basic Simulation")
create_animation(results1, max_conc=250)

## Test Case 2
Description:

Test the case where, for the same model domain, the initial conditions are read in from a csv file ‘initial_conditions.csv’. Note that the provided measurements are not aligned with the model grid. Your code should be written so that it can read in any initial conditions provided and to interpolate them onto the model grid. (Interpolation between values is not something that we have covered in class, this task is a test of your ability to discover new python functionality. Explore the Pandas documentation)

In [None]:
# Load measurement data and interpolate to model grid

import pandas as pd

# Read CSV file
df = pd.read_csv('initial_conditions.csv', encoding='cp1252')
data_dist = df.iloc[:, 0].values  # Distance column
data_conc = df.iloc[:, 1].values  # Concentration column

# Create model grid and interpolate
length, dx = 20, 0.2
num_points = int(length / dx) + 1
model_distance = np.linspace(0, length, num_points)
initial_cond = np.interp(model_distance, data_dist, data_conc)

# Run solver and store output in 'results2' for plotting
results2 = advection_solver(
    end_time=300, dt=10, length=20, dx=0.2, U=0.1,
    initial_conditions=initial_cond,
    boundary_condition=initial_cond[0]
)

plot_snapshots(results2, title="Test Case 2: CSV Initial Conditions")
create_animation(results2)


## Test Case 3
Description:

Test to see how sensitive your model results are to its parameters (U, spatial and temporal resolution)

In [None]:
# Vary parameters to understand model behaviour

# --- Static Plots (one for each parameter) ---

# Sensitivity to Velocity (U)
fig, ax = plt.subplots(figsize=(10, 6))
for U in [0.001, 0.01, 0.1, 1, 10]:
    results = advection_solver(end_time=300, dt=10, length=20, dx=0.2, U=U, C0=250)
    ax.plot(results['distance'], results['concentrations'][-1],
            label=f'U = {U} m/s', linewidth=2)
ax.set_xlabel("Distance (m)")
ax.set_ylabel("Concentration (μg/m³)")
ax.set_title("Sensitivity to Velocity U (Final State)")
ax.legend()
ax.grid(True)
plt.show()

# Sensitivity to Spatial Resolution (dx)
fig, ax = plt.subplots(figsize=(10, 6))
for dx in [0.1, 0.2, 0.5, 1.0, 2.0]:
    results = advection_solver(end_time=300, dt=10, length=20, dx=dx, U=0.1, C0=250)
    ax.plot(results['distance'], results['concentrations'][-1],
            label=f'Δx = {dx} m', linewidth=2)
ax.set_xlabel("Distance (m)")
ax.set_ylabel("Concentration (μg/m³)")
ax.set_title("Sensitivity to Spatial Resolution Δx (Final State)")
ax.legend()
ax.grid(True)
plt.show()

# Sensitivity to Time Step (dt)
fig, ax = plt.subplots(figsize=(10, 6))
for dt in [1, 5, 10, 30, 60]:
    results = advection_solver(end_time=300, dt=dt, length=20, dx=0.2, U=0.1, C0=250)
    ax.plot(results['distance'], results['concentrations'][-1],
            label=f'Δt = {dt} s', linewidth=2)
ax.set_xlabel("Distance (m)")
ax.set_ylabel("Concentration (μg/m³)")
ax.set_title("Sensitivity to Time Step Δt (Final State)")
ax.legend()
ax.grid(True)
plt.show()


# --- Animated Subplots (all three in one animation) ---

# Parameters to animate (change these to explore)
U_animate = 0.01   # Change this value
dx_animate = 0.01  # Change this value
dt_animate = 100   # Change this value

# Run simulations
results_U = advection_solver(end_time=300, dt=10, length=20, dx=0.2, U=U_animate, C0=250)
results_dx = advection_solver(end_time=300, dt=10, length=20, dx=dx_animate, U=0.1, C0=250)
results_dt = advection_solver(end_time=300, dt=dt_animate, length=20, dx=0.2, U=0.1, C0=250)

# Create figure with 3 subplots
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

def animate(f):
    # Clear all subplots
    for ax in axes:
        ax.cla()

    # Subplot 1: Velocity
    axes[0].plot(results_U['distance'], results_U['concentrations'][f], 'b-', linewidth=2)
    axes[0].set_xlim(0, 20)
    axes[0].set_ylim(0, 275)
    axes[0].set_xlabel("Distance (m)")
    axes[0].set_ylabel("Concentration (μg/m³)")
    axes[0].set_title(f"U = {U_animate} m/s")
    axes[0].grid(True)

    # Subplot 2: Spatial resolution
    axes[1].plot(results_dx['distance'], results_dx['concentrations'][f], 'g-', linewidth=2)
    axes[1].set_xlim(0, 20)
    axes[1].set_ylim(0, 275)
    axes[1].set_xlabel("Distance (m)")
    axes[1].set_title(f"Δx = {dx_animate} m")
    axes[1].grid(True)

    # Subplot 3: Time step
    # Use matching frame for dt animation
    dt_frame = min(f, len(results_dt['concentrations']) - 1)
    axes[2].plot(results_dt['distance'], results_dt['concentrations'][dt_frame], 'r-', linewidth=2)
    axes[2].set_xlim(0, 20)
    axes[2].set_ylim(0, 275)
    axes[2].set_xlabel("Distance (m)")
    axes[2].set_title(f"Δt = {dt_animate} s")
    axes[2].grid(True)

    # Overall title with current time
    fig.suptitle(f"Sensitivity Analysis - Time = {f * 10:.0f} s", fontsize=14)

# Create animation
num_frames = len(results_U['concentrations'])
anim = animation.FuncAnimation(fig, animate, frames=num_frames, interval=100)
plt.tight_layout()
plt.close()
HTML(anim.to_jshtml())




## Test Case 4
Description:

Test to see how an exponentially decaying initial concentration of the pollutant in time alters your results.

In [None]:
# Model a source that decreases over time: C(t) = C0 * exp(-λt)

C0 = 250           # Initial concentration (µg/m³)
decay_rate = 0.01  # Decay rate λ (1/s)

# Run solver and store output in 'results4' for plotting
results4 = advection_solver(
    end_time=300, dt=10, length=20, dx=0.2, U=0.1, C0=C0,
    boundary_condition=lambda t: C0 * np.exp(-decay_rate * t)
)

plot_snapshots(results4, title=f"Test Case 4: Exponential Decay (λ={decay_rate})")
create_animation(results4, max_conc=C0)

## Test Case 5

Description:

Test to see how a variable stream velocity profile alters your results (for example, add a 10% random perturbation to the constant velocity profile).

In [None]:
# Spatially varying velocity with random perturbations

length, dx = 20, 0.2
num_points = int(length / dx) + 1
base_U = 0.1           # Base velocity (m/s)
perturbation = 0.4     # Perturbation fraction (±40%)

# Generate random velocity profile
np.random.seed(42)  # For reproducibility
variable_U = base_U * (1 + perturbation * (2*np.random.random(num_points) - 1))

# Plot velocity profile
fig, ax = plt.subplots(figsize=(10, 4))
ax.plot(np.linspace(0, length, num_points), variable_U, 'b-', linewidth=2)
ax.axhline(base_U, color='r', linestyle='--', label='Base U')
ax.set_xlabel("Distance (m)")
ax.set_ylabel("Velocity (m/s)")
ax.set_title(f"Variable Velocity Profile (±{perturbation*100:.0f}%)")
ax.legend()
ax.grid(True)
plt.show()

# Run simulation with variable velocity
results5 = advection_solver(
    end_time=300, dt=10, length=20, dx=0.2,
    U=variable_U,
    C0=250
)

plot_snapshots(results5, title=f"Test Case 5: Variable Velocity (±{perturbation*100:.0f}%)")
create_animation(results5, max_conc=250)
