# Parallelisation Benchmark (Strong Scaling)

A benchmarking workflow to measure **parallel scaling efficiency** of the OpenFOAM
CFD solver on the target HPC cluster.

The experiment fixes **one terrain**, **one wind direction**, and **one mesh** and
submits the same simulation with a range of core counts.  The resulting wall-clock
times are used to compute:

- **Speedup** = T(baseline) / T(N)
- **Efficiency** = Speedup / N

where `T(baseline)` is the wall time at the smallest core count tested.

The parallel decomposition is controlled by `system/decomposeParDict` inside each
case directory.  All variants share the **same mesh** — only the decomposition
and the SLURM resource allocation differ.

**Node arithmetic:** the cluster has **128 cores per node**, so:
```
nodes             = ceil(n_cores / 128)
ntasks_per_node   = min(n_cores, 128)
```

**Resume-safe:** Close and reopen at any time.  
All decisions are derived from `benchmark_metadata.json` files written into each
variant directory.

## 1. Configuration

Edit these settings before running the notebook.

In [None]:
import math
import os
import sys

# ── Paths ────────────────────────────────────────────────────────────────────
# Root of the CFD-dataset repository (directory containing this notebook)
REPO_ROOT = os.path.dirname(os.path.abspath("__file__"))

# Path to the single terrain directory to use for this benchmark
# (e.g. a folder produced by generateInputs.py under Data/downloads/)
FIXED_TERRAIN_DIR = os.path.join(REPO_ROOT, "Data", "downloads", "terrain_0001_example")

# Single wind direction to test (degrees, 0 = North)
FIXED_ROTATION_DEG = 270

# terrain_config.yaml — used as-is (no mesh modifications for this benchmark)
TERRAIN_CONFIG_PATH = os.path.join(REPO_ROOT, "terrain_config.yaml")

# Root output directory: one sub-folder will be created per core-count variant
CASES_OUTPUT_DIR = os.path.join(REPO_ROOT, "parallelisation_benchmark")

# Path to the taskManager submodule
TASK_MANAGER_DIR = os.path.join(REPO_ROOT, "taskManager")

# Remote HPC path on Deucalion (used by taskManager for rsync/sbatch)
DEUCALION_PATH = "/projects/EEHPC-BEN-2026B02-011/cfd_data"

# Number of parallel workers for local meshing
N_PARALLEL_WORKERS = 4

# ── Core-count configuration ──────────────────────────────────────────────────
# Number of physical cores per node on the cluster — do not change
CORES_PER_NODE = 128

# List of total core counts to benchmark (strong scaling series)
CORES_TO_TEST = [16, 32, 64, 128, 256, 512]

# SLURM settings common to all jobs
SLURM_PARTITION = "hpc"
SLURM_TIME      = "04:00:00"   # wall-clock time limit per job


def compute_slurm_resources(n_cores: int, cores_per_node: int = CORES_PER_NODE) -> tuple[int, int]:
    """
    Compute the SLURM --nodes and --ntasks-per-node values for a given
    total core count, assuming a fixed number of cores per node.

    The default value of *cores_per_node* is taken from the cell-level constant
    ``CORES_PER_NODE`` (defined above in this same cell), so the function must
    be called after that constant is set.  Pass an explicit value to override.

    Returns
    -------
    (nodes, ntasks_per_node)
    """
    nodes            = math.ceil(n_cores / cores_per_node)
    ntasks_per_node  = min(n_cores, cores_per_node)
    return nodes, ntasks_per_node


# ── Submodule path setup ──────────────────────────────────────────────────────
for _submod in ["terrain_following_mesh_generator", "ABL_BC_generator", TASK_MANAGER_DIR]:
    _p = _submod if os.path.isabs(_submod) else os.path.join(REPO_ROOT, _submod)
    if _p not in sys.path:
        sys.path.insert(0, _p)

