# NetCDF Zarr Sequential Recipe
### NOTE - all files and catalogs here are temporary and not ready for general use!!!

- Can go to https://aws-cloudnode.esgfed.org/thredds/fileServer/CMIP6 to get a sample netcdf file
- Take home messages: 
    - there are very few datasets which have netcdf files with constant number of time slices in each file
    - almost all datasets need a working `cftime` environment (the same notebook works with our usual `open_mfdataset` call
    - pre-processing is not available in open_dataset
    - most individual netcdf files are already too big for one chunk - need to be split up

- tests[0]: 
    - This is the only test case which works with the basic recipe
    - I can drop the height coordinate using `xarray_open_kwargs` 
    - can't do pre-processing to make the *_bnds Data Variables into Coordinates since `open_dataset` does not take a pre-processing argument
- tests[1]: 
    - The last netcdf file has fewer time slices 
    - each netcdf file is 1.5G - so chunking by whole netcdf files is not great
    - the single chunk reports that it is the size of the whole dataset (ds_chunk.nbytes = 13449.395536 MB)?
    - cftime is not working properly - issue with 'hours since 1850-01-16 12:00:00.000000' with "calendar 'noleap'"
- tests[2]
    - The last netcdf file has fewer time slices (does it work if we drop the last file?)
    - each netcdf file is about 600M - not great
    - unable to decode time units
- tests[3]
    - each netcdf file is about 2.5G - not great
    - unable to decode time units 'hours since 0001-01-16 12:00:00.000000' with "calendar 'noleap'"

In [1]:
import pandas as pd
import xarray as xr
from cftime import DatetimeNoLeap

In [2]:
# the versions are fine for cftime:
import zarr
import cftime
xr.__version__, zarr.__version__, cftime.__version__, pd.__version__

('0.16.2', '2.6.1', '1.3.1', '1.2.1')

In [3]:
df_s3 = pd.read_csv('http://fletcher.ldeo.columbia.edu/catalogs/s3-world.csv.gz')
tests = ['CMIP/MPI-M/MPI-ESM1-2-HR/historical/r1i1p1f1/Amon/tas/gn/v20190710/'
        ,'CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Omon/tos/gn/v20180701/'
        ,'CMIP/NASA-GISS/GISS-E2-1-G/historical/r1i1p1f1/Amon/ua/gn/v20180827/'
        ,'CMIP/THU/CIESM/piControl/r1i1p1f1/Amon/zg/gr/v20191202/'
        ,'CMIP/CNRM-CERFACS/CNRM-ESM2-1/piControl/r1i1p1f2/Amon/co2/gr/v20181115/'
        ]
files_per_chunk = [10,1,1,1,1]

In [4]:
# pick a dataset to save as zarr:
test_number = 4
vstore = tests[test_number]
inputs_per_chunk = files_per_chunk[test_number]

df_vstore = df_s3[df_s3.vstore == vstore]
var = df_vstore.variable_id.unique()[0]
# make sure we are looking at the last available version:
last_version = sorted(df_vstore.version.unique())[-1]
dze = df_vstore[df_vstore.version == last_version].reset_index(drop=True)

print(f"number of files = {len(dze)}")
dze.ncfile.values

number of files = 5


array(['co2_Amon_CNRM-ESM2-1_piControl_r1i1p1f2_gr_185001-194912.nc',
       'co2_Amon_CNRM-ESM2-1_piControl_r1i1p1f2_gr_195001-204912.nc',
       'co2_Amon_CNRM-ESM2-1_piControl_r1i1p1f2_gr_205001-214912.nc',
       'co2_Amon_CNRM-ESM2-1_piControl_r1i1p1f2_gr_215001-224912.nc',
       'co2_Amon_CNRM-ESM2-1_piControl_r1i1p1f2_gr_225001-234912.nc'],
      dtype=object)

In [5]:
# Choose a Recipe:
from pangeo_forge.recipe import NetCDFtoZarrSequentialRecipe

In [6]:
import tempfile
from fsspec.implementations.local import LocalFileSystem
from pangeo_forge.storage import FSSpecTarget, CacheFSSpecTarget

fs_local = LocalFileSystem()

cache_dir = tempfile.TemporaryDirectory()
cache_target = CacheFSSpecTarget(fs_local, cache_dir.name)
print(cache_target)

target_dir = tempfile.TemporaryDirectory()
target = FSSpecTarget(fs_local, target_dir.name)
print(target)

CacheFSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7f12c2edaa10>, root_path='/tmp/tmpuzs0xw63')
FSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7f12c2edaa10>, root_path='/tmp/tmpasix04o2')


