# Access HYCOM Reanalysis data on AWS Open Data
Access the  63,341 NetCDF 64-bit offset files from the HYbrid Coordinate Ocean Model (HYCOM) Global Ocean Forecast System Reanalysis (1994-2015) on [AWS Open Data](https://registry.opendata.aws/hycom-gofs-3pt1-reanalysis/) as a single Kerchunk-generated virtual dataset

In [None]:
import fsspec
import xarray as xr
import hvplot.xarray
import intake

## Open the virtual dataset

Open using Intake:

In [None]:
cat = intake.open_catalog('https://ncsa.osn.xsede.org/esip/rsignell/hycom.yaml')

In [None]:
%%time
ds = cat['gofs-3pt1'].read()

In [None]:
# The much more verbose way non-Intake way to open the dataset using the xarray kerchunk engine:
# combined_parquet_aws = 's3://esip/rsignell/hycom.parq'

# so = dict(anon=True)    # data stored on AWS Open Data S3
# to = dict(anon=True,    # refs stored on OSN 
#           client_kwargs={'endpoint_url': 'https://ncsa.osn.xsede.org'})

# ds = xr.open_dataset(combined_parquet_aws, engine='kerchunk', chunks={},
#                     backend_kwargs=dict(storage_options=dict(target_options=to,
#                     remote_protocol='s3', lazy=True, remote_options=so)))

## Read Data

<u> Case 1: Read a 3D field at a specific time step.  </u>

We don't need a cluster for this since just one chunk of data (24MB) is actually loaded

In [None]:
ds['water_temp'].sel(depth=0, time='2012-10-29 17:00', method='nearest')

In [None]:
%%time
da = ds['water_temp'].sel(depth=0, time='2012-10-29 17:00', method='nearest').load()

In [None]:
da.hvplot.quadmesh(x='lon', y='lat', geo=True, global_extent=True, tiles='ESRI', cmap='viridis', rasterize=True)

<u> Case 2: Load a time series at a specific location. </u>

Because each chunk only contains one time value, we want to read chunks in parallel using a Dask cluster.  Here we use coiled.io to generate a cluster on AWS in the same region as the data.   We also specify a cheap ARM instance type to keep the cost low. 

In [None]:
cluster_type = 'Coiled'

In [None]:
if cluster_type == 'Local':
    from dask.distributed import Client

    client = Client()

In [None]:
if cluster_type == 'Coiled':
    import coiled
    cluster = coiled.Cluster(
        region="us-west-2",
        arm=True,
        worker_vm_types=["t4g.small"],  # cheap, small ARM instances, 2cpus, 2GB RAM
        worker_options={'nthreads':2},
        n_workers=100,
        wait_for_workers=True,
        compute_purchase_option="spot_with_fallback",
        name='hycom',
        software='esip-pangeo-arm',
        workspace='esip-lab',
        timeout=300   # leave cluster running for 5 min in case we want to use it again
    )

    client = cluster.get_client()

We open the dataset again and tell Dask to load 20 time values (20 chunks) for each task.  Loading multiple time steps means we only incur object storage latency once for each task.  Each task will use more memory, however.  We picked 20 because it fits within memory on the 2GB ARM `t4g.small` instance types (and larger than 20 doesn't give significant performance benefit). 

In [None]:
ds = cat['gofs-3pt1'].read(chunks={"time": 20})   # Intake way to re-open the dataset with different Dask chunks

In [None]:
# non-Intake way to re-open the dataset with different Dask chunks:
# ds = xr.open_dataset(combined_parquet_aws, engine='kerchunk', chunks={'time':20},
#                    backend_kwargs=dict(storage_options=dict(target_options=to,
#                    remote_protocol='s3', lazy=True, remote_options=so)))

Extract the time series:

In [None]:
%%time
da = ds['water_temp'].isel(depth=0).sel(lon=-69.6, lat=42.5, method='nearest').load(retries=10)

Interactive visualization of the time series:

In [None]:
da.hvplot(x='time', grid=True)