In [None]:
import dask
import dask.dataframe as dd
from dask.distributed import Client
import hvplot.dask
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pprint
import requests

import datetime
from io import StringIO

In [None]:
client = Client(n_workers=10)    # Set to n workers for number of CPUs or parallel processes or set to 1 worker to run things on a single thread
client

In [None]:
# Assign URLs to Variables for the APIs we use, FTS and Hydrocron
FTS_URL = "https://fts.podaac.earthdata.nasa.gov/v1"  
HYDROCRON_URL = "https://soto.podaac.earthdatacloud.nasa.gov/hydrocron/v1/timeseries"

# BASIN or RIVER to query FTS for
BASIN_IDENTIFIER = "732" # to search via basin ID, find within SWORD database
#RIVER_NAME = "Rhine" # to search via river name

In [None]:
def query_fts(query_url, params):
    """Query Feature Translation Service (FTS) for reach identifers using the query_url parameter.

    Parameters
    ----------
    query_url: str - URL to use to query FTS
    params: dict - Dictionary of parameters to pass to query

    Returns
    -------
    dict of results: hits, page_size, page_number, reach_ids
    """

    nodes = requests.get(query_url, params=params)
    nodes_json = nodes.json()

    hits = nodes_json['hits']
    if 'search on' in nodes_json.keys():
        page_size = nodes_json['search on']['page_size']
        page_number = nodes_json['search on']['page_number']
    else:
        page_size = 0
        page_number = 0

    return {
        "hits": hits,
        "page_size": page_size,
        "page_number": page_number,
        "node_ids": [ item['node_id'] for item in nodes_json['results'] ]
    }

In [None]:
# Search by basin code
query_url = f"{FTS_URL}/rivers/node/{BASIN_IDENTIFIER}"
print(f"Searching by basin... {query_url}")

page_size = 100   # Retrieve 100 results per request
page_number = 1   # Start with page 1
node_ids = []    # Initialize empty list for IDs

# --- Replace your previous loop with this improved version ---
while True:
    params = {"page_size": page_size, "page_number": page_number}
    results = query_fts(query_url, params)

    # Check if the response contains valid node_ids
    if 'node_ids' not in results or not results['node_ids']:
        print(f"No more node IDs returned at page {page_number}. Ending loop.")
        break

    hits = results.get('hits', 0)
    node_ids.extend(results['node_ids'])

    print(
        f"page_size: {page_size}, "
        f"page_number: {page_number}, "
        f"hits: {hits}, "
        f"# node_ids: {len(node_ids)}"
    )

    # Stop condition: all hits fetched
    if len(node_ids) >= hits:
        print("All available node IDs fetched.")
        break

    page_number += 1

# --- Your existing final checks remain unchanged ---
print("Total number of node: ", len(node_ids))
node_ids = list(set(node_ids))  # remove duplicates
print("Total number of non-duplicate reaches: ", len(node_ids))

In [None]:
node_ids[:300]

In [None]:
@dask.delayed
def query_hydrocron(query_url, node_id, start_time, end_time, fields):
    """Query Hydrocron for node-level time series data.

    Parameters
    ----------
    query_url: str - URL to use to query FTS
    node_id: str - String SWORD node identifier
    start_time: str - String time to start query
    end_time: str - String time to end query
    fields: list - List of fields to return in query response
    empty_df: pandas.DataFrame that contains empty query results

    Returns
    -------
    pandas.DataFrame that contains query results
    """

    params = {
        "feature": "Node",
        "feature_id": node_id,
        "output": "csv",
        "start_time": start_time,
        "end_time": end_time,
        "fields": fields
    }
    results = requests.get(query_url, params=params)
    if "results" in results.json().keys():
        results_csv = results.json()["results"]["csv"]
        df = pd.read_csv(StringIO(results_csv))
    else:
        df = pd.DataFrame({
            "node_id": [np.int64(node_id)],
            "time_str": [datetime.datetime(1900, 1, 1).strftime("%Y-%m-%dT%H:%M:%S")],
            "wse": [-999999999999.0],
            "wse_units": ["m"]
        })

    return df

In [None]:
%%time
import psutil

# Define parameters
start_time = "2023-07-28T00:00:00Z"
end_time = "2024-04-16T00:00:00Z"
fields = "node_id,time_str,lat,lon,wse,width,node_q_b,dark_frac,ice_clim_f,ice_dyn_f,p_dist_out"

# Batch processing to reduce task graph size

available_mem = psutil.virtual_memory().available  # Free RAM in bytes
batch_size = min(500, max(100, available_mem // (10 * 1024**2)))  # 10MB per batch
batched_results = []

for i in range(0, len(node_ids), batch_size):
    batch = node_ids[i : i + batch_size]
    
    # Create delayed queries for the batch
    if __name__ == "__main__":
        delayed_queries = [query_hydrocron(HYDROCRON_URL, node, start_time, end_time, fields) for node in batch]
    
    # Compute batch immediately to avoid an excessively large graph
    batch_results = dask.compute(*delayed_queries)
    
    # Convert batch results to a Dask DataFrame
    ddf_batch = dd.from_pandas(pd.concat(batch_results, ignore_index=True), npartitions=20)
    batched_results.append(ddf_batch)

# Combine batches into a final Dask DataFrame
ddf = dd.concat(batched_results)

# Show the first 20 rows (computed lazily)
ddf.head(n=20)