> From the PO.DAAC Cookbook, to access the GitHub version of the notebook, follow [this link](insert link to notebook).

# Hydrocron API: SWOT Time Series Examples

#### *Authors: Nikki Tebaldi and Cassandra Nickles, NASA PO.DAAC*

### Summary:

[Hydrocron](https://podaac.github.io/hydrocron/intro.html) is an API that repackages the `SWOT_L2_HR_RIVERSP_2.0` dataset into csv or geojson formats that make time series analysis easier. This notebook will highlight how to utilize hydrocron and convert its output into a readable geodatabase of data from multiple SWOT reaches identified from the [SWORD Database](https://www.swordexplorer.com/).

### Requirements:
Any compute environment, local or the cloud.

### Learning Objectives:
- Obtain a list of SWORD IDs for a region of interest
- Access SWOT river vector product attributes for multiple reach IDs via the Hydrocron API
- Convert accessed time series data into readable database
- Plot one time series variable from multiple reaches

### Cite the Hydrocron tool via the following:
 `

## Import Packages

In [1]:
import dask
import dask.dataframe as dd
from dask.distributed import Client
import hvplot.dask
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pprint
import requests

from io import StringIO

In [2]:
# Set up Dask workers
client = Client(n_workers=4)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 12,Total memory: 64.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:57810,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 64.00 GiB

0,1
Comm: tcp://127.0.0.1:57822,Total threads: 3
Dashboard: http://127.0.0.1:57826/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:57813,
Local directory: /var/folders/_9/p6pwvlbd7qsbl1mbhgm45d7h0000gq/T/dask-scratch-space/worker-kwy28wnf,Local directory: /var/folders/_9/p6pwvlbd7qsbl1mbhgm45d7h0000gq/T/dask-scratch-space/worker-kwy28wnf

0,1
Comm: tcp://127.0.0.1:57821,Total threads: 3
Dashboard: http://127.0.0.1:57825/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:57815,
Local directory: /var/folders/_9/p6pwvlbd7qsbl1mbhgm45d7h0000gq/T/dask-scratch-space/worker-28nx8kyb,Local directory: /var/folders/_9/p6pwvlbd7qsbl1mbhgm45d7h0000gq/T/dask-scratch-space/worker-28nx8kyb

0,1
Comm: tcp://127.0.0.1:57824,Total threads: 3
Dashboard: http://127.0.0.1:57831/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:57817,
Local directory: /var/folders/_9/p6pwvlbd7qsbl1mbhgm45d7h0000gq/T/dask-scratch-space/worker-9fvlspp0,Local directory: /var/folders/_9/p6pwvlbd7qsbl1mbhgm45d7h0000gq/T/dask-scratch-space/worker-9fvlspp0

0,1
Comm: tcp://127.0.0.1:57823,Total threads: 3
Dashboard: http://127.0.0.1:57829/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:57819,
Local directory: /var/folders/_9/p6pwvlbd7qsbl1mbhgm45d7h0000gq/T/dask-scratch-space/worker-8sbxk83q,Local directory: /var/folders/_9/p6pwvlbd7qsbl1mbhgm45d7h0000gq/T/dask-scratch-space/worker-8sbxk83q




## Constants

In [3]:
FTS_URL = "https://fts.podaac.earthdata.nasa.gov/v1"
HYDROCRON_URL = "https://soto.podaac.sit.earthdatacloud.nasa.gov/hydrocron/v1/timeseries"

## Obtain SWORD IDs for a river

In this section, we query the Feature Translation Service (FTS) to give us a list of river IDs from the SWOT River Database [(SWORD)](https://www.swordexplorer.com/). One river reach ID has the format `CBBBBBRRRRT` and a node ID has the format, `CBBBBBRRRRNNNT` where `C` stands for continent, `B` for basin, `R` for reach, `N` for node, and `T` for type. The first 6 digits of the id (CBBBBB) are the [HydroBASINS](https://www.hydrosheds.org/products/hydrobasins) Pfafstetter level 6 basin code, where the first digit represents one of nine continental regions (1 = Africa, 2 = Europe, 3 = Siberia, 4 = Asia, 5 = Oceania, 6 = South America, 7 = North America, 8 = Arctic, 9 = Greenland), and the remaining digits are nested basin levels 2–6. We recommend looking for your region of interest in the SWORD database and identifying the code for your basin/river of interest.

In this example, we use ID `732520`. This ID represents multiple reaches along the Savannah River and its tributaries in Georgia, USA. Note that this ID includes every reach included in the broader category, for example, individual reach `73252000161`. 

In [None]:
Savannah_reaches = requests.get("https://fts.podaac.earthdata.nasa.gov/rivers/reach/732520")

pprint.pprint(Savannah_reaches.json(), compact=True, width=60, depth=2)

In [None]:
# This is only a partial result from page 1, how do we also append from the rest of the pages in the response?
IDs = list(Savannah_reaches.json()['results'].keys())

## Locate SWORD reach identifiers for a river name

The `query_fts` function query the Feature Translation Service for reach identifiers by river name. The function is used to query the FTS for reach identifiers associated with the Rhine river. You can query the FTS by river name and use a combination of `page_size` and `page_number` parameters to retrieve all reach identifier results for a query.

In [4]:
def query_fts(query_url, params):
    """Query Feature Translation Service (FTS) for reach identifers by river name.

    Parameters
    ----------
    query_url: str - URL to use to query FTS
    params: dict - Dictionary of parameters to pass to query

    Returns
    -------
    dict of results: hits, page_size, page_number, reach_ids
    """

    rhine_reaches = requests.get(query_url, params=params)
    rhine_reaches_json = rhine_reaches.json()

    hits = rhine_reaches_json['hits']
    if 'search on' in rhine_reaches_json.keys():
        page_size = rhine_reaches_json['search on']['page_size']
        page_number = rhine_reaches_json['search on']['page_number']
    else:
        page_size = 0
        page_number = 0

    return {
        "hits": hits,
        "page_size": page_size,
        "page_number": page_number,
        "reach_ids": [ item['reach_id'] for item in rhine_reaches_json['results'] ]
    }

In [5]:
# Search by river name
print("Searching by river name...")
query_url = f"{FTS_URL}/rivers/Rhine"
page_size = 500    # Set FTS to retrieve 500 results at a time
page_number = 1    # Set FTS to retrieve the first page of results
hits = 1           # Set hits to intial value to start while loop
reach_ids = []
while (page_size * page_number) != 0 and len(reach_ids) < hits:
    params = { "page_size": page_size, "page_number": page_number }
    results = query_fts(query_url, params)
    
    hits = results['hits']
    page_size = results['page_size']
    page_number = results['page_number'] + 1
    reach_ids.extend(results['reach_ids'])

    print("page_size: ", page_size, ", page_number: ", page_number - 1, ", hits: ", hits, ", # reach_ids: ", len(reach_ids))
    
print("Total number of reaches: ", len(reach_ids))
reach_ids = list(set(reach_ids))    # Remove duplicates
print("Total number of non-duplicate reaches: ", len(reach_ids))

Searching by river name...
page_size:  500 , page_number:  1 , hits:  4592 , # reach_ids:  500
page_size:  500 , page_number:  2 , hits:  4592 , # reach_ids:  1000
page_size:  500 , page_number:  3 , hits:  4592 , # reach_ids:  1500
page_size:  500 , page_number:  4 , hits:  4592 , # reach_ids:  2000
page_size:  500 , page_number:  5 , hits:  4592 , # reach_ids:  2500
page_size:  500 , page_number:  6 , hits:  4592 , # reach_ids:  3000
page_size:  500 , page_number:  7 , hits:  4592 , # reach_ids:  3500
page_size:  500 , page_number:  8 , hits:  4592 , # reach_ids:  4000
page_size:  500 , page_number:  9 , hits:  4592 , # reach_ids:  4500
page_size:  500 , page_number:  10 , hits:  4592 , # reach_ids:  4592
Total number of reaches:  4592
Total number of non-duplicate reaches:  144


## Query Hydrocron for time series data for all Rhine reach identifiers

Once you have a list of reach identifiers, you can query Hydrocron for SWOT time series data.

In [6]:
reach_ids = reach_ids[:11]
reach_ids

['23269000254',
 '23267000051',
 '23269000024',
 '23261000221',
 '23261000361',
 '23261000411',
 '23267000604',
 '23261000431',
 '23267000494',
 '23267000081',
 '23267000111']

In [7]:
@dask.delayed
def query_hydrocron(query_url, reach_id, start_time, end_time, fields, empty_df):
    """Query Hydrocron for reach-level time series data.

    Parameters
    ----------
    query_url: str - URL to use to query FTS
    reach_id: str - String SWORD reach identifier
    start_time: str - String time to start query
    end_time: str - String time to end query
    fields: list - List of fields to return in query response
    empty_df: pandas.DataFrame that contains empty query results

    Returns
    -------
    pandas.DataFrame that contains query results
    """

    params = {
        "feature": "Reach",
        "feature_id": reach_id,
        "output": "csv",
        "start_time": start_time,
        "end_time": end_time,
        "fields": fields
    }
    results = requests.get(query_url, params=params)
    if "results" in results.json().keys():
        results_csv = results.json()["results"]["csv"]
        df = pd.read_csv(StringIO(results_csv))
    else:
        df = empty_df

    return df

In [8]:
# Create queries that return Pandas.DataFrame objects
start_time = "2023-07-28T00:00:00Z"
end_time = "2024-04-16T00:00:00Z"
fields = "reach_id,time_str,wse"
results = []
for reach in reach_ids:
    # Create an empty dataframe for cases where no data is returned for a reach identifier
    empty_df = pd.DataFrame({
        "reach_id": np.int64(reach),
        "time_str": "no_data",
        "wse": -1.000000e+12,
        "wse_units": "m"
    }, index=[0])
    results.append(query_hydrocron(HYDROCRON_URL, reach, start_time, end_time, fields, empty_df))
    # df = query_hydrocron(HYDROCRON_URL, reach, start_time, end_time, fields, df)
    # print(df)

# Load DataFrame results
ddf = dd.from_delayed(results)
ddf.head(n=20, npartitions=len(reach_ids))

Unnamed: 0,reach_id,time_str,wse,wse_units
0,23269000254,2024-01-26T00:59:39Z,-1000000000000.0,m
1,23269000254,2024-02-04T23:21:47Z,-1000000000000.0,m
2,23269000254,2024-03-25T01:53:24Z,-1000000000000.0,m
3,23269000254,2024-03-28T15:14:52Z,-1000000000000.0,m
4,23269000254,2024-04-14T22:38:29Z,-1000000000000.0,m
0,23267000051,2024-01-26T01:00:00Z,484.617,m
1,23267000051,no_data,-1000000000000.0,m
2,23267000051,2024-03-25T01:53:03Z,288.6828,m
3,23267000051,2024-03-28T15:15:13Z,383.9036,m
4,23267000051,2024-04-14T22:38:09Z,95.4392,m


In [9]:
# Load results into memory to filter 
ddf = ddf.compute()
ddf = ddf[(ddf.time_str != "no_data")]
ddf = ddf[~np.isclose(ddf.wse, -1.000000e+12)]

# Load back as a Dask DataFrame
ddf = dd.from_pandas(ddf)
ddf.head(n=20)

Unnamed: 0,reach_id,time_str,wse,wse_units
0,23267000051,2024-01-26T01:00:00Z,484.617,m
0,23267000111,2024-01-26T00:59:50Z,622.5545,m
0,23261000221,2024-01-27T01:01:11Z,53.925,m
0,23267000081,2024-01-26T00:59:59Z,555.2445,m
0,23261000361,2024-01-27T01:01:11Z,71.4789,m
0,23261000431,2024-01-27T01:01:10Z,115.8828,m
0,23261000411,2024-01-27T01:01:10Z,102.8595,m
1,23267000111,2024-03-25T01:53:04Z,372.7833,m
1,23267000081,2024-03-25T01:53:04Z,330.8292,m
1,23261000431,2024-03-25T01:52:24Z,110.2209,m


In [10]:
# Plot results
line_plot = ddf.hvplot(x="time_str", y="wse", by="reach_id", kind="line", persist=True)
line_plot.opts(xrotation=90)

scatter_plot = ddf.hvplot(x="time_str", y="wse", by="reach_id", kind="scatter", persist=True)
line_plot * scatter_plot

In [11]:
client.close()