# NCAR BCSD AK indicators processing

In [69]:
# indicators.py
# TO-DO: put this in a separate script
"""This script includes functions that define the various extreme variables we will by deriving"""

import numpy as np
import xclim.indices as xci
from xclim.core.calendar import percentile_doy
from xclim.core.units import convert_units_to, to_agg_units
from xclim.indices.generic import threshold_count


def take_sorted(arr, axis, idx):
    """Helper function for the 'hot day' and 'cold day' indices to slice a numpy array after sorting it. Done in favor of fixed, []-based indexing.
    
    Args:
        arr (numpy.ndarray): array
        axis (int): axis to sort and slice according to
        idx (int): index value to slice arr at across all other axes
        
    Returns:
        array of values at position idx of arr sorted along axis
    """
    return np.take(np.sort(arr, axis), idx, axis)


def hd(tasmax):
    """'Hot Day' - the 6th hottest day of the year
    
    Args:
        tasmax (xarray.DataArray): daily maximum temperature values for a year
        
    Returns:
        Hot Day values for each year
    """
    def func(tasmax):
        # np.sort defaults to ascending.. 
        #   hd is simply "6th hottest" day
        return tasmax.reduce(take_sorted, dim="time", idx=-6)
    
    # hardcoded unit conversion
    out = tasmax.resample(time="1Y").map(func) - 273.15
    out.attrs["units"] = "C"
    out.attrs["comment"] = "'hot day': 6th hottest day of the year"
    
    return out
    

def cd(tasmin):
    """'Cold Day' - the 6th coldest day of the year
    
    Args:
        tasmin (xarray.DataArray): daily minimum temperature values
        
    Returns:
        Cold Day values for each year
    """
    def func(tasmin):
        # time_ax = np.where(np.array(tasmin.dims) == "time")[0][0]
        # np.sort defaults to ascending.. 
        #   cd is simply "6th coldest" day
        return tasmin.reduce(take_sorted, dim="time", idx=5)
    
    # hardcoded unit conversion
    out = tasmin.resample(time="1Y").map(func) - 273.15
    out.attrs["units"] = "C"
    out.attrs["comment"] = "'cold day': 6th coldest day of the year"
    
    return out
    

def rx1day(pr):
    """'Max 1-day precip' - the max daily precip value recorded for a year.
    
    Args:
        pr (xarray.DataArray): daily total precip values
        
    Returns:
        Max 1-day precip for each year
    """
    out = xci.max_n_day_precipitation_amount(pr, freq="YS")
    out.attrs["units"] = "mm"
    
    return out


def rx5day(pr):
    """'Max 5-day precip' - the max 5-day precip value recorded for a year.
    
    Args:
        pr (xarray.DataArray): daily total precip values
        
    Returns:
        Max 5-day precip for each year
    """
    out = xci.max_n_day_precipitation_amount(pr, 5, freq="YS")
    out.attrs["units"] = "mm"
    
    return out


def su(tasmax):
    """'Summer days' - the number of days with tasmax above 25 C
    
    Args:
        tasmax (xarray.DataArray): daily maximum temperature values for a year
        
    Returns:
        Number of summer days for each year
    """
    return xci.tx_days_above(tasmax, "25 degC", freq="YS")


def dw(tasmin):
    """'Deep winter days' - the number of days with tasmin below -30 C
    
    Args:
        tasmin (xarray.DataArray): daily maximum temperature values for a year
        
    Returns:
        Number of deep winter days for each year
    """
    return xci.tn_days_below(tasmin, thresh="-30 degC", freq="YS")


def wsdi(tasmax, hist_da):
    """'Warm spell duration index' - Annual count of occurrences of at least 5 consecutive days with daily max T above 90th percentile of historical values for the date
    
    Args:
        tasmax (xarray.DataArray): daily maximum temperature values
        hist_da (xarray.DataArray): historical daily maximum temperature values
        
    Returns:
        Warm spell duration index for each year
    """
    tasmax_per = percentile_doy(hist_da, per=90).sel(percentiles=90)
    return xci.warm_spell_duration_index(tasmax, tasmax_per, window=6, freq="YS").drop("percentiles")


def csdi(tasmin, hist_da):
    """'Cold spell duration index' - Annual count of occurrences of at least 5 consecutive days with daily min T below 10th percentile of historical values for the date
    
    Args:
        tasmin (xarray.DataArray): daily minimum temperature values for a year
        hist_da (xarray.DataArray): historical daily minimum temperature values
        
    Returns:
        Cold spell duration index for each year
    """
    tasmin_per = percentile_doy(hist_da, per=10).sel(percentiles=10)
    return xci.cold_spell_duration_index(tasmin, tasmin_per, window=6, freq="YS").drop("percentiles")


def r10mm(pr):
    """'Heavy precip days' - number of days in a year with over 10mm of precip
    
    Args:
        pr (xarray.DataArray): daily total precip values
        
    Returns:
        Number of heavy precip days for each year
    """
    # code based on xclim.indices._threshold.tg_days_above
    thresh = "10 mm/day"
    thresh = convert_units_to(thresh, pr)
    f = threshold_count(pr, ">", thresh, freq="YS")
    return to_agg_units(f, pr, "count")


