In [1]:
import os

# Prepend the folder containing cdo to PATH
os.environ["PATH"] = "/sw/spack-levante/cdo-2.2.2-4z4icb/bin:" + os.environ["PATH"]

from cdo import Cdo
cdo = Cdo()
print(cdo.version())


2.2.2


In [2]:
import xarray as xr
from joblib import Parallel, delayed
import multiprocessing
import pandas as pd
import shutil

# Define paths
data_path_ml = "/pool/data/ERA5/E5/ml/an/1H/"
data_path_pl = "/pool/data/ERA5/E5/pl/an/1H/"
data_path_sf = "/pool/data/ERA5/E5/sf/an/1H/"

scratch_path = "/scratch/u/u301827/mse/"
final_path   = "/work/uc1275/u301827/02_MSE/PNW/"

# Ensure output directories exist
os.makedirs(scratch_path, exist_ok=True)
os.makedirs(final_path, exist_ok=True)

# Define region bounds (North Atlantic)
lon_min, lon_max = -125, -115
lat_min, lat_max = 49, 59

def process_era5_daily(year, month, day, var_num, var, levels="ml", level="137"):
    """
    Process ERA5 daily data for a given variable and date.
    
    Parameters
    ----------
    year, month, day : int
        Date of the data.
    var_num : str
        Variable number (e.g. '133' for specific humidity).
    var : str
        Variable name (e.g. 'q' or 't').
    levels : {'ml', 'pl'}
        'ml' for model levels or 'pl' for pressure levels.
    level : str
        Level number for model levels (e.g. '137') or pressure level (e.g. '500').
    """

    cdo = Cdo()
    date_str = f"{year}-{month:02d}-{day:02d}"

    # Choose correct path
    if levels == "ml":
        data_path = data_path_ml
    elif levels == "pl":
        data_path = data_path_pl
    else:
        data_path = data_path_sf

    var_file = f"{data_path}{var_num}/E5{levels}00_1H_{date_str}_{var_num}.grb"
    if not os.path.exists(var_file):
        print(f"Skipped {var_file}: file does not exist")
        return

    # Create scratch folder
    var_scratch_path = os.path.join(scratch_path, var)
    os.makedirs(var_scratch_path, exist_ok=True)

    var_final_path = os.path.join(final_path, var)
    os.makedirs(var_final_path, exist_ok=True)

    # Intermediate file paths
    reg_file    = os.path.join(var_scratch_path, f"reg_{var}_{date_str}.nc")
    cut_file    = os.path.join(var_scratch_path, f"{var}_{date_str}_cut.nc")
    sel_file    = os.path.join(var_final_path, f"sel_{var}_{date_str}_level.nc")
    regrid_file = os.path.join(var_scratch_path, f"{var}_{date_str}_0.5deg.nc")

    # Step 1: Convert to netCDF and regular grid
    cdo.setgridtype("regular", input=var_file, output=reg_file, options="-f nc --eccodes")

    # Step 3: Regrid to 0.5° × 0.5° grid (720x360 global grid)
    cdo.remapbil("r720x360", input=reg_file, output=regrid_file)

    # Step 2: Select region
    cdo.sellonlatbox(lon_min, lon_max, lat_min, lat_max, input=regrid_file, output=cut_file)

    # Step 3: Select level
    if levels == "ml":
        # Select lowest model level
        cdo.sellevel(level, input=cut_file, output=sel_file)
        os.remove(cut_file)  
        
    elif levels == "pl":
        # Select desired pressure level with xarray
        ds = xr.open_dataset(cut_file)
        level_val = int(level)  # ensure numeric)
        if "plev" in ds:
            ds_level = ds.sel(plev=level_val)
        else:
            raise KeyError("No valid vertical coordinate ('level' or 'plev') found.")

        ds_level["time"] = pd.to_datetime(ds_level["time"].values).round("H")
    
        ds_level.to_netcdf(sel_file)
        ds.close()
        os.remove(cut_file)  
        
    else:
        # Step 2: Select region
        shutil.move(cut_file, sel_file)
            
    os.remove(reg_file)
        
    return sel_file

In [3]:
era5_vars = {
    130: "t",   # Temperature
}

In [4]:
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing

# === Variable mapping ===
era5_vars = {
    130: "t",   # Temperature, pl @ 500 hPa
    129: "z",   # Geopotential, pl @ 500 hPa
    133: "q",   # Specific humidity, ml @ level 137
    167: "2t",  # 2m temperature, surface (no levels)
}

# === Date range ===
start_date = "2021-06-21"
end_date   = "2021-07-05"
dates = pd.date_range(start_date, end_date, freq="D")

# === Wrapper to apply correct levels automatically ===
def run_process(date, var_num, var):
    if var_num in [130, 129]:      # pressure-level vars
        return process_era5_daily(date.year, date.month, date.day,
                                  var_num=str(var_num), var=var,
                                  levels="pl", level="50000")
    elif var_num == 133:           # model-level var
        return process_era5_daily(date.year, date.month, date.day,
                                  var_num=str(var_num), var=var,
                                  levels="ml", level="137")
    elif var_num == 167:           # surface var (no levels)
        return process_era5_daily(date.year, date.month, date.day,
                                  var_num=str(var_num), var=var,
                                  levels="sf")
    else:
        print(f"Variable {var_num} not configured.")
        return None

# === Parallel execution across variables and dates ===
n_jobs = 50

results = Parallel(n_jobs=n_jobs, verbose = 10)(
    delayed(run_process)(date, var_num, var)
    for var_num, var in era5_vars.items()
    for date in dates
)

print("Processing completed.")

[Parallel(n_jobs=50)]: Using backend LokyBackend with 50 concurrent workers.
[Parallel(n_jobs=50)]: Done   1 tasks      | elapsed:    5.4s
[Parallel(n_jobs=50)]: Done   3 out of  60 | elapsed:    5.5s remaining:  1.7min
[Parallel(n_jobs=50)]: Done  10 out of  60 | elapsed:    8.4s remaining:   42.1s
[Parallel(n_jobs=50)]: Done  17 out of  60 | elapsed:   31.3s remaining:  1.3min
[Parallel(n_jobs=50)]: Done  24 out of  60 | elapsed:   34.0s remaining:   51.0s
[Parallel(n_jobs=50)]: Done  31 out of  60 | elapsed:   35.9s remaining:   33.6s
[Parallel(n_jobs=50)]: Done  38 out of  60 | elapsed:   36.7s remaining:   21.2s
[Parallel(n_jobs=50)]: Done  45 out of  60 | elapsed:   39.2s remaining:   13.1s
[Parallel(n_jobs=50)]: Done  52 out of  60 | elapsed:  1.7min remaining:   15.7s


Processing completed.


[Parallel(n_jobs=50)]: Done  60 out of  60 | elapsed:  1.8min finished
