# Create Kerchunk Reference Files from ARISE Data on AWS

## Imports

In [48]:
import fsspec
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr
from distributed import Client, LocalCluster
import dask
import ujson 
import nc_time_axis
import numpy as np
import ecgtools
from ecgtools import Builder
from ecgtools.builder import INVALID_ASSET, TRACEBACK
from ecgtools.parsers.cesm import parse_cesm_timeseries
import traceback
from pathlib import Path
import cf_xarray
import glob
import intake
import xarray as xr

## Spin up a Cluster
Let's spin up a Dask Cluster on our local machine! This will help compute our reference files in parallel.

In [2]:
cluster = LocalCluster()
client = Client(cluster)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 64853 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:64853/status,

0,1
Dashboard: http://127.0.0.1:64853/status,Workers: 4
Total threads: 12,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:64856,Workers: 4
Dashboard: http://127.0.0.1:64853/status,Total threads: 12
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:64877,Total threads: 3
Dashboard: http://127.0.0.1:64878/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:64861,
Local directory: /Users/mgrover/git_repos/cloud-for-climate/notebooks/dask-worker-space/worker-he55lsw3,Local directory: /Users/mgrover/git_repos/cloud-for-climate/notebooks/dask-worker-space/worker-he55lsw3

0,1
Comm: tcp://127.0.0.1:64880,Total threads: 3
Dashboard: http://127.0.0.1:64881/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:64862,
Local directory: /Users/mgrover/git_repos/cloud-for-climate/notebooks/dask-worker-space/worker-phbnfwnd,Local directory: /Users/mgrover/git_repos/cloud-for-climate/notebooks/dask-worker-space/worker-phbnfwnd

0,1
Comm: tcp://127.0.0.1:64871,Total threads: 3
Dashboard: http://127.0.0.1:64873/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:64859,
Local directory: /Users/mgrover/git_repos/cloud-for-climate/notebooks/dask-worker-space/worker-85kt3elj,Local directory: /Users/mgrover/git_repos/cloud-for-climate/notebooks/dask-worker-space/worker-85kt3elj

0,1
Comm: tcp://127.0.0.1:64872,Total threads: 3
Dashboard: http://127.0.0.1:64875/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:64860,
Local directory: /Users/mgrover/git_repos/cloud-for-climate/notebooks/dask-worker-space/worker-nrz1ee01,Local directory: /Users/mgrover/git_repos/cloud-for-climate/notebooks/dask-worker-space/worker-nrz1ee01


## Create our Reference Files
This is a process you will only need to do once for each file on Amazon S3

In [3]:
fs = fsspec.filesystem('s3',
                       skip_instance_cache=True)

### Setup our AWS Credentials - **Do This Before Running this Section**

We need to set which bucket to use -  before running this notebook or running throught this analysis, make sure to setup your credentials (email Brian Dobbins from NCAR if you need the credentials) using the [AWS Command Line Interface (CLI)](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html).

Once you install the CLI, go to the command line and run:

```bash
aws configure
```

Which will prompt you for the credentials.

### List Files on the Bucket

In [4]:
bucket = 'ncar-cesm2-arise'

Let's start with monthly atmospheric output!

In [5]:
files = fs.glob(f"s3://{bucket}/*/atm/proc/tseries/month_1/*TREFHT*")

In [6]:
urls = ["s3://" + f for f in files]

so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')

### Setup a Function to Generate our Reference Files

In [33]:
def gen_json(u):
    with fs.open(u, **so) as infile:
        h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
        outf = f"jsons/{path.stem}.json"
        print(outf)
        with open(outf, 'wb') as f:
            f.write(ujson.dumps(h5chunks.translate()).encode());

Now that we have our function to operate on each file, let's compute this in parallel using dask

In [8]:
%%time
dask.compute(*[dask.delayed(gen_json)(u) for u in urls], retries=10);

CPU times: user 29.9 s, sys: 5.18 s, total: 35.1 s
Wall time: 5min 52s


(None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None)

## Read back in the jsons (**Start here if you already have the reference files**)

Now that we have all of our reference files, we can combine them into a single data catalog, using a catalog generation tool (ecgtools)

In [41]:
def parse_cesm_timeseries_json(file, user_streams_dict={}, xarray_open_kwargs=None):
    """Parser for CESM timeseries files"""
    info = dict()
    path = Path(file)

    try:
        ds = xr.open_dataset(
            "reference://",
            engine="zarr",
            backend_kwargs={
                "storage_options": {
                    "fo": file,
                    "remote_protocol": "s3",
                },
                "consolidated": False
            },
            chunks={}
            )

        info["path"] = file
        
        info["variable"] = path.stem.split(".")[-2]
        info["experiment"] = path.stem.split(".")[-6]
        info["member_id"] = path.stem.split(".")[-5]

        # Get the long name from dataset
        info['long_name'] = ds[info['variable']].attrs.get('long_name')

        # Grab the units of the variable
        info['units'] = ds[info['variable']].attrs.get('units')

        # Set the default of # of vertical levels to 1
        info['vertical_levels'] = 1

        try:
            info['vertical_levels'] = ds[ds.cf['vertical'].name].size
        except (KeyError, AttributeError, ValueError):
            pass

        try:
            info['frequency'] = ds.attrs['time_period_freq']

        except (KeyError, AttributeError):
            warnings.warn('Using the default frequency definitions')
            info['frequency'] = stream.frequency
        info['path'] = str(file)
        return info

    except Exception:
        return {INVALID_ASSET: file, TRACEBACK: traceback.format_exc()}

### Setup the Builder
We first define where the reference files (jsons) are located.

In [42]:
b = Builder(paths=["jsons/"])

In [43]:
b.build(parsing_func=parse_cesm_timeseries_json)

