# Run pyCIAM

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import distributed as dd
import numpy as np
import pandas as pd
import xarray as xr
from dask_gateway import Gateway
from pyCIAM.constants import CASE_DICT, CASES, COSTTYPES
from pyCIAM.io import (
    check_finished_zarr_workflow,
    create_template_dataarray,
    load_ciam_inputs,
    prep_sliiders,
    save_to_zarr_region,
)
from pyCIAM.run import calc_costs_all_adapt
from shared import (
    AUTHOR,
    CONTACT,
    FS,
    HISTORY,
    PATH_PARAMS,
    PATH_QUANTILE_RES,
    PATH_REFA,
    PATH_SLIIDERS_ECON,
    PATH_SLIIDERS_SLR_QUANTILES,
    PATH_SURGE_LOOKUP,
    QUANTILES,
    upload_pkg,
)

  from distributed.utils import LoopRunner, format_bytes


In [3]:
TMPPATH = FS.get_mapper("rhg-data-scratch/pyCIAM_results_quantiles_prechunked.zarr")

N_WORKERS = 500
SEG_CHUNKSIZE = 3

DESCRIPTION = "Projected coastal damages from pyCIAM, using quantiles of SLR scenarios."

In [4]:
gateway = Gateway()
cluster = gateway.new_cluster(
    idle_timeout=3600,
    profile="micro",
    env_items={
        "DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "10",
        "DASK_DISTRIBUTED__WORKER__MEMORY__TARGET": "0.95",
        "DASK_DISTRIBUTED__WORKER__MEMORY__SPILL": "0.95",
        "DASK_DISTRIBUTED__WORKER__MEMORY__PAUSE": "0.95",
        "DASK_DISTRIBUTED__WORKER__MEMORY__TERMINATE": "0.99",
    },
)
client = cluster.get_client()
cluster.scale(N_WORKERS)

client.upload_file("shared.py")
upload_pkg(client, "../pyCIAM")

cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

## Save Template Zarr

In [5]:
# get coord values from input data
params = pd.read_json(PATH_PARAMS)["values"]
ciam_in = xr.open_zarr(PATH_SLIIDERS_ECON, chunks=None)

# set up coords and lengths
coords = {
    "scenario": np.concatenate(
        (["ncc"], xr.open_zarr(PATH_SLIIDERS_SLR_QUANTILES).scenario.values)
    ),
    "quantile": QUANTILES,
    "iam": ciam_in.iam.values,
    "ssp": ciam_in.ssp.values,
    "year": ciam_in.year.values,
    "seg_adm": ciam_in.seg_adm.values,
    "costtype": COSTTYPES,
    "case": CASES,
}

lens = {k: len(v) for k, v in coords.items()}

chunks = {"seg_adm": 1, "case": len(coords["case"]) - 1}
chunks = {k: -1 if k not in chunks else chunks[k] for k in coords}

# create arrays
cost_dims = [
    "case",
    "costtype",
    "seg_adm",
    "scenario",
    "quantile",
    "year",
    "ssp",
    "iam",
]
refA_dims = ["case", "seg_adm", "quantile"]
npv_dims = ["ssp", "iam", "case", "seg_adm", "quantile"]

out_ds = create_template_dataarray(cost_dims, coords, chunks).to_dataset(name="costs")
out_ds["npv"] = out_ds.costs.isel(year=0, costtype=0, drop=True).astype("float64")
out_ds["optimal_case"] = out_ds.npv.isel(case=0, drop=True).astype("uint8")

# add attrs
out_ds.attrs.update(
    {
        "description": DESCRIPTION,
        "author": AUTHOR,
        "contact": CONTACT,
        "history": HISTORY,
        "updated": pd.Timestamp.now(tz="US/Pacific").strftime("%c"),
        "planning_period_start_years": params.at_start,
    }
)
out_ds.costs.attrs.update(
    {
        "long_name": "Costs",
        "description": "Cost (by cost type) incurred in a given time period",
        "units": "2019 USD",
    }
)
out_ds.npv.attrs.update(
    {
        "long_name": "Net Present Value",
        "description": (
            "Calculated using 4% discount and inclusive of initial adaptation."
        ),
    }
)
out_ds.optimal_case.attrs.update(
    {
        "long_name": "Lowest Cost Adaptation Approach",
        "description": "Adaptation approach chosen by each segment",
        "data_dictionary": str(CASE_DICT),
    }
)

In [6]:
# save
out_ds.to_zarr(TMPPATH, compute=False, mode="w")

Delayed('_finalize_store-f0224880-2cd0-4934-8ca0-46900db87ed7')

## Define wrapper functions

In [7]:
def calc_all_cases(
    seg_adms,
    check=True,
):
    if check_finished_zarr_workflow(
        finalstore=TMPPATH,
        varname="costs",
        final_selector={"seg_adm": seg_adms, "case": CASES[:-1]},
        check_temp=False,
        check_final=check,
    ):
        return None

    segs = ["_".join(seg_adm.split("_")[:2]) for seg_adm in seg_adms]

    inputs, slr, surge = load_ciam_inputs(
        PATH_SLIIDERS_ECON,
        PATH_SLIIDERS_SLR_QUANTILES,
        params,
        seg_adms,
        seg_var="seg_adm",
        surge_lookup_store=PATH_SURGE_LOOKUP,
    )

    slr = slr.unstack().rename(mc_sample_id="quantile")

    # get initial adaptation height
    refA = (
        xr.open_zarr(PATH_REFA, chunks=None)
        .refA.sel(seg=segs, movefactor=inputs.movefactor)
        .drop_vars(["case", "movefactor"])
    )
    refA["seg"] = seg_adms

    out = (
        calc_costs_all_adapt(
            inputs, slr, surge_lookup=surge, elev_chunksize=None, min_R_noadapt=refA
        )
        .to_dataset(name="costs")
        .rename(seg="seg_adm")
    )

    out["npv"] = (
        (out.costs.sum("costtype") * inputs.dfact)
        .sel(year=slice(inputs.npv_start, None))
        .sum("year")
    )

    save_to_zarr_region(out, TMPPATH)
    return None


