In [1]:
import dask
import logging
import numpy as np
import xarray as xr
from distributed import Client

from nwsspc.sharp.calc import constants
from nwsspc.sharp.calc import parcel
from nwsspc.sharp.calc import params
from nwsspc.sharp.calc import thermo
from nwsspc.sharp.calc import layer

client = Client(n_workers=16, memory_limit='4GB', silence_logs=logging.ERROR)
client

In [2]:
chunks = {"y": 25, "x": -1}


ds_hybrid = xr.open_dataset(
    "hrrr-hybrid.json", 
    engine="kerchunk", 
    decode_timedelta=True
)[["pres", "gh", "t", "q"]].astype("float32").load().chunk(dict(hybrid=-1, **chunks))

ds_2m = xr.open_dataset(
    "hrrr-2m.json", 
    engine="kerchunk", 
    decode_timedelta=True
)[["t2m", "sh2"]].astype("float32").load().chunk(chunks)

ds_sfc = xr.open_dataset(
    "hrrr-surface.json", 
    engine="kerchunk", 
    decode_timedelta=True
)[["sp"]].astype("float32").load().chunk(chunks)

In [3]:
print(ds_hybrid)
print(ds_2m)

<xarray.Dataset> Size: 2GB
Dimensions:     (hybrid: 50, y: 1059, x: 1799)
Coordinates:
  * hybrid      (hybrid) float64 400B 1.0 2.0 3.0 4.0 ... 47.0 48.0 49.0 50.0
    latitude    (y, x) float64 15MB dask.array<chunksize=(1059, 1799), meta=np.ndarray>
    longitude   (y, x) float64 15MB dask.array<chunksize=(1059, 1799), meta=np.ndarray>
    step        (hybrid) timedelta64[ns] 400B dask.array<chunksize=(50,), meta=np.ndarray>
    time        (hybrid) datetime64[ns] 400B dask.array<chunksize=(50,), meta=np.ndarray>
    valid_time  (hybrid) datetime64[ns] 400B dask.array<chunksize=(50,), meta=np.ndarray>
Dimensions without coordinates: y, x
Data variables:
    pres        (hybrid, y, x) float32 381MB dask.array<chunksize=(50, 1059, 1799), meta=np.ndarray>
    gh          (hybrid, y, x) float32 381MB dask.array<chunksize=(50, 1059, 1799), meta=np.ndarray>
    t           (hybrid, y, x) float32 381MB dask.array<chunksize=(50, 1059, 1799), meta=np.ndarray>
    q           (hybrid, y, x) f

In [4]:
def compute_everything(pres, hght, tmpk, spfh, sp, t2m, sh2, use_2m=True):
    mixr = thermo.mixratio(spfh)
    mixr_2m = thermo.mixratio(sh2)

    mixr[mixr < constants.TOL] = constants.TOL
    if (mixr_2m < constants.TOL): mixr_2m = constants.TOL

    vtmp = thermo.virtual_temperature(tmpk, mixr)
    dwpk = thermo.temperature_at_mixratio(mixr, pres)

    if (use_2m):
        dwpk_2m = thermo.temperature_at_mixratio(mixr_2m, sp)
        pcl = parcel.Parcel(sp, t2m, dwpk_2m, parcel.LPL.SFC)
    else: 
        pcl = parcel.Parcel(pres[0], tmpk[0], dwpk[0], parcel.LPL.SFC)

    lifter = parcel.lifter_cm1()
    lifter.ma_type = thermo.adiabat.pseudo_liq
    lifter.converge = 0.15

    pcl_vtmp = pcl.lift_parcel(lifter, pres)
    pcl_buoy = thermo.buoyancy(pcl_vtmp, vtmp)
    cape, cinh = pcl.cape_cinh(pres, hght, pcl_buoy)

    mupcl = parcel.Parcel()
    eil = params.effective_inflow_layer(
        lifter,
        pres,
        hght,
        tmpk,
        dwpk,
        vtmp,
        mupcl=mupcl
    )

    return cape, cinh, eil.bottom, eil.top

In [5]:
input_core_dims = [
    ["hybrid"], ["hybrid"],
    ["hybrid"], ["hybrid"],
    [], [], []
]

In [None]:
%%time
cape, cinh, eil_pbot, eil_ptop = xr.apply_ufunc(
    compute_everything, 
    ds_hybrid["pres"],
    ds_hybrid["gh"],
    ds_hybrid["t"],
    ds_hybrid["q"],
    ds_sfc["sp"],
    ds_2m["t2m"],
    ds_2m["sh2"],
    input_core_dims=input_core_dims,
    output_core_dims=[[], [], [], []],
    vectorize=True,
    dask="parallelized",
    output_dtypes=[np.float32, np.float32, np.float32, np.float32],
    kwargs={"use_2m": True},
)

cape, cinh, eil_pbot, eil_ptop = dask.compute(cape, cinh, eil_pbot, eil_ptop)

In [None]:
print(cape.max(), cinh.min(), eil_pbot.min(), eil_pbot.max())