# `precip-dot` data test `03`:
## Ability to independently re-produce pipeline

Verify that data produced at each step throughout the pipeline match what is expected with independent computation. 

### inputs

The path to the directory containing the output directories for all steps of the pipeline needs to be saved in the `PIPE_DIR`. A set of sample locations will be used, represented as latitude and longitude values. Those values are:

In [1]:
wgs84_coords = [
    (-147.7164, 64.8378), # Fairbanks
    (-149.9003, 61.2181) # Anchorage
]
print("Test coordinates that will be used:", wgs84_coords)

Test coordinates that will be used: [(-147.7164, 64.8378), (-149.9003, 61.2181)]


### tests

Each of the tests is enclosed in a function in the next cell, as are some helper functions. Generally, the flow of  tests go as: compute manually for sample locations, compare to pipeline results.

In [2]:
def timeit(func, args):
    tic = time.perf_counter()
    out = func(*args)
    elapsed = time.perf_counter() - tic
    d,u = 1,"s"
    if elapsed > 60:
        d,u = 60,"m"
    print(f"Elapsed time: {round(elapsed / d, 1)} {u}\n")
    return out


def transform_wgs84(coords_lst, wrf=True):
    """
    Transform list of lon, lat coordinates to desired projection
    """
    if wrf:
        wrf_proj = "+units=m +proj=stere +lat_ts=64.0 +lon_0=-152.0 +lat_0=90.0 +x_0=0 +y_0=0 +a=6370000 +b=6370000"
        transformer = Transformer.from_proj("EPSG:4326", wrf_proj, always_xy=True)
    else:
        transformer = Transformer.from_crs(4326, 3338, always_xy=True)
    
    return [transformer.transform(*coords) for coords in coords_lst]


def read_wrf(args):
    """
    Read and return xarray.DataArrays from a WRF grid-based netCDF file
    Set up for using Pool
    """
    fp, sel_xy = args[0], args[1]
    with xr.open_dataset(fp) as ds:
        varnames = [key for key in ds.data_vars.keys()]
        if len(varnames) == 1:
            das = [
                ds.sel(xc=xy[0], yc=xy[1], method="nearest")[varnames[0]]
                for xy in sel_xy
            ]
        else:
            # return dataset if intervals step or later
            das = [
                ds.sel(xc=xy[0], yc=xy[1], method="nearest")   
                for xy in sel_xy
            ]
            
    return das


def read_a14(args):
    """
    Read and return single values by location from Atlas 14 
    GeoTIFFs
    """
    fp, windows = args[0], args[1]
    with rio.open(fp) as src:
        return [src.read(1, window=window)[0][0]/1000 for window in windows]

    
def run_durations_test(wrf_dir, dur_dir, wrf_xy):
    """
    Verify that durations series are reproducible for sample indices
    Return results and durations data for use in next test
    """    
    # test for all 4 data groups
    results = {}
    for group in DATA_GROUPS:
        # read and compute sums manually from WRF data
        print(f"Working on {group}...", end="")
        # read target comparison year and previous year in case of overlap of binning periods
        year = DATA_GROUPS[group]["dur"]
        wrf_fp = sorted(glob.glob(os.path.join(wrf_dir, f"*{group}*{year}.nc")))[0]
        wrf_args = (wrf_fp, wrf_xy)
        wrf_das = read_wrf(wrf_args)
        man_arrs = np.array([
            np.array([
                da.resample(time=duration).sum().values[:5] 
                for duration in DURATIONS_PANDAS
            ])
            for da in wrf_das
        ])
                
        # read durations sums from pipeline step
        dur_fps = []
        # construct filepaths
        for duration in DURATIONS:
            if duration in ["60m", "2h", "3h", "6h"]:
                year = year
            else:
                year = ""
            fp = sorted(glob.glob(os.path.join(dur_dir, f"*_{duration}*{group}*{year}.nc")))[0]
            dur_fps.append(fp)        
        dur_args = [(fp, wrf_xy) for fp in dur_fps]
        p = Pool(16)
        dur_das = p.map(read_wrf, dur_args)
        p.close()
        p.join()
        # make arrays that are comparable to manually computed results
        dur_arrs = np.array([
            np.array([da_lst[i].values[:5] for da_lst in dur_das])
            for i in np.arange(2)
        ])
        
        result = np.all(np.isclose(dur_arrs, man_arrs))
        
        results[group] = {"result": result, "dur_das": dur_das}
        if result:
            print(f"Result: PASS")
        else:
            print(f"Result: FAIL")
            
    return results
        
    
    