def optimize_case(seg_adm, *wait_futs, check=True):
    # use last fpath to check if this task has already been run
    if check and check_finished_zarr_workflow(
        finalstore=TMPPATH,
        varname="costs",
        final_selector={"seg_adm": seg_adm, "case": CASES[-1]},
        check_temp=False,
        check_final=check,
    ):
        return None

    seg = "_".join(seg_adm.split("_")[:2])
    with xr.open_zarr(PATH_SLIIDERS_ECON, chunks=None) as ds:
        all_segs = ds.seg.load()

    this_seg_adms = all_segs.seg_adm.isel(seg_adm=all_segs.seg == seg).values

    opt_case = (
        xr.open_zarr(TMPPATH, chunks=None)
        .npv.sel(seg_adm=this_seg_adms)
        .drop_sel(case="optimalfixed")
        .sum("seg_adm")
        .idxmin("case")
    )
    opt_case_ser = opt_case.to_series()
    opt_val = (
        pd.Series(
            pd.Series(CASE_DICT).loc[opt_case_ser].values,
            index=opt_case_ser.index,
        )
        .to_xarray()
        .astype("uint8")
    )

    dfact = prep_sliiders(
        PATH_SLIIDERS_ECON, seg_adm, constants=params, seg_var="seg_adm"
    ).dfact

    out = (
        xr.open_zarr(TMPPATH, chunks=None)[["costs"]]
        .sel(seg_adm=[seg_adm])
        .sel(case=opt_case, drop=True)
        .expand_dims(case=["optimalfixed"])
    )
    out["optimal_case"] = opt_val.expand_dims(seg_adm=[seg_adm])
    out["npv"] = (
        (out.costs.sum("costtype") * dfact)
        .sel(year=slice(params.npv_start, None))
        .sum("year")
    )

    save_to_zarr_region(out, TMPPATH)
    return None

## Calculate initial adaptation

In [8]:
# get groups for running pyCIAM
groups = [
    ciam_in.seg_adm.isel(seg_adm=slice(i, i + SEG_CHUNKSIZE)).values
    for i in np.arange(0, len(ciam_in.seg_adm), SEG_CHUNKSIZE)
]

# get groups for aggregating seg-adms up to segs
most_segadm = ciam_in.seg.groupby(ciam_in.seg).count().max().item()
i = 0
agg_groups = []
while i < len(ciam_in.seg):
    this_group = ciam_in.isel(seg_adm=slice(i, i + most_segadm))
    if len(np.unique(this_group.seg)) == 1:
        i += most_segadm
    else:
        this_group = this_group.isel(
            seg_adm=this_group.seg != this_group.seg.isel(seg_adm=-1, drop=True)
        )
        i += len(this_group.seg_adm)

    agg_groups.append(this_group.seg_adm.values)

groups_ser = (
    pd.Series(groups)
    .explode()
    .reset_index()
    .rename(columns={"index": "group_id", 0: "seg_adm"})
    .set_index("seg_adm")
    .group_id
)

## Run 1st stage (estimate costs for each adaptation type)

In [9]:
futs = np.array(client.map(calc_all_cases, groups, check=True))

## Add optimal case data

In [10]:
seg_adms = ciam_in.seg_adm.values
seg_adm_ser = pd.Series(seg_adms)
seg_adm_ser.index = seg_adm_ser.str.split("_").apply(lambda x: "_".join(x[:2]))
seg_grps = seg_adm_ser.groupby(seg_adm_ser.index).apply(list)
precurser_futs = (
    seg_adm_ser.to_frame("seg_adm")
    .join(seg_grps.rename("seg_group"))
    .set_index("seg_adm")
    .seg_group.explode()
    .to_frame()
    .join(groups_ser, on="seg_group")
    .groupby("seg_adm")
    .group_id.apply(set)
    .apply(list)
    .apply(lambda x: futs[x])
)

In [11]:
futs = precurser_futs.reset_index(drop=False).apply(
    lambda row: client.submit(optimize_case, row.seg_adm, *row.group_id), axis=1
)

## Rechunk and save

In [12]:
_ = dd.wait(futs.tolist())

In [13]:
out = (
    xr.open_zarr(TMPPATH, chunks={"case": -1, "seg_adm": 10})
    .chunk({"year": 10})
    .persist()
)
for v in out.data_vars:
    out[v].encoding.clear()

out["npv"] = out.npv.astype("float32")

for k, v in out.coords.items():
    if v.dtype == object:
        out[k] = v.astype("unicode")

In [14]:
out.to_zarr(PATH_QUANTILE_RES, mode="w")

<xarray.backends.zarr.ZarrStore at 0x7ff324d1ec80>

### Shut down cluster

In [15]:
cluster.close(), client.close()

(None, None)

In [16]:
TMPPATH.clear()