# Exploring National Water Model (NWM) operational streamflow output using teehr

This notebook demonstrates how to retrieve and explore National Water Model streamflow output using the `teehr` package. The first step is to install the package. For more information see the `teehr` documentation page: https://rtiinternational.github.io/teehr/ Currently, teehr v0.3.25 is compatible with Python versions 3.10 and 3.11. You can download the latest version of `teehr` from this page: https://github.com/RTIInternational/teehr/tags and install using `pip`. Note that for this notebook, you'll also need the `dask[distributed]` and `dask[dataframe]` packages.

```console
$ python3 -m pip install teehr-0.3.25.zip
```

In [1]:
%pip -q install "dask[distributed]" "dask[dataframe]"

Note: you may need to restart the kernel to use updated packages.


## Setup `dask` cluster

Setting a local `dask` cluster will ensure `teehr` takes advantage of parallell processing by spreading the work across all your computer's cores.

In [2]:
import os
from dask.distributed import Client
n_workers = max(os.cpu_count() - 1, 1)
client = Client(n_workers=n_workers)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 5
Total threads: 10,Total memory: 14.99 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:44977,Workers: 5
Dashboard: http://127.0.0.1:8787/status,Total threads: 10
Started: Just now,Total memory: 14.99 GiB

0,1
Comm: tcp://127.0.0.1:44309,Total threads: 2
Dashboard: http://127.0.0.1:33133/status,Memory: 3.00 GiB
Nanny: tcp://127.0.0.1:33867,
Local directory: /tmp/dask-scratch-space/worker-t3rvjr90,Local directory: /tmp/dask-scratch-space/worker-t3rvjr90

0,1
Comm: tcp://127.0.0.1:39791,Total threads: 2
Dashboard: http://127.0.0.1:33281/status,Memory: 3.00 GiB
Nanny: tcp://127.0.0.1:44765,
Local directory: /tmp/dask-scratch-space/worker-i0uegran,Local directory: /tmp/dask-scratch-space/worker-i0uegran

0,1
Comm: tcp://127.0.0.1:37789,Total threads: 2
Dashboard: http://127.0.0.1:44257/status,Memory: 3.00 GiB
Nanny: tcp://127.0.0.1:35751,
Local directory: /tmp/dask-scratch-space/worker-m65ap0km,Local directory: /tmp/dask-scratch-space/worker-m65ap0km

0,1
Comm: tcp://127.0.0.1:37321,Total threads: 2
Dashboard: http://127.0.0.1:40257/status,Memory: 3.00 GiB
Nanny: tcp://127.0.0.1:39201,
Local directory: /tmp/dask-scratch-space/worker-l3rbcyzq,Local directory: /tmp/dask-scratch-space/worker-l3rbcyzq

0,1
Comm: tcp://127.0.0.1:41043,Total threads: 2
Dashboard: http://127.0.0.1:45447/status,Memory: 3.00 GiB
Nanny: tcp://127.0.0.1:39585,
Local directory: /tmp/dask-scratch-space/worker-kwbw7qwl,Local directory: /tmp/dask-scratch-space/worker-kwbw7qwl


## Import `teehr` and set options

This example is taken directly from the TEEHR User Guide. The first steps are to import `teehr`, then set the various options necessary to retrieve and store the relevant operational NWM forecast data locally.

In [3]:
from pathlib import Path
import teehr.loading.nwm.nwm_points as tlp

In [4]:
CONFIGURATION = "analysis_assim_extend_no_da"  # analysis_assim, short_range, analysis_assim_hawaii, medium_range_mem1
OUTPUT_TYPE = "channel_rt"
VARIABLE_NAME = "streamflow"
T_MINUS = [0, 1, 2]  # Only used if an assimilation run is selected

NWM_VERSION = "nwm22"  # Currently accepts "nwm22" or "nwm30"
                       # Use "nwm22" for dates prior to 09-19-2023

DATA_SOURCE = "GCS"    # Specifies the remote location from which to fetch the data
                       # ("GCS", "NOMADS", "DSTOR")

KERCHUNK_METHOD = "auto"  # When data_source = "GCS", specifies the preference in creating Kerchunk reference json files.
                          # "local" - always create new json files from netcdf files in GCS and save locally, if they do not already exist
                          # "remote" - read the CIROH pre-generated jsons from s3, ignoring any that are unavailable
                          # "auto" - read the CIROH pre-generated jsons from s3, and create any that are unavailable, storing locally

