# CREDIT catched dataset

In [1]:
import os
import sys
import zarr
import yaml
import time
import numpy as np
import xarray as xr
from glob import glob

In [2]:
sys.path.insert(0, os.path.realpath('../libs/'))
import preprocess_utils as pu

## compression and chunking settings

In [6]:
config_name = os.path.realpath('data_config_6h.yml')

with open(config_name, 'r') as stream:
    conf = yaml.safe_load(stream)

# Get zscore values
zscore_mean = xr.open_dataset(conf['cache']['mean_loc'])
zscore_std = xr.open_dataset(conf['cache']['std_loc'])

# Get variable names
varnames = list(conf['cache'].keys())
varnames = varnames[:-5] # remove save_loc and others
varname_surf = list(set(varnames) - set(['U', 'V', 'T', 'Q']))

years_range = conf['cache']['years_range']
years = np.arange(years_range[0], years_range[1])
year = years[0]

In [13]:
compress = zarr.Blosc(cname='zstd', clevel=1, shuffle=zarr.Blosc.SHUFFLE, blocksize=0)

chunk_size_3d = dict(chunks=(conf['zarr_opt']['chunk_size_3d']['time'],
                                 conf['zarr_opt']['chunk_size_3d']['latitude'],
                                 conf['zarr_opt']['chunk_size_3d']['longitude']))

chunk_size_4d = dict(chunks=(conf['zarr_opt']['chunk_size_4d']['time'],
                                 conf['zarr_opt']['chunk_size_4d']['level'],
                                 conf['zarr_opt']['chunk_size_4d']['latitude'],
                                 conf['zarr_opt']['chunk_size_4d']['longitude']))

dict_encoding = {}

for i_var, var in enumerate(varnames):
    if var in varname_surf:
        dict_encoding[var] = {'compressor': compress, **chunk_size_3d}
    else:
        dict_encoding[var] = {'compressor': compress, **chunk_size_4d}

In [14]:
dict_encoding

