**Description:**
This Jupyter notebook is developed to show case the use of JSON header files for faster data access to NWM files. Here's a brief overview of the imported modules:

- `joblib`: Used for parallel processing and efficient caching.
- `numpy`: A fundamental library for numerical operations.
- `xarray`: A library for working with labeled multi-dimensional arrays, often used in scientific data analysis.
- `fsspec`: Provides a common interface for working with various filesystem-like protocols.
- `ujson`: A fast JSON encoder and decoder for handling JSON data.
- `matplotlib.pyplot`: Used for creating data visualizations.
- `psutil`: Provides information on system resource utilization.
- `concurrent.futures`: A module for asynchronously executing functions using threads or processes.
- `multiprocessing`: A library for parallel and concurrent computing.
- `urlgennwm`: An external library/module for generating URLs specific to the NWM (National Water Model).



In [None]:
import joblib
import numpy as np
import xarray as xr
import fsspec
import ujson
import matplotlib.pyplot as plt
import psutil
import concurrent.futures
import multiprocessing
import urlgennwm 


This code snippet retrieves and calculates system information, including the number of CPU cores and available memory in gigabytes (GB). It also computes a memory limit per worker process based on available resources. These metrics are essential for optimizing and parallelizing computational tasks.


In [3]:
# Get the number of available CPU cores
num_cores = psutil.cpu_count(logical=False)  # Use logical=True for hyperthreading

# Calculate the available memory in GB
available_memory_gb = psutil.virtual_memory().available / (1024 ** 3)  # Bytes to GB

# Calculate a memory limit per worker based on available memory
# Adjust this factor based on your memory usage requirements
memory_per_worker_gb = available_memory_gb / num_cores

This code defines a set of functions for working with remote datasets in JSON format. It includes functions to load remote JSON content, open datasets from JSON, select streamflow data, and select time values. The `process_file` function combines these operations to process a single file for a given feature ID, making it useful for retrieving and working with specific data from remote sources.

In [8]:
# Define a function to load remote JSON content
def load_remote_json(file_url):
    of = fsspec.open(file_url)
    with of as f:
        return ujson.load(f)

# Define a function to load a remote dataset from JSON content
def load_remote_ds(json_obj):
    backend_args = {
        "consolidated": False,
        "storage_options": {
            "fo": json_obj,
        },
    }
    return xr.open_dataset("reference://", engine="zarr", backend_kwargs=backend_args)

# Define a function to select streamflow data from a dataset
def select_flow(ds, feature_id):
    cords = ds.streamflow.sel(feature_id=feature_id)
    return cords.values

# Define a function to select time
def select_time(ds):
    dstime = ds.time
    return dstime.values

# Define a function to process a single file for a given feature ID
def process_file(file_url, feature_id):
    json_obj = load_remote_json(file_url)
    ds = load_remote_ds(json_obj)
    streamflow_value = select_flow(ds, feature_id)
    time_value = select_time(ds)
    return streamflow_value, time_value



This code defines a list of feature IDs, which are typically used as identifiers for specific geographic or data features. In this case, the list includes feature IDs `8153461`, `8153027`, and `18210860`, representing specific features of interest. These feature IDs are used in subsequent data retrieval. Feature IDs could for different locations could be found at https://water.noaa.gov/map#forecast-chart


In [None]:
# Define a list of feature IDs
feature_ids = [8153461, 8153027, 18210860]

This code sets input variables `start_date`, `end_date`, and `fcst_cycle` to specify the date and forecast cycle parameters. It then calls the `urlgennwm.generate_urls` function,  to generate a list of URLs for accessing data related to the National Water Model (NWM) for the specified date and forecast cycle. These URLs are used to retrieve specific NWM data.

In [None]:
# Setting input variables to generate NWM JSON header urls
start_date = "202201120000"
end_date   = "202201130000"
fcst_cycle = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23]
urlgennwm.generate_urls(start_date, end_date, fcst_cycle)

This code snippet initializes an empty list called `files` to store URLs. It then specifies the name of a file, "filenamelist.txt," which contains a list of URLs. The code opens this file in read mode, reads the URLs line by line, removes leading and trailing whitespace from each line, and appends each URL to the `files` list. Finally, it initializes an empty dictionary called `extracted_values_dict`,  intended to store extracted values associated with specific feature IDs.

In [None]:
# Initialize an empty list to store the URLs
files = []

# Define the name of the file containing the URLs
filename = "filenamelist.txt"

# Open the file in read mode and read the URLs line by line
with open(filename, "r") as file:
    for line in file:
        # Remove leading and trailing whitespace and append the URL to the list
        url = line.strip()
        files.append(url)
# Create a dictionary to store the extracted values for each feature ID
extracted_values_dict = {}

This code snippet utilizes the `joblib` library to create a parallel processing pool using threading for dynamic settings. It processes a list of `files` using the `process_file` function in parallel, passing each `file_url` and the `feature_ids` list as arguments. The results are stored in `result_values` and sorted based on the first element of each result tuple. The extracted streamflow values are then stored in the `Streamflow` list, and the associated timestamps are stored in the `Timestamp` list, which can be further used for analysis or visualization.

In [None]:
# Create a Parallel processing pool using joblib with dynamic settings
with joblib.parallel_backend("threading", n_jobs=num_cores):  # Use threads for parallel processing
    result_values = joblib.Parallel()(joblib.delayed(process_file)(file_url, feature_ids) for file_url in files)
result_values = sorted(result_values, key=lambda x: x[1][0])
Streamflow = [item[0] for item in result_values]
Timestamp = [item[1] for item in result_values]


This code snippet creates separate plots for each feature ID in the `feature_ids` list. For each feature ID, it plots the time series of the extracted streamflow values using Matplotlib. The streamflow values are stored in the `Streamflow` list, and each plot is labeled with the corresponding feature ID. The x-axis represents time in hours, and the y-axis represents streamflow values. Finally, the code displays all the separate plots.


In [None]:
# Create separate plots for each feature ID
    # Plot the time series of the extracted streamflow values for the current feature ID using Matplotlib
plt.figure()
plt.plot(Streamflow, label=[f"Feature ID {feature_id}" for feature_id in feature_ids])
#plt.xlabel(f"Time(hours after {min(Timestamp)})")
plt.xlabel("Time(hours)")
plt.ylabel("Streamflow")
plt.title(f"Streamflow Time Series")
plt.legend()
# Show all the separate plots
plt.show()