def cwd(pr):
    """'Consecutive wet days' - number of the most consecutive days with precip > 1 mm
    
    Args:
        pr (xarray.DataArray): daily total precip values
        
    Returns:
        Max number of consecutive wet days for each year
    """
    return xci.maximum_consecutive_wet_days(pr, thresh=f"1 mm/day", freq="YS")


def cdd(pr):
    """'Consecutive dry days' - number of the most consecutive days with precip < 1 mm
    
    Args:
        pr (xarray.DataArray): daily total precip values
        
    Returns:
        Max number of consecutive dry days for each year
    """
    return xci.maximum_consecutive_dry_days(pr, thresh=f"1 mm/day", freq="YS")


def compute_indicator(da, indicator, scenario, model, kwargs={}):
    """Summarize a DataArray according to a specified index / aggregation function
    
    Args:
        da (xarray.DataArray): the DataArray object containing the base variable data to be summarized according to aggr
        indicator (str): String corresponding to the name of the indicator to compute (assumes value is equal to the name of the corresponding function)
        scenario (str): scenario being run (for new coordinate dimension)
        model (str): model being run (for new coordinate dimension)
        kwargs (dict): additional arguments for the index function being called
            
    Returns:
        A new data array with dimensions year, latitude, longitude, in that order containing the summarized information
    """
    new_da = globals()[indicator](da, **kwargs)
    new_da = new_da.compute()
    new_da.name = indicator  

    # add model and scenario coordinate dimensions to the data array
    coords_di = {
        "model": model,
        "scenario": scenario,
    }

    new_dims = list(coords_di.keys())
    new_da = new_da.assign_coords(coords_di).expand_dims(new_dims)
    # convert the time dimension to integer years instead of CF time objects
    new_da = new_da.rename({"time": "year"}).assign_coords({"year": new_da.time.dt.year.values})

    return new_da


In [5]:
from multiprocessing import Pool
import numpy as np
import tqdm
import xarray as xr
# project

# import indices
# ignore all-nan slice warnings
import warnings
warnings.filterwarnings('ignore', r'All-NaN (slice|axis) encountered')
import time
# tic = time.perf_counter()

In [164]:
# config.py

"""Config file for setting shared paths, imports, etc across the project"""

import os
from pathlib import Path


out_dir = Path("/atlas_scratch/kmredilla/")
# path to directory containing CORDEX data
ncar_dir = Path("/atlas_scratch/cparr4/ncar_replacement_data")

# path to dataset of extreme variables calculated on an annal basis
#  for the entire domain of the CORDEX data.
indicators_fp = out_dir.joinpath("ncar12km_indicators.nc")

# models, scenarios, and base variable names as found in the base CORDEX data
models = [
    # "ACCESS1-3",
    # "CanESM2",
    "CCSM4",
    # "CSIRO-Mk3-6-0",
    # "GFDL-ESM2M",
    # "HadGEM2-ES",
    # "inmcm4",
    # "MIROC5",
    "MRI-CGCM3"
]

scenarios = ["rcp45", "rcp85"]

varnames = ["pcp", "tmin", "tmax"]

# map from model variable names to possible index variable names
indicator_varname_lu = {
    'rx1day': 'pcp',
    'rx5day': 'pcp',
    'r10mm': 'pcp',
    'cwd': 'pcp',
    'cdd': 'pcp',
    'hd': 'tmax',
    'su': 'tmax',
    'wsdi': 'tmax',
    'cd': 'tmin',
    'dw': 'tmin',
    'csdi': 'tmin'
}

indicators = list(indicator_varname_lu.keys())

# template filename
temp_fn = "{}_{}_BCSD_met_{}.nc"


We will use functions from the `indicators.py` script to compute the indicators. Define a wrapper function for the `compute_indicator` function that will open the connection to a dataset, which is a collection of files from 2006-2100 for a particular model, and compute all indicators for that particular file.

In [94]:
def run_compute_indicators(fps, scenario, model):
    """Read in data and compute all requested indices for a particular model variable, scenario, and model.
    
    Args:
        fps (list): list of paths to the yearly met files for computing indicators from
        scenario (str): scenario being run
        model (str): model being run
        
    Returns:
        summary_das (tuple): list of DataArrays containing indicator values (one for each indicator)
    """
    # Should be ~6 GB to load ~100 years of data
    # since each subdataset will need to be operated on multiple times, just load this into memory
    ds = xr.open_mfdataset(fps)
    ds = ds.load()
    print(f"data for {scenario}, {model} loaded into memory")
    # need to remove underscore in units for xclim :/
    ds["tmin"].attrs["units"] = "degC"
    ds["tmax"].attrs["units"] = "degC"
        
    summary_das = []
    for indicator in indicators:
        varname = indicator_varname_lu[indicator]
        if indicator in ["wsdi", "csdi"]:
            kwargs = {"hist_da": daymet_ds[varname]}
            summary_das.append(compute_indicator(ds[varname], indicator, scenario, model, kwargs))
        else:
            summary_das.append(compute_indicator(ds[varname], indicator, scenario, model))

        print(indicator, "done", end=", ")
    print()
    return summary_das