print(f"REPO_ROOT           : {REPO_ROOT}")
print(f"FIXED_TERRAIN_DIR   : {FIXED_TERRAIN_DIR}")
print(f"FIXED_ROTATION_DEG  : {FIXED_ROTATION_DEG}")
print(f"TERRAIN_CONFIG_PATH : {TERRAIN_CONFIG_PATH}")
print(f"CASES_OUTPUT_DIR    : {CASES_OUTPUT_DIR}")
print(f"CORES_PER_NODE      : {CORES_PER_NODE}")
print(f"CORES_TO_TEST       : {CORES_TO_TEST}")
print()
print("SLURM resource allocation per variant:")
for n in CORES_TO_TEST:
    nodes, ntpn = compute_slurm_resources(n)
    print(f"  {n:>4} cores → {nodes} node(s), {ntpn} tasks/node")

## 2. Imports

In [None]:
import json
from pathlib import Path
from datetime import datetime

import yaml
import pandas as pd

# ── terrain_following_mesh_generator (submodule) ──────────────────────────────
try:
    from terrain_following_mesh_generator import terrain_mesh as tm
    _MESH_OK = True
    print("✓ terrain_following_mesh_generator imported")
except ImportError as _e:
    _MESH_OK = False
    print(f"✗ terrain_following_mesh_generator not available: {_e}")
    print("  Run: git submodule update --init --recursive")

# ── taskManager (submodule) ───────────────────────────────────────────────────
try:
    from taskManager import OpenFOAMCaseGenerator
    _TM_OK = True
    print("✓ taskManager imported")
except ImportError as _e:
    _TM_OK = False
    OpenFOAMCaseGenerator = None
    print(f"✗ taskManager not available: {_e}")
    print("  Run: git submodule update --init --recursive")

## 3. Generate Parallel Cases

For each core count in `CORES_TO_TEST`:
1. Compute `nodes` and `ntasks_per_node` via `compute_slurm_resources()`.
2. Create a case directory `parallel_bench_{n_cores}cores_{rotation:03d}deg/`.
3. Write a `benchmark_metadata.json` recording the variant parameters.

**All variants use the same terrain mesh** (no mesh configuration changes).
The only difference between variants is the parallel decomposition and the
SLURM resource allocation.

Variants that already have a `benchmark_metadata.json` are **skipped** (resume-safe).

In [None]:
def _parallel_bench_dir(cases_output_dir: str, n_cores: int, rotation: int) -> Path:
    return Path(cases_output_dir) / f"parallel_bench_{n_cores}cores_{rotation:03d}deg"


# ── Ensure output root exists ─────────────────────────────────────────────────
Path(CASES_OUTPUT_DIR).mkdir(parents=True, exist_ok=True)

# ── Create variant directories and metadata ───────────────────────────────────
for n_cores in CORES_TO_TEST:
    nodes, ntasks_per_node = compute_slurm_resources(n_cores)
    var_dir   = _parallel_bench_dir(CASES_OUTPUT_DIR, n_cores, FIXED_ROTATION_DEG)
    meta_file = var_dir / "benchmark_metadata.json"

    # Resume check — skip if metadata already written
    if meta_file.exists():
        print(f"  ↷ {n_cores} cores: metadata already exists, skipping")
        continue

    var_dir.mkdir(parents=True, exist_ok=True)

    meta = {
        "n_cores"         : n_cores,
        "nodes"           : nodes,
        "ntasks_per_node" : ntasks_per_node,
        "rotation"        : FIXED_ROTATION_DEG,
        "terrain_dir"     : str(FIXED_TERRAIN_DIR),
        "status"          : "pending",
        "created_at"      : datetime.now().isoformat(),
    }
    with open(meta_file, "w") as fh:
        json.dump(meta, fh, indent=2)
    print(f"  ✓ {n_cores} cores: created {var_dir.name}")

print("\nVariant directory setup complete.")

## 4. Status Scanner

Reads every `benchmark_metadata.json` found under `CASES_OUTPUT_DIR` and assembles a
pandas DataFrame.  **Re-run this cell at any time to refresh the view.**

