# Cache NWIS data to Zarr
* The pyriver geohydro package extracts streamflow from NWIS
* Here we query all the gages identified in the National Water Model 2.1 over the simulation period and store to zarr for faster access 

[Rendered notebook with output](https://nbviewer.org/gist/3d38160704a7d8f606f99a3ee07680ec)

In [None]:
import logging
logging.getLogger().setLevel(logging.CRITICAL)

In [None]:
import warnings
warnings.filterwarnings('ignore') 

In [None]:
%%time
import pandas as pd
import xarray as xr
import fsspec
import hvplot.xarray
from pathlib import Path
import numpy as np
import dask
from dask.distributed import Client, LocalCluster
from zarr.convenience import consolidate_metadata

Load model dataset that contains stations and time range of interest:

In [None]:
fs2 = fsspec.filesystem('s3', requester_pays=True)

In [None]:
fs2.ls('s3://nhgf-development/nwm/')

In [None]:
url = 's3://nhgf-development/nwm/chanobs.zarr'

In [None]:
%%time
ds_chanobs = xr.open_dataset(fs2.get_mapper(url), engine='zarr', 
                             backend_kwargs={'consolidated':False}, chunks={})

In [None]:
ds_chanobs

In [None]:
gage_ids_str = [gage_id.astype('str').lstrip() for gage_id in ds_chanobs['gage_id'].values]

In [None]:
#ds_chanobs = ds_chanobs.assign_coords({'site_id':xr.DataArray(gage_ids_str, dims='feature_id')})

In [None]:
start = ds_chanobs.time[0].values
stop = ds_chanobs.time[-1].values
print(start,stop)

#### Extract obs data using hyriver

In [None]:
from pygeohydro import NWIS

In [None]:
nwis = NWIS()

In [None]:
dates = (start,stop)
print(dates)

If we request only one station, we get a time series with just good data (doesn't span the time window).  So we request two stations:

In [None]:
%%time
ds_obs = nwis.get_streamflow(gage_ids_str[:2], dates, to_xarray=True)

In [None]:
ds_obs

In [None]:
ds_obs = ds_obs.rename_dims({'station_id':'gage_id'}).rename({'station_id':'gage_id','discharge':'streamflow'})

In [None]:
ds_obs

Define time base for interpolatation of subsequent NWIS data requests:

In [None]:
time_base = ds_obs.time.values

In [None]:
fs = fsspec.filesystem('file')

In [None]:
dir_scratch = Path('/caldera/projects/usgs/hazards/cmgp/woodshole/rsignell/conus404/zarr')
file_chanobs = dir_scratch / 'nwis_chanobs2.zarr'

In [None]:
if file_chanobs.is_dir():
    fs.rm(str(file_chanobs),recursive=True)

In [None]:
len(gage_ids_str)

In [None]:
#source_dataset = ds_obs.drop_vars(drop_vars)
source_dataset = ds_obs

In [None]:
template = (source_dataset.chunk().
            pipe(xr.zeros_like).
            isel(gage_id=0, drop=True).
            expand_dims(gage_id=len(gage_ids_str), axis=-1))

template = template.assign_coords({'gage_id':[f'USGS-{gage_id}' for gage_id in gage_ids_str]})

template = template.chunk({'time':len(ds_obs.time), 'gage_id': 1})

In [None]:
template

Specify appropriate dtypes and fill values (otherwise int64 and float64 are used by default):

In [None]:
encoding = {'alt_acy_va': dict(_FillValue=-2147483647, dtype=np.int32),
            'alt_va': dict( _FillValue=9.96921e+36, dtype=np.float32),
            'dec_lat_va': dict( _FillValue=None, dtype=np.float32),
            'dec_long_va': dict( _FillValue=None, dtype=np.float32),
            'streamflow': dict( _FillValue=9.96921e+36, dtype=np.float32)}

In [None]:
# Writes no data (yet)
template.to_zarr(file_chanobs, compute=False, encoding=encoding, consolidated=True, mode='w')

In [None]:
nt = len(ds_obs.time)

In [None]:
ds_obs.to_zarr(file_chanobs, region={'time':slice(0, nt), 'gage_id': slice(0, 2)})

In [None]:
def ind2zarr(n):
     site_id = gage_ids_str[n]
     try:
        ds_obs = nwis.get_streamflow(site_id, dates, to_xarray=True).interp(time=time_base)
        ds_obs = ds_obs.rename_dims({'station_id':'gage_id'}).rename({'station_id':'gage_id','discharge':'streamflow'})
        ds_obs.to_zarr(file_chanobs, region={'time': slice(0, nt), 'gage_id': slice(n,n+1)})
     except:
        pass

Use a Dask cluster to make NWIS station requests in parallel:

In [None]:
client = Client()

In [None]:
client

This is where all the work gets done (a list of delayed tasks is created and then executed by the Dask cluster:

In [None]:
%%time
# takes less than 5 minutes with a local cluster on Denali:
_ = dask.compute(*[dask.delayed(ind2zarr)(i) for i in range(len(gage_ids_str))], retries=10);

Call Zarr convenience function to consolidate the metadata:

In [None]:
_ = consolidate_metadata(file_chanobs)

#### Check out the resulting dataset

In [None]:
dst = xr.open_dataset(file_chanobs, engine='zarr', chunks={}, backend_kwargs=dict(consolidated=True))
dst

In [None]:
dst.streamflow.isel(gage_id=100).hvplot(x='time', grid=True)