# Probability distribution analysis - Routine(Frequency-Testing)

This notebook provides a **clean, reproducible routine** for **frequency analysis** and **probability distribution testing** using annual hydroclimate time series derived from gridded NetCDF data (e.g., runoff, precipitation).

**Typical outputs (for Supplementary Information):**
- Annual aggregation (calendar year or water year)
- Distribution fitting (**Normal**, **Gumbel (EV1)**, **GEV**, **Log-Pearson Type III**)
- Goodness-of-fit (GOF) table (KS statistic/p-value + AIC/BIC)
- Return levels for selected return periods (e.g., 2–100 years)
- Optional: pixel-wise return level maps + NetCDF export


## Data source note (CMIP6)

**CMIP6 hydroclimate projections used in this workflow can be obtained from ORNL IMPACT:**
https://impact.ornl.gov/en/datasets/cmip6-based-multi-model-hydroclimate-projection-over-the-contermi/

(Kept here so readers can trace dataset provenance.)


## How to use

1. Set paths in **Configuration** (NetCDF file(s), variable name hint, region shapefile).
2. Run **Example A** for a 1D (region-mean) frequency analysis.
3. (Optional) Run **Example B** for a pixel-wise return level map.

Notes:
- For frequency analysis, **annual maxima** (block maxima) is common, but this notebook also supports **annual totals**.
- Log-Pearson III requires **strictly positive** samples (values ≤ 0 are dropped for that fit).


In [None]:
# Optional (uncomment if needed in a fresh environment):
# !pip install -q numpy pandas xarray netCDF4 h5netcdf cftime scipy matplotlib geopandas rioxarray shapely


In [None]:
import os
import warnings

import numpy as np
import pandas as pd
import xarray as xr

import matplotlib.pyplot as plt

from scipy.stats import norm, gumbel_r, genextreme as gev, pearson3
from scipy.stats import kstest

# Optional geospatial dependencies (needed only for clipping/mapping)
import geopandas as gpd
import rioxarray  # activates the `.rio` accessor
from shapely.geometry import mapping

warnings.filterwarnings("ignore", category=FutureWarning)


In [None]:
# =========================
# Configuration
# =========================

# CMIP6 hydroclimate projection data source (ORNL IMPACT):
# https://impact.ornl.gov/en/datasets/cmip6-based-multi-model-hydroclimate-projection-over-the-contermi/

# --- Input NetCDF(s) ---
# Provide a single file OR a list of files. If you have separate historical/future files, set both.
RUNOFF_HIST_FILE = r"/path/to/runoff_monthly_1980_2019.nc"
RUNOFF_FUT_FILE  = r"/path/to/runoff_monthly_2020_2059.nc"   # optional

# If you only have one file with the full period, put it in RUNOFF_HIST_FILE and set RUNOFF_FUT_FILE=None
# RUNOFF_FUT_FILE = None

# --- Variable name ---
# The code will try to auto-detect, but this hint helps (common names: "runoff", "mrro", "RAIN", "qtot")
VAR_HINT = "runoff"

# --- Region of interest ---
REGION_SHP = r"/path/to/region_boundary.shp"  # e.g., Iowa state boundary
REGION_ID_FIELD = None  # set to a field name if you want to select one polygon; None = use all geometries

# --- Annual aggregation ---
USE_WATER_YEAR = False  # False = calendar year (Jan-Dec); True = water year (Oct-Sep)

# Choose one:
ANNUAL_STATISTIC = "total"  # "total" or "max"
# "total": sum monthly values within each year
# "max"  : maximum monthly value within each year (block maxima)

# --- Periods (used for slicing the annual series) ---
HIST_Y0, HIST_Y1 = 1980, 2022
FUT_Y0,  FUT_Y1  = 2023, 2059

# --- Return periods (years) ---
RETURN_PERIODS = [2, 5, 10, 25, 50, 100]

# --- Output directory ---
OUT_DIR = r"/path/to/output_folder"
os.makedirs(OUT_DIR, exist_ok=True)


In [None]:
# =========================
# Utilities
# =========================

def _find_data_var(ds: xr.Dataset, var_hint: str | None = None) -> str:
    # Pick a reasonable data variable from a Dataset
    if var_hint and var_hint in ds.data_vars:
        return var_hint
    for candidate in ["runoff", "mrro", "RAIN", "qtot", "Q", "discharge"]:
        if candidate in ds.data_vars:
            return candidate
    if len(ds.data_vars) == 1:
        return list(ds.data_vars)[0]
    raise ValueError(f"Could not determine variable. Available: {list(ds.data_vars)}")

