## IVT Orchestration

In [1]:
import xarray as xr
import numpy as np
from tqdm import tqdm
from pathlib import Path

from compute_ivt import *
from config import era5_fp, ar_params, ard_fp

Let's verify the configuration. The "window" parameter will determine the number of days before and after a given DOY to compute IVT percentiles. For example, a value of 60 would create a fourth month window (60 days before a DOY and 60 days after).

In [2]:
ar_params["window"]

75

The way that this is configured is a little confusing to me, because the actual window is 2x this amount, half the days occuring before, and half the days occuring after, some target date. I'll create a `window_days` variable to represent how long the actual window of time is.

In [3]:
window_days = ar_params["window"] * 2
window_days

150

The quantile to compute, which must be between 0 and 1 inclusive. To be explicit: For some vector V, the q-th quantile of V is the value q of the way from the minimum to the maximum in a sorted copy of V. The default "percentile" is 85, so the q-th quantile in this case is the value estimated at 0.85 of the way between the minimum and maximum values. Note that in the NumPy implementation, the `quantile` and `percentile` functions are equivalent.

In [4]:
quantile = ar_params["ivt_percentile"] / 100

test_arr = np.array(range(101))

qth_quantile = np.quantile(test_arr, q=quantile)
qth_percentile = np.percentile(test_arr, q=ar_params["ivt_percentile"])

assert qth_percentile == qth_quantile == 85

In [5]:
# create dask client
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 32,Total memory: 251.72 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:42659,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 32
Started: Just now,Total memory: 251.72 GiB

0,1
Comm: tcp://127.0.0.1:42170,Total threads: 4
Dashboard: http://127.0.0.1:44802/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:46405,
Local directory: /tmp/dask-scratch-space/worker-8_610_0l,Local directory: /tmp/dask-scratch-space/worker-8_610_0l

0,1
Comm: tcp://127.0.0.1:45043,Total threads: 4
Dashboard: http://127.0.0.1:33489/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:43367,
Local directory: /tmp/dask-scratch-space/worker-4pxcit0p,Local directory: /tmp/dask-scratch-space/worker-4pxcit0p

0,1
Comm: tcp://127.0.0.1:39586,Total threads: 4
Dashboard: http://127.0.0.1:36501/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:39963,
Local directory: /tmp/dask-scratch-space/worker-7su8has0,Local directory: /tmp/dask-scratch-space/worker-7su8has0

0,1
Comm: tcp://127.0.0.1:43778,Total threads: 4
Dashboard: http://127.0.0.1:43801/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:37548,
Local directory: /tmp/dask-scratch-space/worker-2e4pxe8b,Local directory: /tmp/dask-scratch-space/worker-2e4pxe8b

0,1
Comm: tcp://127.0.0.1:36924,Total threads: 4
Dashboard: http://127.0.0.1:33172/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:43617,
Local directory: /tmp/dask-scratch-space/worker-3c3jsp1y,Local directory: /tmp/dask-scratch-space/worker-3c3jsp1y

0,1
Comm: tcp://127.0.0.1:33723,Total threads: 4
Dashboard: http://127.0.0.1:40379/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:40212,
Local directory: /tmp/dask-scratch-space/worker-xnxa44mv,Local directory: /tmp/dask-scratch-space/worker-xnxa44mv

0,1
Comm: tcp://127.0.0.1:45759,Total threads: 4
Dashboard: http://127.0.0.1:36330/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:46366,
Local directory: /tmp/dask-scratch-space/worker-lx3axv9x,Local directory: /tmp/dask-scratch-space/worker-lx3axv9x

0,1
Comm: tcp://127.0.0.1:42247,Total threads: 4
Dashboard: http://127.0.0.1:41275/status,Memory: 31.46 GiB
Nanny: tcp://127.0.0.1:37698,
Local directory: /tmp/dask-scratch-space/worker-axmdjbo2,Local directory: /tmp/dask-scratch-space/worker-axmdjbo2


In [6]:
def compute_magnitude(u, v):
    """
    Compute vapor transport magnitude from eastward (u) and northward (v) components.
    reference: https://www.eol.ucar.edu/content/wind-direction-quick-reference
    """
    #func = lambda x, y: np.sqrt(x**2 + y**2)
    return xr.apply_ufunc(np.hypot, u, v, dask='parallelized')


def direction(a, b):
    """
    compute geographic IVT direction (degrees with respect to true north (0=north,90=east,180=south,270=west) that the wind is coming FROM)
    from eastward and northward components
    reference: https://www.eol.ucar.edu/content/wind-direction-quick-reference
    """
    func = lambda x, y: 270 - ((180 / np.pi) * np.arctan2(x, y))
    return xr.apply_ufunc(func, a, b, dask='parallelized')


In [7]:
def generate_start_end_doys(day_of_year, window):

    half_window = window // 2
    # check leap year
    is_leap_year = (day_of_year == 366)
    total_days = 366 if is_leap_year else 365
    
    start_day_of_year = (day_of_year - half_window) % total_days
    end_day_of_year = (day_of_year + half_window) % total_days
    
    return start_day_of_year, end_day_of_year

