# Demoing code for `intake`, `fsspec`, `dask`, and `xarray` libraries

These libraries are used in most of the HyTEST notebooks and understanding how they work together will help understand the code more.

Author: Andrew Laws - USGS Web Informatics and Mapping (WIM) <br>

Date: 3/20/2023

Focus: `intake`, `fsspec`, `dask`, and `xarray` libraries use in climate and forcings big data

In [None]:
# library imports
import fsspec
import hvplot.xarray
import intake
import os
import warnings
import rioxarray
import dask

from dask.distributed import LocalCluster, Client
from pygeohydro import pygeohydro

import xarray as xr
import geopandas as gpd
import pandas as pd
import geoviews as gv
import dask.dataframe as dd

warnings.filterwarnings('ignore')

#### Dask is a super useful Python package for parallel and lazy computing. 

When working with a large dataset that is multiple terrabytes in size such as CONUS404, Dask allows you to perform lazy operations, meaning the whole dataset is not read into memory (RAM) all at once but only once you're ready for it. A prime example is work within HyTEST where we lazily ready in CONUS404, subset the data to certain variables and a limited geographic extend and then load it into memory. For a more in-depth dive and before you start to use it in-depth, make sure you read through the [Dask documentation](https://docs.dask.org/en/stable/).

**Another huge benefit of this**: you can avoid the annoying step of downloading data to a local folder, instead just using a  web service, saving time. This means if the base data is updated, you don't have to download the dataset again and it is automatically available for your notebook.

#### Starting up dask on HPC or Nebari using a helper function

In [None]:
def configure_cluster(machine):
    ''' Helper function to configure cluster
    '''
    if machine == 'denali':
        from dask.distributed import LocalCluster, Client
        cluster = LocalCluster(threads_per_worker=1)
        client = Client(cluster)
    
    elif machine == 'tallgrass':
        from dask.distributed import Client
        from dask_jobqueue import SLURMCluster
        cluster = SLURMCluster(queue='cpu', cores=1, interface='ib0',
                               job_extra=['--nodes=1', '--ntasks-per-node=1', '--cpus-per-task=1'],
                               memory='6GB')
        cluster.adapt(maximum_jobs=30)
        client = Client(cluster)
        
    elif machine == 'local':
        import os
        import warnings
        from dask.distributed import LocalCluster, Client
        warnings.warn("Running locally can result in costly data transfers!\n")
        n_cores = os.cpu_count() # set to match your machine
        cluster = LocalCluster(threads_per_worker=n_cores)
        client = Client(cluster)
        
    elif machine in ['esip-qhub-gateway-v0.4']:   
        import sys, os
        sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
        import ebdpy as ebd
        aws_profile = 'esip-qhub'
        ebd.set_credentials(profile=aws_profile)

        aws_region = 'us-west-2'
        endpoint = f's3.{aws_region}.amazonaws.com'
        ebd.set_credentials(profile=aws_profile, region=aws_region, endpoint=endpoint)
        worker_max = 30
        client,cluster = ebd.start_dask_cluster(profile=aws_profile, worker_max=worker_max, 
                                              region=aws_region, use_existing_cluster=True,
                                              adaptive_scaling=False, wait_for_cluster=False, 
                                              worker_profile='Medium Worker', propagate_env=True)
        
    return client, cluster

# if-else to determine HPC or Nebari
if 'SLURM_CLUSTER_NAME' in os.environ: #USGS HPC use SLURM CLUSTER to handle jobs, otherwise...
    machine = os.environ['SLURM_CLUSTER_NAME']
    cluster = configure_cluster(machine)
else:  # use the Nebari machine
    machine = 'esip-qhub-gateway-v0.4'
    client, cluster = configure_cluster(machine)