def _open_netcdf(paths):
    # Open one or many NetCDF files robustly
    if paths is None:
        return None
    if isinstance(paths, (list, tuple)):
        ds = xr.open_mfdataset(list(paths), combine="by_coords", parallel=True, use_cftime=True)
    else:
        ds = xr.open_dataset(paths, use_cftime=True)
    return ds

def _ensure_latlon(da: xr.DataArray) -> xr.DataArray:
    # Ensure the DataArray is recognized as having spatial dimensions that rioxarray can work with
    dims = list(da.dims)
    lat_dim = next((d for d in dims if d.lower() in ("lat","latitude","y")), None)
    lon_dim = next((d for d in dims if d.lower() in ("lon","longitude","x")), None)
    if lat_dim is None or lon_dim is None:
        raise ValueError(f"Could not infer lat/lon dims from {dims}")
    da = da.rio.set_spatial_dims(x_dim=lon_dim, y_dim=lat_dim, inplace=False)
    if da.rio.crs is None:
        da = da.rio.write_crs("EPSG:4326", inplace=False)
    return da

def clip_to_region(da: xr.DataArray, shp_path: str, id_field: str | None = None, id_value=None) -> xr.DataArray:
    # Clip a gridded DataArray to a polygon region (shapefile)
    gdf = gpd.read_file(shp_path)
    if id_field is not None and id_value is not None:
        gdf = gdf[gdf[id_field] == id_value]
    if gdf.empty:
        raise ValueError("Region shapefile selection returned no geometries.")
    gdf = gdf.to_crs("EPSG:4326")
    da = _ensure_latlon(da)
    return da.rio.clip(gdf.geometry.apply(mapping), gdf.crs, drop=True)

def annual_aggregate(da_monthly: xr.DataArray, statistic: str = "total", water_year: bool = False) -> xr.DataArray:
    # Convert monthly data to annual series (total or max)
    if "time" not in da_monthly.dims:
        raise ValueError("Expected a 'time' dimension.")
    t = da_monthly["time"]

    if water_year:
        year = xr.where(t.dt.month >= 10, t.dt.year + 1, t.dt.year)  # label by ending year
    else:
        year = t.dt.year

    if statistic == "total":
        ann = da_monthly.groupby(year).sum("time", skipna=True)
    elif statistic == "max":
        ann = da_monthly.groupby(year).max("time", skipna=True)
    else:
        raise ValueError("statistic must be 'total' or 'max'")

    ann = ann.rename({"group": "year"} if "group" in ann.dims else {})
    ann.attrs.update(da_monthly.attrs)
    ann.attrs["annual_statistic"] = statistic
    ann.attrs["water_year"] = str(bool(water_year))
    return ann


In [None]:
# =========================
# Distribution fitting + return levels
# =========================

def _clean_sample(x):
    x = np.asarray(x, float)
    x = x[np.isfinite(x)]
    return x

def fit_normal(x):
    x = _clean_sample(x)
    mu, sigma = norm.fit(x)
    return (mu, sigma)

def fit_gumbel(x):
    x = _clean_sample(x)
    loc, scale = gumbel_r.fit(x)
    return (loc, scale)

def fit_gev(x):
    x = _clean_sample(x)
    c, loc, scale = gev.fit(x)
    return (c, loc, scale)

def fit_lp3(x):
    # Log-Pearson Type III fit on base-10 log(x); requires x>0
    x = _clean_sample(x)
    x = x[x > 0]
    if x.size < 10:
        raise ValueError("LP3 needs at least 10 positive samples.")
    y = np.log10(x)
    skew, loc, scale = pearson3.fit(y)
    return (skew, loc, scale)

def return_level(dist_name: str, params: tuple, T: float):
    # Return level for return period T using non-exceedance probability p = 1 - 1/T
    p = 1.0 - 1.0/float(T)

    if dist_name == "Normal":
        mu, sigma = params
        return float(norm.ppf(p, loc=mu, scale=sigma))

    if dist_name == "Gumbel":
        loc, scale = params
        return float(gumbel_r.ppf(p, loc=loc, scale=scale))

    if dist_name == "GEV":
        c, loc, scale = params
        return float(gev.ppf(p, c, loc=loc, scale=scale))

    if dist_name == "LP3":
        skew, loc, scale = params
        yq = pearson3.ppf(p, skew, loc=loc, scale=scale)
        return float(10**yq)

    raise ValueError(f"Unknown dist_name: {dist_name}")