def run_ams_test(dur_results, dur_dir, ams_dir, wrf_xy):
    """
    Verify that annual maximum series are reproducible for sample locations
    Takes output dict from durations test
    """
    # test for all 4 data groups
    results = {}
    for group in DATA_GROUPS:
        # compute AMS manually from durations test output
        print(f"Working on {group}...", end="")
        dur_das = dur_results[group]["dur_das"]
        # iterate through short durations to replace loaded 
        #  durations data with first year of ams series depending on group
        year = DATA_GROUPS[group]["ams"]
        short_dur_fps = [
            sorted(glob.glob(os.path.join(dur_dir, f"*_{duration}*{group}*{year}.nc")))[0]
            for duration in ["60m", "2h", "3h", "6h"]
        ]
        short_dur_args = [(fp, wrf_xy) for fp in short_dur_fps]
        p = Pool(4)
        short_dur_das = p.map(read_wrf, short_dur_args)
        p.close()
        p.join()
        dur_das[:4] = short_dur_das
        man_arrs = np.array([
            np.array([
                da_lst[i].sel(time=DATA_GROUPS[group]["ams"]).max().values 
                for da_lst in dur_das
            ])
            for i in np.arange(2)
        ])
        
        # read AMS from pipeline step
        ams_fps = [
            sorted(glob.glob(os.path.join(ams_dir, f"*{group}*_{duration}*.nc")))[0]
            for duration in DURATIONS
        ]     
        # spread reading over multiple cores
        ams_args = [(fp, wrf_xy) for fp in ams_fps]
        p = Pool(15)
        ams_das = p.map(read_wrf, ams_args)
        p.close()
        p.join()
        # make arrays that are comparable to manually computed results
        ams_arrs = np.array([
            np.array([da_lst[i].values[0] for da_lst in ams_das])
            for i in np.arange(2)
        ])
        
        result = np.all(np.isclose(ams_arrs, man_arrs))
        results[group] = {"result": result, "ams_das": ams_das}
        if result:
            print(f"Result: PASS")
        else:
            print(f"Result: FAIL")
            
    return results


def run_estimates(data):
    """
    take data as numpy array, return estimates and CIs
    """
    def run_bootstrap(data, lmom_fitted, intervals, method='pi'):
        ''' 
        Calculate confidence intervals using parametric bootstrap and the
        percentile interval method
        '''
        # function to bootstrap
        def sample_return_intervals(data, intervals=intervals):
            sample = lmom_fitted.rvs(len(data))
            paras = distr.gev.lmom_fit(sample)
            samplefit = [ paras[i] for i in ['loc', 'scale', 'c']]
            sample_fitted = distr.gev(**paras)
            sample_intervals = sample_fitted.ppf(1.0-1./intervals)
            res = samplefit
            res.extend(sample_intervals.tolist())
            return tuple(res)

        # the calculation
        out = boot.ci(
            data, statfunction=sample_return_intervals,
            alpha=0.05, n_samples=5000, 
            method=method, output='lowhigh'
        )
        ci_Td = out[0, 3:]
        ci_Tu = out[1, 3:]
        params_ci ={}

        return {'ci_Td':ci_Td, 'ci_Tu':ci_Tu}
    
    data = data * 0.0393701
    paras = distr.gev.lmom_fit(data)
    fitted_gev = distr.gev(**paras)
    avi = np.array([2,5,10,25,50,100,200,500,1000]).astype(np.float)    
    estimates = fitted_gev.ppf(1.-1./avi)
    boot_out = run_bootstrap(data, fitted_gev, avi)
    
    # return array with shape (3, 9): lower diff, estimate, upper diff
    lower = boot_out["ci_Td"] - estimates
    upper = boot_out["ci_Tu"] - estimates
    return np.array([lower, estimates, upper])


