In [None]:
# Dask imports

from dask_jobqueue import PBSCluster
from dask.distributed import Client

In [None]:
# Dask cluster config

cluster = PBSCluster(
    # Basic job directives
    job_name        = 'hackathon-rechunk',
    queue           = 'casper',
    walltime        = '120:00',
    # Make sure you change the project code if running this notebook!!
    account         = 'UCSG0002',
    log_directory   = 'dask-logs',
    # These settings impact the resources assigned to the job
    cores           = 1,
    memory          = '10GiB',
    resource_spec   = 'select=1:ncpus=1:mem=10GB',
    # These settings define the resources assigned to a worker
    processes       = 1,
    # This controls where Dask will write data to disk if memory is exhausted
    local_directory = '/local_scratch/pbs.$PBS_JOBID/dask/spill',
    # This specifies which network interface the cluster will use
    interface       = 'ext'
)

In [None]:
# Create the client to load the Dashboard
client = Client(cluster)

# Display the client repr
client

In [None]:
# Scale and wait for workers

cluster.scale(40)
client.wait_for_workers(40)

In [None]:
import xarray as xr
import pandas as pd
import dask

# Read in files
ds = xr.open_mfdataset('/glade/derecho/scratch/ksha/CREDIT_data/ERA5_mlevel_arXiv/SixHourly_y_TOTAL_202*.zarr',
                       engine = 'zarr',
                       consolidated=True,
                       data_vars='minimal',
                       coords='minimal',
                       compat='override',
                       parallel=True)

# Rechunk the data
ds = ds.chunk({"time": 1, "level": 1, "latitude": 640, "longitude": 1280})

# Remove the old encoding info and set compression to none
for k, v in ds.variables.items():
    v.encoding['compressors'] = None
    del v.encoding['chunks']
    del v.encoding['preferred_chunks']

# Remove the old encoding info (default compression will then apply when written to Zarr)
# for k, v in ds.variables.items():
#     del v.encoding['compressors']
#     del v.encoding['chunks']
#     del v.encoding['preferred_chunks']


In [None]:
# Some not particularly polished data wrangling to combine the arrays
# Skip this to write separate arrays

full_variables = ['Q', 'T', 'U', 'V']
single_level_variables = ['Q500', 'T500', 'U500', 'V500', 'Z500', 't2m', 'SP']

ds1 = xr.concat([ds[x] for x in single_level_variables],
                pd.Index(single_level_variables,
                         name='channel')).transpose('time',
                                                    'channel',
                                                    'latitude',
                                                    'longitude')

c = xr.concat([ds[x] for x in full_variables], dim=full_variables)

s = c.stack(channel = ('concat_dim','level')).transpose('time',
                                                        'channel',
                                                        'latitude',
                                                        'longitude').reset_index('channel')

s['channel'] = s['concat_dim'] + s['level'].astype('str')

ds2 = s.drop_vars(['level', 'concat_dim'])

combined = xr.concat([ds1, ds2], dim='channel').rename('combined')

combined.encoding

In [None]:
# Write to Zarr v3 with consolidated metdata

combined.to_zarr('/glade/derecho/scratch/katelynw/era5/rechunked_stacked_uncompressed_test.zarr',
                 zarr_version=3,
                 consolidated=True)

In [None]:
# Shutdown the cluster

client.shutdown()

In [None]:
# Open up the new dataset and check the encoding

ds_new = xr.open_dataset('/glade/derecho/scratch/katelynw/era5/rechunked_stacked_uncompressed_test.zarr')

ds_new.combined.encoding