# HDF Reference Recipe for CMIP6

This example illustrates how to create a Reference Recipe using CMIP6 data.
This recipe does not actually copy the original source data.
Instead, it generates metadata files which reference and index the original data, allowing it to be accessed more efficiently. It does this by using the Python library, [Kerchunk](https://fsspec.github.io/kerchunk/) under the hood. Pangeo-Forge is acting as a runner for Kerchunk to generate reference files. 

As the input for this recipe, we will use some CMIP6 NetCDF4 files provided by ESGF and stored in Amazon S3 ([CMIP6 AWS Open Data Page](https://registry.opendata.aws/cmip6/)).
Many CMIP6 simulations spread their outputs over many HDF5/ NetCDF4 files, in order to limit the individual file size.
This can be inconvenient for analysis.
In this recipe, we will see how to virtually concatenate many HDF5 files into one big virtual Zarr dataset.

## Define the FilePattern

Let's pick a random dataset: ocean model output from the GFDL ocean model from the [OMIP](https://www.wcrp-climate.org/modelling-wgcm-mip-catalogue/cmip6-endorsed-mips-article/1063-modelling-cmip6-omip) experiments.

In [1]:
import s3fs
fs = s3fs.S3FileSystem(anon=True)
base_path = 's3://esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/'
all_paths = fs.ls(base_path)
all_paths

['esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_170801-172712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_172801-174712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_174801-176712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_176801-178712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_178801-180712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_180801-182712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_182801-184712.nc',
 'esgf-world/

We see there are 15 individual NetCDF files. Let's time how long it takes to open and display one of them using Xarray.

```{note}
The argument `decode_coords='all'` helps Xarray promote all of the `_bnds` variables to coordinates (rather than data variables).
```

In [2]:
import xarray as xr

In [3]:
%%time
ds_orig = xr.open_dataset(fs.open(all_paths[0]), engine='h5netcdf', chunks={}, decode_coords='all')
ds_orig



CPU times: user 713 ms, sys: 324 ms, total: 1.04 s
Wall time: 4.41 s


Unnamed: 0,Array,Chunk
Bytes,2.81 kiB,2.81 kiB
Shape,"(180, 2)","(180, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 2.81 kiB 2.81 kiB Shape (180, 2) (180, 2) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",2  180,

Unnamed: 0,Array,Chunk
Bytes,2.81 kiB,2.81 kiB
Shape,"(180, 2)","(180, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,5.62 kiB,5.62 kiB
Shape,"(360, 2)","(360, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 5.62 kiB 5.62 kiB Shape (360, 2) (360, 2) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",2  360,

Unnamed: 0,Array,Chunk
Bytes,5.62 kiB,5.62 kiB
Shape,"(360, 2)","(360, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,3.75 kiB,3.75 kiB
Shape,"(240, 2)","(240, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,object numpy.ndarray,object numpy.ndarray
"Array Chunk Bytes 3.75 kiB 3.75 kiB Shape (240, 2) (240, 2) Dask graph 1 chunks in 2 graph layers Data type object numpy.ndarray",2  240,

Unnamed: 0,Array,Chunk
Bytes,3.75 kiB,3.75 kiB
Shape,"(240, 2)","(240, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,object numpy.ndarray,object numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,560 B,560 B
Shape,"(35, 2)","(35, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 560 B 560 B Shape (35, 2) (35, 2) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",2  35,

Unnamed: 0,Array,Chunk
Bytes,560 B,560 B
Shape,"(35, 2)","(35, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.03 GiB,2.03 GiB
Shape,"(240, 35, 180, 360)","(240, 35, 180, 360)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 2.03 GiB 2.03 GiB Shape (240, 35, 180, 360) (240, 35, 180, 360) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",240  1  360  180  35,

Unnamed: 0,Array,Chunk
Bytes,2.03 GiB,2.03 GiB
Shape,"(240, 35, 180, 360)","(240, 35, 180, 360)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


It took ~5 seconds to open this one dataset. So it would take over a minute for us to open every file.

As a first step in our recipe, we create a `File Pattern <../../recipe_user_guide/file_patterns>` to represent the input files.
In this case, since we already have a list of inputs, we just use the `pattern_from_file_sequence` convenience function.

In [4]:
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
pattern = pattern_from_file_sequence(['s3://' + path for path in all_paths], 'time')
pattern

<FilePattern {'time': 15}>

## Write the Recipe

Once we have our `FilePattern`, describing our input file paths, we can construct out `beam` pipeline. A beam pipeline is a chained together list of (Apache Beam transformations)[https://beam.apache.org/documentation/programming-guide/#transforms].


### Specify where our target data should be written
Here, we are creating a temporary directory to store the written reference files. If we wanted these reference files to persist locally, we would want to specify another file path. 


In [5]:
import os
from tempfile import TemporaryDirectory
td = TemporaryDirectory()
target_root = td.name
store_name = "output.json"
target_store = os.path.join(target_root, store_name)

## Construct a Pipeline
Next, we will construct a beam pipeline. This should look similar to the other standard Zarr examples, but will involve a few different transforms. 

In [6]:
import apache_beam as beam
from pangeo_forge_recipes.transforms import OpenWithKerchunk, CombineReferences, WriteCombinedReference

store_name = "cmip6_reference"
transforms = (
    # Create a beam PCollection from our input file pattern
    beam.Create(pattern.items())
    # Open with Kerchunk and create references for each file
    | OpenWithKerchunk(file_type=pattern.file_type, storage_options={'anon':True})
    # Use Kerchunk's `MultiZarrToZarr` functionality to combine the reference files into a single
    # reference file. *Note*: Setting the correct contact_dims and identical_dims is important.
    | CombineReferences(
        concat_dims=["time"], 
        identical_dims=["lat", "lat_bnds", "lon", "lon_bnds", "lev_bnds", "lev"],
        mzz_kwargs = {"remote_protocol": "s3"},
    )
    # Write the combined Kerchunk reference to file.
    | WriteCombinedReference(target_root=target_root, store_name=store_name)
)

## Execute the Recipe

In [7]:
with beam.Pipeline() as p:
    p | transforms



## Examine the Result

Here we are creating an fsspec mapper of the reference file and then passing it to Xarray's `open_dataset` to be read as if it were a Zarr store.

In [8]:
import fsspec
import xarray as xr
full_path = os.path.join(target_root, store_name, "reference.json")
mapper = fsspec.get_mapper("reference://", fo=full_path, remote_protocol="s3",)
ds = xr.open_dataset(mapper, engine="zarr", decode_coords='all', backend_kwargs={"consolidated": False})


  ds = xr.open_dataset(mapper, engine="zarr", decode_coords='all', backend_kwargs={"consolidated": False})


In [9]:
ds

## Make a Map

In [10]:
ds_ann = ds.resample(time='A').mean()
sst_diff = ds_ann.thetao.isel(time=-1, lev=0) - ds_ann.thetao.isel(time=0, lev=0)
sst_diff.plot()