This notebook demonstrates how to convert a netCDF dataset stored as a collection of HDF5 files in blob storage to Zarr format.

In [1]:
import argparse
import itertools

import adlfs
import dask.array
import fsspec
import h5py
import numpy as np
import xarray as xr
import zarr

# The Source Dataset

We're working with the [Daymet](https://azure.microsoft.com/en-us/services/open-datasets/catalog/daymet/) dataset. There are a few aspects of this dataset that matter for the conversion to Zarr.

- There are several variables of interest that share the same coordinates (`tmin`, `tmax`, `prcp`, etc.). These are indexed by `(time, x, y)`.
- A new version of this dataset is generated each year. We want to make it easy to expand this in time.
- The source files are netCDF datasets (stored as directories of HDF5 files) named like `{variable}_{year}_{region}.nc4`.

So the raw data is like

```
daymetv3_dayl_1980_hawaii.nc4
daymetv3_tmax_1980_hawaii.nc4
...
daymetv3_tmax_1980_na.nc4
...
daymetv3_tmax_2010_hawaii.nc4
```

And here's the repr for the dataset that we'll generate for Hawaii.

```
<xarray.Dataset>
Dimensions:                  (nv: 2, time: 14600, x: 284, y: 584)
Coordinates:
    lat                      (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray>
    lon                      (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray>
  * time                     (time) datetime64[ns] 1980-01-01T12:00:00 ... 20...
  * x                        (x) float32 -5.802e+06 -5.801e+06 ... -5.519e+06
  * y                        (y) float32 -3.9e+04 -4e+04 ... -6.21e+05 -6.22e+05
Dimensions without coordinates: nv
Data variables:
    dayl                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    lambert_conformal_conic  float32 ...
    prcp                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    srad                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    swe                      (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    time_bnds                (time, nv) datetime64[ns] dask.array<chunksize=(14600, 2), meta=np.ndarray>
    tmax                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    tmin                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    vp                       (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>
    yearday                  (time) int16 dask.array<chunksize=(14600,), meta=np.ndarray>
```


Our overall strategy is to

1. Treat each region completely independenlty. They'll end up in different Zarr stores.
2. Construct a dictionary with all of the `attrs` we whish to preserve (global attributes and per-variable attributes)
3. "Allocate" an empty Zarr Group containing the dataset. "Allocate" is in quotes, since we aren't writing any actual data here. We just want to get the layout of the Zarr metadata right. We'll copy the actual data later.
4. Copy the data for "static" variables. These are coordinates / variables like `lat`, `lon`, or `lambert_conformal_conic` that *not* indexed by `time`. We only need to copy them once.
5. Process each year's worth of data. We'll go into this in detail later.

This is a deliberately low-level strategy that's attempting to do the conversion as efficiently as possible.
In particular, we avoid any expensive inference operations in xarray, and we've set up the problem to be as parallel as possible.

We have a few helpers. These do the bulk of the work of actually copying the data from HDF5 to Zarr.
The most important aspect to notice is that we pass in an `offset`. Recall that each *source* file is in a separate netCDF file. But our output Zarr store
will contain data for *all* the years. `offset` tells us where in the *output* array this data goes.

In [2]:
# -----------------------------------------------------------------------------
# utils
def process_file(file, offset, url, storage_options=None):
    """
    Copy the data for a single variable for a single year. 
    """
    storage_options = storage_options or {}
    fs = adlfs.AzureBlobFileSystem(account_name="daymeteuwest")
    h5file = h5py.File(fs.open(file))
    group = zarr.group(fsspec.get_mapper(url, **storage_options))

    variable = file.split("_")[2]
    dataset = h5file[variable]

    slices = dask.array.core.slices_from_chunks(dask.array.empty_like(dataset).chunks)
    for slice_ in slices:
        time_slice, *rest = slice_
        time_slice = slice(time_slice.start + offset, time_slice.stop + offset)
        target_slice = (time_slice,) + tuple(rest)
        group[variable][target_slice] = dataset[slice_]

`process_expanding` needs to be called once per variable per year. This updates variables like `yearday` that are indexed by `time`.
However, they aren't (nescesesarily?) chunked, and so we can't simply use `process_file`.

