# AK spruce beetle outbreak risk pipeline

This notebook is used for creating a climate-based dataset of spruce beetle outbreak risk.

It is currently under development.

Define a function that returns a percentage survival based on univoltinism from supplied sequences of daily minimum and maximum temperatures for a given pixel:

In [1]:
import numpy as np


def univoltine(tmin, tmax):
    try:
        idx = np.where(tmax >= 16)[0][0]
    except IndexError:
        return 0
    
    tmin = tmin[idx + 40:idx + 90]
    tmax = tmax[idx + 40:idx + 90]
    # hour counter
    k = 0
    # easy if tmin ever above 17
    # need to remember indices of values so we can exclude from
    #  the hourly estimator
    hot_idx = tmin > 17
    k += 24 * hot_idx.sum()
    # discard indices that counted for entire days above 17C
    tmax = tmax[~hot_idx]
    tmin = tmin[~hot_idx]
    # need special treatment for tin == 17 as well, as it would
    #  require division by zero in our estimation algorithm next. 
    #  just assume it is above 17 for 75% of the time, or 18 hrs
    equal_idx = tmin == 17
    k += 18 * equal_idx.sum()
    # discard indices that counted for days where tmin == 17C
    tmax = tmax[~equal_idx]
    tmin = tmin[~equal_idx]
    # then, multiply percent of temp difference above 17 by 24
    #  to get estimate of hours above 17
    # then get the estimate of remaining hours above 17 and add to
    #  running total
    h_est = ((tmax - 17) / (17 - tmin)) / 2 * 24
    h_est[h_est < 0] = 0
    k += h_est.sum()
    
    # then determine "survival" due to univoltinism
    if k < 40:
        x = 50
    elif 40 <= k < 225:
        x = 50 + (k - 40) / 14.8
    elif 225 <= k < 412:
        k = 62.5 + (k - 225) / 5
    else:
        k = 100

    return round(k / 100, 2)

Define a function that returns a value representing percentage survival in the fall as a result of mortality caused by rapid cooling using daily temperature minimums for a given pixel / year. It should first compute the number of degree days below a threshold that decreases linearly at a rate of 0.5 C per day, within a 21 day window starting with the first day a minimum temperature of -12 is reached. This resulting "degree days below the curve" value will then be mapped to a percentage survival.

In [2]:
def fall_survival(arr):
    """Execute the fall survival algorithm for an
    array of temperature minimums for a single year.
    """
    try:
        idx = np.where(arr <= -12)[0][0]
    except IndexError:
        return 1.0
    
    # return 0 if tmin is ever less than -30
    if arr.min() < -30:
        return 0
    
    window = arr[idx:idx + 21]
    # cooling cutoff values
    thr_arr = np.arange(-12, -22.5, -0.5)
    dd = thr_arr - window
    # count only positive values and sum
    dd = dd[dd > 0].sum()
    # ensure value is between 0 and 100
    fall_survival = np.clip(100 - (dd * 4.76), 0, 100)
        
    return round(fall_survival / 100, 2)

Define a function that maps winter minimum temperature to percent survival. The survival rate will be based on snow cover, which will be one of three categories: minimal/no snow, moderate snowpack, deep snowpack.

In [3]:
def winter_survival(tmin, snow):
    """Map a supplied minimum temperature to percent survival
    based on snowpack
    """
    if snow == "low":
        # linear ramp from -20 (100%) to -40 (0%) for no snowpack
        winter_survival = 200 + 5*tmin
    elif snow == "med":
        # linear ramp from -30 (100%) to -50 (0%) for no snowpack
        winter_survival = 250 + 5*tmin
    elif snow == "high":
        # linear ramp from -40 (100%) to -60 (0%) for no snowpack
        winter_survival = 300 + 5*tmin
    else:
        raise ValueError("snow parameter must be one of low, med, or high")
    winter_survival = np.clip(winter_survival, 0, 100) 

    return np.round(winter_survival / 100, 2)

Define a function to compute the overall risk array for a given era, model, and scenario, using the above functions:

In [4]:
import xarray as xr
from itertools import product


