# Pre-process ERA5 pressure level data for CREDIT

This notebook provides methods on gathering ERA5 pressure level data from NCAR/RDA and ARCO-ERA5. The RDA data requires internal access of the glade file system at NCAR.

## Data preparation

* **Pressure-level analysis (RDA)**
    * geopotential, u_component_of_wind, v_component_of_wind, temperature, specific_humidity
* **Single-level analysis (RDA)**
    * surface_pressure, mean_sea_level_pressure
    * sea_surface_temperature, skin_temperature, 2m_temperature,
    * 10m_u_component_of_wind, 10m_v_component_of_wind, total_cloud_cover
* **Single-level forecasts (ARCO)**
    * total_precipitation, evaporation
    * top_net_solar_radiation, top_net_thermal_radiation
    * surface_net_solar_radiation, surface_net_thermal_radiation, surface_latent_heat_flux, surface_sensible_heat_flux

**References**

* NCAR/RDA
    * [ERA5 Reanalysis (0.25 Degree Latitude-Longitude Grid)](https://rda.ucar.edu/datasets/d633000/)
    * glade storage: `/glade/campaign/collections/rda/data/d633000/`
* ARCO-ERA5
    * [Google Cloud storage](https://console.cloud.google.com/storage/browser/gcp-public-data-arco-era5)
    * [Project page at GitHub](https://github.com/google-research/arco-era5)
    * Complete hourly file: `gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3`



In [1]:
import os
import sys
import yaml
import dask
import zarr
import numpy as np
import xesmf as xe
import xarray as xr
import pandas as pd
from glob import glob
from dask.utils import SerializableLock

import calendar
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

sys.path.insert(0, os.path.realpath('../libs/'))
import verif_utils as vu

In [2]:
# import multiprocessing
# from dask.distributed import Client
# from dask_jobqueue import PBSCluster

In [3]:
import matplotlib.pyplot as plt
%matplotlib inline

In [4]:
config_name = os.path.realpath('data_config.yml')

with open(config_name, 'r') as stream:
    conf = yaml.safe_load(stream)

### C404 domain static

In [5]:
static_WRF_name = '/glade/derecho/scratch/ksha/DWC_data/CONUS_domain_GP/static/C404_GP_static.zarr'
ds_WRF_static = xr.open_zarr(static_WRF_name)
XLAT = ds_WRF_static['XLAT'].values
XLONG = ds_WRF_static['XLONG'].values
ds_WRF_static = ds_WRF_static.assign_coords(lat=(("south_north", "west_east"), XLAT))
ds_WRF_static = ds_WRF_static.assign_coords(lon=(("south_north", "west_east"), XLONG))
domain_inds = np.arange(336).astype(np.float32)
# 1000.,  950.,  850.,  700.,  600.,  500.,  400.,  300.,  200.,  100., 50.
ind_pick = [36, 34, 30, 25, 23, 21, 19, 17, 14, 10, 8]

In [6]:
year = 2025
N_days = 180
# N_days = 366 if year % 4 == 0 else 365

In [10]:
# increase the file cache size
xr.set_options(file_cache_maxsize=500)
# lock for safe parallel access
netcdf_lock = SerializableLock()

# all days within a year
start_time = datetime(year, 1, 1, 0, 0)
dt_list = [start_time + timedelta(days=i) for i in range(N_days)]

# upper-air var names
varnames = list(conf['RDA']['varname_upper_air'].values())

ds_list = []

for i_day, dt in enumerate(dt_list):
    # upper air
    # ============================================================================================ #
    # file source info
    base_dir = dt.strftime(conf['RDA']['source']['anpl_format'])
    dt_pattern = dt.strftime(conf['RDA']['source']['anpl_dt_pattern_format'])

    # get upper-air vars
    filename_collection = [glob(base_dir + f'*{var}*{dt_pattern}*')[0] for var in varnames]
    
    if len(filename_collection) != len(varnames):
        raise ValueError(f'Year {year}, day {day_idx} has incomplete files')
    
    # Open with a lock to avoid race conditions when accessing files
    ds = xr.open_mfdataset(filename_collection, combine='by_coords', parallel=True, lock=netcdf_lock)

    # drop useless var
    ds = ds.drop_vars('utc_date', errors='ignore')
    ds = ds.isel(level=ind_pick)
    
    # ======================================================== #
    # Interpolation block
    ds['longitude'] = (ds['longitude']  + 180) % 360 - 180
    ds = ds.rename({'longitude': 'lon', 'latitude': 'lat'})
    
    if i_day == 0:
        regridder = xe.Regridder(ds, ds_WRF_static, method='bilinear')

    ds_ERA5_interp = regridder(ds)
    
    ds_ERA5_interp = ds_ERA5_interp.assign_coords(
        south_north=domain_inds, 
        west_east=domain_inds
    )
    
    ds_ERA5_interp = ds_ERA5_interp.drop_vars(['lon', 'lat'])
    
    ds_list.append(ds_ERA5_interp)

# concatenate
ds_yearly = xr.concat(ds_list, dim='time')

# save to zarr
base_dir = conf['RDA']['save_loc'] + 'upper_air/' 
if not os.path.exists(base_dir):
    os.makedirs(base_dir)

save_name = base_dir + conf['RDA']['prefix'] + '_upper_air_{}.zarr'.format(year)

# ds_yearly.to_zarr(save_name, mode='w', consolidated=True, compute=True)

In [11]:
ds_yearly

Unnamed: 0,Array,Chunk
Bytes,19.99 GiB,113.70 MiB
Shape,"(4320, 11, 336, 336)","(24, 11, 336, 336)"
Dask graph,180 chunks in 1263 graph layers,180 chunks in 1263 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 19.99 GiB 113.70 MiB Shape (4320, 11, 336, 336) (24, 11, 336, 336) Dask graph 180 chunks in 1263 graph layers Data type float32 numpy.ndarray",4320  1  336  336  11,

Unnamed: 0,Array,Chunk
Bytes,19.99 GiB,113.70 MiB
Shape,"(4320, 11, 336, 336)","(24, 11, 336, 336)"
Dask graph,180 chunks in 1263 graph layers,180 chunks in 1263 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,19.99 GiB,113.70 MiB
Shape,"(4320, 11, 336, 336)","(24, 11, 336, 336)"
Dask graph,180 chunks in 1263 graph layers,180 chunks in 1263 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 19.99 GiB 113.70 MiB Shape (4320, 11, 336, 336) (24, 11, 336, 336) Dask graph 180 chunks in 1263 graph layers Data type float32 numpy.ndarray",4320  1  336  336  11,

Unnamed: 0,Array,Chunk
Bytes,19.99 GiB,113.70 MiB
Shape,"(4320, 11, 336, 336)","(24, 11, 336, 336)"
Dask graph,180 chunks in 1263 graph layers,180 chunks in 1263 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,19.99 GiB,113.70 MiB
Shape,"(4320, 11, 336, 336)","(24, 11, 336, 336)"
Dask graph,180 chunks in 1263 graph layers,180 chunks in 1263 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 19.99 GiB 113.70 MiB Shape (4320, 11, 336, 336) (24, 11, 336, 336) Dask graph 180 chunks in 1263 graph layers Data type float32 numpy.ndarray",4320  1  336  336  11,

Unnamed: 0,Array,Chunk
Bytes,19.99 GiB,113.70 MiB
Shape,"(4320, 11, 336, 336)","(24, 11, 336, 336)"
Dask graph,180 chunks in 1263 graph layers,180 chunks in 1263 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,19.99 GiB,113.70 MiB
Shape,"(4320, 11, 336, 336)","(24, 11, 336, 336)"
Dask graph,180 chunks in 1263 graph layers,180 chunks in 1263 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 19.99 GiB 113.70 MiB Shape (4320, 11, 336, 336) (24, 11, 336, 336) Dask graph 180 chunks in 1263 graph layers Data type float32 numpy.ndarray",4320  1  336  336  11,

Unnamed: 0,Array,Chunk
Bytes,19.99 GiB,113.70 MiB
Shape,"(4320, 11, 336, 336)","(24, 11, 336, 336)"
Dask graph,180 chunks in 1263 graph layers,180 chunks in 1263 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,19.99 GiB,113.70 MiB
Shape,"(4320, 11, 336, 336)","(24, 11, 336, 336)"
Dask graph,180 chunks in 1263 graph layers,180 chunks in 1263 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 19.99 GiB 113.70 MiB Shape (4320, 11, 336, 336) (24, 11, 336, 336) Dask graph 180 chunks in 1263 graph layers Data type float32 numpy.ndarray",4320  1  336  336  11,

Unnamed: 0,Array,Chunk
Bytes,19.99 GiB,113.70 MiB
Shape,"(4320, 11, 336, 336)","(24, 11, 336, 336)"
Dask graph,180 chunks in 1263 graph layers,180 chunks in 1263 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


### Single-level variables

In [9]:
year = 2000
N_months = 12

config_name = os.path.realpath('data_config.yml')

with open(config_name, 'r') as stream:
    conf = yaml.safe_load(stream)

In [10]:
# increase the file cache size
xr.set_options(file_cache_maxsize=500)
# lock for safe parallel access
netcdf_lock = SerializableLock()

# all days within a year
start_time = datetime(year, 1, 1, 0, 0)
dt_list = [start_time + relativedelta(months=i) for i in range(N_months)]

In [11]:
compress = zarr.Blosc(cname='zstd', clevel=1, shuffle=zarr.Blosc.SHUFFLE, blocksize=0)


chunk_size_3d = dict(chunks=(conf['RDA']['chunk_size_3d']['time'],
                             conf['RDA']['chunk_size_3d']['latitude'],
                             conf['RDA']['chunk_size_3d']['longitude']))

dict_encoding = {}

for i_var, var in enumerate(conf['RDA']['varname_single']):
    dict_encoding[var] = {'compressor': compress, **chunk_size_3d}

In [13]:
# var names
varnames = list(conf['RDA']['varname_single'].values())

ds_list = []

for i_mon, dt in enumerate(dt_list[:2]):
    # file source info
    base_dir = dt.strftime(conf['RDA']['source']['ansfc_format'])

    first_day = datetime(year, dt.month, 1)
    last_day = datetime(year, dt.month, calendar.monthrange(year, dt.month)[1])
    
    dt_pattern = dt.strftime(conf['RDA']['source']['ansfc_dt_pattern_format'])
    dt_pattern = dt_pattern.format(first_day.day, last_day.day)
    
    # get upper-air vars
    filename_collection = [glob(base_dir + f'*{var}*{dt_pattern}*')[0] for var in varnames]
    
    if len(filename_collection) != len(varnames):
        raise ValueError(f'Year {year}, day {day_idx} has incomplete files')
    
    # Open with a lock to avoid race conditions when accessing files
    ds = xr.open_mfdataset(filename_collection, combine='by_coords', parallel=True, lock=netcdf_lock)

    # drop useless var
    ds = ds.drop_vars('utc_date', errors='ignore')
    ds = ds.chunk(conf['RDA']['chunk_size_3d'])
    
    # ======================================================== #
    # Interpolation block
    ds['longitude'] = (ds['longitude']  + 180) % 360 - 180
    ds = ds.rename({'longitude': 'lon', 'latitude': 'lat'})
    
    if i_mon == 0:
        regridder = xe.Regridder(ds, ds_WRF_static, method='bilinear')

    ds_ERA5_interp = regridder(ds)
    
    ds_ERA5_interp = ds_ERA5_interp.assign_coords(
        south_north=domain_inds, 
        west_east=domain_inds
    )
    
    ds_ERA5_interp = ds_ERA5_interp.drop_vars(['lon', 'lat'])
    
    ds_list.append(ds_ERA5_interp)
    
# concatenate
ds_yearly = xr.concat(ds_list, dim='time')

# save to zarr
base_dir = conf['RDA']['save_loc'] + 'surf/' 
if not os.path.exists(base_dir):
    os.makedirs(base_dir)

save_name = base_dir + conf['RDA']['prefix'] + '_surf_{}.zarr'.format(year)

# ds_yearly.to_zarr(save_name, mode='w', consolidated=True, compute=True, encoding=dict_encoding)

In [15]:
save_name

'/glade/campaign/ral/hap/ksha/ERA5_data/surf/ERA5_plevel_1h_surf_2000.zarr'

In [14]:
ds_yearly

Unnamed: 0,Array,Chunk
Bytes,620.16 MiB,1.72 MiB
Shape,"(1440, 336, 336)","(4, 336, 336)"
Dask graph,360 chunks in 21 graph layers,360 chunks in 21 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 620.16 MiB 1.72 MiB Shape (1440, 336, 336) (4, 336, 336) Dask graph 360 chunks in 21 graph layers Data type float32 numpy.ndarray",336  336  1440,

Unnamed: 0,Array,Chunk
Bytes,620.16 MiB,1.72 MiB
Shape,"(1440, 336, 336)","(4, 336, 336)"
Dask graph,360 chunks in 21 graph layers,360 chunks in 21 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,620.16 MiB,1.72 MiB
Shape,"(1440, 336, 336)","(4, 336, 336)"
Dask graph,360 chunks in 21 graph layers,360 chunks in 21 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 620.16 MiB 1.72 MiB Shape (1440, 336, 336) (4, 336, 336) Dask graph 360 chunks in 21 graph layers Data type float32 numpy.ndarray",336  336  1440,

Unnamed: 0,Array,Chunk
Bytes,620.16 MiB,1.72 MiB
Shape,"(1440, 336, 336)","(4, 336, 336)"
Dask graph,360 chunks in 21 graph layers,360 chunks in 21 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,620.16 MiB,1.72 MiB
Shape,"(1440, 336, 336)","(4, 336, 336)"
Dask graph,360 chunks in 21 graph layers,360 chunks in 21 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 620.16 MiB 1.72 MiB Shape (1440, 336, 336) (4, 336, 336) Dask graph 360 chunks in 21 graph layers Data type float32 numpy.ndarray",336  336  1440,

Unnamed: 0,Array,Chunk
Bytes,620.16 MiB,1.72 MiB
Shape,"(1440, 336, 336)","(4, 336, 336)"
Dask graph,360 chunks in 21 graph layers,360 chunks in 21 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,620.16 MiB,1.72 MiB
Shape,"(1440, 336, 336)","(4, 336, 336)"
Dask graph,360 chunks in 21 graph layers,360 chunks in 21 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 620.16 MiB 1.72 MiB Shape (1440, 336, 336) (4, 336, 336) Dask graph 360 chunks in 21 graph layers Data type float32 numpy.ndarray",336  336  1440,

Unnamed: 0,Array,Chunk
Bytes,620.16 MiB,1.72 MiB
Shape,"(1440, 336, 336)","(4, 336, 336)"
Dask graph,360 chunks in 21 graph layers,360 chunks in 21 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,620.16 MiB,1.72 MiB
Shape,"(1440, 336, 336)","(4, 336, 336)"
Dask graph,360 chunks in 21 graph layers,360 chunks in 21 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 620.16 MiB 1.72 MiB Shape (1440, 336, 336) (4, 336, 336) Dask graph 360 chunks in 21 graph layers Data type float32 numpy.ndarray",336  336  1440,

Unnamed: 0,Array,Chunk
Bytes,620.16 MiB,1.72 MiB
Shape,"(1440, 336, 336)","(4, 336, 336)"
Dask graph,360 chunks in 21 graph layers,360 chunks in 21 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


### check time coords

In [16]:
# for year in range(1979, 2024):

#     # save to zarr
#     base_dir = conf['RDA']['save_loc'] + 'surf/' 
#     save_name = base_dir + conf['RDA']['prefix'] + '_surf_{}.zarr'.format(year)
    
#     time = ds_year['time'].values
    
#     # Check if the time coordinate is monotonic (increasing)
#     is_monotonic = np.all(np.diff(time) > np.timedelta64(0, 'ns'))
    
#     # Check if the time intervals between consecutive values are equal to 1 hour
#     time_diff = np.diff(time)  # Get the differences between consecutive time values
#     hourly_gap = np.timedelta64(6, 'h')  # 1 hour in timedelta64 format
#     is_hourly_gap = np.all(time_diff == hourly_gap)
    
#     # Final check if both conditions are satisfied
#     is_monotonic_and_hourly = is_monotonic and is_hourly_gap
    
#     print("Is time coordinate monotonic with an hourly gap?", is_monotonic_and_hourly)

## Single-level accumulative variables

**RDA**

* Example file listing: `/glade/campaign/collections/rda/data/d633000/e5.oper.fc.sfc.accumu/197901`

**ARCO**

* Hourly data from `gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3`
* Accumulate hourly to 6 hourly: `xarray.resample(time='6h').sum()`

In [6]:
year = 1979
N_days = 366 if year % 4 == 0 else 365

config_name = os.path.realpath('data_config.yml')

with open(config_name, 'r') as stream:
    conf = yaml.safe_load(stream)

In [7]:
# save to zarr
base_dir = conf['ARCO']['save_loc'] + 'accum/' 
if not os.path.exists(base_dir):
    os.makedirs(base_dir)

compress = zarr.Blosc(cname='zstd', clevel=1, shuffle=zarr.Blosc.SHUFFLE, blocksize=0)


chunk_size_3d = dict(chunks=(conf['ARCO']['chunk_size_3d']['time'],
                             conf['ARCO']['chunk_size_3d']['latitude'],
                             conf['ARCO']['chunk_size_3d']['longitude']))

dict_encoding = {}

for i_var, var in enumerate(conf['ARCO']['varname_accum']):
    dict_encoding[var] = {'compressor': compress, **chunk_size_3d}

In [9]:
ERA5_1h = xr.open_zarr(
    "gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3",
    chunks=None,
    storage_options=dict(token='anon'),)

time_start = '{}-12-31T00'.format(year-1) # hourly accum var needs one extra day to accum on 6 hrs
time_start_save = '{}-01-01T00'.format(year)
time_end = '{}-01-03T23'.format(year)
#time_end = '{}-12-31T23'.format(year)
ERA5_1h_yearly = ERA5_1h.sel(time=slice(time_start, time_end))

variables_levels = {}
for varname in conf['ARCO']['varname_accum']:
    variables_levels[varname] = None

ERA5_1h_save = vu.ds_subset_everything(ERA5_1h_yearly, variables_levels)

ERA5_1h_shifted = ERA5_1h_save.shift(time=-1)
ERA5_6h = ERA5_1h_shifted.resample(time='6h').sum()
ERA5_6h['time'] = ERA5_6h['time'] + pd.Timedelta(hours=6)

ERA5_6h_save = ERA5_6h.sel(time=slice(time_start_save, time_end))

ERA5_6h_save = ERA5_6h_save.chunk(conf['ARCO']['chunk_size_3d'])
save_name = base_dir + conf['ARCO']['prefix'] + '_accum_{}.zarr'.format(year)
# ERA5_6h_save.to_zarr(save_name, mode="w", consolidated=True, compute=True, encoding=dict_encoding)

In [10]:
ERA5_1h_yearly

**Comparing accumulated hourly to the old directly available 6 hourly data**

In [15]:
ERA5_6h = xr.open_zarr(
    "gs://gcp-public-data-arco-era5/ar/1959-2022-6h-1440x721.zarr",
    chunks=None,
    storage_options=dict(token='anon'),)

tp_6h_ref = ERA5_6h['total_precipitation_6hr']
tp_6h_ref = tp_6h_ref.sel(time=slice(time_start_save, time_end))

In [18]:
for ind_check in range(3):
    diff = np.sum(np.array(ERA5_6h_save['total_precipitation'].isel(time=ind_check)) - \
                  np.array(tp_6h_ref.isel(time=ind_check)))
    print(diff)

0.0
0.0
0.0


### Static variables

**ARCO**

* Hourly data from `gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3`

In [11]:
config_name = os.path.realpath('data_config_6h.yml')

with open(config_name, 'r') as stream:
    conf = yaml.safe_load(stream)

In [12]:
# save to zarr
base_dir = conf['ARCO']['save_loc'] + 'static/' 
if not os.path.exists(base_dir):
    os.makedirs(base_dir)

compress = zarr.Blosc(cname='zstd', clevel=1, shuffle=zarr.Blosc.SHUFFLE, blocksize=0)

dict_encoding = {}

for i_var, var in enumerate(conf['ARCO']['varname_static']):
    dict_encoding[var] = {'compressor': compress}

In [13]:
ERA5_1h = xr.open_zarr(
    "gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3",
    chunks=None,
    storage_options=dict(token='anon'),)

ERA5_1h_slice = ERA5_1h.sel(time='2000-01-01T00')

variables_levels = {}
for varname in conf['ARCO']['varname_static']:
    variables_levels[varname] = None

ERA5_static = vu.ds_subset_everything(ERA5_1h_slice, variables_levels)

In [20]:
# normalize 'geopotential_at_surface'
mean_val = float(ERA5_static['geopotential_at_surface'].mean(skipna=False))
std_val = float(ERA5_static['geopotential_at_surface'].std(skipna=False))
ERA5_static['z_norm'] = (ERA5_static['geopotential_at_surface'] - mean_val)/std_val

# normalize soil type
ERA5_static['soil_type'] = ERA5_static['soil_type'] / 7

# fix coordiante names
ds_example = xr.open_zarr(
    '/glade/derecho/scratch/ksha/CREDIT_data/ERA5_plevel_base/all_in_one/ERA5_plevel_6h_1979.zarr')
ERA5_static['latitude'] = ds_example['latitude']
ERA5_static['longitude'] = ds_example['longitude']
ERA5_static = ERA5_static.drop_vars('time')

save_name = base_dir + conf['ARCO']['prefix'] + '_static.zarr'
# ERA5_static.to_zarr(save_name, mode="w", consolidated=True, compute=True, encoding=dict_encoding)

In [23]:
save_name

'/glade/derecho/scratch/ksha/CREDIT_data/ERA5_plevel_base/static/ERA5_plevel_6h_static.zarr'

**Add coslat variable**

In [19]:
# base_dir = conf['ARCO']['save_loc'] + 'static/' 
# save_name = base_dir + conf['ARCO']['prefix'] + '_static.zarr'
# ERA5_static = xr.open_zarr(save_name)

In [25]:
# ERA5_static['geopotential_at_surface']
# ERA5_static['z_norm']

**Add mean and std to their files**

In [27]:
# mean_path = conf['ARCO']['save_loc'] + 'mean_std/mean_6h_1979_2019_13lev_0.25deg.nc'
# std_path = conf['ARCO']['save_loc'] + 'mean_std/std_residual_6h_1979_2019_13lev_0.25deg.nc'

# ds_mean = xr.open_dataset(mean_path)
# ds_mean['z_norm'] = mean_val

# ds_std = xr.open_dataset(std_path)
# ds_std['z_norm'] = std_val

# ds_mean.to_netcdf(mean_path, mode="w")
# ds_std.to_netcdf(std_path, mode="w")

## Others

### Total precipitation from RDA


```python
xr_ARCO = xr.open_zarr('/glade/derecho/scratch/ksha/CREDIT_data/ERA5_plevel_base/test_data/surf_test.zarr')
tp_ARCO = xr_ARCO['total_precipitation']

base_dir = '/glade/campaign/collections/rda/data/d633000/e5.oper.fc.sfc.accumu/197901/'
xr_RDA_CP = xr.open_dataset(base_dir+'e5.oper.fc.sfc.accumu.128_143_cp.ll025sc.1979010106_1979011606.nc')
xr_RDA_LP = xr.open_dataset(base_dir+'e5.oper.fc.sfc.accumu.128_142_lsp.ll025sc.1979010106_1979011606.nc')

xr_RDA_CP = xr_RDA_CP.drop_vars('utc_date', errors='ignore')
xr_RDA_CP = xr_RDA_CP.rename({'CP': 'TP'})
xr_RDA_LP = xr_RDA_LP.drop_vars('utc_date', errors='ignore')
xr_RDA_LP = xr_RDA_LP.rename({'LSP': 'TP'})

da = xr_RDA_CP + xr_RDA_LP

time_deltas = pd.to_timedelta(da["forecast_hour"].values, unit="h")
new_times = np.add.outer(da["forecast_initial_time"].values, time_deltas)
new_times = new_times.flatten()

da_an = da.stack(time=("forecast_initial_time", "forecast_hour"))
da_an = da_an.drop_vars(['forecast_hour', 'forecast_initial_time', 'time'])
da_an = da_an.assign_coords(time=new_times)

for i_hour in range(10):
    # i + 7 becuase ini_time = 06Z, fcst_lead_time starts from 01 hr
    tp_ARCO_np = np.array(tp_ARCO.isel(time=i_hour+7))
    da_np = np.array(da_an['TP'].isel(time=i_hour))
    print(np.sum(np.abs(tp_ARCO_np - da_np)))

# ARCO vs. RDA
data_var = da['TP']
tp_RDA = data_var.isel(forecast_initial_time=0)
tp_RDA_np = np.array(tp_RDA)
tp_ARCO_np = np.array(tp_ARCO.isel(time=slice(7, 7+12)))
np.sum(np.abs(tp_ARCO_np[3, ...] - tp_RDA_np[3, ...]))
```

### Combine Q components

In [4]:
config_name = os.path.realpath('data_config_6h.yml')

with open(config_name, 'r') as stream:
    conf = yaml.safe_load(stream)

In [12]:
year = 1989

chunk_size_4d = dict(chunks=(conf['zarr_opt']['chunk_size_4d']['time'],
                             conf['zarr_opt']['chunk_size_4d']['level'],
                             conf['zarr_opt']['chunk_size_4d']['latitude'],
                             conf['zarr_opt']['chunk_size_4d']['longitude']))

base_dir = conf['zarr_opt']['save_loc']
zarr_name_upper = sorted(glob(base_dir+'upper_air/ERA5_plevel_6h_upper_air_*.zarr'))
zarr_name_cloud = sorted(glob(base_dir+'cloud/*.zarr'))

fn_upper = [fn for fn in zarr_name_upper if str(year) in fn][0]
fn_cloud = [fn for fn in zarr_name_cloud if str(year) in fn][0]

variables_levels = {}
variables_levels['Q'] = None

ds_upper = xr.open_zarr(fn_upper).chunk(conf['zarr_opt']['chunk_size_4d'])
ds_upper = vu.ds_subset_everything(ds_upper, variables_levels)

ds_cloud = xr.open_zarr(fn_cloud).chunk(conf['zarr_opt']['chunk_size_4d'])

# ds_upper['Q'] = ds_upper['Q'] + ds_cloud['CLWC'] + ds_cloud['CRWC']
# + ds_cloud['CSWC'] +  ds_cloud['CIWC']

# ds_upper = ds_upper.rename({'Q': 'specific_total_water'})

dict_encoding = {}
compress = zarr.Blosc(cname='zstd', clevel=1, shuffle=zarr.Blosc.SHUFFLE, blocksize=0)
dict_encoding['specific_total_water'] = {'compressor': compress, **chunk_size_4d}

save_name = base_dir+'upper_air/ERA5_plevel_6h_Q_{}.zarr'.format(year)
# ds_upper.to_zarr(save_name, mode='w', consolidated=True, compute=True, encoding=dict_encoding)

In [10]:
base_dir

'/glade/derecho/scratch/ksha/CREDIT_data/ERA5_plevel_base/'

In [13]:
Q = np.array(ds_upper['Q'].isel(time=999))
CIWC = np.array(ds_cloud['CIWC'].isel(time=999))
CLWC = np.array(ds_cloud['CLWC'].isel(time=999))
CRWC = np.array(ds_cloud['CRWC'].isel(time=999))
CSWC = np.array(ds_cloud['CSWC'].isel(time=999))

In [42]:
i_level = -5
print(Q[i_level, ...].mean(), Q[i_level, ...].max())
print(CLWC[i_level, ...].mean(), CLWC[i_level, ...].max()) # 225 hPa level 15
print(CRWC[i_level, ...].mean(), CRWC[i_level, ...].max()) # 300 hPa level 17

0.005934408 0.019964522
2.2040711e-05 0.0011463165
2.2253564e-06 0.0015803277


In [35]:
levels = ds_upper['level'].values

### Combine all in one

In [29]:
year = 1979
base_dir = conf['zarr_opt']['save_loc']

In [30]:
zarr_name_surf = base_dir+'surf/ERA5_plevel_6h_surf_{}.zarr'
zarr_name_surf_extra = base_dir+'surf/ERA5_plevel_6h_surf_extend_{}.zarr'
zarr_name_accum = base_dir+'accum/ERA5_plevel_6h_accum_{}.zarr'
zarr_name_forcing = base_dir+'forcing/ERA5_plevel_6h_forcing_{}.zarr'
zarr_name_upper = base_dir+'upper_subset/ERA5_subset_6h_upper_air_{}.zarr'
zarr_name_upper_full = base_dir+'upper_air/ERA5_plevel_6h_upper_air_{}.zarr'
zarr_name_upper_Q = base_dir+'upper_air/ERA5_plevel_6h_Q_{}.zarr'
zarr_name_Q = base_dir+'upper_subset/ERA5_subset_6h_Q_{}.zarr'

In [31]:
ds_surf = xr.open_zarr(zarr_name_surf.format(year))
ds_surf_extra = xr.open_zarr(zarr_name_surf_extra.format(year))
ds_accum = xr.open_zarr(zarr_name_accum.format(year))
ds_forcing = xr.open_zarr(zarr_name_forcing.format(year))
ds_upper = xr.open_zarr(zarr_name_upper.format(year))
ds_upper_full = xr.open_zarr(zarr_name_upper_full.format(year))
ds_upper_Q = xr.open_zarr(zarr_name_upper_Q.format(year))
ds_Q = xr.open_zarr(zarr_name_Q.format(year))

ds_500hPa = xr.merge([ds_upper_full.isel(level=21), ds_upper_Q.isel(level=21)])

In [32]:
ds_500hPa = ds_500hPa.rename({'T': 'T500', 
                              'U': 'U500', 
                              'V': 'V500', 
                              'Z': 'Z500', 
                              'Q': 'Q500', 
                              'specific_total_water': 'specific_total_water_500'})

In [33]:
# combining land-sea mask and sea-ice cover

ds_static = xr.open_zarr(base_dir+'static/ERA5_plevel_6h_static.zarr')
land_sea_mask = ds_static['land_sea_mask']
sea_ice_cover = ds_surf_extra['CI']

land_sea_mask_expanded = land_sea_mask.broadcast_like(sea_ice_cover)
land_sea_CI_mask = xr.where(
    (land_sea_mask_expanded == 0) & (sea_ice_cover > 0),
    -sea_ice_cover,
    land_sea_mask_expanded
)

land_sea_CI_mask.name = 'land_sea_CI_mask'
ds_mask = land_sea_CI_mask.to_dataset()

In [34]:
ds_merge = xr.merge([ds_surf, ds_accum, ds_forcing, ds_upper, ds_Q, ds_mask, ds_500hPa])
ds_merge = ds_merge.drop_vars('SSTK')

In [36]:
varnames = list(ds_merge.keys())
varname_4D = ['U', 'V', 'T', 'Z', 'Q', 'specific_total_water']

for i_var, var in enumerate(varnames):
    if var in varname_4D:
        ds_merge[var] = ds_merge[var].chunk(conf['zarr_opt']['chunk_size_4d'])
    else:
        ds_merge[var] = ds_merge[var].chunk(conf['zarr_opt']['chunk_size_3d'])

In [37]:
# ========================================================================== #
# zarr encodings
dict_encoding = {}

chunk_size_3d = dict(chunks=(conf['zarr_opt']['chunk_size_3d']['time'],
                             conf['zarr_opt']['chunk_size_3d']['latitude'],
                             conf['zarr_opt']['chunk_size_3d']['longitude']))

chunk_size_4d = dict(chunks=(conf['zarr_opt']['chunk_size_4d']['time'],
                             conf['zarr_opt']['chunk_size_4d']['level'],
                             conf['zarr_opt']['chunk_size_4d']['latitude'],
                             conf['zarr_opt']['chunk_size_4d']['longitude']))

compress = zarr.Blosc(cname='zstd', clevel=1, shuffle=zarr.Blosc.SHUFFLE, blocksize=0)

for i_var, var in enumerate(varnames):
    if var in varname_4D:
        dict_encoding[var] = {'compressor': compress, **chunk_size_4d}
    else:
        dict_encoding[var] = {'compressor': compress, **chunk_size_3d}

In [38]:
save_name = base_dir+'all_in_one/ERA5_plevel_6h_{}.zarr'.format(year)
# ds_merge.to_zarr(save_name, mode='w', consolidated=True, compute=True, encoding=dict_encoding)