def _loglik(dist_name: str, x: np.ndarray, params: tuple):
    x = _clean_sample(x)
    if dist_name == "Normal":
        mu, sigma = params
        return np.sum(norm.logpdf(x, loc=mu, scale=sigma)), 2
    if dist_name == "Gumbel":
        loc, scale = params
        return np.sum(gumbel_r.logpdf(x, loc=loc, scale=scale)), 2
    if dist_name == "GEV":
        c, loc, scale = params
        return np.sum(gev.logpdf(x, c, loc=loc, scale=scale)), 3
    if dist_name == "LP3":
        x = x[x > 0]
        y = np.log10(x)
        skew, loc, scale = params
        ll_y = np.sum(pearson3.logpdf(y, skew, loc=loc, scale=scale))
        ll_jac = -np.sum(np.log(x * np.log(10.0)))
        return ll_y + ll_jac, 3
    raise ValueError(dist_name)

def gof_summary(x, dist_name: str, params: tuple):
    x = _clean_sample(x)
    n = x.size

    if dist_name == "LP3":
        # KS in log space (common practice)
        x_pos = x[x > 0]
        y = np.log10(x_pos)
        skew, loc, scale = params
        D, p = kstest(y, "pearson3", args=(skew, loc, scale))
    elif dist_name == "Normal":
        mu, sigma = params
        D, p = kstest(x, "norm", args=(mu, sigma))
    elif dist_name == "Gumbel":
        loc, scale = params
        D, p = kstest(x, "gumbel_r", args=(loc, scale))
    elif dist_name == "GEV":
        c, loc, scale = params
        D, p = kstest(x, "genextreme", args=(c, loc, scale))
    else:
        raise ValueError(dist_name)

    ll, k = _loglik(dist_name, x, params)
    aic = 2*k - 2*ll
    bic = k*np.log(n) - 2*ll

    return {
        "n": int(n),
        "KS_D": float(D),
        "KS_p": float(p),
        "logLik": float(ll),
        "AIC": float(aic),
        "BIC": float(bic),
    }

def fit_and_compare(x):
    x = _clean_sample(x)

    fits = {
        "Normal": fit_normal(x),
        "Gumbel": fit_gumbel(x),
        "GEV": fit_gev(x),
    }

    # LP3 is optional
    try:
        fits["LP3"] = fit_lp3(x)
    except Exception:
        fits["LP3"] = None

    rows = []
    for name, params in fits.items():
        if params is None:
            rows.append({"Distribution": name, "status": "skipped"})
            continue
        s = gof_summary(x, name, params)
        s["Distribution"] = name
        s["status"] = "ok"
        rows.append(s)

    df = pd.DataFrame(rows).sort_values(["status", "AIC"], ascending=[True, True])
    return df, fits

def return_level_table(fits: dict, return_periods):
    rows = []
    for dist_name, params in fits.items():
        if params is None:
            continue
        row = {"Distribution": dist_name}
        for T in return_periods:
            row[f"RL_T{int(T)}"] = return_level(dist_name, params, T)
        rows.append(row)
    return pd.DataFrame(rows)


In [None]:
# =========================
# Plots: PDF/CDF + QQ/PP
# =========================