def compute_risk_arrays(args):
    """Compute the risk arrays from the NCAR BCSD data
    for a given model, scenario, and era. Takes a single argument
    for multiprocessing purposes. 
    
    Args:
        args (tuple): tuple of arguments of the form (met_dir, tmp_fn, era, model, scenario)
        where arguments are defined as:
            met_dir (pathlib.PosixPath): path to the directory containing met data organized as
                folders named by model
            tmp_fn (str): template filename string
            era (str): era to be processed, of the form <start year>-<end year>
            model (str): model to be processed
            scenario (str): scenario to be processed
    
    Returns:
        risk_da (xarray.DataArray): DataArray of risk with dimensions model, scenario,
            snow load level, year, y index, x index
    """
    met_dir, tmp_fn, era, model, scenario = args
    yearly_risk_arrs = []
    start_year, end_year = era.split("-")
    start_year = int(start_year)
    end_year = int(end_year)
    years = np.arange(start_year, end_year)
    for year in years:
        yr1_fp = met_dir.joinpath(model, scenario, tmp_fn.format(model, scenario, year - 1))
        yr2_fp = met_dir.joinpath(model, scenario, tmp_fn.format(model, scenario, year))
        with xr.open_mfdataset([yr1_fp, yr2_fp]) as ds:
            winter_tmin = ds["tmin"].sel(
                time=slice(f"{year - 1}-07-01", f"{year}-06-30")
            ).values
            tmax = ds["tmax"].sel(
                time=slice(f"{year}-01-01", f"{year}-12-31")
            ).values
            tmin = ds["tmin"].sel(
                time=slice(f"{year}-01-01", f"{year}-12-31")
            ).values
        
        survival = {}
        survival["fall"] = np.apply_along_axis(fall_survival, 0, winter_tmin)
        # need to iterate over axes indices for summer "survival" because
        #  both tmin and tmax arrays are needed
        survival["summer"] = np.empty(tmin.shape[1:])
        for i, j in product(range(tmin.shape[1]), range(tmin.shape[2])):
            survival["summer"][i,j] = univoltine(tmin[:,i,j], tmax[:,i,j])
        
        # each year will have three risk arrays, one for each level of snowpack
        year_risk_arr = []
        snow_values = ["low", "med", "high"]
        for snow in snow_values:
            survival["winter"] = winter_survival(winter_tmin.min(axis=0), snow)
            year_risk_arr.append(
                # just taking the raw product of all three "survival"
                #  estimates for a yearly risk metric for now
                np.prod(
                    np.array(list(survival.values())), 0
                )
            )
        
        yearly_risk_arrs.append(np.array(year_risk_arr))
    
    # flip along y axis because it's inverted in ingest data
    yearly_risk = np.flip(np.array(yearly_risk_arrs), axis=2)
    # swap the year and snow axes for more intuitive structure
    #  to (snow, year, y, x) from (year, snow, y, x)
    yearly_risk = np.swapaxes(yearly_risk, 0, 1)
    # nodata_mask = np.broadcast_to(np.flipud(np.isnan(winter_tmin[0])), yearly_risk.shape)
    # yearly_risk[nodata_mask] = np.nan
    
    # create a DataArray for easier construction of full DataArray with all results
    risk_da = xr.DataArray(
        # need to expand dims to add an extra for each of model, scenario
        data=np.expand_dims(yearly_risk, (0, 1)),
        dims=["model", "scenario", "snow", "year", "y", "x"],
        coords={
            "year": (["year"], years),
            "model": (["model"], [model]),
            "scenario": (["scenario"], [scenario]),
            # need to flip lat/lon arrays as well, since the values are flipped above
            "longitude": (["y", "x"], np.flipud(ds["longitude"].values)),
            "latitude": (["y", "x"], np.flipud(ds["latitude"].values)),
            "snow": (["snow"], snow_values),
        },
        attrs=dict(
            description="Climate-based beetle risk",
        ),
    )
    
    return risk_da

Process the datasets into risk arrays using `multiprocessing.Pool`. Set up all combinations of desired models / scenarios / eras as tasks.

Set up path variables:

In [5]:
from pathlib import Path


met_dir = Path("/workspace/Shared/Tech_Projects/NCAR_AK/met")
tmp_fn = "{}_{}_BCSD_met_{}.nc4"

Make arguments for `compute_risk_arrays` function:

In [6]:
import luts


args_list = []
for model in luts.models:
    for scenario in luts.scenarios:
        for era in luts.eras:
            # ignore hist + future eras
            # if scenario == "hist" and era != "1950-2009":
            #     continue
            # simply ignore historical era /scenario for now
            if scenario == "hist" or era == "1950-2009":
                continue
            args_list.append((met_dir, tmp_fn, era, model, scenario))

Run it:

In [7]:
import time
from multiprocessing import Pool
import tqdm


# seems to be some limitation here for feasible number of workers.
#  Perhaps memory limitation? Using 10 on Atlas 17, this completed in 
#  37 minutes (only the future scenarios being tested here)
with Pool(10) as pool:
    risk_das = [
        result for result in tqdm.tqdm(
            pool.imap_unordered(compute_risk_arrays, args_list), total=len(args_list))
    ]

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 40/40 [37:22<00:00, 56.05s/it]


In [8]:
risk_da = xr.combine_by_coords(risk_das)

In [None]:
out_fp = "/workspace/Shared/Tech_Projects/beetles/final_products/yearly_risk.nc"

risk_da.to_netcdf(out_fp)