def run_intervals_test(ams_results, itl_dir, wrf_xy):
    """
    Verify that intervals are reproducible for sample locations
    """    
    results = {}
    for group in DATA_GROUPS:
        # compute intervals manually from AMS test output
        print(f"  Working on {group}...", end="")
        ams_das = ams_results[group]["ams_das"]
        # lists of arrays of data
        data_arrs = [[da_lst[i].values for da_lst in ams_das] for i in np.arange(2)]
        # iterate through test locations and Pool the estimation
        man_arrs = []
        for arr_lst in data_arrs:
            p = Pool(15)
            man_arrs.append(np.array(p.map(run_estimates, arr_lst)))
            p.close()
            p.join()
        man_arrs = np.array(man_arrs)
        
        # read the "diff'd" estimates from pipeline step
        itl_fps = [
            sorted(glob.glob(os.path.join(itl_dir, f"*{group}*_{duration}*.nc")))[0]
            for duration in DURATIONS
        ]
        # spread reading over multiple cores
        itl_args = [(fp, wrf_xy) for fp in itl_fps]
        p = Pool(15)
        itl_das = p.map(read_wrf, itl_args)
        p.close()
        p.join()
        # make arrays that are comparable to manually computed results
        varnames = ["pf-lower", "pf", "pf-upper"]
        itl_arrs = np.array([
            np.array([
                np.array([da_lst[i][varname].values for varname in varnames])
                for da_lst in itl_das
            ])
            for i in np.arange(2)
        ])
        
        # since the resulting confidence bounds are not 
        # determined (randomness of bootstrap), compare
        # the pf estimate results much more closely
        # pf estimate arrays
        man_pf_arr = np.array([
            [est_arr[1] for est_arr in dur_arr] 
            for dur_arr in man_arrs
        ])
        itl_pf_arr = np.array([
            [est_arr[1] for est_arr in dur_arr] 
            for dur_arr in itl_arrs
        ])
        # conf bound diffs arrays
        man_ci_arr = np.array([
            [np.array([est_arr[0], est_arr[2]]) for est_arr in dur_arr] 
            for dur_arr in man_arrs
        ])
        itl_ci_arr = np.array([
            [np.array([est_arr[0], est_arr[2]]) for est_arr in dur_arr] 
            for dur_arr in itl_arrs
        ])

        pf_result = np.all(np.isclose(itl_pf_arr, man_pf_arr))
        ci_result = np.all(
            np.isclose(
                np.around(itl_pf_arr, 2), np.round(man_pf_arr, 2)
            )
        )

        result = np.all([pf_result, ci_result])
        results[group] = {"result": result, "itl_das": itl_das}
        if result:
            print(f"Result: PASS")
        else:
            print(f"Result: FAIL")
            
    return results
        
    