In [3]:
def process_expanding(variable, files, offsets, url, storage_options=None):
    """
    Write the all the years for an "expanding" variable like time.
    """
    storage_options = storage_options or {}
    fs = adlfs.AzureBlobFileSystem(account_name="daymeteuwest")

    for file, offset in zip(files, offsets):
        h5file = h5py.File(fs.open(file))
        group = zarr.group(fsspec.get_mapper(url, **storage_options))

        dataset = h5file[variable]
        slice_ = (slice(offset, dataset.shape[0] + offset),)
        if dataset.ndim > 1:
            slice_ += tuple(slice(i) for i in dataset.shape[1:])

        group[variable][slice_] = dataset


def decode_encode_attr(v):
    if isinstance(v, bytes):
        v = v.decode("utf-8")  # is this always utf-8?
    return xr.backends.zarr.encode_zarr_attr_value(v)

## Static Metadata

This section defines all the *static* metadata like the URL of the source data, the various varibles. We just know this ahead of time.

In [4]:
ACCOUNT_NAME = "daymeteuwest"

# TODO: Think about what's static vs. dynamic.
# VARIABLES, REGIONS, and YEARS could be dynamic.
VARIABLES = [
    "tmin",
    "tmax",
    "prcp",
    "srad",
    "vp",
    "swe",
    "dayl",
]
YEARS = list(range(1980, 2020))
REGIONS = ["na", "hawaii", "puertorico"]

# These are all static properties of the dataset. Must be provided
# by the user.
# Coordinates in xarray parlance.
COORDINATES = ["lat", "lon", "time", "x", "y"]
# These are dimensions in xarray's data model, but they aren't our
# main variables of interest. They are the same for all the variables
# within a year, so we only need to copy them once per year.
EXTRA_VARIABLBES = ["lambert_conformal_conic", "time_bnds", "yearday"]
# These variables expand in time. They must be copied each year.
EXPANDING = {"time", "time_bnds", "yearday"}

missing_value = -9999.0

# These are attrs that are dropped by xarray. I discovered these
# by manually loading with `open_mdfdataset` on a couple of years
# and merging.
ATTRS_DROP = {
    "_FillValue",
    "DIMENSION_LIST",
    "_nc3_strict",
    "CLASS",
    "NAME",
    "_Netcdf4Dimid",
    "REFERENCE_LIST",
}


def year_key(x):
    return x.split("_")[3]

We're writing to Azure Blob Storage. We'll use fsspec and adlfs to create the mapping for the Zarr store. You could replace this with any fsspec-compatible URL.

In [5]:
import os

region = "hawaii"
account_name = "daymeteuwest"
store_url = "az://daymet-zarr/daymet_v3_{region}.zarr"
sas_token = os.environ['SAS_TOKEN']  # Grants permission to write the Azure Blob container
url = store_url.format(region=region)

storage_options = dict(
    account_name=account_name,
    sas_token=sas_token,
)
store = fsspec.get_mapper(url, **storage_options)
group = zarr.group(store)

In [6]:
# -------------------------------------------------------------------------
# Metadata Processing
# Gather a bit of metadata ahead of time.
fs = adlfs.AzureBlobFileSystem(account_name=ACCOUNT_NAME)

## Attrs

Now we gather all of the `attrs` that xarray typically handles for us. For the most part, this involves opening up the HDF5 dataset for each variable, copying its attrs (after decoding the bytes), and then fixing a few up.
We also handle the attrs for the dataset as a whole. To verify this, I used xarray to merge the data for a couple years and compared my attrs against xarray.

The most important addition here is the `_ARRAY_DIMENSIONS` variable. This is a convention used by xarray when loading datasets from Zarr. I've explicitly created them by looking at what xarray did. I suspect they can be discovered from the dataset.

In [7]:
templates = [x for x in files if "1980" in x]

# These are expected by xarray when reading a Zarr file.
# I don't know how best to discover these dynamically.
ATTRS_ARRAY_DIMENSIONS = {
    "time_bnds": ["time", "nv"],
    "srad": ["time", "y", "x"],
    "tmin": ["time", "y", "x"],
    "time": ["time"],
    "swe": ["time", "y", "x"],
    "dayl": ["time", "y", "x"],
    "prcp": ["time", "y", "x"],
    "x": ["x"],
    "y": ["y"],
    "vp": ["time", "y", "x"],
    "tmax": ["time", "y", "x"],
    "yearday": ["time"],
    "lat": ["y", "x"],
    "lon": ["y", "x"],
}

