# Process extreme weather dataset

This notebook is used to derive a dataset of extreme events using the bias-corrected CORDEX data provided to Two Bears Environmental Consulting (via KR at SNAP) by Stantec, Inc.

This dataset will consist of four "extreme" variables (to start), derived at the annual scale for all available models and scenarios on the same grid as the original bias-corrected data.

The dataset will be a datacube with the following dimensions:

* variable
* model
* scenario
* year
* Y
* X

## Variables

The following variables are to be derived, at the annual scale:

* `hd`:  “Hot day” threshold -- the highest observed daily $T_{max}$ such that there are 5 other observations equal to or greater than this value.
* `cd`: “Cold day” threshold -- the lowest observed daily $T_{max}$ such that there are 5 other observations equal to or less than this value.
* `rx1say`: Max 1-day precipitation –- maximum 1-day precipitation
* `hsd`: Heavy Snow Days –- number of days with snowfall > 10 cm

## Models

The CORDEX data are created by combining a regional climate model with a global circulation model, and there are a couple different types of each represented in this dataset. The combinations are nowhere near exhaustive though, so for our purposes, it should be sufficient to just treat each unique combination as its own "model", of which there are 11:

* CCCma-CanESM2 x CCCma-CanRCM4
* CCCma-CanESM2 x SMHI-RCA4
* CCCma-CanESM2 x UQAM-CRCM5
* ICHEC-EC-EARTH x DMI-HIRHAM5
* ICHEC-EC-EARTH x SMHI-RCA4
* ICHEC-EC-EARTH x SMHI-RCA4-SN
* MPI-M-MPI-ESM-LR x MGO-RRCM
* MPI-M-MPI-ESM-LR x SMHI-RCA4
* MPI-M-MPI-ESM-LR x SMHI-RCA4-SN
* MPI-M-MPI-ESM-MR x UQAM-CRCM5
* NCC-NorESM1-M x SMHI-RCA4

## Processing

Here we now derive this dataset. The strategy will be to spread the individual read-and-summarize jobs over available cores. Start with defining directories:

In [1]:
import os
from pathlib import Path


# base directory for the project
base_dir = Path(os.getenv("BASE_DIR") or "/workspace/Shared/Tech_Projects/TBEC_CMIP5_Processing/project_data/")
# actual subdirectory of base_dir that contains CORDEX data
src_dir = base_dir.joinpath("arc_cordex/bias_corrected")
# write to the final_products/auxiliary_content directory, as the outputs from here will be used for reporting
out_dir = Path(os.getenv("OUTPUT_DIR") or "/workspace/Shared/Tech_Projects/TBEC_CMIP5_Processing/final_products/")
out_ds_fp = out_dir.joinpath("annual_extremes.nc")

Models, scenarios, and variable names for iterating over file paths:

In [2]:
models = [
    "CCCma-CanESM2_CCCma-CanRCM4",
    "CCCma-CanESM2_SMHI-RCA4",
    "CCCma-CanESM2_UQAM-CRCM5",
    "ICHEC-EC-EARTH_DMI-HIRHAM5",
    "ICHEC-EC-EARTH_SMHI-RCA4",
    "ICHEC-EC-EARTH_SMHI-RCA4-SN",
    "MPI-M-MPI-ESM-LR_MGO-RRCM",
    "MPI-M-MPI-ESM-LR_SMHI-RCA4",
    "MPI-M-MPI-ESM-LR_SMHI-RCA4-SN",
    "MPI-M-MPI-ESM-MR_UQAM-CRCM5",
    "NCC-NorESM1-M_SMHI-RCA4",
]

scenarios = ["hist", "rcp45", "rcp85"]

# not using all available model variables yet
# varnames = ["pr", "prsn", "sfcWind", "tas", "tasmax", "tasmin"]
varnames = ["pr", "prsn", "tasmax", "tasmin"]


# map from varnames to desired summary variables
aggr_varname_lu = {
    "pr": ["rx1say"],
    "prsn": ["hsd"],
    "tasmax": ["hd"],
    "tasmin": ["cd"],
}

# template filename
temp_fn = "ARC44_{}_{}_{}_ERA5bc.nc"

Create a list of arguments for filenames and requested summaries to supply to the pool of workers:

In [3]:
args = []

for scenario in scenarios:
    for varname in varnames:
        for model in models:
            fp = src_dir.joinpath(scenario, varname, temp_fn.format(scenario, varname, model))
            
            # aggregate variable names for this particular file
            aggr_varnames = aggr_varname_lu[varname]
            
            if fp.exists():
                args.append((fp, aggr_varnames, varname, scenario, model))

Define functions for each of the new extreme aggregate variables:

In [4]:
import numpy as np


def hd(arr, axis):
    # np.sort defaults to ascending.. 
    #   hd is simply "6th hottest" day
    return np.sort(arr, axis)[-6,:,:]
    

def cd(arr, axis):
    # np.sort defaults to ascending.. 
    #   cd is simply "6th coldest" day
    return np.sort(arr, axis)[5,:,:]


def rx1say(arr, axis):
    # bit redundant but here with others for completion
    return np.max(arr, axis)