def run_deltas_warp_test(itl_results, deltas_dir, warp_dir, wrf_xy, a14_xy):
    """
    Verify that deltas and warping are reproducible
    """ 
    results = {}
    # need to iterate over only models now
    for group in ["GFDL-CM3", "NCAR-CCSM4"]:
        print(f"Working on {group}...", end="")
        itl_das = itl_results[group + "_historical"]["itl_das"]
        # deltas only calculated from pf
        itl_arrs = np.array([
            np.array([
                da_lst[i]["pf"].values
                for da_lst in itl_das
            ])
            for i in np.arange(2)
        ])
        # read in future period intervals data
        fut_itl_fps = [
            sorted(glob.glob(os.path.join(itl_dir, f"*{group}_rcp85*_{duration}*.nc")))[0]
            for duration in DURATIONS
        ]
        # spread reading over multiple cores
        fut_itl_args = [(fp, wrf_xy) for fp in fut_itl_fps]
        p = Pool(15)
        fut_itl_das = p.map(read_wrf, fut_itl_args)
        p.close()
        p.join()
        # stuff into arrays
        fut_itl_arrs = np.array([
            np.array([
                da_lst[i]["pf"].values
                for da_lst in fut_itl_das
            ])
            for i in np.arange(2)
        ])
        # take deltas
        man_arrs = fut_itl_arrs / itl_arrs

        # read the deltas from pipeline step
        deltas_fps = [
            sorted(glob.glob(os.path.join(deltas_dir, f"*{group}*_{duration}*.nc")))[0]
            for duration in DURATIONS
        ]
        # spread reading over multiple cores
        deltas_args = [(fp, wrf_xy) for fp in deltas_fps]
        p = Pool(15)
        deltas_das = p.map(read_wrf, deltas_args)
        p.close()
        p.join()
        # make arrays that are comparable to manually computed results
        deltas_arrs = np.array([
            np.array([
                da_lst[i]["pf"].values
                for da_lst in deltas_das
            ])
            for i in np.arange(2)
        ])

        deltas_result = np.all(np.isclose(deltas_arrs, man_arrs))

        # read the warped estimates from pipeline step
        warp_fps = [
            sorted(glob.glob(os.path.join(warp_dir, f"*{group}*_{duration}*.nc")))[0]
            for duration in DURATIONS
        ]
        # spread reading over multiple cores
        warp_args = [(fp, a14_xy) for fp in warp_fps]
        p = Pool(15)
        warp_das = p.map(read_wrf, warp_args)
        p.close()
        p.join()
        # make arrays that are comparable to manually computed results
        warp_arrs = np.array([
            np.array([
                da_lst[i]["pf"].values
                for da_lst in warp_das
            ])
            for i in np.arange(2)
        ])
        # Now, ensure that the euclidean distance between corresponding arrays
        # are the smallest in comparison to all other combinations
        # probably not the best way to check this, but it should be the case that
        # distances between warped deltas and deltas, queried at the same relative location,
        # should be smaller than randomly chosen locations (represented by the locations
        # of other points)
        check_dists = [np.linalg.norm(x-y) for x,y in zip(warp_arrs, deltas_arrs)]
        rand_dists = [
            np.linalg.norm(x-y) for x,y in zip(warp_arrs, [
                np.roll(deltas_arrs, i, 0) 
                for i in np.arange(deltas_arrs.shape[0])[1:]
            ])
        ]
        warp_result = np.all([[d1 < d2 for d1 in check_dists] for d2 in rand_dists])
        
        result = np.all([deltas_result, warp_result])
        results[group] = {"result": result, "warp_das": warp_das}
        if result:
            print(f"Result: PASS")
        else:
            print(f"Result: FAIL")
            
    return results


def run_multiply_test(warp_results, mul_dir, a14_xy):
    """
    Verify that results of multiplication step are reproducible
    """
    results = {}
    for group in ["GFDL-CM3", "NCAR-CCSM4"]:
        print(f"working on {group}...", end="")
        # make warp arrs for multiplying with a14 data
        warp_das = warp_results[group]["warp_das"]
        warp_arrs = np.array([
            np.array([
                da_lst[i]["pf"].values
                for da_lst in warp_das
            ])
            for i in np.arange(2)
        ])
        # read atlas 14 data
        a14_durations = [duration.replace("2d", "48h") for duration in DURATIONS]
        a14_fps = [
            os.path.join(a14_dir, f"ak{interval}yr{duration.zfill(3)}a_ams.tif")
            for duration in a14_durations
            for interval in intervals
        ]
        a14_args = [(fp, windows) for fp in a14_fps]
        with rio.open(a14_fps[0]) as src:
            rc_idx = [src.index(*xy) for xy in a14_xy]
        windows = [Window(idx[1], idx[0], 1, 1) for idx in rc_idx]

        # pool reading of atlas14 data
        p = Pool(20)
        a14_data = p.map(read_a14, a14_args)
        p.close()
        p.join()
        a14_arrs = np.array([
            np.array([data[i] for data in a14_data]).reshape(14, 9)
            for i in np.arange(len(wgs84_coords))
        ])
        # manually computed multiplied
        man_arrs = a14_arrs * warp_arrs
        
        # read multiplied data from pipelines
        mul_fps = [
            sorted(glob.glob(os.path.join(mul_dir, f"*{group}*_{duration}*")))[0]
            for duration in DURATIONS
        ]
        mul_args = [(fp, a14_xy) for fp in mul_fps]
        p = Pool(15)
        mul_das = p.map(read_wrf, mul_args)
        p.close()
        p.join()
        # stuff into arrays
        mul_arrs = np.array([
            np.array([
                da_lst[i]["pf"].values / 1000
                for da_lst in mul_das
            ])
            for i in np.arange(2)
        ])
    
        result = np.all(np.isclose(mul_arrs, man_arrs))
        if result:
            print(f"Result: PASS")
        else:
            print(f"Result: FAIL")
            
    return None
    

