author: Piotr Karaś

# AGH Modelling of Physical Systems Lab 6 (2025 April 8th)
Sylwester Arabas (sylwester.arabas@agh.edu.pl) & Emma Ware (ecware@ucdavis.edu) 

**1. particle-resolved random-sampled state representation for an advection problem**

In [None]:
"""Lab06"""
from types import SimpleNamespace

import numpy as np
import scipy
from attr import dataclass
from matplotlib import pyplot
from open_atmos_jupyter_utils import show_plot

In [None]:
PARAMS_P = SimpleNamespace(
    n_part=1000,
    norm=1e10,
    dist=scipy.stats.norm(loc=250, scale=50),
)

PARAMS_X = SimpleNamespace(
    span=1000,
    n_cell=50,
)
PARAMS_X.step = PARAMS_X.span / PARAMS_X.n_cell

RNG = np.random.default_rng(seed=44)

In [None]:
def sample(*, params_p, params_x, rng):
    """ samples a particle population specified onto a grid and returns
    a collection indexed by sampling type, with values composed of `cell` and `mult`
    integer-valued arrays with cell IDs and multiplicities, respectively """
    u01 = rng.uniform(0, 1, size=params_p.n_part)
    return {
        k: {
            'cell': (v['x'] / params_x.step).astype(int),
            'mult': np.round(v['y'] * params_p.norm).astype(int),
        }
        for k, v in
        {
            'sampling: uniform random in x': {
                'x': u01 * params_x.span,
                'y': params_p.dist.pdf(u01 * params_x.span) * params_x.span / params_p.n_part,
            },
            'sampling: constant multiplicity': {
                'x': params_p.dist.ppf(u01),
                'y': np.full(shape=params_p.n_part, fill_value=1 / params_p.n_part),
            }
        }.items()
    }


PARTICLES = sample(params_p=PARAMS_P, params_x=PARAMS_X, rng=RNG)

In [None]:
@dataclass
class PlotRequest:
    """Dataclass representing a request for plot function."""
    particles: np.ndarray
    params_p: SimpleNamespace
    params_x: SimpleNamespace
    rng: np.random.Generator
    title: str = ""
    shift: int = 0


def plot(*, plot_request: PlotRequest) -> None:
    """ plots the particle state as both a histogram as well as population scatter plot
    (with random coordinates shuffled for the purpose of plotting) """
    particles = plot_request.particles
    params_p = plot_request.params_p
    params_x = plot_request.params_x

    _, axs = pyplot.subplot_mosaic(
        [['hist'], ['part']],
        figsize=(11, 6),
        sharex=True,
        tight_layout=True,
    )
    u01 = plot_request.rng.uniform(0, 1, params_p.n_part)
    scale = params_p.norm / params_p.n_part
    for k in particles:
        axs['hist'].hist(
            x=particles[k]['cell'],
            weights=particles[k]['mult'] / params_p.norm,
            bins=params_x.n_cell,
            range=(0, params_x.n_cell),
            label=f'{k}',
            alpha=.666
        )
        axs['part'].scatter(
            particles[k]['cell'] + u01,
            tuple(reversed(u01)),
            s=.25 + 2 * particles[k]['mult'] / scale
        )
    pdf_x = np.linspace(0, params_x.span, 256)
    pdf_y = params_p.dist.pdf(pdf_x - plot_request.shift)
    axs['hist'].plot(pdf_x / params_x.step, pdf_y * params_x.step, color='black', label='analytic')
    axs['hist'].set_xlim(0, params_x.n_cell)
    axs['hist'].legend()
    axs['hist'].set_ylabel('pdf(x) ⋅ Δx [1]')
    axs['hist'].set_title(plot_request.title or f'{params_p.n_part=}   {params_x.n_cell=}')
    axs['part'].set_xticks(np.arange(params_x.n_cell + 1))
    axs['part'].xaxis.set_tick_params(rotation=75)
    axs['part'].set_xlabel('x / Δx [1]')
    axs['part'].set_yticks([])
    axs['part'].set_ylim(0, 1)
    for axes in axs.values():
        axes.grid()
    show_plot()


plot(
    plot_request=PlotRequest(
        particles=PARTICLES,
        params_p=PARAMS_P,
        params_x=PARAMS_X,
        rng=RNG,
    ),
)

**2. Monte-Carlo representation of advection dynamics**

In [None]:
PARAMS_T = SimpleNamespace(
    span=300,
    n_step=30,
    wind=1.5
)
PARAMS_T.step = PARAMS_T.span / PARAMS_T.n_step
PARAMS_T.courant_number = PARAMS_T.wind / PARAMS_X.step * PARAMS_T.step