In [None]:
def scan_parallel_bench_status(cases_output_dir: str) -> pd.DataFrame:
    """Scan variant directories and return a status DataFrame."""
    output_path = Path(cases_output_dir)
    records = []

    if not output_path.exists():
        print(f"⚠  Output directory does not exist yet: {cases_output_dir}")
        print("   Run Section 3 first to create the variant directories.")
    else:
        for meta_file in sorted(output_path.glob("parallel_bench_*/benchmark_metadata.json")):
            try:
                with open(meta_file) as fh:
                    meta = json.load(fh)
            except (json.JSONDecodeError, OSError) as exc:
                print(f"⚠  Could not read {meta_file}: {exc}")
                continue

            # Merge in case_status.json from taskManager if present
            case_status_file = meta_file.parent / "case_status.json"
            case_status      = {}
            if case_status_file.exists():
                try:
                    with open(case_status_file) as fh:
                        case_status = json.load(fh)
                except (json.JSONDecodeError, OSError):
                    pass

            records.append({
                "n_cores"         : meta.get("n_cores"),
                "nodes"           : meta.get("nodes"),
                "ntasks_per_node" : meta.get("ntasks_per_node"),
                "pipeline_status" : meta.get("status"),
                "mesh_status"     : case_status.get("mesh_status"),
                "job_id"          : case_status.get("job_id"),
                "job_status"      : case_status.get("job_status"),
                "wall_time"       : case_status.get("wall_time"),
                "last_checked"    : case_status.get("last_checked"),
                "variant_dir"     : str(meta_file.parent),
            })

    columns = [
        "n_cores", "nodes", "ntasks_per_node", "pipeline_status",
        "mesh_status", "job_id", "job_status", "wall_time", "last_checked", "variant_dir",
    ]
    df = pd.DataFrame(records, columns=columns) if records else pd.DataFrame(columns=columns)
    # Sort by core count for readability
    if not df.empty and "n_cores" in df.columns:
        df = df.sort_values("n_cores").reset_index(drop=True)
    return df