Create global daymet dataset for use in computing WSDI and CSDI indicators:

In [93]:
%%time
daymet_ds = xr.open_mfdataset([f"/atlas_scratch/cparr4/ncar_replacement_data/daymet/daymet_met_{year}.nc" for year in range(1980, 2010)])
daymet_ds = daymet_ds.load()
# drop underscore from units in tmin/tmax, for xclim to be happy
daymet_ds["tmin"].attrs["units"] = "degC"
daymet_ds["tmax"].attrs["units"] = "degC"

CPU times: user 39.6 s, sys: 7.99 s, total: 47.6 s
Wall time: 40.2 s


Iterate over the projected models and scenarios and compute the indicators:

In [95]:
%%time
results = []
for scenario in scenarios:
    for model in models:
        fps = [ncar_dir.joinpath(f"{model}_{scenario}_BCSD_met_{year}.nc") for year in range(2006, 2100)]
        results.append(run_compute_indicators(fps, scenario, model))

data for rcp45, CCSM4 loaded into memory
rx1day  done, rx5day  done, r10mm  done, cwd  done, cdd  done, hd  done, su  done, wsdi  done, cd  done, dw  done, csdi  done, 
data for rcp45, MRI-CGCM3 loaded into memory
rx1day  done, rx5day  done, r10mm  done, cwd  done, cdd  done, hd  done, su  done, wsdi  done, cd  done, dw  done, csdi  done, 
data for rcp85, CCSM4 loaded into memory
rx1day  done, rx5day  done, r10mm  done, cwd  done, cdd  done, hd  done, su  done, wsdi  done, cd  done, dw  done, csdi  done, 
data for rcp85, MRI-CGCM3 loaded into memory
rx1day  done, rx5day  done, r10mm  done, cwd  done, cdd  done, hd  done, su  done, wsdi  done, cd  done, dw  done, csdi  done, 
CPU times: user 1h 5min 40s, sys: 15min 33s, total: 1h 21min 13s
Wall time: 1h 19min 44s


Merge the individual DataArrays into a single dataset:

In [141]:
%time proj_indicators_ds = xr.merge([da for da_list in results for da in da_list])

CPU times: user 18.1 s, sys: 5.23 s, total: 23.4 s
Wall time: 23.4 s


Process applicable indicators for the historical era (using Daymet dataset):

In [136]:
%%time
summary_das = []
for indicator in ["rx1day", "rx5day", "r10mm", "cwd", "cdd", "hd", "su", "cd", "dw"]:
    varname = indicator_varname_lu[indicator]
    summary_das.append(compute_indicator(daymet_ds[varname], indicator, "historical", "daymet"))
    print(indicator, "done", end=", ")

rx1day done, rx5day done, r10mm done, cwd done, cdd done, hd done, su done, cd done, dw done, CPU times: user 1min 48s, sys: 32.6 s, total: 2min 21s
Wall time: 2min 21s


And combine into a Dataset:

In [149]:
%time daymet_indicators_ds = xr.merge(summary_das)

CPU times: user 20.7 ms, sys: 21 µs, total: 20.7 ms
Wall time: 20 ms


Convert 0's (which are null values from some of the xclim indicators) to -9999. Do this for projected indicators:

In [143]:
def replace_nan(da):
    da.values[nan_mask] = -9999
    da.attrs["_FillValue"] = -9999
    return da

nan_mask = np.isnan(proj_indicators_ds["rx1day"])

for indicator in ["r10mm", "wsdi", "csdi", "cwd", "cdd", "su", "dw"]:
    proj_indicators_ds[indicator] = replace_nan(proj_indicators_ds[indicator]).astype(np.int32)

Then daymet indicators:

In [155]:
# setting different nanmask because different data cubes
nan_mask = np.isnan(daymet_indicators_ds["rx1day"])

for indicator in ["r10mm", "cwd", "cdd", "su", "dw"]:
    # um this array isn't writeable? never seen this before
    daymet_indicators_ds[indicator].values.setflags(write=1)
    daymet_indicators_ds[indicator] = replace_nan(daymet_indicators_ds[indicator]).astype(np.int32)

Then combine the projected and Daymet indicators Datasets together:

In [163]:
indicators_ds = xr.merge([daymet_indicators_ds, proj_indicators_ds])
del indicators_ds.attrs["units"]

Round certain indicators to reasonable precision:

In [170]:
for indicator in ["hd", "cd", "rx1day", "rx5day"]:
    indicators_ds[indicator] = np.round(indicators_ds[indicator], 1)

Add global metadata:

In [171]:
from datetime import datetime


indicators_ds.attrs = {
    "creation_date": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
}

Write to disk (might take a couple of minutes):

In [173]:
%time indicators_ds.to_netcdf(indicators_fp)

CPU times: user 88.6 ms, sys: 4.26 s, total: 4.35 s
Wall time: 12.7 s


done!