In [None]:
def advect(*, params_p, params_t, particles, rng):
    """ performs Monte-Carlo advection of the particles """
    probability_of_shift = abs(params_t.courant_number)
    assert probability_of_shift < 1

    sign = int(abs(params_t.courant_number) / params_t.courant_number)
    for _ in range(params_t.n_step):
        u01 = rng.uniform(0, 1, params_p.n_part)
        for part in particles.values():
            part['cell'] += (probability_of_shift > u01) * sign


advect(particles=PARTICLES, params_t=PARAMS_T, params_p=PARAMS_P, rng=RNG)

In [None]:
plot(
    plot_request=PlotRequest(
        particles=PARTICLES,
        params_x=PARAMS_X,
        params_p=PARAMS_P,
        rng=RNG,
        shift=PARAMS_T.wind * PARAMS_T.span,
        title=f'{PARAMS_T.courant_number=:.3g}  {PARAMS_T.span=}',
    ),
)

**3. Assignment**

In [None]:
PARAMS_P_2D = SimpleNamespace(
    n_part=8000,
    norm=1e10,
    dist=scipy.stats.multivariate_normal(
        mean=[500, 500],
        cov=[[2500, 0], [0, 2500]],
    ),
)

PARAMS_X_2D = SimpleNamespace(
    span=1000,
    n_cell_dim=32,
)
PARAMS_X_2D.step = PARAMS_X_2D.span / PARAMS_X_2D.n_cell_dim

PARAMS_T_2D = SimpleNamespace(
    span = 50,
    n_step = 100,
)

COURANT_NUMBER = 0.8

In [None]:
def plot_particles_2d(particle_dict, params_p, params_2d, rng):
    """
    Plot particle distribution in 2D with a histogram background and scatter overlay.
    """
    edges = np.linspace(0, params_2d.span, params_2d.n_cell_dim + 1)
    norm_factor = params_p.norm / params_p.n_part
    ticks = edges[::5] / params_2d.step

    for label, dataset in particle_dict.items():
        fig, ax = pyplot.subplots()
        ax.set_title(f"n_part={params_p.n_part} n_cell_dim={params_2d.n_cell_dim}")
        ax.set_xticks(ticks)
        ax.set_yticks(ticks)
        ax.grid(True)

        cells = dataset["cell"]
        multipliers = dataset["mult"]

        displacements = rng.uniform([0, 0], [1, 1], (params_p.n_part, 2))

        sizes = multipliers / norm_factor * 0.125 + 0.01

        ax.scatter(
            cells[:, 0] + displacements[:, 0],
            cells[:, 1] + displacements[:, 1],
            s=sizes,
            alpha=0.5,
            label=label,
        )

        ax.hist2d(
            cells[:, 0] + 0.5,
            cells[:, 1] + 0.5,
            bins=params_2d.n_cell_dim,
            range=[[0, params_2d.n_cell_dim], [0, params_2d.n_cell_dim]],
            weights=multipliers / params_p.norm,
            alpha=0.5
        )

        fig.tight_layout()
        pyplot.show()

In [None]:
def plot_analytic_solution(density_values, params_d) -> None:
    """Function for plotting the analytic solution."""
    pyplot.figure(figsize=(8, 6))
    pyplot.imshow(density_values, extent=(0, params_d.span, 0, params_d.span))
    pyplot.colorbar(label="density")
    show_plot()

In [None]:
def sample_2d(params_p, params_2d, rng):
    """
    Function that samples a particle population specified onto a grid and returns
    a sampled particle population.
    """
    u01 = rng.uniform([0.0, 0.0], [1.0, 1.0], size=(params_p.n_part, 2))
    return {
        k: {
            'cell': (v['cell'] / params_2d.step).astype(int),
            'mult': np.round(v['mult'] * params_p.norm).astype(int),
        }
        for k,v in
        {
            'sampling: uniform random in x-y': {
                'cell': u01 * params_2d.span,
                'mult': params_p.dist.pdf(u01 * params_2d.span) \
                    * params_2d.span ** 2 / params_p.n_part
            },
            'sampling: constant multiplicity': {
                'cell': params_p.dist.rvs(params_p.n_part),
                'mult': np.ones(params_p.n_part) / params_p.n_part
            }
        }.items()
    }

In [None]:
PARTICLES_2D = sample_2d(params_p=PARAMS_P_2D, params_2d=PARAMS_X_2D, rng=RNG)

In [None]:
plot_particles_2d(
    particle_dict=PARTICLES_2D,
    params_2d=PARAMS_X_2D,
    params_p=PARAMS_P_2D,
    rng=RNG,
)