def hsd(arr, axis):
    # number of days with snowfall greater than 10cm
    # assumes input is in cm
    return (arr > 10).sum(axis)

a function for summarizing a DataArray of model output for a particular variable to create a new data array of an aggregate variable:

In [5]:
def summarize(da, aggr, model, scenario):
    """Summarize a DataArray according to a specified
    aggregation function
    
    Args:
        da (xarray.DataArray): the DataArray object containing the base variable
            data to b summarized according to aggr
        aggr (str): String corresponding to the type of aggregation to do, 
            based on the new aggregation functions
        scenario (str): scenario being run (for new coordinate dimension)
        model (str): model being run (for new coordinate dimension)
            
    Returns:
        A new data array with dimensions year, latitude, longitude, in that order
            containing the summarized information
    """
    aggr_lu = {
        "hd": hd,
        "cd": cd,
        "rx1say": rx1say,
        "hsd": hsd,
    }
    new_da = (
        da.transpose("time", "lat", "lon")
        .groupby("time.year")
        .reduce(aggr_lu[aggr])
        .reset_coords(["longitude", "latitude", "height"], drop=True)
    )
    new_da.name = aggr
    
    # add model and scenario coordinate dimensions to the data array
    coords_di = {
        "model": model,
        "scenario": scenario,
    }

    new_dims = list(coords_di.keys())
    new_da = new_da.assign_coords(coords_di).expand_dims(new_dims)
    
    return new_da

And a wrapper function that will read in the data and run as many extreme variable summaries as requested for that particular model variable:

In [6]:
import xarray as xr


def run_summary(fp, aggr_varnames, varname, scenario, model):
    """Read in data and run all requeste aggregations for
    a particular model variable, scenario, and model.
    
    Args:
        fp (path-like): path to the file for the variable
            required for creating the aggregate variables
            in aggr_varnames
        aggr_varnames (list): aggregate variables to derive
            using data in provided filepath
        varname (str): model variable being used for aggr_varnames
        scenario (str): scenario being run
        model (str): model being run
        
    Returns:
        summary_das (tuple): tuple of the form (da, aggr_varname, scenario, model),
            where da is a DataArray with dimensions of year
            (summary year), latitude (lat) and longitude (lon)
    """
    # passing in model, scenario, agregate variable name
    #  so this information can be handed back after
    #  pool-ing to then construct new DataSet
    
    ds = xr.load_dataset(fp)
    da = ds[varname]
        
    # convert snow and precip to cm
    if varname in ["pr", "prsn"]:
        da = da * 8640
        
    if varname in ["tasmax", "tasmin"]:
        da = da - 273.15

    out = [summarize(da, aggr, model, scenario) for aggr in aggr_varnames]
    
    return out

Pool it!!

In [7]:
import time
from multiprocessing import Pool


tic = time.perf_counter()

with Pool(32) as pool:
    # results = pool.starmap(run_summary, args[50:60])
    results = pool.starmap_async(run_summary, args[:10])
    results.get()
    
print(f"{round((time.perf_counter() - tic) / 60, 1)}m")

Process ForkPoolWorker-14:
Process ForkPoolWorker-20:
Process ForkPoolWorker-19:
Process ForkPoolWorker-25:
Process ForkPoolWorker-22:
Process ForkPoolWorker-29:
Process ForkPoolWorker-13:
Process ForkPoolWorker-23:
Process ForkPoolWorker-21:
Process ForkPoolWorker-24:
Process ForkPoolWorker-26:
Process ForkPoolWorker-28:
Process ForkPoolWorker-27:
Process ForkPoolWorker-16:
Process ForkPoolWorker-18:
Process ForkPoolWorker-32:
Process ForkPoolWorker-17:
Process ForkPoolWorker-12:
Process ForkPoolWorker-11:
Process ForkPoolWorker-15:
Process ForkPoolWorker-8:
Process ForkPoolWorker-9:
Process ForkPoolWorker-30:
Process ForkPoolWorker-31:
Traceback (most recent call last):
Process ForkPoolWorker-7:
Process ForkPoolWorker-10:
Process ForkPoolWorker-4:
Process ForkPoolWorker-5:
Process ForkPoolWorker-3:
Process ForkPoolWorker-6:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/UA/kmredilla/miniconda3/envs/py38/lib/python3.8/multiprocessing/process.py", l

Okay, Pool is not working well for some reason. Success has been hit or miss, so we will try doing this in serial.

In [23]:
from tqdm.notebook import tqdm


results = []
for arg in tqdm(args):
    results.extend(run_summary(*arg))

  0%|          | 0/105 [00:00<?, ?it/s]

In [24]:
ds = xr.merge([da for da in results])

Round to reasonable precision:

In [33]:
ds["hsd"] = np.round(ds["hsd"], 1)
ds["hd"] = np.round(ds["hd"], 1)
ds["cd"] = np.round(ds["cd"], 1)
ds["rx1say"] = np.round(ds["rx1say"], 1)

Write to disk:

In [38]:
ds.to_netcdf(out_ds_fp)