# climval + Earthmover Arraylake Integration

**Validation in 15 lines, directly from cloud-native array storage**

**Author:** Northflow Technologies (northflow.no)  
**Library:** [climval](https://github.com/northflowlabs/climval) + [Arraylake](https://docs.earthmover.io/)  
**License:** Apache 2.0

---

## What this notebook shows

How to run climate validation with **climval** on data stored in an **Earthmover Arraylake** repository.

This notebook demonstrates:
- Loading ERA5 + CMIP6-style data from Arraylake as xarray DataArrays
- Validating **3 CMIP6 models** against ERA5 with `BenchmarkSuite`
- Producing a **metric scorecard** and **Taylor diagram**
- Exporting an HTML/JSON/Markdown validation report
- Tagging a validated dataset version in Arraylake

It includes a **credential-free synthetic fallback**, so it runs anywhere (Binder / local / CI).

> Arraylake Python syntax in this notebook follows Earthmover docs (Client, get_repo, readonly_session, open_dataset/open_zarr, create_tag).


## 1. Install & Import

In [None]:
# Uncomment on first run
# !pip install climval arraylake xarray numpy pandas matplotlib netcdf4

import warnings
from datetime import datetime
from pathlib import Path

warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import xarray as xr
import matplotlib.pyplot as plt
from IPython.display import display

import climval
from climval import BenchmarkSuite, load_model

try:
    from arraylake import Client
    ARRAYLAKE_AVAILABLE = True
except Exception:
    ARRAYLAKE_AVAILABLE = False

print(f"climval   : v{climval.__version__}")
print(f"xarray    : v{xr.__version__}")
print(f"numpy     : v{np.__version__}")
print(f"pandas    : v{pd.__version__}")
print(f"arraylake : {'available' if ARRAYLAKE_AVAILABLE else 'not installed'}")

## 2. Load Data from Arraylake (with Synthetic Fallback)

This section is split into two modes:
1. **Real Arraylake mode** (set `USE_REAL_ARRAYLAKE = True`)
2. **Synthetic fallback mode** (default)

### Real data swap-in (exact pattern)

Replace the group paths below with your repository layout, then set `USE_REAL_ARRAYLAKE = True`:

- `ARRAYLAKE_REPO` (e.g. `myorg/cmip6-era5-validation`)
- `ERA5_GROUP` (e.g. `era5/monthly/europe`)
- `CMIP6_MODEL_GROUPS` (e.g. `cmip6/MPI-ESM1-2-HR/monthly/europe`)

In [None]:
USE_REAL_ARRAYLAKE = False  # <-- set True when credentials + repo paths are ready

ARRAYLAKE_REPO = "earthmover-public/era5-surface-aws"
ARRAYLAKE_BRANCH = "main"
ARRAYLAKE_GROUP = "spatial"

# Example placeholders for your own CMIP6 repository layout
ERA5_GROUP = "spatial"
CMIP6_MODEL_GROUPS = {
    "MPI-ESM1-2-HR": "cmip6/MPI-ESM1-2-HR",
    "EC-Earth3": "cmip6/EC-Earth3",
    "CNRM-CM6-1": "cmip6/CNRM-CM6-1",
}

def _to_monthly_series(da: xr.DataArray, name: str) -> xr.DataArray:
    if "time" not in da.dims:
        raise ValueError(f"{name}: expected a 'time' dimension")

    spatial_dims = [d for d in da.dims if d != "time"]
    if spatial_dims:
        da = da.mean(dim=spatial_dims, skipna=True)

    da = da.resample(time="MS").mean()
    da = da.dropna(dim="time")
    da.name = "tas"
    return da.astype("float64")

def _synthetic_series() -> tuple[xr.DataArray, dict[str, xr.DataArray]]:
    np.random.seed(42)
    n = 360
    t = np.arange(n)

    era = 10.0 * np.sin(2 * np.pi * t / 12 - np.pi / 2) + 0.02 * t / 12 + np.random.normal(0, 0.8, n)
    era_std = np.std(era)

    def make_model(corr, rel_std, bias):
        signal = corr * era / era_std
        ortho = np.random.normal(0, 1, n)
        ortho -= np.dot(ortho, era) / np.dot(era, era) * era
        noise = np.sqrt(max(0, 1 - corr**2)) * ortho / (np.std(ortho) + 1e-8)
        raw = signal + noise
        raw = raw / np.std(raw) * rel_std * era_std
        return raw + bias

    cfg = {
        "MPI-ESM1-2-HR": dict(corr=0.97, rel_std=0.98, bias=-0.4),
        "EC-Earth3": dict(corr=0.95, rel_std=1.05, bias=0.7),
        "CNRM-CM6-1": dict(corr=0.93, rel_std=1.10, bias=1.2),
    }

    time = pd.date_range("1985-01-01", periods=n, freq="MS")
    ref = xr.DataArray(era, dims=["time"], coords={"time": time}, name="tas", attrs={"units": "K"})
    models = {
        k: xr.DataArray(make_model(**v), dims=["time"], coords={"time": time}, name="tas", attrs={"units": "K"})
        for k, v in cfg.items()
    }
    return ref, models

arraylake_repo = None
arraylake_snapshot_id = None
used_real_arraylake = False

if USE_REAL_ARRAYLAKE and ARRAYLAKE_AVAILABLE:
    try:
        client = Client()
        # client.login()  # Uncomment if your environment needs explicit auth
        arraylake_repo = client.get_repo(ARRAYLAKE_REPO)
        session = arraylake_repo.readonly_session(branch=ARRAYLAKE_BRANCH)

        # ERA5 from Arraylake (documented sample repo uses variable 't2')
        ds_era5 = xr.open_dataset(
            session.store,
            engine="zarr",
            consolidated=False,
            zarr_format=3,
            chunks=None,
            group=ERA5_GROUP,
        )
        ref_da = _to_monthly_series(ds_era5["t2"], "ERA5/t2")

        # CMIP6 model groups (expected to expose a 'tas' variable in your repo)
        model_das = {}
        for model_name, group_path in CMIP6_MODEL_GROUPS.items():
            ds_model = xr.open_dataset(
                session.store,
                engine="zarr",
                consolidated=False,
                zarr_format=3,
                chunks=None,
                group=group_path,
            )
            model_das[model_name] = _to_monthly_series(ds_model["tas"], f"{model_name}/tas")

        used_real_arraylake = True
        arraylake_snapshot_id = arraylake_repo.lookup_branch(ARRAYLAKE_BRANCH)
    except Exception as exc:
        print(f"Real Arraylake load failed: {exc}")
        print("Falling back to synthetic data.")
        ref_da, model_das = _synthetic_series()
else:
    ref_da, model_das = _synthetic_series()

print(f"Mode            : {'REAL ARRAYLAKE' if used_real_arraylake else 'SYNTHETIC FALLBACK'}")
print(f"Reference points: {ref_da.sizes['time']}")
print(f"Models loaded   : {list(model_das.keys())}")

## 3. Run Validation with climval BenchmarkSuite

`BenchmarkSuite` currently validates from registered model descriptors.
To keep this workflow data-driven, we materialize each loaded DataArray to temporary NetCDF, then run climval on those files.

This means:
- Real mode: metrics come from real Arraylake-backed data
- Fallback mode: metrics come from synthetic DataArrays

In [None]:
tmp_dir = Path("_tmp_arraylake_validation")
tmp_dir.mkdir(parents=True, exist_ok=True)

def write_series_to_netcdf(da: xr.DataArray, path: Path) -> None:
    ds = da.rename("tas").to_dataset()
    ds.to_netcdf(path)

ref_path = tmp_dir / "era5_tas.nc"
write_series_to_netcdf(ref_da, ref_path)

model_paths = {}
for model_name, da in model_das.items():
    safe_name = model_name.lower().replace("-", "_")
    p = tmp_dir / f"{safe_name}_tas.nc"
    write_series_to_netcdf(da, p)
    model_paths[model_name] = p

suite = BenchmarkSuite(name="Arraylake-CMIP6-vs-ERA5")

reference = load_model(
    preset="era5",
    name="ERA5",
    path=str(ref_path),
    variables=["tas"],
    lat_range=(35.0, 72.0),
    lon_range=(-15.0, 45.0),
    time_start=datetime(1985, 1, 1),
    time_end=datetime(2014, 12, 31),
)
suite.register(reference, role="reference")

for model_name, model_path in model_paths.items():
    candidate = load_model(
        name=model_name,
        path=str(model_path),
        variables=["tas"],
        lat_range=(35.0, 72.0),
        lon_range=(-15.0, 45.0),
        time_start=datetime(1985, 1, 1),
        time_end=datetime(2014, 12, 31),
    )
    suite.register(candidate)

report = suite.run(variables=["tas"], seed=42)
results_by_model = {r.candidate: r.score_summary() for r in report.results}

print("Benchmark complete.")
print(f"Models evaluated : {len(results_by_model)}")
print(f"Metrics computed : {len(next(iter(results_by_model.values())))}")

## 4. Metric Scorecard

In [None]:
score_rows = []
for model_name, metrics_dict in results_by_model.items():
    row = {"Model": model_name}
    row.update({k: round(v, 4) for k, v in metrics_dict.items()})
    score_rows.append(row)

df = pd.DataFrame(score_rows).set_index("Model")
lower_is_better = {"rmse", "mae", "mean_bias", "nrmse", "percentile_bias_p95", "percentile_bias_p5"}

def highlight_best(col):
    if col.name in lower_is_better:
        idx = col.abs().idxmin() if col.name == "mean_bias" else col.idxmin()
    else:
        idx = col.idxmax()
    return ["background-color: #d4edda; font-weight: bold" if i == idx else "" for i in col.index]

display(
    df.style
      .apply(highlight_best)
      .format("{:.4f}")
      .set_caption("climval Scorecard — Arraylake CMIP6 vs ERA5 | Europe | 1985-2014 | tas")
)
print("Green highlight = best performer per metric")

## 5. Taylor Diagram

In [None]:
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, polar=True)