In [None]:
def calculate_advection_2d_analytic(params_d, params_p, t):
    """
    Compute analytic advection in 2D over a grid of cell corners.

    Parameters:
    - params_d: object with attributes n_cell_dim (int) and step (float)
    - params_p: object with attributes dist (scipy.stats distribution) and norm (float)
    - t: time scalar for advection

    Returns:
    - 2D numpy array of density values scaled by cell area and normalization
    """
    coords = np.arange(params_d.n_cell_dim + 1) * params_d.step
    x_vals, y_vals = np.meshgrid(coords, coords, indexing='xy')

    shift = COURANT_NUMBER * t
    x_adv = x_vals + shift
    y_adv = y_vals + shift

    positions = np.stack((x_adv, y_adv), axis=-1)
    pdf_vals = params_p.dist.pdf(positions)

    area = params_d.step ** 2
    return pdf_vals * area * params_p.norm


In [None]:
def rmse(a, b) -> float:
    """Function for calculating the root mean square error."""
    return np.sqrt(np.mean((a - b) ** 2))

In [None]:
def calculate_advection_2d_monte_carlo(
    params_d,
    params_p,
    params_t,
    particles,
    rng,
) -> dict[str, list[float]]:
    """
    Perform Monte Carlo advection in 2D and compute RMSE against analytic solution.

    Parameters:
    - params_d: object with n_cell_dim (int) and step (float)
    - params_p: object with norm (float) and n_part (int)
    - params_t: object with n_step (int)
    - particles: dict of sampling strategies (each having "cell" and "mult")
    - rng: numpy-compatible random generator

    Returns:
    - Dictionary mapping strategy names to lists of RMSE values over time steps
    """
    def _move_cells(cells, rand_x, rand_y):
        x_coords, y_coords = cells[:, 0], cells[:, 1]
        escaped_x = (x_coords < 0) | (x_coords >= params_d.n_cell_dim)
        escaped_y = (y_coords < 0) | (y_coords >= params_d.n_cell_dim)
        move_x = (rand_x < abs(COURANT_NUMBER)).astype(int) * int(np.sign(COURANT_NUMBER))
        move_y = (rand_y < abs(COURANT_NUMBER)).astype(int) * int(np.sign(COURANT_NUMBER))
        cells[:, 0] += move_x * (~escaped_x).astype(int)
        cells[:, 1] += move_y * (~escaped_y).astype(int)

    def _compute_rmse(analytic_sol, all_parts, label):
        cell_positions = all_parts[label]["cell"]
        counts = np.histogram2d(
            cell_positions[:, 0] + 0.5,
            cell_positions[:, 1] + 0.5,
            bins=(params_d.n_cell_dim + 1,) * 2,
            range=((0, params_d.n_cell_dim),) * 2,
            weights=all_parts[label]["mult"],
        )[0]
        return rmse(analytic_sol / params_p.norm, counts / params_p.norm)

    rmse_history_: dict[str, list[float]] = {}
    for step_idx in range(params_t.n_step):
        time = step_idx * params_t.n_step
        analytic_solution = calculate_advection_2d_analytic(params_d, params_p, time)
        rand_u = rng.uniform(0, 1, params_p.n_part)
        rand_v = rng.uniform(0, 1, params_p.n_part)

        for strategy, sample_ in particles.items():
            _move_cells(sample_["cell"], rand_u, rand_v)
            rmse_history_.setdefault(strategy, []).append(
                _compute_rmse(analytic_solution, particles, strategy)
            )
        break

    return rmse_history_



rmse_history = calculate_advection_2d_monte_carlo(
    params_d=PARAMS_X_2D,
    particles=PARTICLES_2D,
    params_t=PARAMS_T_2D,
    params_p=PARAMS_P_2D,
    rng=RNG,
)

In [None]:
plot_analytic_solution(
    density_values=calculate_advection_2d_analytic(
        params_d=PARAMS_X_2D,
        params_p=PARAMS_P_2D,
        t=0,
    ),
    params_d=PARAMS_X_2D,
)

plot_analytic_solution(
    density_values=calculate_advection_2d_analytic(
        params_d=PARAMS_X_2D,
        params_p=PARAMS_P_2D,
        t=PARAMS_T_2D.span,
    ),
    params_d=PARAMS_X_2D,
)

In [None]:
plot_particles_2d(
    particle_dict=PARTICLES_2D,
    params_2d=PARAMS_X_2D,
    params_p=PARAMS_P_2D,
    rng=RNG,
)

In [None]:
!nbqa pylint stochasticadv.ipynb