## Run tests

### Setup

In [3]:
import os, glob, time, datetime
import lmoments3 as lmom
import numpy as np
import rasterio as rio
import scikits.bootstrap as boot
import xarray as xr
from lmoments3 import distr
from multiprocessing import Pool
from pyproj import Transformer
from rasterio.windows import Window


# define global variables
# because of the asymmetry in file structure between short/long durations, 
#   simplest way to efficiently compare manual sample with pipeline is to load
#   first years used in each step for each group 
#   (1970, 2006 for durations; 1979, 2020 for AMS)
DATA_GROUPS = {
    "GFDL-CM3_historical": {"dur": "1970", "ams": "1979"},
    "GFDL-CM3_rcp85": {"dur": "2006", "ams": "2020"},
    "NCAR-CCSM4_historical": {"dur": "1970", "ams": "1979"},
    "NCAR-CCSM4_rcp85": {"dur": "2006", "ams": "2020"},
}

DURATIONS = [
    "60m",
    "2h",
    "3h",
    "6h",
    "12h",
    "24h",
    "2d",
    "3d",
    "4d",
    "7d",
    "10d",
    "20d",
    "30d",
    "45d",
    "60d",
]
DURATIONS_PANDAS = [
    "1H",
    "2H",
    "3H",
    "6H",
    "12H",
    "24H",
    "2D",
    "3D",
    "4D",
    "7D",
    "10D",
    "20D",
    "30D",
    "45D",
    "60D",
]

pipe_dir = os.getenv("PIPE_DIR")
wrf_xy = transform_wgs84(wgs84_coords)
a14_xy = transform_wgs84(wgs84_coords, wrf=False)



### durations

In [4]:
wrf_dir = os.path.join(pipe_dir, "pcpt")
dur_dir = os.path.join(pipe_dir, "durations")


args = (wrf_dir, dur_dir, wrf_xy)
dur_results = timeit(run_durations_test, args)

Working on GFDL-CM3_historical...Result: PASS
Working on GFDL-CM3_rcp85...Result: PASS
Working on NCAR-CCSM4_historical...Result: PASS
Working on NCAR-CCSM4_rcp85...Result: PASS
Elapsed time: 16.1 m



### annual maximums

In [5]:
ams_dir = os.path.join(pipe_dir, "ams")

args = (dur_results, dur_dir, ams_dir, wrf_xy)
ams_results = timeit(run_ams_test, args)

Working on GFDL-CM3_historical...Result: PASS
Working on GFDL-CM3_rcp85...Result: PASS
Working on NCAR-CCSM4_historical...Result: PASS
Working on NCAR-CCSM4_rcp85...Result: PASS
Elapsed time: 22.4 s



### intervals

In [6]:
itl_dir = os.path.join(pipe_dir, "diff")

args = (ams_results, itl_dir, wrf_xy)
itl_results = timeit(run_intervals_test, args)

  Working on GFDL-CM3_historical...Result: PASS
  Working on GFDL-CM3_rcp85...Result: PASS
  Working on NCAR-CCSM4_historical...Result: PASS
  Working on NCAR-CCSM4_rcp85...Result: PASS
Elapsed time: 2.0 m



### deltas and warping

In [7]:
deltas_dir = os.path.join(pipe_dir, "deltas")
warp_dir = os.path.join(pipe_dir, "warp")

args = (itl_results, deltas_dir, warp_dir, wrf_xy, a14_xy)
warp_results = timeit(run_deltas_warp_test, args)

Working on GFDL-CM3...Result: PASS
Working on NCAR-CCSM4...Result: PASS
Elapsed time: 11.1 s



### multiply

In [8]:
mul_dir = os.path.join(pipe_dir, "multiply")

args = (itl_results, deltas_dir, warp_dir, wrf_xy, a14_xy)
warp_results = timeit(run_deltas_warp_test, args)

Working on GFDL-CM3...Result: PASS
Working on NCAR-CCSM4...Result: PASS
Elapsed time: 1.9 s



In [9]:
utc_time = datetime.datetime.utcnow()
comp_time = utc_time.strftime("%Y-%m-%d %H:%M:%S")
print(f"Completion time of previous test: {comp_time}")

Completion time of previous test: 2020-11-30 18:04:35