In [8]:
def compute_period_of_record_quantile(da, day_of_year, window, target_quantile):
    """
    Compute a single quantile value for IVT magnitude over rolling time window. In in this instance we subsetting the data for
    specific DOY periods - so we don't need to change the value of the window to account for the six-hour timesteps.
    Returned quantiles are for the entire period of record, so the values returned represent the q-th quantile
    for all observations within a time window (e.g. 5 months) centered on some DOY for all years in the record.
    This approach is closer to a "climatology" of q-th quantile IVT.
    """
    # check leap year
    leap_year = (day_of_year == 366)
    total_days = 366 if leap_year else 365
    
    # subset data for the specified day-of-year span
    start_day_of_year, end_day_of_year = generate_start_end_doys(day_of_year, window)
    
    if day_of_year >= window // 2 and 365 - day_of_year >= window // 2:
        # subset without wrapping around the year boundary (e.g., DOY is 180)
        subset = da.sel(time=((da.doy >= start_day_of_year) & (da.doy <= end_day_of_year)))
    else:
        # subset by wrapping around the year boundary (e.g., DOY is 360 and window is 30 days)
        subset = da.sel(time=((da.doy >= start_day_of_year) | (da.doy <= end_day_of_year)))

    if not leap_year:
        mask = subset['doy'] != 366
        subset = subset.where(mask, drop=True)


    result = subset.reduce(np.nanquantile, q=target_quantile, dim="time")
    return result
    

In [9]:
ds = xr.open_dataset(era5_fp)

# add DOY coords to input dataset
dsc = ds.assign_coords(doy=ds.time.dt.dayofyear)
dsc["ivt_mag"] = compute_magnitude(dsc["p71.162"], dsc["p72.162"]).astype(int)
dsc["ivt_dir"] = direction(ds["p71.162"], ds["p72.162"]).astype(int)

da = dsc["ivt_mag"]

quantile_results = []

for day_of_year in tqdm(range(1, 367)):
# keeping line below to test more rapidly, all DOYS takes ~7 minutes with no paralleization efforts
# for day_of_year in tqdm([1, 180, 366]):

    result = compute_period_of_record_quantile(da, day_of_year, window_days, quantile)
    
    quantile_results.append(result)

    


for i, result in enumerate(quantile_results):
    doy_result = result.assign_coords(doy=i + 1)
    quantile_results[i] = doy_result

# Concatenate the DataArrays along the 'doy' dimension
combined_da = xr.concat(quantile_results, dim='doy')
dsc["ivt_por_normal_quantile"] = combined_da

ds.close()
client.close()
dsc.to_netcdf(ard_fp)

100%|███████████████████████████████████████████████████████████| 366/366 [36:46<00:00,  6.03s/it]


In [None]:
# CP note: I've renamed this function to be more consistent about quantile vs. percentile to reduce my confusion
def compute_seasonal_quantile(da, window, target_quantile):
    """
    Compute a single quantile value for IVT magnitude over rolling time window.
    Recall that the input data 6-hr frequency, so there are 4 timesteps per day.
    This means that the window size (unit days) has to be multiplied by a factor of 4.
    These quantiles are for single period of time (e.g. 5 months) for a given year.
    """
    rolling_window = da.rolling(time=window * 4, center=True)
    qth_quantile = rolling_window.reduce(np.quantile, q=target_quantile)
    return qth_quantile

I want to test my understanding of how the rolling window arguments are interacting with the time dimension of the datacube.
I'll do this by comparing maximum values over a ten day slice centered on a certain date.

In [None]:
test_window_center_date = "2020-12-15T00:00:00"
test_window_days = 10

In [None]:
ds = xr.open_dataset(era5_fp)
# add DOY coords to input dataset
dsc = ds.assign_coords(doy=ds.time.dt.dayofyear)
dsc["ivt_mag"] = magnitude(dsc["p71.162"], dsc["p72.162"]).astype(int)
da = dsc["ivt_mag"]
# compare maximum values over a known ten day slice
tslice = da.sel(time=slice("2020-12-10T00:00:00", "2020-12-20T00:00:00")).max(axis=0)
ds.close()
tslice.data

In [None]:
ds = xr.open_dataset(era5_fp)    
# add DOY coords to input dataset
dsc = ds.assign_coords(doy=ds.time.dt.dayofyear)
dsc["ivt_mag"] = magnitude(dsc["p71.162"], dsc["p72.162"]).astype(int)
da = dsc["ivt_mag"]
rolling_window_max = da.rolling(time=test_window_days * 4, center=True).max()
ds.close()
rolling_window_max.sel(time=test_window_center_date).data

In [None]:
assert np.all(tslice.data == rolling_window_max.sel(time=test_window_center_date).data)

OK, so I'm confident that we can compute quantiles (or any statistic) over a rolling window using the actual window size in days, multiplied by 4 to account for the six-hour time frequency of the input dataset (four time steps per day).

In [None]:
combined_da

In [None]:
with xr.open_dataset(era5_fp) as ds:
    
    # add DOY coords to input dataset
    ds = ds.assign_coords(doy=ds.time.dt.dayofyear)

    # chunk to avoid memory error
    ds = ds.chunk({"latitude": 1, "longitude": 1})

    # p71.162 = code eastward "u" component
    # p72.162 = code for northward "v" component
    ds["ivt_mag"] = magnitude(ds["p71.162"], ds["p72.162"]).astype(int)
    ds["ivt_dir"] = direction(ds["p71.162"], ds["p72.162"]).astype(int)

    # ds["ivt_seasonal_quantile"] = ds["ivt_mag"].map_blocks(compute_seasonal_quantile, kwargs={"window": window_days, "target_quantile" : quantile},
    #                                                        template=ds["ivt_mag"]).compute()
    
    ds["ivt_por_normal_quantile"] = combined_da
    
    #ds["ivt_por_quantile"] = ds["ivt_mag"].map_blocks(normal_pctile, kwargs={"window": ar_params['window'], "percentile" : q}, template=ds["ivt_mag"]).compute()

    # ds.to_netcdf(ard_fp)
ds