Builder(paths=['jsons/'], storage_options={}, depth=0, exclude_patterns=[], include_patterns=[], joblib_parallel_kwargs={})

In [46]:
b.save(
    name='catalogs/arise-catalog',
    # Column name including filepath
    path_column_name='path',
    # Column name including variables
    variable_column_name='variable',
    # Data file format - could be netcdf or zarr (in this case, netcdf)
    data_format="reference",
    # Which attributes to groupby when reading in variables using intake-esm
    groupby_attrs=["frequency", "experiment", "member_id"],
    # Aggregations which are fed into xarray when reading in data using intake
    aggregations=[],
)

Successfully wrote ESM catalog json file to: file:///Users/mgrover/git_repos/cloud-for-climate/notebooks/./catalogs/arise-catalog.json


## Read the Catalog

In [49]:
data_catalog = intake.open_esm_datastore("catalogs/arise-catalog.json")

In [54]:
dsets = data_catalog.search(variable='TREFHT',
                            member_id=1).to_dataset_dict()


--> The keys in the returned dictionary of datasets are constructed as follows:
	'frequency.experiment.member_id'


In [57]:
dsets['month_1.SSP245-TSMLT-GAUSS-DEFAULT.1']

Unnamed: 0,Array,Chunk
Bytes,88.59 MiB,216.00 kiB
Shape,"(420, 192, 288)","(1, 192, 288)"
Count,421 Tasks,420 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 88.59 MiB 216.00 kiB Shape (420, 192, 288) (1, 192, 288) Count 421 Tasks 420 Chunks Type float32 numpy.ndarray",288  192  420,

Unnamed: 0,Array,Chunk
Bytes,88.59 MiB,216.00 kiB
Shape,"(420, 192, 288)","(1, 192, 288)"
Count,421 Tasks,420 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.28 kiB 3.28 kiB Shape (420,) (420,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",420  1,

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.28 kiB 3.28 kiB Shape (420,) (420,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",420  1,

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.28 kiB 3.28 kiB Shape (420,) (420,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",420  1,

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,8 B
Shape,"(420,)","(1,)"
Count,421 Tasks,420 Chunks
Type,object,numpy.ndarray
"Array Chunk Bytes 3.28 kiB 8 B Shape (420,) (1,) Count 421 Tasks 420 Chunks Type object numpy.ndarray",420  1,

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,8 B
Shape,"(420,)","(1,)"
Count,421 Tasks,420 Chunks
Type,object,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.28 kiB 3.28 kiB Shape (420,) (420,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",420  1,

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.28 kiB 3.28 kiB Shape (420,) (420,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",420  1,

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.28 kiB 3.28 kiB Shape (420,) (420,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",420  1,

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.50 kiB,1.50 kiB
Shape,"(192,)","(192,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.50 kiB 1.50 kiB Shape (192,) (192,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",192  1,

Unnamed: 0,Array,Chunk
Bytes,1.50 kiB,1.50 kiB
Shape,"(192,)","(192,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,568 B,568 B
Shape,"(71,)","(71,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 568 B 568 B Shape (71,) (71,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",71  1,

Unnamed: 0,Array,Chunk
Bytes,568 B,568 B
Shape,"(71,)","(71,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,560 B,560 B
Shape,"(70,)","(70,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 560 B 560 B Shape (70,) (70,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",70  1,

Unnamed: 0,Array,Chunk
Bytes,560 B,560 B
Shape,"(70,)","(70,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,568 B,568 B
Shape,"(71,)","(71,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 568 B 568 B Shape (71,) (71,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",71  1,

Unnamed: 0,Array,Chunk
Bytes,568 B,568 B
Shape,"(71,)","(71,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,560 B,560 B
Shape,"(70,)","(70,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 560 B 560 B Shape (70,) (70,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",70  1,

Unnamed: 0,Array,Chunk
Bytes,560 B,560 B
Shape,"(70,)","(70,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.28 kiB 3.28 kiB Shape (420,) (420,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",420  1,

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.28 kiB 3.28 kiB Shape (420,) (420,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",420  1,

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.28 kiB 3.28 kiB Shape (420,) (420,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",420  1,

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.28 kiB 3.28 kiB Shape (420,) (420,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",420  1,

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 3.28 kiB 3.28 kiB Shape (420,) (420,) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",420  1,

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,3.28 kiB
Shape,"(420,)","(420,)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,6.56 kiB,16 B
Shape,"(420, 2)","(1, 2)"
Count,421 Tasks,420 Chunks
Type,object,numpy.ndarray
"Array Chunk Bytes 6.56 kiB 16 B Shape (420, 2) (1, 2) Count 421 Tasks 420 Chunks Type object numpy.ndarray",2  420,

Unnamed: 0,Array,Chunk
Bytes,6.56 kiB,16 B
Shape,"(420, 2)","(1, 2)"
Count,421 Tasks,420 Chunks
Type,object,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,8 B
Shape,"(420,)","(1,)"
Count,421 Tasks,420 Chunks
Type,object,numpy.ndarray
"Array Chunk Bytes 3.28 kiB 8 B Shape (420,) (1,) Count 421 Tasks 420 Chunks Type object numpy.ndarray",420  1,

Unnamed: 0,Array,Chunk
Bytes,3.28 kiB,8 B
Shape,"(420,)","(1,)"
Count,421 Tasks,420 Chunks
Type,object,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,16 B,16 B
Shape,"(1, 2)","(1, 2)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 16 B 16 B Shape (1, 2) (1, 2) Count 2 Tasks 1 Chunks Type float64 numpy.ndarray",2  1,

Unnamed: 0,Array,Chunk
Bytes,16 B,16 B
Shape,"(1, 2)","(1, 2)"
Count,2 Tasks,1 Chunks
Type,float64,numpy.ndarray
