# Merge ERA5 1 deg model level and single level variables

In [1]:
import os
import sys
import yaml
import dask
import zarr
import numpy as np
import xarray as xr
import pandas as pd
from glob import glob

import calendar
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

sys.path.insert(0, os.path.realpath('../libs/'))
import verif_utils as vu

In [2]:
base_dir_mlevel = '/glade/derecho/scratch/ksha/CREDIT_data/ERA5_mlevel_1deg/'
base_dir_plevel = '/glade/derecho/scratch/ksha/CREDIT_data/ERA5_plevel_1deg/'
base_dir_output = '/glade/derecho/scratch/ksha/CREDIT_data/ERA5_mlevel_1deg_stage1/'

In [3]:
fn_fmt_mlevel = base_dir_mlevel + 'upper_air/ERA5_mlevel_1deg_6h_{}_conserve.zarr'
fn_fmt_plevel = base_dir_plevel + 'all_in_one/ERA5_plevel_1deg_6h_{}_conserve.zarr'
fn_fmt_static = base_dir_plevel + 'static/ERA5_plevel_1deg_6h_conserve_static.zarr'
fn_mean_std = '/glade/derecho/scratch/ksha/CREDIT_data/mean_6h_1979_2018_16lev_0.25deg.nc'

In [4]:
mlevel_inds = [ 10,  30,  40,  50,  60,  70,  80,  90,  95, 100, 105, 110, 120, 130, 136, 137]
varnames_mlevel = ['specific_humidity', 'temperature', 'u_component_of_wind', 'v_component_of_wind', 'SP', 'VAR_2T']

var_mlevel = {
    'specific_humidity': mlevel_inds,
    'temperature': mlevel_inds,
    'u_component_of_wind': mlevel_inds,
    'v_component_of_wind': mlevel_inds
}

var_plevel = {
    'SP': None,
    'VAR_2T': None,
    'toa_incident_solar_radiation': None,
    'U': [500,], 
    'V': [500,], 
    'T': [500,], 
    'Q': [500,],
    'Z': [500,]
}

varnames_500 = ['U', 'V', 'T', 'Q', 'Z']

var_rename = {
    'U': 'U500',
    'V': 'V500',
    'T': 'T500',
    'Q': 'Q500',
    'Z': 'Z500',
    'specific_humidity': 'Q',
    'temperature': 'T',
    'u_component_of_wind': 'U',
    'v_component_of_wind': 'V',
    'VAR_2T': 't2m',
    'toa_incident_solar_radiation': 'tsi'}

chunk_size_3d = {
    'time': 10,
    'latitude': 181,
    'longitude': 360
}

chunk_size_4d = {
    'time': 10,
    'level': 16,
    'latitude': 181,
    'longitude': 360
}

encode_size_3d = dict(
    chunks=(
        chunk_size_3d['time'],
        chunk_size_3d['latitude'],
        chunk_size_3d['longitude']
    )
)

encode_size_4d = dict(
    chunks=(
        chunk_size_4d['time'],
        chunk_size_4d['level'],
        chunk_size_4d['latitude'],
        chunk_size_4d['longitude']
    )
)

### Main routine

In [7]:
ds_mean_std = xr.open_dataset(fn_mean_std)

for year in np.arange(1979, 2024):
    print(year)
    ds_mlevel = xr.open_zarr(fn_fmt_mlevel.format(year))
    ds_plevel = xr.open_zarr(fn_fmt_plevel.format(year))
    
    ds_mlevel_sub = vu.ds_subset_everything(ds_mlevel, var_mlevel)
    ds_plevel_sub = vu.ds_subset_everything(ds_plevel, var_plevel)
    for var in varnames_500:
        ds_plevel_sub[var] = ds_plevel_sub[var].squeeze(dim="level")
    
    ds_plevel_sub = ds_plevel_sub.drop_vars(['level',])
    ds_merge = xr.merge([ds_mlevel_sub, ds_plevel_sub])
    ds_merge = ds_merge.rename(var_rename)
    
    ds_merge['level'] = ds_mean_std['level']
    
    varnames = list(ds_merge.keys())
    varname_4D = ['Q', 'T', 'U', 'V']
    
    for i_var, var in enumerate(varnames):
        if var in varname_4D:
            ds_merge[var] = ds_merge[var].chunk(chunk_size_4d)
        else:
            ds_merge[var] = ds_merge[var].chunk(chunk_size_3d)
    
    dict_encoding = {}
    
    compress = zarr.Blosc(cname='zstd', clevel=1, shuffle=zarr.Blosc.SHUFFLE, blocksize=0)
    
    for i_var, var in enumerate(varnames):
        if var in varname_4D:
            dict_encoding[var] = {'compressor': compress, **encode_size_4d}
        else:
            dict_encoding[var] = {'compressor': compress, **encode_size_3d}
    
    save_name = base_dir_output + 'all_in_one/ERA5_mlevel_1deg_6h_lev16_{}.zarr'.format(year)
    #ds_merge.to_zarr(save_name, mode='w', consolidated=True, compute=True, encoding=dict_encoding)

### Static file

In [13]:
var_static = {
    'land_sea_mask': None,
    'geopotential_at_surface': None
}

var_static_rename = {
    'land_sea_mask': 'LSM',
    'geopotential_at_surface': 'Z_GDS4_SFC'
}


chunk_size_2d = {
    'latitude': 181,
    'longitude': 360
}

encode_size_2d = dict(
    chunks=(
        chunk_size_3d['latitude'],
        chunk_size_3d['longitude']
    )
)


In [15]:
ds_static = xr.open_zarr(fn_fmt_static)
ds_static = vu.ds_subset_everything(ds_static, var_static)
ds_static = ds_static.rename(var_static_rename)

varnames = list(ds_static.keys())
dict_encoding = {}
    
compress = zarr.Blosc(cname='zstd', clevel=1, shuffle=zarr.Blosc.SHUFFLE, blocksize=0)
    
for i_var, var in enumerate(varnames):
    dict_encoding[var] = {'compressor': compress, **encode_size_2d}

save_name = base_dir_output + 'static/ERA5_mlevel_1deg_static.zarr'
#ds_static.to_zarr(save_name, mode='w', consolidated=True, compute=True, encoding=dict_encoding)

<xarray.backends.zarr.ZarrStore at 0x150f2d31fed0>

In [12]:
ds_static

Unnamed: 0,Array,Chunk
Bytes,254.53 kiB,254.53 kiB
Shape,"(181, 360)","(181, 360)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 254.53 kiB 254.53 kiB Shape (181, 360) (181, 360) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",360  181,

Unnamed: 0,Array,Chunk
Bytes,254.53 kiB,254.53 kiB
Shape,"(181, 360)","(181, 360)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,254.53 kiB,254.53 kiB
Shape,"(181, 360)","(181, 360)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 254.53 kiB 254.53 kiB Shape (181, 360) (181, 360) Dask graph 1 chunks in 2 graph layers Data type float32 numpy.ndarray",360  181,

Unnamed: 0,Array,Chunk
Bytes,254.53 kiB,254.53 kiB
Shape,"(181, 360)","(181, 360)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