def plot_diagnostics(x, fits, title="", units=""):
    x = _clean_sample(x)
    x = np.sort(x)
    n = x.size
    p_emp = (np.arange(1, n+1) - 0.44) / (n + 0.12)  # Blom plotting position

    fig, axes = plt.subplots(2, 2, figsize=(12, 9))

    # 1) Histogram + fitted PDFs
    ax = axes[0, 0]
    ax.hist(x, bins=15, density=True, alpha=0.35)
    xx = np.linspace(x.min()*0.95, x.max()*1.05, 400)

    for name, params in fits.items():
        if params is None:
            continue
        if name == "Normal":
            mu, sigma = params
            ax.plot(xx, norm.pdf(xx, mu, sigma), label=name)
        elif name == "Gumbel":
            loc, scale = params
            ax.plot(xx, gumbel_r.pdf(xx, loc=loc, scale=scale), label=name)
        elif name == "GEV":
            c, loc, scale = params
            ax.plot(xx, gev.pdf(xx, c, loc=loc, scale=scale), label=name)
        elif name == "LP3":
            skew, loc, scale = params
            xx_pos = xx[xx > 0]
            y = np.log10(xx_pos)
            pdf_y = pearson3.pdf(y, skew, loc=loc, scale=scale)
            pdf_x = pdf_y / (xx_pos * np.log(10.0))
            ax.plot(xx_pos, pdf_x, label=name)

    ax.set_title("PDF / histogram")
    ax.set_xlabel(f"Value {units}".strip())
    ax.set_ylabel("Density")
    ax.grid(alpha=0.25, linestyle=":")

    # 2) CDF comparison
    ax = axes[0, 1]
    ax.plot(x, p_emp, "k.", label="Empirical")
    for name, params in fits.items():
        if params is None:
            continue
        if name == "Normal":
            mu, sigma = params
            ax.plot(x, norm.cdf(x, mu, sigma), label=name)
        elif name == "Gumbel":
            loc, scale = params
            ax.plot(x, gumbel_r.cdf(x, loc=loc, scale=scale), label=name)
        elif name == "GEV":
            c, loc, scale = params
            ax.plot(x, gev.cdf(x, c, loc=loc, scale=scale), label=name)
        elif name == "LP3":
            x_pos = x[x > 0]
            skew, loc, scale = params
            ax.plot(x_pos, pearson3.cdf(np.log10(x_pos), skew, loc=loc, scale=scale), label=name)
    ax.set_title("CDF")
    ax.set_xlabel(f"Value {units}".strip())
    ax.set_ylabel("Non-exceedance probability")
    ax.grid(alpha=0.25, linestyle=":")

    # 3) QQ plot
    ax = axes[1, 0]
    ax.plot(x, x, "k--", lw=1)
    for name, params in fits.items():
        if params is None:
            continue
        if name == "Normal":
            mu, sigma = params
            q = norm.ppf(p_emp, mu, sigma)
            ax.plot(q, x, ".", label=name)
        elif name == "Gumbel":
            loc, scale = params
            q = gumbel_r.ppf(p_emp, loc=loc, scale=scale)
            ax.plot(q, x, ".", label=name)
        elif name == "GEV":
            c, loc, scale = params
            q = gev.ppf(p_emp, c, loc=loc, scale=scale)
            ax.plot(q, x, ".", label=name)
        elif name == "LP3":
            x_pos = x[x > 0]
            p_pos = p_emp[-x_pos.size:]
            skew, loc, scale = params
            qy = pearson3.ppf(p_pos, skew, loc=loc, scale=scale)
            ax.plot(10**qy, x_pos, ".", label=name)
    ax.set_title("Q-Q")
    ax.set_xlabel("Theoretical quantiles")
    ax.set_ylabel("Empirical quantiles")
    ax.grid(alpha=0.25, linestyle=":")

    # 4) PP plot
    ax = axes[1, 1]
    ax.plot([0,1],[0,1],"k--", lw=1)
    for name, params in fits.items():
        if params is None:
            continue
        if name == "Normal":
            mu, sigma = params
            p_mod = norm.cdf(x, mu, sigma)
            ax.plot(p_mod, p_emp, ".", label=name)
        elif name == "Gumbel":
            loc, scale = params
            p_mod = gumbel_r.cdf(x, loc=loc, scale=scale)
            ax.plot(p_mod, p_emp, ".", label=name)
        elif name == "GEV":
            c, loc, scale = params
            p_mod = gev.cdf(x, c, loc=loc, scale=scale)
            ax.plot(p_mod, p_emp, ".", label=name)
        elif name == "LP3":
            x_pos = x[x > 0]
            p_pos = p_emp[-x_pos.size:]
            skew, loc, scale = params
            p_mod = pearson3.cdf(np.log10(x_pos), skew, loc=loc, scale=scale)
            ax.plot(p_mod, p_pos, ".", label=name)

    ax.set_title("P-P")
    ax.set_xlabel("Model CDF")
    ax.set_ylabel("Empirical CDF")
    ax.grid(alpha=0.25, linestyle=":")

    fig.suptitle(title, fontsize=14)
    handles, labels = axes[0,0].get_legend_handles_labels()
    fig.legend(handles, labels, loc="lower center", ncol=4, frameon=False)
    fig.tight_layout(rect=[0, 0.06, 1, 0.95])
    return fig


## Example A — 1D frequency analysis (region-mean annual series)

This example:
1. Opens NetCDF runoff data (monthly)
2. Clips to your region polygon (shapefile)
3. Aggregates to annual series (total or max)
4. Creates a **region-mean** 1D sample (one value per year)
5. Fits distributions, prints GOF table, and computes return levels


In [None]:
# =========================
# Load, clip, and build annual region-mean series
# =========================

ds_hist = _open_netcdf(RUNOFF_HIST_FILE)
var = _find_data_var(ds_hist, VAR_HINT)
da_hist = ds_hist[var]