# Stage 1: Attrs handling
global_attrs = {
    k: decode_encode_attr(v)
    for k, v in h5py.File(fs.open(templates[0])).attrs.items()
    if k not in ATTRS_DROP
}
attrs = {".zattrs": global_attrs}
for file in templates:
    h5file = h5py.File(fs.open(file))
    variable = file.split("_")[2]
    dataset = h5file[variable]
    attrs[variable] = {
        k: decode_encode_attr(v)
        for k, v in dataset.attrs.items()
        if k not in ATTRS_DROP
    }
    attrs[variable]["_ARRAY_DIMENSIONS"] = ATTRS_ARRAY_DIMENSIONS[variable]
    attrs[variable]["missing_value"] = missing_value

h5file = h5py.File(fs.open(templates[0]))

for variable in EXTRA_VARIABLBES + COORDINATES:
    dataset = h5file[variable]
    attrs[variable] = {
        k: decode_encode_attr(v)
        for k, v in dataset.attrs.items()
        if k not in ATTRS_DROP
    }
    attrs[variable]["_ARRAY_DIMENSIONS"] = ATTRS_ARRAY_DIMENSIONS.get(variable, [])
    if "_FILL_VALUE" in attrs[variable]:
        attrs[variable]["missing_value"] = missing_value

# final attrs fixups.
# I have no idea how xarray gets these.
attrs["time_bnds"].update(
    calendar="proleptic_gregorian", units="days since 1980-01-01 00:00:00"
)
attrs["time"].update(units="days since 1980-01-01T00:00:00+00:00")

## Pre-allocate Zarr Groups for Variables

We now will "preallocate" the Zarr groups, one for each variable, using the netCDF files from 1980 as a "template". This is fast, since we just write the `.zarray` and `.zattrs` files. We aren't actually copying any data yet.
Note that we do a bit of work here to get "nice" chunks. The chunks in the HDF5 file were a bit small for my taste (something like 4-8 MB if memory serves). We use
`dask.array` to infer a good chunksize. We also explicitly set the chunking along the first dimension (`time`) to be `365`. This works well for our problem, and results
in uniform chunks for each year.

Note that we use the `full_shape`, which covers all the years, rather than the shape of the dataset (which only covers one year).

In [12]:
for file in templates:  # this is fast, no need to parallelize
    h5file = h5py.File(fs.open(file))
    variable = file.split("_")[2]
    dataset = h5file[variable]
    chunks = dask.array.empty_like(dataset).chunksize
    # We explicitly use a chunksize of 365 on the time axis.
    # This makes it easier to append in the future, since the chunks will always be
    # uniform. We won't have to rechunk a dangling chunk of data from the previous year.
    chunks = (365,) + chunks[1:]
    v = group.empty(
        dataset.name, shape=full_shape, dtype=dataset.dtype, chunks=chunks, overwrite=True
    )
    v.attrs.update(attrs[dataset.name.lstrip("/")])

Next, we process the coordinates (like `time`, `x`, `y`) and "extra variables" (like `lambert_conformal_conic`) -- there's probably a better name for the "extra variables".
We do handle these differently based on whether they're indexed by `time` ("expanding"). If they are expanding, we again pre-allocate the *full* array shape, rather than
the shape from just this file (which represents a year).