In [7]:
dze.url[0]

'https://aws-cloudnode.esgfed.org/thredds/fileServer/CMIP6/CMIP/CNRM-CERFACS/CNRM-ESM2-1/piControl/r1i1p1f2/Amon/co2/gr/v20181115/co2_Amon_CNRM-ESM2-1_piControl_r1i1p1f2_gr_185001-194912.nc'

In [8]:
# Look at the first file from the dataset, using the OPeNDAP url:

url = dze.url[0].replace('fileServer','dodsC')
print(url)
ds = xr.open_dataset(url)
ntimes = len(ds.time)
print(f"number of time slices in first file = {ntimes}")
print(f"Dataset size is {ds.nbytes/1e6} MB")  # Too large - but don't know how to split
ds.coords, ds.data_vars

https://aws-cloudnode.esgfed.org/thredds/dodsC/CMIP6/CMIP/CNRM-CERFACS/CNRM-ESM2-1/piControl/r1i1p1f2/Amon/co2/gr/v20181115/co2_Amon_CNRM-ESM2-1_piControl_r1i1p1f2_gr_185001-194912.nc
number of time slices in first file = 1200
Dataset size is 2988.473548 MB


(Coordinates:
   * lat      (lat) float64 -88.93 -87.54 -86.14 -84.74 ... 86.14 87.54 88.93
   * lon      (lon) float64 0.0 1.406 2.812 4.219 ... 354.4 355.8 357.2 358.6
   * plev     (plev) float32 1e+05 9.25e+04 8.5e+04 7e+04 ... 1e+03 500.0 100.0
   * time     (time) datetime64[ns] 1850-01-16T12:00:00 ... 1949-12-16T12:00:00,
 Data variables:
     time_bounds  (time, axis_nbounds) datetime64[ns] ...
     co2          (time, plev, lat, lon) float32 ...)

In [9]:
# Check number of time slices in first and last netcdf files to make sure they are the same
input_urls = dze.url.tolist()

nc_bads = []
for index, row in dze.iterrows():
    url = row.url.replace('fileServer','dodsC') # To open here we need the dods version
    ncfile = row.ncfile

    if index in [0,len(dze)-1]:
        try:
            ds = xr.open_dataset(url)
            print(f"{index}:{ncfile} dataset size={ds.nbytes/1e6} MB, number of time slices = {len(ds.time)}")
        except:
            print(f"cannot open {url}")
            nc_bads += [url]

0:co2_Amon_CNRM-ESM2-1_piControl_r1i1p1f2_gr_185001-194912.nc dataset size=2988.473548 MB, number of time slices = 1200
4:co2_Amon_CNRM-ESM2-1_piControl_r1i1p1f2_gr_225001-234912.nc dataset size=2988.473548 MB, number of time slices = 1200


  dtype = _decode_cf_datetime_dtype(data, units, calendar, self.use_cftime)
  dtype = _decode_cf_datetime_dtype(data, units, calendar, self.use_cftime)
  return array(a, dtype, copy=False, order=order)


In [10]:
def set_bnds_as_coords(ds):
    ds = set_bnds_as_coords(ds)
    return ds