if RUNOFF_FUT_FILE:
    ds_fut = _open_netcdf(RUNOFF_FUT_FILE)
    var2 = _find_data_var(ds_fut, VAR_HINT)
    da_fut = ds_fut[var2]
    da = xr.concat([da_hist, da_fut], dim="time")
else:
    da = da_hist

# Clip to region (optional but typical for state/basin analysis)
da_reg = clip_to_region(da, REGION_SHP, id_field=REGION_ID_FIELD, id_value=None)

# Annual aggregation
ann = annual_aggregate(da_reg, statistic=ANNUAL_STATISTIC, water_year=USE_WATER_YEAR)

# Slice to a period and compute region mean time series
ann_hist = ann.sel(year=slice(HIST_Y0, HIST_Y1))
spatial_dims = [d for d in ann_hist.dims if d != "year"]
x_hist = ann_hist.mean(dim=spatial_dims, skipna=True).values

print(f"Sample size (historical): {np.isfinite(x_hist).sum()} years")

# Fit & compare
gof_df, fits = fit_and_compare(x_hist)
display(gof_df)

# Return levels
rl_df = return_level_table(fits, RETURN_PERIODS)
display(rl_df)

# Diagnostics plots
fig = plot_diagnostics(
    x_hist,
    fits,
    title=f"Region-mean annual {ANNUAL_STATISTIC} runoff ({HIST_Y0}–{HIST_Y1})",
    units="(native units)",
)
plt.show()


## Example B — Pixel-wise return level map (optional)

This section demonstrates how to compute a return level map using `xarray.apply_ufunc` (parallel-friendly with Dask).

Warning: Pixel-wise fits for GEV/Gumbel/LP3 can be computationally expensive for large grids.


In [None]:
# =========================
# Pixel-wise RL map (choose distribution)
# =========================

DIST_FOR_MAP = "GEV"     # "Normal", "Gumbel", "GEV", "LP3"
T_MAP = 100

ann_map = ann.sel(year=slice(HIST_Y0, HIST_Y1))

# Chunk for Dask (adjust to your machine)
chunks = {d: -1 if d == "year" else min(200, ann_map.sizes[d]) for d in ann_map.dims}
ann_map = ann_map.chunk(chunks)

p = 1.0 - 1.0 / T_MAP

def _rl_pixel(x):
    x = np.asarray(x, float)
    x = x[np.isfinite(x)]
    if x.size < 8:
        return np.nan
    try:
        if DIST_FOR_MAP == "Normal":
            mu, sigma = norm.fit(x)
            return float(norm.ppf(p, loc=mu, scale=sigma))
        if DIST_FOR_MAP == "Gumbel":
            loc, scale = gumbel_r.fit(x)
            return float(gumbel_r.ppf(p, loc=loc, scale=scale))
        if DIST_FOR_MAP == "GEV":
            c, loc, scale = gev.fit(x)
            return float(gev.ppf(p, c, loc=loc, scale=scale))
        if DIST_FOR_MAP == "LP3":
            x = x[x > 0]
            if x.size < 10:
                return np.nan
            y = np.log10(x)
            skew, loc, scale = pearson3.fit(y)
            yq = pearson3.ppf(p, skew, loc=loc, scale=scale)
            return float(10**yq)
        raise ValueError(DIST_FOR_MAP)
    except Exception:
        return np.nan

rl_map = xr.apply_ufunc(
    _rl_pixel, ann_map,
    input_core_dims=[["year"]],
    output_core_dims=[[]],
    vectorize=True,
    dask="parallelized",
    output_dtypes=[float],
    dask_gufunc_kwargs={"allow_rechunk": True},
).rename(f"RL{T_MAP}_{DIST_FOR_MAP}")

rl_map.attrs.update({
    "long_name": f"{T_MAP}-year return level from annual {ANNUAL_STATISTIC} ({DIST_FOR_MAP})",
    "period": f"{HIST_Y0}-{HIST_Y1}",
    "units": "native",
    "p_non_exceed": float(p),
})

# Compute and export
out_nc = os.path.join(OUT_DIR, f"RL{T_MAP}_{DIST_FOR_MAP}_{HIST_Y0}_{HIST_Y1}.nc")
rl_map.compute().to_netcdf(out_nc)
print("Saved:", out_nc)

# Quick plot (works best if in lat/lon)
plt.figure(figsize=(8,5))
rl_map.compute().plot()
plt.title(f"RL{T_MAP} ({DIST_FOR_MAP}) — {HIST_Y0}–{HIST_Y1}")
plt.show()
