# Prepare Basin Averaged Forcing for NGEN

**Authors:**  
   - Tony Castronova <acastronova@cuahsi.org>    
   - Irene Garousi-Nejad <igarousi@cuahsi.org>  
    
**Last Updated:** 06.21.2023   

**Description**:  

The purpose of this Jupyter Notebook is to demonstrate how to prepare basin averaged forcing input from for the [NOAA Next Generation (NextGen) Water Resource Modeling Framework](https://github.com/NOAA-OWP/ngen). This notebook demonstrates how these data can be prepared from AORC **v1.0** kerchunk header files.

**Link to data**:
- Original: https://noaa-nwm-retrospective-2-1-pds.s3.amazonaws.com/index.html#forcing/
- Kerchunk: https://ciroh-nwm-zarr-retrospective-data-copy.s3.amazonaws.com/index.html#noaa-nwm-retrospective-2-1-zarr-pds/

**Software Requirements**:  

The software and operating system versions used to develop this notebook are listed below. To avoid encountering issues related to version conflicts among Python packages, we recommend creating a new environment variable and installing the required packages specifically for this notebook.

Tested on: Windows (`python: 3.9.16`) 

> dask: 2023.5.1 \
  numpy: 1.24.3 \
    xarray: 2023.5.0 \
    pyproj: 3.5.0 \
    pandas: 2.0.2 \
    requests: 2.31.0 \
    geopandas: 0.13.2 \
    matplotlib: 3.7.1 \
    zarr: 2.15.0 \
    fsspec: 2023.6.0 \
    s3fs: 2023.6.0 \
    kerchunk:0.1.2 
    
---

In [None]:
import re
import dask
import numpy
import xarray
import pyproj
import pandas
import requests
import geopandas
from matplotlib import colors
import matplotlib.pyplot as plt
from dask.distributed import Client
from dask.distributed import progress

import zarr
import fsspec
from pyproj import Transformer
from s3fs import S3FileSystem
from kerchunk.combine import MultiZarrToZarr

In [None]:
# install rioxarray and geocube
!pip install rioxarray -q
!pip install geocube -q

In [None]:
import rioxarray
from geocube.api.core import make_geocube

Initiate the Dask client. This will enable us to parallelize our computations.

In [None]:
# use a try accept loop so we only instantiate the client
# if it doesn't already exist.
try:
    print(client.dashboard_link)
except:    
    # The client should be customized to your workstation resources.
    client = Client(n_workers=6, memory_limit='2GB') # per worker
    print(client.dashboard_link)


---

## Load Forcing Data into Memory

In this notebook we'll be working with AORC v1.0 meteorological forcing. These data are publicly available for the entire CONUS, spanning from 1980 to 2020. Kerchunk header files have been created by the Alabama Water Institute team and this is an ongoing project. Please note that this jupyter notebook works for data within 2007-2019, but it cannot work with data prior to 2006.  

In [None]:
# define the selected watershed boundary 
wb_id = 'wb-2620227'

# define the year of interest
year=2010

In [None]:
bucket = 's3://ciroh-nwm-zarr-retrospective-data-copy/noaa-nwm-retrospective-2-1-zarr-pds/forcing/'

# create an instace of the S3FileSystem class from s3fs
s3 = S3FileSystem(anon=True)
files = s3.ls(f'{bucket}{year}')  

new_files = []
for f in files:
    parts = f.split('/')
    parts[0] += '.s3.amazonaws.com'
    parts.insert(0, 'https:/')
    new_name = '/'.join(parts)
    new_files.append(new_name)
    

In [None]:
print(len(new_files))
new_files[0]

Considering the memory limitations, it is necessary to choose a smaller subset of the dataset. Afterwards, we can utilize the `MultiZarrToZarr` function from the kerchunk library to merge the individual header files and generate a single kerchunk file.

In [None]:
%%time
json_list = new_files[0:217] 

mzz = MultiZarrToZarr(json_list,
    remote_protocol='s3',
    remote_options={'anon':True},
    concat_dims=['valid_time'])

d = mzz.translate()

backend_args = {"consolidated": False, "storage_options": {"fo": d}, "consolidated": False}

ds = xarray.open_dataset("reference://", engine="zarr", backend_kwargs=backend_args)

In [None]:
ds

In [None]:
ds.valid_time

Use SQUEEZE function to remove the Time dimension that has a size of 1 .

In [None]:
ds = ds.squeeze(dim='Time')

In [None]:
ds

## Add spatial metadata to the dataset 


Load the metadata dataset using `xarray` and add spatial metadata to it.

In [None]:
ds_meta = xarray.open_dataset('http://thredds.hydroshare.org/thredds/dodsC/hydroshare/resources/2a8a3566e1c84b8eb3871f30841a3855/data/contents/WRF_Hydro_NWM_geospatial_data_template_land_GIS.nc')

leny = len(ds_meta.y)
x = ds_meta.x.values
y = ds_meta.y.values

ds = ds.rename({'valid_time': 'time', 'south_north':'y', 'west_east':'x'})
#ds.rename_dims(south_north='y', west_east='x', valid_time='time')

X, Y = numpy.meshgrid(x, y)

# define the input crs
wrf_proj = pyproj.Proj(proj='lcc',
                       lat_1=30.,
                       lat_2=60., 
                       lat_0=40.0000076293945, lon_0=-97., # Center point
                       a=6370000, b=6370000)

# define the output crs
wgs_proj = pyproj.Proj(proj='latlong', datum='WGS84')

# transform X, Y into Lat, Lon
transformer = pyproj.Transformer.from_crs(wrf_proj.crs, wgs_proj.crs)
lon, lat = transformer.transform(X, Y)

ds = ds.assign_coords(lon = (['y', 'x'], lon))
ds = ds.assign_coords(lat = (['y', 'x'], lat))
ds = ds.assign_coords(x = x)
ds = ds.assign_coords(y = y)

ds.x.attrs['axis'] = 'X'
ds.x.attrs['standard_name'] = 'projection_x_coordinate'
ds.x.attrs['long_name'] = 'x-coordinate in projected coordinate system'
ds.x.attrs['resolution'] = 1000.  # cell size

ds.y.attrs['axis'] = 'Y' 
ds.y.attrs['standard_name'] = 'projection_y_coordinate'
ds.y.attrs['long_name'] = 'y-coordinate in projected coordinate system'
ds.y.attrs['resolution'] = 1000.  # cell size

ds.lon.attrs['units'] = 'degrees_east'
ds.lon.attrs['standard_name'] = 'longitude' 
ds.lon.attrs['long_name'] = 'longitude'

ds.lat.attrs['units'] = 'degrees_north'
ds.lat.attrs['standard_name'] = 'latitude' 
ds.lat.attrs['long_name'] = 'latitude'

# add crs to netcdf file
ds.rio.write_crs(ds_meta.crs.attrs['spatial_ref'], inplace=True
                ).rio.set_spatial_dims(x_dim="x",
                                       y_dim="y",
                                       inplace=True,
                                       ).rio.write_coordinate_system(inplace=True);


In [None]:
ds

## Add spatial reference to the model domain

Load the geodatabase of our `ngen` domain. This can be obtained using the `ngen-hydrofabric-subset.ipynb` notebook.

In [None]:
# prepare geometries for spatial averaging
gdf = geopandas.read_file(f'{wb_id}/config/{wb_id.split("_")[0]}_upstream_subset.gpkg', layer='divides')

gdf['geometry'].values

In [None]:
# convert these data into the projection of our forcing data
target_crs = pyproj.Proj(proj='lcc',
                       lat_1=30.,
                       lat_2=60., 
                       lat_0=40.0000076293945, lon_0=-97., # Center point
                       a=6370000, b=6370000) 

gdf = gdf.to_crs(target_crs.crs)

gdf['geometry'].values

In [None]:
# important step
# rechunk the dataset to solve the memory limit issue
ds = ds.chunk(chunks={'time': 1})

## Clip AORC to the extent of the subset hydrofabric geometries

Add catchment ids to the geodataset. These will be used to perform zonal statistics later on.

In [None]:
%%time

# create zonal id column
gdf['cat'] = gdf.id.str.split('-').str[-1].astype(int)

# clip AORC to the extent of the hydrofabric geometries
ds = ds.rio.clip(gdf.geometry.values,
                 gdf.crs,
                 drop=True,
                 invert=False, from_disk=True)

# select a single array of data to use as a template
lwdown_data = ds.isel(time=0).LWDOWN

# create a grid for the geocube
out_grid = make_geocube(
    vector_data=gdf,
    measurements=["cat"],
    like=ds # ensure the data are on the same grid
)

# add the catchment variable to the original dataset
ds = ds.assign_coords(cat = (['y','x'], out_grid.cat.data))

# compute the unique catchment IDs which will be used to compute zonal statistics
catchment_ids = numpy.unique(ds.cat.data[~numpy.isnan(ds.cat.data)])

print(f'The dataset contains {len(catchment_ids)} catchments')

In [None]:
ds

## Preview the gridded catchments over the watershed vector boundary

Note that the method we're using will associate grid cell with the watershed that it overlaps the most with. There are more advanced ways to create a mapping using various interpolation methods that will distribute values cells across all watershed boundaries that they intersect with. This is left as a future exercize. 

In [None]:
figure, ax = plt.subplots(figsize=(10,7))


# plot the gridded catchment mapping
ds.cat.plot()

## create a discrete color mapping such that each catchment 
## is represented by a single color
# cmap = colors.ListedColormap(['green', 'lightskyblue', 'cyan', 'red', 'navy'])
# bounds = [catchment_ids[0]] + [c+0.9 for c in catchment_ids]
# norm = colors.BoundaryNorm(bounds, cmap.N)
# ds.cat.plot(cmap=cmap, norm=norm, ax=ax)

# preview map geometries
gdf.iloc[:].plot(ax=ax, linewidth=2, edgecolor='k', facecolor='None');


## Compute basin-averaged forcing data

Define functions that will be used to perform basin averages on the AORC data. These functions leverage `dask` to parallelize the computation.

In [None]:
# call once per catchment
# distribute zonal stats to sub processes
def perform_zonal_computation(ds, cat_id):

    # subset by catchment id
    ds_catchment = ds.where(ds.cat==cat_id, drop=True)
#    ds_catchement_future = client.scatter(ds_catchment, broadcast=True)
    
    delayed = []
    # loop over variables   
    for variable in ['LWDOWN', 'PSFC',
                     'Q2D', 'RAINRATE', 'SWDOWN',
                     'T2D', 'U2D', 'V2D']:
                
        delay = dask.delayed(compute_zonal_mean)(ds_catchment[variable], variable)
        delayed.append(delay)
        
    res = dask.compute(*delayed)
    
    # combine outputs (list of dicts) into a single dict.
    res = {k: v for d in res for k, v in d.items()}
    
    # return results
    return {f'cat-{int(cat_id)}': res}

def compute_zonal_mean(ds, variable):
    return {variable: ds.mean(dim=['x','y']).values}

Slice the data to the temporal period of our choice.

In [None]:
# define the start and end time of the data we want to use
start_time = f'{year}-01-01 00:00'
end_time = f'{year}-01-10 00:00'

# isolate the desired time period of our data
ds_subset = ds.sortby('time').sel(time=slice(start_time, end_time))

print(f'The dataset contains {len(ds_subset.time)} timesteps')

Let's rechunk our data now that we have many fewer elements.

In [None]:
ds_subset = ds_subset.chunk(chunks={'time': 1000})

In [None]:
ds_subset.chunks

Drop all data that we don't need. The goal here is to make the dataset as small as possible before we start running computations on the data.

In [None]:
# drop unused coordinates
ds_subset = ds_subset.drop(['lat','lon'])

Tell `dask` to perform the subsetting computations on the data now. That way when we process the zonal statistics, the entire dataset won't need to be moved around. This will save a considerable amount of processing in future steps time.

In [None]:
%%time 
ds_subset = ds_subset.compute()

In [None]:
ds_subset

Scatter the dataset to the cluster so all workers will have access to it. This is good practice and especially necessary if working on a large dataset.

In [None]:
%%time
scattered_ds = client.scatter(ds_subset, broadcast=True)

Build a list of `delayed` tasks. This will not execute the computation.

In [None]:
%%time
delayed = []

# loop over each catchment in our domain
# create delayed tasks to compute zonal mean
for cat_id in catchment_ids:
    delay = dask.delayed(perform_zonal_computation)(scattered_ds, cat_id)
    delayed.append(delay)

Invoke the computation using `dask.compute`.

In [None]:
%%time 

# run the computation
results = dask.compute(*delayed)

Save the basin averaged meteorological data in the format expected by `ngen`.

The summarized AORC variables need to be mapped to the `ngen` model that we'll be using. The following table illustrates the mapping.

|AORC Variable Name|NGEN Variable Name|Description|
|---|---|---|
| LWDOWN   | DLWRF_surface         | Surface downward long-wave radiation flux (W m-2) 
| PSFC     | PRES_surface          | Surface Pressure (Pa)
| Q2D      | SPFH_2maboveground    | 2-m Specific Humidity (kg kg-1)
| RAINRATE | ---                   | precipitation_flux (mm s^-1)
| SWDOWN   | DSWRF_surface         | Surface downward short-wave radiation flux (W m-2)
| T2D      | TMP_2maboveground     | 2-m Air Temperature (K)
| U2D      | UGRD_10maboveground   | 10-m U-component of wind (m s-1)
| V2D      | VGRD_10maboveground   | 10-m V-component of wind (m s-1)
| ---      | APCP_surface          | Surface precipitation (kg/m^2)

Note: our `ngen` model is expecting shortwave and longwave radiation at a height of 0 meters above ground whereas the AORC data has values are 2 meters above ground.

References: [tshirt_c.h](https://github.com/NOAA-OWP/ngen/blob/f2725dfbb52f3af5083ce927e69733edbf059f57/models/tshirt/include/tshirt_c.h#L52), [sample forcing csv](https://github.com/NOAA-OWP/ngen/blob/master/data/forcing/cat-27_2015-12-01%2000_00_00_2015-12-30%2023_00_00.csv)

## Save data as csv files

In [None]:

# compute the date range for our data using start and end times
# that were used in the subsetting process.
dates = pandas.date_range(start_time, end_time, freq="60min")

# save the zonal means for each catchment
for dat in results:
    for cat in dat:
        df = pandas.DataFrame({k:list(v) for k,v in dat[cat].items()})
        df.fillna(0., inplace=True)
        
        # convert rainrate from mm/s to kg/m2
        # mm/s - mm/hr = df.RAINRATE * 3600
        # since the timestep is one hour, this is effectively the total rain in mm.
        # 1 mm of rainfall is equal to 1kg/m2 so our conversion is:
        # NOTE: we should only be considering the fraction of liquid precip which can
        #       be computed using LQFRAC. However LQFRAC is zero for our data which 
        #       does not seem correct, so we'll assume that all precip is liquid. This
        #       is something that needs to be revisited.
        df['APCP_surface'] = df.RAINRATE * 3600

        # rename columns to match the variable names expected by the ngen t-shirt model
        df.rename(columns={
            'LWDOWN'   : 'DLWRF_surface',
            'PSFC'     : 'PRES_surface',
            'Q2D'      : 'SPFH_2maboveground',
            'SWDOWN'   : 'DSWRF_surface',
            'T2D'      : 'TMP_2maboveground',
            'U2D'      : 'UGRD_10maboveground',
            'V2D'      : 'VGRD_10maboveground',
            'RAINRATE' : 'precip_rate',
        },
                  inplace=True)
               
        # add the time index
        df['time'] = dates
        df.set_index('time', inplace=True)


        # write to file
        with open(f'{wb_id}/forcings/{cat}.csv', 'w') as f:
            # Note: saving "precip_rate" because this column exists in the example 
            #       forcing files. It's not clear if this is being used or not.
            df.to_csv(f,
                      columns = ['APCP_surface',
                                 'DLWRF_surface',
                                 'DSWRF_surface',
                                 'PRES_surface',
                                 'SPFH_2maboveground',
                                 'TMP_2maboveground',
                                 'UGRD_10maboveground',
                                 'VGRD_10maboveground',
                                 'precip_rate'])
            

In [None]:
# check the number of catchments. 
print(gdf.shape[0])
print(len(results))

# If these are not equal, run the following code cell.

Here is an example showing why some catchments are missing in the results.

<img src="./figures/missing_catchment_example.png">

In [None]:
computed_catchments = [list(r.keys())[0] for r in results]
for cat_id in gdf['cat'].values:
    known_catchment = f'cat-{int(cat_id)}'
    if known_catchment not in computed_catchments:
        print(f'Creating Synthetic Forcing for {known_catchment}')
        synthetic_df = pandas.DataFrame(0, index=df.index, columns=['APCP_surface',
                                                                    'DLWRF_surface',
                                                                    'PRES_surface',
                                                                    'SPFH_2maboveground',
                                                                    'DSWRF_surface',
                                                                    'TMP_2maboveground',
                                                                    'UGRD_10maboveground',
                                                                    'VGRD_10maboveground',
                                                                    'precip_rate'])
        # write to file
        with open(f'{wb_id}/forcings/{known_catchment}.csv', 'w') as f:
            df.to_csv(f,
                      columns = ['APCP_surface',
                                 'DLWRF_surface',
                                 'DSWRF_surface',
                                 'PRES_surface',
                                 'SPFH_2maboveground',
                                 'TMP_2maboveground',
                                 'UGRD_10maboveground',
                                 'VGRD_10maboveground',
                                 'precip_rate'])
            
        