ax.set_thetamin(0)
ax.set_thetamax(90)

colors = ["#2196F3", "#4CAF50", "#FF9800"]
markers = ["o", "s", "^"]

ref_std = float(np.std(ref_da.values))
ax.plot(0, 1.0, marker="*", markersize=15, color="black", label="ERA5 (Reference)", zorder=5)

for corr in [0.7, 0.8, 0.9, 0.95, 0.99]:
    theta = np.arccos(corr)
    ax.plot([theta, theta], [0, 1.4], color="#cccccc", linewidth=0.8, linestyle="--")

for i, model_name in enumerate(results_by_model):
    r = results_by_model[model_name]["pearson_r"]
    std_ratio = float(np.std(model_das[model_name].values) / ref_std)
    theta = np.arccos(np.clip(r, -1, 1))
    ax.plot(theta, std_ratio, marker=markers[i], markersize=11, color=colors[i], label=model_name)

ax.set_rlabel_position(90)
ax.set_thetagrids(
    [0, 15, 30, 45, 60, 75, 90],
    [f"r={np.cos(np.deg2rad(a)):.2f}" for a in [0, 15, 30, 45, 60, 75, 90]],
    fontsize=8,
)
ax.set_rlim(0, 1.4)
ax.set_rticks([0.4, 0.6, 0.8, 1.0, 1.2, 1.4])
ax.set_title("Taylor Diagram — Arraylake CMIP6 vs ERA5\nEurope | 1985-2014", fontsize=13, fontweight="bold", pad=24)
ax.legend(loc="upper left", bbox_to_anchor=(1.05, 1.0), fontsize=10, framealpha=0.9)