You can also use Dask on your personal computer. This [bit of Dask documentation](https://docs.dask.org/en/stable/deploying-python.html) explains how easy this is to set up.

In [None]:
# cluster = LocalCluster()
# client = Client(cluster)

#### With HyTEST, we use `intake` catalogs in our code to making data I/O more uniform. 

If we change where a dataset gets imported from, we only have to change it in one place rather than in each notebook. They can also be nested as you'll see next.

[Intake readthedocs site](https://intake.readthedocs.io/en/latest/overview.html)

[Example of HyTEST catalogs](https://github.com/hytest-org/hytest/tree/main/dataset_catalog)

Note: Select datasets that end in "onprem" if running on Denali/Tallgrass HPC or cloud data if working on QHub or local.

In [None]:
url = 'https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml'
cat = intake.open_catalog(url)
list(cat)

Notice the 'conus404-drb-eval-tutorial-catalog'? That is a nested catalog for data we use in a specific tutorial series and can be accessed like this.

In [None]:
nested_cat = cat['conus404-drb-eval-tutorial-catalog']
list(nested_cat)

And if you want to see the details about the dataset

In [None]:
nested_cat['conus404-drb-OSN']

The other handy bit about `intake` is that you can open a dataset as a Dask data object easily (though the base class will be an `xarray` dataset). This is done lazily. But why the `xarray` dataset? From [this read about the Dask ecosystem:](https://docs.dask.org/en/stable/ecosystem.html?highlight=xarray) "xarray: Wraps Dask Array, offering the same scalability, but with axis labels which add convenience when dealing with complex datasets."

In [None]:
c404_drb = nested_cat['conus404-drb-OSN'].to_dask()
c404_drb

#### What is `xarray`? 
From [the Xarray website](https://docs.xarray.dev/en/stable/getting-started-guide/why-xarray.html): "Xarray introduces labels in the form of dimensions, coordinates and attributes on top of raw NumPy-like multidimensional arrays, which allows for a more intuitive, more concise, and less error-prone developer experience." 

If you've used `numpy` and `pandas` before, the API will be relatively familiar. As mentioned above, since it wraps a Dask array, it allows computation on large datasets in parallel. It can also take in many cloud and local file types. See [this Xarray documentation about that](https://docs.xarray.dev/en/stable/user-guide/io.html). As with Dask, an upfront time investment in understanding Xarray will pay long-term efficiency rewards.

But what if you don't want to use `intake` catalogs for a bunch of datasets you don't maintain?

#### `fsspec` allows a user to connect with many different filesystems for querying, reading, and writing data.

Want data from an AWS S3 bucket? `fsspec` has you covered, whether it requires credentials (requester pays and no anonymous read) or not (anonymous reads). Local files? Yep. Open data portals using something such as [THREDDS Data Server](https://www.unidata.ucar.edu/software/tds/)? Still good. For a more in-depth look at `fsspec`, read through the [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/).

An additional benefit is that many libraries (`dask`, `xarray`, and `pandas`) already use `fsspec` in the background for reading and writing data. Next, you'll see a couple examples of calling in data using `fsspec` directly and indirectly (through `xarray`), as well as incorporating `dask`.

First up, some data is only available through older file transfer protocal (FTP) methods. Using `fsspec`'s [FTPFileSystem object](https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=ftpfilesystem#fsspec.implementations.ftp.FTPFileSystem).

In [None]:
from fsspec.implementations.ftp import FTPFileSystem

One thing to note: FTP server calls typically have a timeout, meaning if you take too long after instantiating the connecting it will disconnect. All you have to do is reconnect by running the code again (next cell). In the next few cells, we are reading in Climate Reference Network station data over an FTP connection.

First, create a FTP file system connection.

In [None]:
fs = FTPFileSystem("ftp.ncei.noaa.gov")

In [None]:
fs.ls("/pub/data/uscrn/products/")

Since the file type is *tab-separated values (tsv)*, we will use the *pd.read_table* function to create a Dataframe

In [None]:
uscrn_all = pd.read_table(fs.open("/pub/data/uscrn/products/stations.tsv")) 
uscrn_all.head()

What about reading from a permissioned S3 bucket? Lets find the URL for a dataset from the `intake` catalog. Note: this requires you to have AWS credentials.

**If you are unsure of what permissioned/requester pays bucket is, read this [resource](https://docs.aws.amazon.com/AmazonS3/latest/userguide/RequesterPaysBuckets.html).**

In [None]:
conus404_cat = cat["conus404_catalog"]
conus404_cat['conus404-hourly-cloud']

Using the urlpath in `xarray`, we can read in the dataset using saved AWS credentials and the `fsspec.get_mapper()` method to pass the correct parameters to `xarray.open_zarr()`.

In [None]:
c404_hourly_url = "s3://nhgf-development/conus404/conus404_hourly_202209.zarr"

fs_gm = fsspec.get_mapper(c404_hourly_url, # given the url, fsspec will know it is speaking to an S3 file system
                          anon=False,
                          requester_pays=True, # our S3 credentials will be charged
                          client_kwargs={'region_name':'us-west-2'})

# open dataset
c404_hourly = xr.open_zarr(fs_gm)
c404_hourly

Or you can open the dataset lazily with Dask by setting the parameter chunks={}

In [None]:
# open dataset lazily 
# this happens by setting chunks={}
# though you can specify chunks where applicable
c404_hourly_dask = xr.open_zarr(fs_gm, chunks={})
c404_hourly_dask

The datasets look the same but the second takes up less space in memory. Now to show an example of lazy computation using radiation variables to calculate a made up variable, FAKERAD. 
Here are the equations:
\begin{equation}FAKERAD = DNB - UPB\end{equation}
\begin{equation}DNB = ACSWDNB - ACLWDNB\end{equation} 
\begin{equation}UPB = ACSWUPB - ACLWUPB\end{equation}

In [None]:
c404_hourly_dask_sub = c404_hourly_dask[["ACSWDNB", "ACSWUPB", "ACLWDNB", "ACLWUPB"]]
c404_hourly_dask_sub = c404_hourly_dask_sub.sel(time=slice('1979-10-01T00:00:00.000000000', '1979-10-01T00:00:00.000000000')) #subset to single time step
c404_hourly_dask_sub

First, calculate DNB and UPB

In [None]:
c404_hourly_dask_sub["DNB"] = c404_hourly_dask_sub['ACSWDNB'] - c404_hourly_dask_sub['ACLWDNB']
c404_hourly_dask_sub['UPB'] = c404_hourly_dask_sub['ACSWUPB'] - c404_hourly_dask_sub['ACLWUPB']

Then calculate FAKERAD.

In [None]:
c404_hourly_dask_sub['FAKERAD'] = c404_hourly_dask_sub['DNB'] - c404_hourly_dask_sub['UPB']

This has all happened so fast! But, has anything actually happened? Dask has been keeping track of the operations and won't run them all until the compute() method is passed. The execution will take longer time than the last couple of cells but will be much faster than runner all of these operations on an in-memory dataset.

In [None]:
c404_hourly_memory_sub = c404_hourly_dask_sub.compute()
c404_hourly_memory_sub

If you are working with large tabular data, Dask Dataframes performs the same way and is modeled after `pandas` dataframes. The `.compute()` graphs can be visualized for these and more information can be found [here.](https://docs.dask.org/en/stable/graphviz.html)

In [None]:
c404_hourly_dask_sub['ACSWDNB'].data.visualize()

Finally, here are some examples of calling in some datasets from various open access sources.

##### OpenDAP data

Now, looking at CERES-EBAF, called in from an OpenDAP source.

In [None]:
# ceres-ebaf url
ceres_url = "https://opendap.larc.nasa.gov/opendap/CERES/EBAF/Edition4.1/CERES_EBAF_Edition4.1_200003-202111.nc"

# bring in ceres-ebaf
ceres = xr.open_dataset(ceres_url, decode_coords="all")
ceres

##### Data on a THREDD server

In [None]:
# open daymet lazily 
# this happens by setting chunks={}
# though you can specify chunks where applicable
daymet = xr.open_dataset("https://thredds.daac.ornl.gov/thredds/dodsC/daymet-v4-agg/na.ncml", chunks={})
daymet

#### Clipping xarray datasets to a bounding box
Here are two possible ways to clip to a bounding box and when each might suit you best. Why a bounding box and not an exact clip? If storing an intermediate dataset, this makes the data smaller as well as giving flexibility to exact area extraction or only including cells inside of the area of interest.

The first is using the `xarray` index methods. This is beneficial because it doesn't require using extra libraries but there have been isues when the spatial resolution of the dataset is very large and the geographical area is small. In some cases, it will drop some grid cells compared to the second method.

In [None]:
from pygeohydro import WBD
from cartopy import crs as ccrs
# bring in boundaries of DRB and create single polygon
drb = WBD("huc6", outfields=["huc6", "name"]).byids("huc6", ["020401", "020402"])

# create a column where all entries have the same value
drb["name"] = "DRB"

# dissolve by that column
drb = drb.dissolve(by="name")

# set CRS to match ds
drb = drb.iloc[[0]].to_crs(ccrs.LambertConformal())

# tuple of bounding box
drb_bbox = list(drb.total_bounds)

drb_bbox

In [None]:
daymet_sel = daymet.sel(x=slice(drb_bbox[1],drb_bbox[3]), y=slice(drb_bbox[2],drb_bbox[0]))
daymet_sel

In the above, the indexing returned a subset along the x dimension but nothing along the y due to underlying CRS/indexing conflicts.

The second is the `rioxarray.clip_box` method. `rioxarray` is used to apply `rasterio` to `xarray` and adds a `rio` accessor to `xarray` objects and [documentation can be found here about rioxarray.](https://corteva.github.io/rioxarray/stable/). This method does require an additional library but does ensure that all grid cells that are inside the bounding box are included.

In [None]:
import rioxarray

# add crs to dataset
daymet.rio.write_crs(ccrs.LambertConformal(), inplace=True)

# drop time_bnds, otherwise it will throw an error in the clip_box
daymet = daymet.drop(["time_bnds"])

#clip to bbox
daymet_bbox = daymet.rio.clip_box(minx=drb_bbox[0],
                            miny=drb_bbox[1],
                            maxx=drb_bbox[2],
                            maxy=drb_bbox[3],
                            crs=ccrs.LambertConformal())

daymet_bbox

As can be seen, the indexing works great but does have one less grid cell (see y dim)

In [None]:
print("Index select:\n", daymet_sel.dims)
print("Rioxarray:\n", daymet_bbox.dims)

Finally, it is always important to shutdown the client and cluster.

In [None]:
client.close(); cluster.shutdown()