In [11]:
if test_number == 1:
    recipe = NetCDFtoZarrSequentialRecipe(
        input_urls=input_urls,
        #xarray_open_kwargs={'preprocess':set_bnds_as_coords},
        xarray_open_kwargs={'drop_variables':'height'},
        sequence_dim="time",
        inputs_per_chunk=inputs_per_chunk,
        nitems_per_input=ntimes
    )
else:
    recipe = NetCDFtoZarrSequentialRecipe(
        input_urls=input_urls,
        sequence_dim="time",
        xarray_open_kwargs={'use_cftime':True},  #, 'decode_coords':'all'},
        #xarray_open_kwargs={'decode_times':False},  # this makes a problem with 365_day calendar in recipe.prepare_target()
        #xarray_open_kwargs={'use_cftime':True,'decode_times':False}, # recipe.prepare_target() fails when using decode_times
        inputs_per_chunk=inputs_per_chunk,
        nitems_per_input=ntimes
    )
recipe

NetCDFtoZarrSequentialRecipe(sequence_dim='time', inputs_per_chunk=1, nitems_per_input=1200, target=<pangeo_forge.storage.UninitializedTarget object at 0x7f12c244da10>, input_cache=<pangeo_forge.storage.UninitializedTarget object at 0x7f12c244d9d0>, require_cache=True, consolidate_zarr=True, xarray_open_kwargs={'use_cftime': True}, xarray_concat_kwargs={}, delete_input_encoding=True)

In [12]:
# set the cache and target location
recipe.input_cache = cache_target
recipe.target = target
recipe

NetCDFtoZarrSequentialRecipe(sequence_dim='time', inputs_per_chunk=1, nitems_per_input=1200, target=FSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7f12c2edaa10>, root_path='/tmp/tmpasix04o2'), input_cache=CacheFSSpecTarget(fs=<fsspec.implementations.local.LocalFileSystem object at 0x7f12c2edaa10>, root_path='/tmp/tmpuzs0xw63'), require_cache=True, consolidate_zarr=True, xarray_open_kwargs={'use_cftime': True}, xarray_concat_kwargs={}, delete_input_encoding=True)

In [13]:
# Must cache the first chunk as well as any others you want to look at
all_chunks = list(recipe.iter_chunks())

for input_file in recipe.inputs_for_chunk(all_chunks[0]):
    print(input_file)
    recipe.cache_input(input_file)

https://aws-cloudnode.esgfed.org/thredds/fileServer/CMIP6/CMIP/CNRM-CERFACS/CNRM-ESM2-1/piControl/r1i1p1f2/Amon/co2/gr/v20181115/co2_Amon_CNRM-ESM2-1_piControl_r1i1p1f2_gr_185001-194912.nc


TimeoutError: 

In [None]:
# put basic info in target directory
recipe.prepare_target()  # This needs to be done on the FIRST chunk

In [None]:
# Now lets look at the last one:
for input_file in recipe.inputs_for_chunk(all_chunks[-1]):
    print(input_file)
    recipe.cache_input(input_file)

In [None]:
ds_chunk = recipe.open_chunk(all_chunks[-1])   
print(f'Total chunk size: {ds_chunk.nbytes / 1e6} MB')  

In [None]:
# store first chunk
zgroup = zarr.open(target_dir.name)
print(zgroup.tree())

In [None]:
from cftime import DatetimeNoLeap
recipe.store_chunk(all_chunks[0])
zgroup[var].info

In [None]:
recipe.store_chunk(all_chunks[-1])



In [None]:
# check first chunk
ds = xr.open_zarr(target_dir.name)
ds[var][0,0].plot()

In [None]:
hurl = recipe.inputs_for_chunk(all_chunks[-1])[0]
url = hurl.replace('fileServer','dodsC')
print('source URL = ',url)
print('open netcdf dataset')
ds = xr.open_dataset(url)
print(f"first time = {ds.time[0]}")

zbdir = 'zarr-tmp'
print('save zarr dataset')
ds.to_zarr(zbdir, consolidated=True, mode='w')
ds2 = xr.open_zarr(zbdir)
print(f"first time = {ds2.time[0]}")