# Reserve space for title and right-side legend so title remains visible in notebooks/Binder
fig.subplots_adjust(top=0.86, right=0.78)

plt.savefig("arraylake_taylor_diagram.png", dpi=150, bbox_inches="tight", facecolor="white")
plt.show()
print("Saved: arraylake_taylor_diagram.png")

## 6. Model Ranking Visualisation

In [None]:
fig, axes = plt.subplots(1, 3, figsize=(14, 5))
fig.suptitle("Arraylake CMIP6 Model Skill — Europe | 1985-2014", fontsize=13, fontweight="bold", y=0.98)

model_names = list(results_by_model.keys())
rmse_vals = [results_by_model[m]["rmse"] for m in model_names]
bias_vals = [results_by_model[m]["mean_bias"] for m in model_names]
tss_vals = [results_by_model[m]["taylor_skill_score"] for m in model_names]
colors = ["#2196F3", "#4CAF50", "#FF9800"]

bars0 = axes[0].barh(model_names, rmse_vals, color=colors)
axes[0].set_xlabel("RMSE (K)")
axes[0].set_title("RMSE (lower is better)")
for bar, val in zip(bars0, rmse_vals):
    axes[0].text(bar.get_width() + 0.01, bar.get_y() + bar.get_height() / 2, f"{val:.3f}", va="center", fontsize=9)

bias_colors = ["#F44336" if v > 0 else "#2196F3" for v in bias_vals]
bars1 = axes[1].barh(model_names, bias_vals, color=bias_colors)
axes[1].set_xlabel("Mean Bias (K)")
axes[1].set_title("Mean Bias (red=warm, blue=cold)")
axes[1].axvline(0, color="black", linewidth=1.2)
for bar, val in zip(bars1, bias_vals):
    x = bar.get_width() + 0.02 if val >= 0 else bar.get_width() - 0.16
    axes[1].text(x, bar.get_y() + bar.get_height() / 2, f"{val:+.3f}", va="center", fontsize=9)