If a coordinate is not expanding (i.e. it's static for every year, like `lat`) then we just copy it now. This is the first time we're actually copying some data, rather
than just metadata! This could be parallized, but doesn't take too long. I'm also not sure what the best practices are for chunking coordinate dimensions.

We also handle `lambert_conformal_conic` special since it's a scalar.

In [12]:
h5file = h5py.File(fs.open(files[0]))

for coord in (
    COORDINATES + EXTRA_VARIABLBES
):  # this is slowish. Maybe parallize on 1 machine.
    dataset = h5file[coord]
    print("handle", coord)
    if coord == "lambert_conformal_conic":
        # scalar!
        group[dataset.name] = np.array(dataset)
        v = group[dataset.name]
    else:
        if coord not in EXPANDING:
            shape = dataset.shape
        else:
            shape = (full_shape[0],) + dataset.shape[1:]
        shape = tuple(int(x) for x in shape)  # NumPy ints -> Python ints
        # We explicitly *don't* want this chunked, and so we specify chunks=shape
        # TODO: figure out a good chunking for NA. Its `lat` array is 250MB
        v = group.empty(
            dataset.name, shape=shape, dtype=dataset.dtype, overwrite=True, chunks=shape
        )
        if coord not in EXPANDING:
            # Actually read some static data
            v[:] = np.array(dataset)
    v.attrs.update(attrs[dataset.name.lstrip("/")])

handle lat
handle lon
handle time
handle x
handle y
handle lambert_conformal_conic
handle time_bnds
handle yearday


## Computing Offsets

The raw data is split into separate files by year like `prcp_1980_hawaii.zarr`, `prcp_1981_hawaii.zarr`. Ideally all of the `prcp` data will be stored in a single Zarr Group.
One option would be to write the first group and then serially `append` to that dataset year by year. This should work, but would be slow since we need to wait for 1980 to finish
before we can start on 1981, and so on.

To maximize parallelism, we pre-compute a list of `offsets`. This indicates the *starting index* for each year. By discovering these ahead of time, we can write to the Zarr
store in parallel. (The offsets happen to be 365 for every year, so we *could* just define this like `365 * np.aranage(len(YEARS))`. But discovering this from
the data isn't too expensive.

In [None]:
files = fs.glob(f"daymet/daymet_v3_*_{region}.nc4")
dayl_files = [x for x in files if "dayl" in x]
shapes = [h5py.File(fs.open(f))["dayl"].shape for f in dayl_files]
full_shape = (np.array(shapes)[:, 0].sum(),) + shapes[0][1:]
offsets = np.cumsum(np.array(shapes)[:, 0])
offsets -= offsets[0]

## Copy Data

We're now ready to copy the data. This is the expensive step so we've taken care to make it easy to parallelize. We'll use the `process_file` from earlier, which will be mapped over all the `(variable, year)` combinations. The only trick here is ensuring that we assocate the right `offset` with each `year`. So we group by year and then index into `offsets`.

In [8]:
# Stage 3: Process
batches = itertools.groupby(sorted(files, key=year_key), year_key)

params = []
for i, (_, files_) in enumerate(batches):
    for file in files_:
        params.append((file, offsets[i]))

files2, offsets2 = zip(*params)

In [22]:
files2[:21:7], offsets2[:21:7]

(('daymet/daymet_v3_dayl_1980_hawaii.nc4',
  'daymet/daymet_v3_dayl_1981_hawaii.nc4',
  'daymet/daymet_v3_dayl_1982_hawaii.nc4'),
 (0, 365, 730))

We could parallize this in any number of ways. We've set stuff up to work with the `concurrent.futures` api, which Dask implements. But we could also use Azure Batch at this point.

The memory usage on each worker will be modest. Just the maximum size of a chunk (something like 250 Mb) times the number of tasks it's running concurrently.

In [None]:
from dask_gateway import GatewayCluster
from distributed import wait

cluster = GatewayCluster()
cluster.adapt(minimum=1, maximum=50)

client = cluster.get_client()

cluster

In [26]:
futures = client.map(process_file, files2, offsets2, url=url, storage_options=storage_options)

We also need to copy the data from the `EXPANDING` variables, which grow in time but aren't chunked.
These can be done in parallel *by variable* but not by time (since they aren't chunked).

In [14]:
dayl_params = [x for x in params if "dayl" in x[0]]
dayl_files, dayl_offsets = zip(*dayl_params)

In [15]:
extra_futures = client.map(
    process_expanding, EXPANDING, files=dayl_files, offsets=offsets, url=url, storage_options=storage_options
)

In [35]:
_ = wait(futures + extra_futures)

## Conslidate Metadata

As usual, we want to consolidate the zarr metadata.

In [17]:
# Stage 4: Finalize
zarr.consolidate_metadata(store)

<zarr.hierarchy.Group '/'>

## Reading

At this point, we should have a cloud-optimized, analysis ready dataset that can be read by Zarr or Xarray.

In [4]:
import xarray as xr
import fsspec

Note: you'll want a version of adlfs with these two pull requests included

- https://github.com/dask/adlfs/pull/155
- https://github.com/dask/adlfs/pull/157

In [3]:
import fsspec
import xarray as xr

store = fsspec.get_mapper(
    "az://daymet-zarr/daymet_v3_hawaii.zarr",
    account_name="daymeteuwest",
)
%time ds = xr.open_zarr(store, consolidated=True)

CPU times: user 187 ms, sys: 31.3 ms, total: 219 ms
Wall time: 555 ms


In [4]:
ds

Unnamed: 0,Array,Chunk
Bytes,663.42 kB,663.42 kB
Shape,"(584, 284)","(584, 284)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 663.42 kB 663.42 kB Shape (584, 284) (584, 284) Count 2 Tasks 1 Chunks Type float32 numpy.ndarray",284  584,

Unnamed: 0,Array,Chunk
Bytes,663.42 kB,663.42 kB
Shape,"(584, 284)","(584, 284)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,663.42 kB,663.42 kB
Shape,"(584, 284)","(584, 284)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 663.42 kB 663.42 kB Shape (584, 284) (584, 284) Count 2 Tasks 1 Chunks Type float32 numpy.ndarray",284  584,

Unnamed: 0,Array,Chunk
Bytes,663.42 kB,663.42 kB
Shape,"(584, 284)","(584, 284)"
Count,2 Tasks,1 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.69 GB 242.15 MB Shape (14600, 584, 284) (365, 584, 284) Count 41 Tasks 40 Chunks Type float32 numpy.ndarray",284  584  14600,

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.69 GB 242.15 MB Shape (14600, 584, 284) (365, 584, 284) Count 41 Tasks 40 Chunks Type float32 numpy.ndarray",284  584  14600,

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.69 GB 242.15 MB Shape (14600, 584, 284) (365, 584, 284) Count 41 Tasks 40 Chunks Type float32 numpy.ndarray",284  584  14600,

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.69 GB 242.15 MB Shape (14600, 584, 284) (365, 584, 284) Count 41 Tasks 40 Chunks Type float32 numpy.ndarray",284  584  14600,

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,233.60 kB,233.60 kB
Shape,"(14600, 2)","(14600, 2)"
Count,2 Tasks,1 Chunks
Type,datetime64[ns],numpy.ndarray
"Array Chunk Bytes 233.60 kB 233.60 kB Shape (14600, 2) (14600, 2) Count 2 Tasks 1 Chunks Type datetime64[ns] numpy.ndarray",2  14600,

Unnamed: 0,Array,Chunk
Bytes,233.60 kB,233.60 kB
Shape,"(14600, 2)","(14600, 2)"
Count,2 Tasks,1 Chunks
Type,datetime64[ns],numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.69 GB 242.15 MB Shape (14600, 584, 284) (365, 584, 284) Count 41 Tasks 40 Chunks Type float32 numpy.ndarray",284  584  14600,

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.69 GB 242.15 MB Shape (14600, 584, 284) (365, 584, 284) Count 41 Tasks 40 Chunks Type float32 numpy.ndarray",284  584  14600,

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 9.69 GB 242.15 MB Shape (14600, 584, 284) (365, 584, 284) Count 41 Tasks 40 Chunks Type float32 numpy.ndarray",284  584  14600,

Unnamed: 0,Array,Chunk
Bytes,9.69 GB,242.15 MB
Shape,"(14600, 584, 284)","(365, 584, 284)"
Count,41 Tasks,40 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,29.20 kB,29.20 kB
Shape,"(14600,)","(14600,)"
Count,2 Tasks,1 Chunks
Type,int16,numpy.ndarray
"Array Chunk Bytes 29.20 kB 29.20 kB Shape (14600,) (14600,) Count 2 Tasks 1 Chunks Type int16 numpy.ndarray",14600  1,

Unnamed: 0,Array,Chunk
Bytes,29.20 kB,29.20 kB
Shape,"(14600,)","(14600,)"
Count,2 Tasks,1 Chunks
Type,int16,numpy.ndarray


## Open Questions

This is a lot of code. Some of it can certainly be abstracted and put in a library (pangeo-forge?). Some of it
is specific to the dataset. But hopefully we can establish some best practices.

1. What parts of this workflow can be made generic to other datasets?
2. Should coordinates be chunked? Does it matter whether they're dimension coordinates or non-dimension coordinates?
3. I've completely thrown out xarray from the processing. What pieces can we keep?