df_pbench = scan_parallel_bench_status(CASES_OUTPUT_DIR)
print(f"Scanned {len(df_pbench)} variant(s) at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

## 5. Summary Dashboard

In [None]:
STATUS_ICONS = {
    "complete" : "★",
    "pending"  : "○",
    "failed"   : "✗",
    "running"  : "▶",
    "meshed"   : "✓",
    "meshing"  : "⟳",
}

total = len(df_pbench)
print(f"{'='*55}")
print(f"  PARALLELISATION BENCHMARK — STATUS SUMMARY")
print(f"  Terrain  : {Path(FIXED_TERRAIN_DIR).name}")
print(f"  Rotation : {FIXED_ROTATION_DEG}°")
print(f"  {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*55}")
print(f"  Total variants : {total}")
print()
if total > 0:
    counts = df_pbench["pipeline_status"].value_counts()
    for status, n in counts.items():
        icon = STATUS_ICONS.get(str(status), "?")
        print(f"  {icon}  {str(status):<16} : {n}")
print(f"{'='*55}")

In [None]:
# ── Full variant table ────────────────────────────────────────────────────────
if not df_pbench.empty:
    display(
        df_pbench[
            ["n_cores", "nodes", "ntasks_per_node", "pipeline_status",
             "mesh_status", "job_id", "job_status", "wall_time", "last_checked"]
        ].reset_index(drop=True)
    )
else:
    print("No variants found. Run Section 3 first.")

## 6. Generate & Mesh Cases

All parallelisation benchmark variants share **the same mesh** — the mesh is
built once (for the first variant or the shared case) and then the decomposition
is varied in Section 7.

This section uses `taskManager` to:
1. Generate OpenFOAM case directories from the fixed terrain inputs.
2. Run `blockMesh` / `snappyHexMesh` for cases that have not yet been meshed.

**Resume-safe:** cases that already have `mesh_status == DONE` are skipped.

In [None]:
if not _TM_OK:
    print("✗ taskManager not available — cannot generate or mesh cases.")
    print("  Run: git submodule update --init --recursive")
else:
    generator = OpenFOAMCaseGenerator(
        template_path=os.path.join(TASK_MANAGER_DIR, "template"),
        input_dir=CASES_OUTPUT_DIR,
        output_dir=CASES_OUTPUT_DIR,
        deucalion_path=DEUCALION_PATH,
    )

    df_pbench = scan_parallel_bench_status(CASES_OUTPUT_DIR)

    # ── Generate OpenFOAM cases for variants that do not have one yet ─────────
    for _, row in df_pbench.iterrows():
        n_cores  = row["n_cores"]
        var_dir  = Path(row["variant_dir"])
        cs_file  = var_dir / "case_status.json"

        if cs_file.exists():
            with open(cs_file) as fh:
                cs = json.load(fh)
            if (cs.get("mesh_status") or "").upper() in ("DONE", "IN_PROGRESS"):
                print(f"  ↷ {n_cores} cores: already meshed / meshing, skipping case generation")
                continue

        try:
            output_path = generator.setup_case({"variant_dir": str(var_dir), "n_cores": n_cores})
            print(f"  ✓ {n_cores} cores: case created at {output_path}")
        except Exception as exc:
            print(f"  ✗ {n_cores} cores: case setup failed: {exc}")

    # ── Mesh cases that are ready ──────────────────────────────────────────────
    # Since all variants share the same mesh parameters, we mesh every case that
    # has not yet been meshed.  In practice only the first run will do real work.
    df_pbench = scan_parallel_bench_status(CASES_OUTPUT_DIR)
    ready_cases = [
        str(Path(row["variant_dir"]))
        for _, row in df_pbench.iterrows()
        if row["mesh_status"] in (None, "NOT_RUN", "")
    ]

    if not ready_cases:
        print("\nNo cases ready for meshing.")
    else:
        print(f"\nMeshing {len(ready_cases)} case(s) with {N_PARALLEL_WORKERS} worker(s)…")
        results = generator.mesh_cases_parallel(ready_cases, n_workers=N_PARALLEL_WORKERS)
        succeeded = sum(results)
        failed    = len(results) - succeeded
        print(f"Meshing complete: {succeeded} succeeded, {failed} failed.")
        print("Re-run Section 4 to refresh the dashboard.")

## 7. Update decomposeParDict

Write or overwrite `system/decomposeParDict` for each variant case so that
OpenFOAM uses the correct number of subdomains when the solver is started in
parallel.

The `scotch` method is used by default because it requires no geometric input
from the user and typically produces well-balanced partitions.

In [None]:
DECOMPOSE_METHOD = "scotch"   # alternative: "simple" (requires coefficients)


def write_decompose_par_dict(case_dir: Path, n_subdomains: int, method: str = "scotch") -> Path:
    """
    Write (or overwrite) system/decomposeParDict for *case_dir*.

    Returns the path of the written file.
    """
    system_dir = case_dir / "system"
    system_dir.mkdir(parents=True, exist_ok=True)

    content = f"""/*--------------------------------*- C++ -*----------------------------------*\\
  =========                 |
  \\\\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
   \\\\    /   O peration     |
    \\\\  /    A nd           |
     \\\\/     M anipulation  |
\\*---------------------------------------------------------------------------*/
FoamFile
{{
    version     2.0;
    format      ascii;
    class       dictionary;
    location    "system";
    object      decomposeParDict;
}}
// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //

numberOfSubdomains  {n_subdomains};

method              {method};

// ************************************************************************* //
"""
    out_path = system_dir / "decomposeParDict"
    out_path.write_text(content)
    return out_path


# ── Apply to every variant ────────────────────────────────────────────────────
df_pbench = scan_parallel_bench_status(CASES_OUTPUT_DIR)

for _, row in df_pbench.iterrows():
    n_cores = int(row["n_cores"])
    var_dir = Path(row["variant_dir"])

    out_path = write_decompose_par_dict(var_dir, n_subdomains=n_cores, method=DECOMPOSE_METHOD)
    print(f"  ✓ {n_cores:>4} cores → {out_path.relative_to(Path(CASES_OUTPUT_DIR))}")

# ── Show the generated file for the first variant as a sanity check ───────────
if not df_pbench.empty:
    sample_path = Path(df_pbench.iloc[0]["variant_dir"]) / "system" / "decomposeParDict"
    if sample_path.exists():
        print(f"\nSample decomposeParDict ({df_pbench.iloc[0]['n_cores']} cores):")
        print(sample_path.read_text())

## 8. Submit to HPC

Submits each meshed variant to SLURM.  The `--nodes` and `--ntasks-per-node`
SLURM options are set **per variant** according to `compute_slurm_resources()`.

In [None]:
if not _TM_OK:
    print("✗ taskManager not available — cannot submit jobs.")
else:
    df_pbench = scan_parallel_bench_status(CASES_OUTPUT_DIR)

    # Select meshed variants that have not yet been submitted
    ready_to_submit = df_pbench[
        (df_pbench["mesh_status"] == "DONE") &
        (df_pbench["job_id"].isna())
    ]

    if ready_to_submit.empty:
        print("No meshed cases ready for submission.")
        print("Possible reasons:")
        print("  • Meshing has not finished yet — re-run Section 6.")
        print("  • All variants have already been submitted.")
    else:
        print(f"Submitting {len(ready_to_submit)} variant(s) to SLURM…")
        for _, row in ready_to_submit.iterrows():
            n_cores         = int(row["n_cores"])
            nodes           = int(row["nodes"])
            ntasks_per_node = int(row["ntasks_per_node"])
            var_path        = row["variant_dir"]
            try:
                job_id = generator.submit_case(
                    case_path=var_path,
                    partition=SLURM_PARTITION,
                    time=SLURM_TIME,
                    nodes=nodes,
                    ntasks_per_node=ntasks_per_node,
                )
                print(f"  ✓ {n_cores:>4} cores ({nodes}×{ntasks_per_node}): job_id={job_id}")
            except Exception as exc:
                print(f"  ✗ {n_cores:>4} cores: submission failed: {exc}")

        print("\nSubmission complete. Re-run Section 4 to refresh the dashboard.")

## 9. Refresh Job Statuses

Polls the SLURM scheduler for every submitted variant and writes the updated status
back to each `case_status.json`.  Requires SSH access to the `deucalion` host.

In [None]:
if not _TM_OK:
    print("✗ taskManager not available.")
else:
    df_pbench = scan_parallel_bench_status(CASES_OUTPUT_DIR)
    submitted = df_pbench[df_pbench["job_id"].notna()]

    if submitted.empty:
        print("No submitted jobs to refresh.")
    else:
        print(f"Refreshing {len(submitted)} job status(es)…")
        for _, row in submitted.iterrows():
            try:
                new_status = generator.update_job_status(row["variant_dir"])
                print(f"  {int(row['n_cores']):>4} cores: {new_status}")
            except Exception as exc:
                print(f"  ✗ {int(row['n_cores']):>4} cores: {exc}")

        print("\nJob statuses updated. Re-run Section 4 to refresh the dashboard.")

## 10. Results: Scaling Analysis

Parse wall-clock times from SLURM log files or `log.simpleFoam` and compute
the classic strong-scaling metrics:

| Metric | Formula |
|---|---|
| Speedup | T(N_baseline) / T(N) |
| Efficiency | Speedup / N × 100 % |

Re-run this cell after all jobs have reached `COMPLETED` status.

In [None]:
import re

try:
    import matplotlib.pyplot as plt
    _PLT_OK = True
except ImportError:
    _PLT_OK = False
    print("⚠  matplotlib not available — plots will be skipped.")


def _parse_solver_time(case_path: Path) -> float | None:
    """
    Extract total execution time (seconds) from log.simpleFoam.
    Looks for the final 'ExecutionTime = X s' line.
    """
    log_file = case_path / "log.simpleFoam"
    if not log_file.exists():
        return None
    text = log_file.read_text(errors="replace")
    matches = re.findall(r"ExecutionTime\s*=\s*([0-9.]+)\s*s", text)
    if matches:
        try:
            return float(matches[-1])
        except ValueError:
            return None
    return None


# ── Build results table ───────────────────────────────────────────────────────
df_pbench = scan_parallel_bench_status(CASES_OUTPUT_DIR)

results = []
for _, row in df_pbench.iterrows():
    n_cores   = int(row["n_cores"])
    var_path  = Path(row["variant_dir"])
    # Prefer wall_time from case_status.json; fall back to solver log
    wall_time = row.get("wall_time") or _parse_solver_time(var_path)
    results.append({
        "n_cores"    : n_cores,
        "nodes"      : row["nodes"],
        "job_status" : row["job_status"],
        "wall_time_s": wall_time,
    })

df_results = pd.DataFrame(results).sort_values("n_cores").reset_index(drop=True)

# ── Compute speedup and efficiency ────────────────────────────────────────────
t_baseline = None
n_baseline = None
valid_times = df_results[df_results["wall_time_s"].notna()]
if not valid_times.empty:
    t_baseline = float(valid_times.iloc[0]["wall_time_s"])
    n_baseline = int(valid_times.iloc[0]["n_cores"])

def _speedup(t, t0):
    return t0 / t if (t and t0 and t > 0) else None

def _efficiency(speedup, n, n0):
    return speedup / (n / n0) * 100 if (speedup and n and n0) else None

df_results["speedup"]    = df_results["wall_time_s"].apply(lambda t: _speedup(t, t_baseline))
df_results["efficiency"] = df_results.apply(
    lambda r: _efficiency(r["speedup"], r["n_cores"], n_baseline), axis=1
)

print("Scaling analysis:")
display(df_results.reset_index(drop=True))

if t_baseline is None:
    print("\n⚠  No completed jobs with wall times yet.")
    print("   Wait for jobs to finish and re-run this cell.")

In [None]:
# ── Scaling plots ─────────────────────────────────────────────────────────────
if not _PLT_OK:
    print("matplotlib not available — install with: pip install matplotlib")
elif valid_times.empty:
    print("No completed results to plot yet.")
else:
    plot_df = df_results.dropna(subset=["wall_time_s"])
    cores   = plot_df["n_cores"].astype(int).tolist()
    speedup = plot_df["speedup"].tolist()
    effic   = plot_df["efficiency"].tolist()

    fig, axes = plt.subplots(1, 2, figsize=(12, 5))

    # ── Speedup plot ──────────────────────────────────────────────────────────
    axes[0].plot(cores, speedup, "o-", color="steelblue", linewidth=2, label="Measured")
    axes[0].plot(
        [cores[0], cores[-1]],
        [1, cores[-1] / cores[0]],
        "--", color="gray", linewidth=1, label="Ideal (linear)"
    )
    axes[0].set_xlabel("Number of cores")
    axes[0].set_ylabel("Speedup")
    axes[0].set_title("Strong Scaling — Speedup")
    axes[0].legend()
    axes[0].grid(True, linestyle="--", alpha=0.5)

    # ── Efficiency plot ───────────────────────────────────────────────────────
    axes[1].plot(cores, effic, "s-", color="darkorange", linewidth=2)
    axes[1].axhline(y=100, color="gray", linestyle="--", linewidth=1, label="Ideal (100%)")
    axes[1].set_xlabel("Number of cores")
    axes[1].set_ylabel("Parallel efficiency (%)")
    axes[1].set_title("Strong Scaling — Efficiency")
    axes[1].legend()
    axes[1].grid(True, linestyle="--", alpha=0.5)

    plt.tight_layout()
    plt.show()