bars2 = axes[2].barh(model_names, tss_vals, color=colors)
axes[2].set_xlabel("Taylor Skill Score")
axes[2].set_title("Taylor Skill Score (higher is better)")
axes[2].set_xlim(0, 1.05)
for bar, val in zip(bars2, tss_vals):
    axes[2].text(min(bar.get_width() + 0.01, 1.02), bar.get_y() + bar.get_height() / 2, f"{val:.3f}", va="center", fontsize=9)

for ax in axes:
    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)

# Keep suptitle visible in interactive windows as well as saved output
fig.tight_layout(rect=[0, 0, 1, 0.93])

plt.savefig("arraylake_model_ranking.png", dpi=150, bbox_inches="tight", facecolor="white")
plt.show()
print("Saved: arraylake_model_ranking.png")

## 7. Export Report + Tag Validated Arraylake Snapshot

In [None]:
report.export("climval_arraylake_report.html")
report.export("climval_arraylake_report.json")
report.export("climval_arraylake_report.md")

print("Reports exported:")
print("  climval_arraylake_report.html")
print("  climval_arraylake_report.json")
print("  climval_arraylake_report.md")

# Example validation gate
mean_rmse = float(np.mean([results_by_model[m]["rmse"] for m in results_by_model]))
min_corr = float(np.min([results_by_model[m]["pearson_r"] for m in results_by_model]))
validation_passed = (mean_rmse < 3.0) and (min_corr > 0.90)

print(f"Validation gate passed: {validation_passed} (mean_rmse={mean_rmse:.3f}, min_r={min_corr:.3f})")

if used_real_arraylake and arraylake_repo is not None and validation_passed:
    snapshot_id = arraylake_snapshot_id or arraylake_repo.lookup_branch(ARRAYLAKE_BRANCH)
    tag_name = f"validated-climval-{pd.Timestamp.utcnow():%Y%m%d}"
    arraylake_repo.create_tag(tag_name, snapshot_id=snapshot_id)
    print(f"Created Arraylake tag: {tag_name} -> {snapshot_id}")
else:
    print("Tagging skipped (requires real Arraylake mode + passed gate).")
    print("Tag command used in real mode:")
    print("  repo.create_tag('validated-climval-YYYYMMDD', snapshot_id=repo.lookup_branch('main'))")

## 8. The 15-Line Version

Cloud-native validation, condensed for sharing.

In [None]:
from datetime import datetime

try:
    from arraylake import Client
except Exception:
    Client = None

from climval import BenchmarkSuite, load_model

if Client is None:
    print("Install arraylake to run the real 15-line cloud workflow: pip install arraylake")
else:
    client = Client(); repo = client.get_repo("myorg/cmip6-era5-validation")
    session = repo.readonly_session(branch="main")
    # xr.open_dataset(session.store, engine='zarr', group='era5/monthly/europe', consolidated=False, zarr_format=3)
    # xr.open_dataset(session.store, engine='zarr', group='cmip6/MPI-ESM1-2-HR/monthly/europe', consolidated=False, zarr_format=3)

suite = BenchmarkSuite(name="Arraylake-CMIP6-15-lines")
suite.register(load_model("era5", name="ERA5", path="_tmp_arraylake_validation/era5_tas.nc", variables=["tas"],
                         lat_range=(35,72), lon_range=(-15,45), time_start=datetime(1985,1,1), time_end=datetime(2014,12,31)), role="reference")
for m in ["MPI-ESM1-2-HR", "EC-Earth3", "CNRM-CM6-1"]:
    suite.register(load_model(name=m, path=f"_tmp_arraylake_validation/{m.lower().replace('-', '_')}_tas.nc", variables=["tas"],
                            lat_range=(35,72), lon_range=(-15,45), time_start=datetime(1985,1,1), time_end=datetime(2014,12,31)))

report_15 = suite.run(variables=["tas"], seed=42)
print({r.candidate: {k: round(v, 3) for k, v in r.score_summary().items()} for r in report_15.results})

---

## Summary

This notebook demonstrates an end-to-end pattern for:
- Reading cloud-native climate arrays from Arraylake
- Validating CMIP6-style candidates against ERA5 with climval
- Publishing reproducible outputs and governance tags

**Use synthetic fallback for portability. Switch to real Arraylake data by toggling `USE_REAL_ARRAYLAKE`.**

---

### References

- Earthmover Arraylake docs: https://docs.earthmover.io/
- Icechunk version control and tags: https://docs.earthmover.io/guide/version-control
- climval: https://github.com/northflowlabs/climval
- Taylor, K. E. (2001). https://doi.org/10.1029/2000JD900719
- Gleckler, P. J. et al. (2008). https://doi.org/10.1029/2007JD008972