{'U': {'compressor': Blosc(cname='zstd', clevel=1, shuffle=SHUFFLE, blocksize=0),
  'chunks': (10, 2, 640, 1280)},
 'V': {'compressor': Blosc(cname='zstd', clevel=1, shuffle=SHUFFLE, blocksize=0),
  'chunks': (10, 2, 640, 1280)},
 'T': {'compressor': Blosc(cname='zstd', clevel=1, shuffle=SHUFFLE, blocksize=0),
  'chunks': (10, 2, 640, 1280)},
 'Q': {'compressor': Blosc(cname='zstd', clevel=1, shuffle=SHUFFLE, blocksize=0),
  'chunks': (10, 2, 640, 1280)},
 'SP': {'compressor': Blosc(cname='zstd', clevel=1, shuffle=SHUFFLE, blocksize=0),
  'chunks': (10, 640, 1280)},
 't2m': {'compressor': Blosc(cname='zstd', clevel=1, shuffle=SHUFFLE, blocksize=0),
  'chunks': (10, 640, 1280)},
 'V500': {'compressor': Blosc(cname='zstd', clevel=1, shuffle=SHUFFLE, blocksize=0),
  'chunks': (10, 640, 1280)},
 'U500': {'compressor': Blosc(cname='zstd', clevel=1, shuffle=SHUFFLE, blocksize=0),
  'chunks': (10, 640, 1280)},
 'T500': {'compressor': Blosc(cname='zstd', clevel=1, shuffle=SHUFFLE, blocksize=0)

## Cached files in zarr format

In [8]:
for i_var, var in enumerate(varnames):
    
    filenames = glob(conf['cache'][var])
    fn = [fn for fn in filenames if str(year) in fn][0]

    ds_original = pu.get_forward_data(fn)
    ds_var = ds_original[var]

    ds_zscore_var = (ds_var - zscore_mean[var])/zscore_std[var]
    ds_zscore_var = ds_zscore_var.to_dataset()
    
    if i_var == 0:
        ds_base = ds_zscore_var
    else:
        ds_base = ds_base.merge(ds_zscore_var)

    # chunk data
    if var in varname_surf:
        ds_base[var] = ds_base[var].chunk(conf['zarr_opt']['chunk_size_3d'])
    else:
        ds_base[var] = ds_base[var].chunk(conf['zarr_opt']['chunk_size_4d'])

# save_name = xxxx
# ds_base.to_zarr(save_name, mode="w", consolidated=True, compute=True, encoding=dict_encoding)

## Cached files in netCDF4 format

In [11]:
compress = dict(zlib=True, complevel=1) # shuffle=True 

chunk_size_3d = dict(chunksizes=(conf['zarr_opt']['chunk_size_3d']['time'],
                                 conf['zarr_opt']['chunk_size_3d']['latitude'],
                                 conf['zarr_opt']['chunk_size_3d']['longitude']))

chunk_size_4d = dict(chunksizes=(conf['zarr_opt']['chunk_size_4d']['time'],
                                 conf['zarr_opt']['chunk_size_4d']['level'],
                                 conf['zarr_opt']['chunk_size_4d']['latitude'],
                                 conf['zarr_opt']['chunk_size_4d']['longitude']))

dict_encoding = {}

for i_var, var in enumerate(varnames):
    if var in varname_surf:
        dict_encoding[var] = {**compress, **chunk_size_3d}
    else:
        dict_encoding[var] = {**compress, **chunk_size_4d}

In [12]:
dict_encoding

{'U': {'zlib': True, 'complevel': 1, 'chunksizes': (10, 2, 640, 1280)},
 'V': {'zlib': True, 'complevel': 1, 'chunksizes': (10, 2, 640, 1280)},
 'T': {'zlib': True, 'complevel': 1, 'chunksizes': (10, 2, 640, 1280)},
 'Q': {'zlib': True, 'complevel': 1, 'chunksizes': (10, 2, 640, 1280)},
 'SP': {'zlib': True, 'complevel': 1, 'chunksizes': (10, 640, 1280)},
 't2m': {'zlib': True, 'complevel': 1, 'chunksizes': (10, 640, 1280)},
 'V500': {'zlib': True, 'complevel': 1, 'chunksizes': (10, 640, 1280)},
 'U500': {'zlib': True, 'complevel': 1, 'chunksizes': (10, 640, 1280)},
 'T500': {'zlib': True, 'complevel': 1, 'chunksizes': (10, 640, 1280)},
 'Z500': {'zlib': True, 'complevel': 1, 'chunksizes': (10, 640, 1280)},
 'Q500': {'zlib': True, 'complevel': 1, 'chunksizes': (10, 640, 1280)},
 'tsi': {'zlib': True, 'complevel': 1, 'chunksizes': (10, 640, 1280)}}

In [None]:
# Get zscore values
zscore_mean = xr.open_dataset(conf['cache']['mean_loc'])
zscore_std = xr.open_dataset(conf['cache']['std_loc'])

# Get variable names
varnames = list(conf['cache'].keys())
varnames = varnames[:-5] # remove save_loc and others
varname_surf = list(set(varnames) - set(['U', 'V', 'T', 'Q']))

years_range = conf['cache']['years_range']
years = np.arange(years_range[0], years_range[1])
year = years[0]

for i_var, var in enumerate(varnames):
    
    filenames = glob(conf['cache'][var])
    fn = [fn for fn in filenames if str(year) in fn][0]

    ds_original = pu.get_forward_data(fn)
    ds_var = ds_original[var]

    ds_zscore_var = (ds_var - zscore_mean[var])/zscore_std[var]
    ds_zscore_var = ds_zscore_var.to_dataset()
    
    if i_var == 0:
        ds_base = ds_zscore_var
    else:
        ds_base = ds_base.merge(ds_zscore_var)

# save_name = xxxx
# ds_base.to_netcdf(save_name, format='NETCDF4', encoding=dict_encoding)

## check file I/O correctness

In [14]:
test = xr.open_zarr('/glade/derecho/scratch/ksha/CREDIT_data/arXiv_cached/cache_arXiv_6h_1979.zarr')
test_tsi = test['tsi'].isel(time=888)
real_tsi = ds_base['tsi'].isel(time=888)
np.sum(np.abs(np.array(test_tsi) - np.array(real_tsi)))

0.0

## Save using Dask workers 

In [None]:
# from dask.distributed import Client

In [None]:
save_name = conf['cache']['save_loc'] + conf['cache']['prefix'] + '_{}.zarr'.format(year)

print('Save to {}'.format(save_name))

# save to zarr using dask client
with Client(n_workers=100, threads_per_worker=2) as client:
    ds_base.to_zarr(save_name,
                    mode="w",
                    consolidated=True,
                    compute=True)