PROCESS_BY_Z_HOUR = True  # If True, NWM files will be processed by z-hour per day. If False, files will be
                          # processed in chunks (defined by STEPSIZE). This can help if you want to read many reaches
                          # at once (all ~2.7 million for medium range for example).

STEPSIZE = 100  # Only used if PROCESS_BY_Z_HOUR = False. Controls how many files are processed in memory at once
                # Higher values can increase performance at the expense on memory  (default value: 100)

IGNORE_MISSING_FILE = True  # If True, the missing file(s) will be skipped and the process will resume
                            # If False, TEEHR will fail if a missing NWM file is encountered

OVERWRITE_OUTPUT = True  # If True, existing output files will be overwritten
                         # If False (default), existing files are retained

START_DATE = "2023-03-18"
INGEST_DAYS = 1

OUTPUT_ROOT = Path("teehr-data")
JSON_DIR = Path(OUTPUT_ROOT, "zarr", CONFIGURATION)
OUTPUT_DIR = Path(OUTPUT_ROOT, "timeseries", CONFIGURATION)

# For this simple example, we'll get data for 10 NWM reaches that coincide with USGS gauges
LOCATION_IDS = [7086109,  7040481,  7053819,  7111205,  7110249, 14299781, 14251875, 14267476,  7152082, 14828145]

## Retrieve data

With all the options set, we can use the `teehr.loading.nwm.nwm_points.nwm_parquet_method` to retrieve and store NWM data locally as a parquet files. These parquet files are directly compatible with other `teehr` methods and modules used to pair and evaluate model output.

In [5]:
tlp.nwm_to_parquet(
    configuration=CONFIGURATION,
    output_type=OUTPUT_TYPE,
    variable_name=VARIABLE_NAME,
    start_date=START_DATE,
    ingest_days=INGEST_DAYS,
    location_ids=LOCATION_IDS,
    json_dir=JSON_DIR,
    output_parquet_dir=OUTPUT_DIR,
    nwm_version=NWM_VERSION,
    data_source=DATA_SOURCE,
    kerchunk_method=KERCHUNK_METHOD,
    t_minus_hours=T_MINUS,
    process_by_z_hour=PROCESS_BY_Z_HOUR,
    stepsize=STEPSIZE,
    ignore_missing_file=IGNORE_MISSING_FILE,
    overwrite_output=OVERWRITE_OUTPUT
)

## Load the data

Normally, we'd use another method from `teehr` to evaluate or visualize NWM output from parquet. However, in this case we just use `dask.dataframe.read_parquet` to look at the data directly.

In [6]:
import dask.dataframe as dd

In [7]:
# Lazily open the parquet files generated earlier
df = dd.read_parquet(OUTPUT_DIR)

In [8]:
# Run the compute method to load the data into memory
df.compute()

Unnamed: 0,value,reference_time,location_id,value_time,configuration,variable_name,measurement_unit
0,0.37,2023-03-18 16:00:00,nwm22-7086109,2023-03-18 16:00:00,analysis_assim_extend_no_da,streamflow,m3/s
1,0.0,2023-03-18 16:00:00,nwm22-7040481,2023-03-18 16:00:00,analysis_assim_extend_no_da,streamflow,m3/s
2,0.01,2023-03-18 16:00:00,nwm22-7053819,2023-03-18 16:00:00,analysis_assim_extend_no_da,streamflow,m3/s
3,0.03,2023-03-18 16:00:00,nwm22-7111205,2023-03-18 16:00:00,analysis_assim_extend_no_da,streamflow,m3/s
4,0.01,2023-03-18 16:00:00,nwm22-7110249,2023-03-18 16:00:00,analysis_assim_extend_no_da,streamflow,m3/s
5,0.01,2023-03-18 16:00:00,nwm22-14299781,2023-03-18 16:00:00,analysis_assim_extend_no_da,streamflow,m3/s
6,0.0,2023-03-18 16:00:00,nwm22-14251875,2023-03-18 16:00:00,analysis_assim_extend_no_da,streamflow,m3/s
7,0.0,2023-03-18 16:00:00,nwm22-14267476,2023-03-18 16:00:00,analysis_assim_extend_no_da,streamflow,m3/s
8,0.0,2023-03-18 16:00:00,nwm22-7152082,2023-03-18 16:00:00,analysis_assim_extend_no_da,streamflow,m3/s
9,0.02,2023-03-18 16:00:00,nwm22-14828145,2023-03-18 16:00:00,analysis_assim_extend_no_da,streamflow,m3/s
