# Job generator for sensitivity test precursor run


In [None]:
%load_ext dotenv
%dotenv
%load_ext autoreload
%autoreload 2

In [None]:
from pathlib import Path
from src.config import get_config, get_rng

config = get_config()

In [None]:
import xarray as xr
import pandas as pd
import numpy as np
from scipy.interpolate import CubicSpline
from scipy.ndimage import gaussian_filter

from src.job_generation import (
    Job,
    read_namelist,
    Driver,
    set_radiation_to_dynamic,
    set_surface_pressure_to_dynamic,
    get_nest_mask,
)

In [None]:
rng = get_rng(config)  # Ensure reproducibility

## Initial setup of data structures for inputs


In [None]:
job = Job("slurb_pre_default")
job.p3d = read_namelist(
    Path(config.path.experiments.sensitivity) / "precursor_default_p3d.yml"
)

Add restart configuration for the run, cutting the simulation in half just before the high-frequency data output begins.


In [None]:
job.p3dr = read_namelist(
    Path(config.path.experiments.sensitivity) / "precursor_default_p3d.yml"
)

In [None]:
static_driver = Driver()
job.register_driver("static", static_driver)

In [None]:
static_driver.set_grid("s", vertical=False)
static_driver.set_attrs(
    Path(config.path.experiments.sensitivity) / "global_attributes.yml"
)
static_driver

For dynamic driver, we only need the vertical grid for 1D initial profiles (not very dynamic _per se_). For radiation forcing, we'll use one minute temporal resolution for the time dimension, to which we'll interpolate radiation inputs with spline interpolation.


In [None]:
dynamic_driver = Driver()
job.register_driver("dynamic", dynamic_driver)
dynamic_driver.set_grid()
dynamic_driver.set_zsoil()
dynamic_driver.set_time(freq="1min", coord="time_rad")
dynamic_driver.set_time(freq="1h", coord="time")
dynamic_driver.set_attrs(
    Path(config.path.experiments.sensitivity) / "global_attributes.yml"
)

## Surface description for the static driver


### Surface fraction

We'll use all-vegetation surface, corresponding to surface fraction index 1.


In [None]:
static_driver.ds = static_driver.ds.assign_coords(
    nsurface_fraction=np.arange(3, dtype=np.int8)
)
static_driver.ds = static_driver.ds.assign_coords(
    nvegetation_pars=np.arange(12, dtype=np.int8)
)
static_driver.ds["surface_fraction"] = (
    ("nsurface_fraction", "y", "x"),
    np.full(
        (
            static_driver.ds.nsurface_fraction.size,
            static_driver.ds.y.size,
            static_driver.ds.x.size,
        ),
        0,
        dtype=np.int8,
    ),
)
static_driver.ds["vegetation_pars"] = (
    ("nvegetation_pars", "y", "x"),
    np.full(
        (
            static_driver.ds.nvegetation_pars.size,
            static_driver.ds.y.size,
            static_driver.ds.x.size,
        ),
        -9999.0,
        dtype=np.float32,
    ),
)
static_driver.ds["surface_fraction"][0, :, :] = 1
static_driver.ds["surface_fraction"].attrs["long_name"] = "surface fraction"
static_driver.ds["surface_fraction"].attrs["units"] = "1"

### Vegetation type

For vegetation type, we sample the pre-computed vegetation type fractions with a Gaussian field to have some spatial autocorrelation.


In [None]:
vegetation_fractions = pd.read_csv(
    Path(config.path.data.interim) / "land_cover" / "vegetation_fractions.csv",
    index_col=0,
)

In [None]:
npatches = 128
patch_centers = rng.integers(0, 256, size=(npatches, 2))
patch_values = rng.choice(
    vegetation_fractions.index, p=vegetation_fractions["Fraction"], size=(npatches,)
)

x = np.arange(256)
y = np.arange(256)
xx, yy = np.meshgrid(x, y)
vegetation_type = np.zeros((256, 256), dtype=int)
for i in range(256):
    for j in range(256):
        distances = np.sqrt(
            (xx[i, j] - patch_centers[:, 0]) ** 2
            + (yy[i, j] - patch_centers[:, 1]) ** 2
        )
        # Find the index of the nearest point
        nearest_point_id = np.argmin(distances)
        # Store the ID in the array
        vegetation_type[i, j] = patch_values[nearest_point_id]

In [None]:
static_driver.ds["vegetation_type"] = (
    ("y", "x"),
    vegetation_type,
)
static_driver.ds["vegetation_type"] = static_driver.ds["vegetation_type"].where(
    static_driver.ds.surface_fraction[0, :, :] == 1, other=-127
)
static_driver.ds["vegetation_type"].attrs["long_name"] = (
    "vegetation type classification"
)
static_driver.ds["vegetation_type"].attrs["units"] = "1"
static_driver.ds["vegetation_type"].plot.imshow(cmap="tab10")

In [None]:
static_driver.ds["water_type"] = (
    ("y", "x"),
    np.full((static_driver.ds.y.size, static_driver.ds.x.size), 1),
)
static_driver.ds["water_type"] = static_driver.ds["water_type"].where(
    static_driver.ds.surface_fraction[2, :, :] == 1, other=-127
)
static_driver.ds["water_type"].attrs["long_name"] = "water type classification"
static_driver.ds["water_type"].attrs["units"] = "1"


static_driver.ds["pavement_type"] = (
    ("y", "x"),
    np.full((static_driver.ds.y.size, static_driver.ds.x.size), -127),
)
static_driver.ds["pavement_type"] = static_driver.ds["pavement_type"].where(
    static_driver.ds.surface_fraction[1, :, :] == 1, other=-127
)
static_driver.ds["pavement_type"].attrs["long_name"] = "pavement type classification"
static_driver.ds["pavement_type"].attrs["units"] = "1"

### Soil type

For soil type, we simply want to use the most prevalent (modal value) soil type.


In [None]:
soil_fractions = pd.read_csv(
    Path(config.path.data.interim) / "land_cover" / "soil_fractions.csv",
    index_col=0,
)
soil_type = soil_fractions.iloc[soil_fractions["Count"].argmax()]

In [None]:
static_driver.ds["soil_type"] = (
    ("y", "x"),
    np.full(
        (static_driver.ds.y.size, static_driver.ds.x.size),
        np.int8(soil_type.name),
    ),
)
static_driver.ds["soil_type"] = static_driver.ds.soil_type.where(
    static_driver.ds.surface_fraction[0, :, :] == 1, other=np.int8(-127)
)
static_driver.ds["soil_type"].attrs["lod"] = 1
static_driver.ds["soil_type"].attrs["long_name"] = "soil type classification"
static_driver.ds["soil_type"].attrs["units"] = "1"

## Initial states for soil temperature and soil moisture

Initial states will be derived from a 10-year monthly average (March 2013-2022) by hour of day (00Z, PALM simulation initialization time). The area of interest is the same region over Central Europe as used when computing the surface fractions (bounding box lat=45,55;lon=0,20), the sub-region was extracted when data was downloaded from the CDS.


## Initial soil temperature and moisture


Reading ERA5 grib file with cfgrib engine needs a bit of a workaround, as data from separate levels cannot be read in at the same time.


In [None]:
soil_t_m = xr.Dataset()
era5_surf = xr.open_dataset(
    config.path.data.raw + "era5/era5_march_2013-2022_surf_diurnal.nc"
)
for var in ["stl1", "stl2", "stl3", "stl4", "swvl1", "swvl2", "swvl3", "swvl4"]:
    temp = xr.open_dataset(
        config.path.data.raw + "era5-land/era5-land_march_2013-2022_soil_00Z.grib",
        engine="cfgrib",
        filter_by_keys={"shortName": var},
    )
    temp = temp.rename({"depthBelowLandLayer": f"depthBelowLandLayer_{var}"})
    soil_t_m = xr.merge([soil_t_m, temp])
soil_t_m = xr.merge([soil_t_m, era5_surf["z"].mean(dim="time").interp_like(soil_t_m)])
soil_t_m = soil_t_m.where(soil_t_m["z"] / 9.80665 < 512.0)

Assing the soil layer into proper coordinates instead of separate values and then interpolate to desired 8-layer configuration. For soil moisture, we use the 0.25 quantile value instead of mean value to reflect the cloud-free case we are simulating.


In [None]:
soil_t_m = soil_t_m.assign_coords(zsoil=[0.035, 0.175, 0.64, 1.945])
soil_t_m["init_soil_t"] = xr.concat(
    [soil_t_m.stl1, soil_t_m.stl2, soil_t_m.stl3, soil_t_m.stl4], dim="zsoil1"
)
soil_t_m["init_soil_m"] = xr.concat(
    [soil_t_m.swvl1, soil_t_m.swvl2, soil_t_m.swvl3, soil_t_m.swvl4],
    dim="zsoil1",
)
soil_t_m = soil_t_m.interp(zsoil1=dynamic_driver.ds.zsoil)
soil_t_m["init_soil_m"] = soil_t_m["init_soil_m"].quantile(
    0.25, dim=("time", "latitude", "longitude")
)
soil_t_m = soil_t_m.mean(dim=("time", "latitude", "longitude"))
soil_t_m

In [None]:
dynamic_driver.ds["init_soil_t"] = (
    ("zsoil",),
    soil_t_m.init_soil_t.data,
)
dynamic_driver.ds["init_soil_m"] = (
    ("zsoil",),
    soil_t_m.init_soil_m.data,
)
dynamic_driver.ds["init_soil_t"].attrs["lod"] = 1
dynamic_driver.ds["init_soil_t"].attrs["units"] = "K"
dynamic_driver.ds["init_soil_t"].attrs["long_name"] = "initial soil temperature"
dynamic_driver.ds["init_soil_m"].attrs["lod"] = 1
dynamic_driver.ds["init_soil_m"].attrs["units"] = "m**3 m**-3"
dynamic_driver.ds["init_soil_m"].attrs["long_name"] = "initial soil volumetric moisture"

## Atmospheric initial state

The atmospheric initial state is derived similarly to the surface initial state. A 10-year monthly average (March 2013-2022) by hour of day (00Z, PALM simulation initialization time) is used. As the ERA5 data is from pressure levels, the vertical coordinate is first transformed to Cartesian. After this, the initial 1D profiles are interpolated from the data.


In [None]:
era5_atm = xr.open_dataset(
    config.path.data.raw + "era5/era5_march_2013-2022_pres_00Z.nc"
)
era5_surf = xr.open_dataset(
    config.path.data.raw + "era5/era5_march_2013-2022_surf_diurnal.nc"
)
g = 9.80665
era5_surf["z"] = era5_surf["z"].where(era5_surf["z"] > 0.0, other=0.0)
era5_atm["phi"] = era5_atm["z"].copy()  # true geopotential
era5_atm["z"] = (
    era5_atm.z - era5_surf["z"].isel(time=0)
) / g  # geopotential height from orography top
era5_atm["pt"] = era5_atm.t * (1000.0 / era5_atm.level) ** 0.286
# era5_atm = era5_atm.mean(dim=("longitude", "latitude", "time"))
era5_atm = era5_atm.reindex(level=list(reversed(era5_atm.level)))

Mask out terrain over 512 m.


In [None]:
era5_atm = era5_atm.where(era5_surf.z.isel(time=0) / 9.81 < 512.0)

First, compute geostrophic wind, as it is easier in the pressure coordinates.


In [None]:
omega = 7.2921e-5
a = 6371e3
era5_atm["latitude"] = np.deg2rad(era5_atm["latitude"])
era5_atm["longitude"] = np.deg2rad(era5_atm["longitude"])
era5_atm["f"] = 2.0 * omega * np.sin(era5_atm["latitude"])
era5_atm["u_g"] = (
    -1 / era5_atm["f"] * 1.0 / a * era5_atm["phi"].differentiate(coord="latitude")
)
era5_atm["v_g"] = (
    1
    / era5_atm["f"]
    * 1.0
    / (a * np.cos(era5_atm["latitude"]))
    * era5_atm["phi"].differentiate(coord="longitude")
)

In [None]:
era5_atm = era5_atm.assign_coords(height=dynamic_driver.ds.z.data)


# Slightly awkward loop
def interp_column(ds: xr.Dataset, source: str, target: str):
    ds[target] = (
        ("time", "longitude", "latitude", "height"),
        np.full(
            (ds.time.size, ds.longitude.size, ds.latitude.size, ds.height.size),
            np.nan,
        ),
    )
    for it, iy, ix in np.ndindex((ds.time.size, ds.longitude.size, ds.latitude.size)):
        var = ds[source].isel(time=it, longitude=iy, latitude=ix)
        z = ds["z"].isel(time=it, longitude=iy, latitude=ix)
        if any(np.isnan(z)) or any(np.isnan(var.data)):
            continue
        cs_var = CubicSpline(z, var.data)
        ds[target][it, iy, ix] = cs_var(ds.height)


interp_column(era5_atm, "u", "u_z")
interp_column(era5_atm, "v", "v_z")
interp_column(era5_atm, "u_g", "u_g_z")
interp_column(era5_atm, "v_g", "v_g_z")
interp_column(era5_atm, "pt", "pt_z")
interp_column(era5_atm, "q", "q_z")
era5_atm

In [None]:
era5_atm["v_z"].mean(dim=("time", "longitude", "latitude")).plot.line()

Set wind in free atmosphere (FA) wind to a constant value in the sponge layer ($z>1536~\mathrm{m}$). By this, any thermal wind in the FA is effectively ignored and the geostrophic wind speed is that of the reference height.


In [None]:
damping_height = job.p3d["initialization_parameters"]["rayleigh_damping_height"]
era5_atm["u_z"] = era5_atm["u_z"].where(
    era5_atm["height"] < damping_height,
    other=era5_atm["u_z"].sel(height=damping_height, method="nearest"),
)
era5_atm["v_z"] = era5_atm["v_z"].where(
    era5_atm["height"] < damping_height,
    other=era5_atm["v_z"].sel(height=damping_height, method="nearest"),
)
era5_atm["u_g_z"] = era5_atm["u_g_z"].where(
    era5_atm["height"] < damping_height,
    other=era5_atm["u_g_z"].sel(height=damping_height, method="nearest"),
)
era5_atm["v_g_z"] = era5_atm["u_g_z"].where(
    era5_atm["height"] < damping_height,
    other=era5_atm["u_g_z"].sel(height=damping_height, method="nearest"),
)

Instead of directly assigning the wind components, we want to rotate them such that the average volume flow is appoximately x-directional. Average horizontal wind velocity at height levels needs to be computed before spatiotemporal averaging. Thus, we first need to compute the mean volume flow direction for every column. After this, we compute the average wind spiral, i.e. the angle to volume flow (x-axis) by height. We later use this spiral to rotate the mean geostrophic wind speed along the column to obtain final wind vectors at height levels.


In [None]:
era5_atm["wspeed"] = np.sqrt(era5_atm.u_z**2 + era5_atm.v_z**2).mean(
    dim=("time", "longitude", "latitude")
)
era5_atm["wspeed_g"] = np.sqrt(era5_atm.u_g_z**2 + era5_atm.v_g_z**2).mean(
    dim=("time", "longitude", "latitude")
)

# volume flow direction
era5_atm["vflow_dir"] = np.arctan2(
    era5_atm.v_z.where(era5_atm["height"] < damping_height).mean(dim="height"),
    era5_atm.u_z.where(era5_atm["height"] < damping_height).mean(dim="height"),
)

era5_atm["u_rot"] = era5_atm.u_z * np.cos(-era5_atm.vflow_dir) - era5_atm.v_z * np.sin(
    -era5_atm.vflow_dir
)
era5_atm["v_rot"] = era5_atm.u_z * np.sin(-era5_atm.vflow_dir) + era5_atm.v_z * np.cos(
    -era5_atm.vflow_dir
)

# wind direction at height levels with respect to mean volume flow
era5_atm["vflow_dir_rot"] = np.arctan2(era5_atm.v_rot, era5_atm.u_rot).mean(
    dim=["time", "longitude", "latitude"]
)

Finally, rotate the mean wind speed along the spiral and store.


In [None]:
dynamic_driver.ds["ls_forcing_ug"] = (
    ("time", "z"),
    np.zeros((dynamic_driver.ds.time.size, dynamic_driver.ds.z.size), dtype=np.float32),
)
dynamic_driver.ds["ls_forcing_vg"] = (
    ("time", "z"),
    np.zeros((dynamic_driver.ds.time.size, dynamic_driver.ds.z.size), dtype=np.float32),
)
dynamic_driver.ds["ls_forcing_ug"][:] = (
    era5_atm.wspeed_g * np.cos(-era5_atm.vflow_dir.mean())
).data
dynamic_driver.ds["ls_forcing_vg"][:] = (
    era5_atm.wspeed_g * np.sin(-era5_atm.vflow_dir.mean())
).data

dynamic_driver.ds["init_atmosphere_u"] = dynamic_driver.ds["ls_forcing_ug"].isel(time=0)
dynamic_driver.ds["init_atmosphere_v"] = dynamic_driver.ds["ls_forcing_vg"].isel(time=0)
dynamic_driver.ds["init_atmosphere_pt"] = (
    ("z",),
    era5_atm.pt_z.mean(dim=("time", "longitude", "latitude")).data,
)
dynamic_driver.ds["init_atmosphere_qv"] = (
    ("z",),
    era5_atm.q_z.mean(dim=("time", "longitude", "latitude")).data,
)

In [None]:
# dynamic_driver.ds["init_atmosphere_u"] = dynamic_driver.ds.init_atmosphere_u[-1].data
# dynamic_driver.ds["init_atmosphere_v"] = dynamic_driver.ds.init_atmosphere_v[-1].data
# dynamic_driver.ds["init_atmosphere_u"] = dynamic_driver.ds["init_atmosphere_u"].where(
#   dynamic_driver.ds.z > 500.0,
#    dynamic_driver.ds["init_atmosphere_u"] * dynamic_driver.ds.z / 500.0,
# )
# dynamic_driver.ds["init_atmosphere_v"] = dynamic_driver.ds["init_atmosphere_v"].where(
#    dynamic_driver.ds.z > 500.0,
#    dynamic_driver.ds["init_atmosphere_v"] * dynamic_driver.ds.z / 500.0,
# )

In [None]:
print(
    f"Mean profile: {dynamic_driver.ds.init_atmosphere_u.mean():.3f} v={dynamic_driver.ds.init_atmosphere_v.mean():.3f}"
)
print(
    f"Model top BC: {dynamic_driver.ds.init_atmosphere_u[-1]:.3f} v={dynamic_driver.ds.init_atmosphere_v[-1]:.3f}"
)
print(
    f"Rotation angle at top: {np.rad2deg(np.arctan2(-dynamic_driver.ds.init_atmosphere_v[-1],dynamic_driver.ds.init_atmosphere_u[-1])):.1f} deg"
)

To expedite the boundary layer development, neutralise (mix) the initial temperature profile below 768 m.


In [None]:
# dynamic_driver.ds["init_atmosphere_pt"] = dynamic_driver.ds["init_atmosphere_pt"].where(
#    dynamic_driver.ds.z > 500.0,
#    dynamic_driver.ds["init_atmosphere_pt"].sel(z=500.0, method="nearest"),
# )
# dynamic_driver.ds["init_atmosphere_pt"] = dynamic_driver.ds["init_atmosphere_pt"].where(
#    dynamic_driver.ds.z > 1500.0,
#    dynamic_driver.ds["init_atmosphere_pt"].sel(z=1500.0, method="nearest"),
# )

In [None]:
# pt_bl_mean = dynamic_driver.ds["init_atmosphere_pt"].where(dynamic_driver.ds.z <= 512.0).mean()
# dynamic_driver.ds["init_atmosphere_pt"] = dynamic_driver.ds["init_atmosphere_pt"].where(
#    dynamic_driver.ds.z > 512.0,
#    pt_bl_mean,
# )

To limit BLH growth, decrease lapse rate to 4 K km^-1 above reference height.


In [None]:
h_ref = 768.0
dynamic_driver.ds["init_atmosphere_pt"] = dynamic_driver.ds["init_atmosphere_pt"].where(
    dynamic_driver.ds["z"] < h_ref,
    other=dynamic_driver.ds["init_atmosphere_pt"].sel(z=h_ref, method="nearest")
    + (-4 + 9.8) * (dynamic_driver.ds["z"] - h_ref) / 1000.0,
)

We'll set the initial humidity in the low levels to a value above the boundary layer to simulate relatively dry boundary layer.


In [None]:
# dynamic_driver.ds["init_atmosphere_qv"] = dynamic_driver.ds["init_atmosphere_qv"].where(
#    dynamic_driver.ds.z > 1500.0,
#    dynamic_driver.ds["init_atmosphere_qv"].sel(z=1500.0, method="nearest"),
# )

In [None]:
dynamic_driver.ds["init_atmosphere_qv"][:] = dynamic_driver.ds[
    "init_atmosphere_qv"
].isel(z=-1)

## Radiative forcing

For radiative forcing, i.e. the incoming shortwave and longwave radiation, we use the seme principle as before. However, we do not select a single hour of day, but rather construct a representative diurnal from 00Z to 00Z the next day. This is not as straightforward, ad the hour angle depends on longitude, hence the data cannot be directly averaged over the AOI. Instead, we first transform the temporal dimension into hour angle $H=2\pi\left(\frac{t_{UTC}}{86400~\mathrm{s}}\right) + \phi - \pi$, where $\phi$ is the longitude (in radians). As the hour angle as a function of solar time is simply $H=2\pi\left(\frac{t_{solar}}{86400~\mathrm{s}}\right)- \pi$, solving for $t_{solar}$ yields $t_{solar}=\frac{\pi t_{UTC} + 43200 \phi}{\pi}$. As the reference longitude in our simulation is 0, $t_{local}=t_{solar}$ for us.


In [None]:
dynamic_driver = set_radiation_to_dynamic(
    era5_surf,
    dynamic_driver,
    time_offset=3 * 3600,
)

### Surface pressure

For surface pressure, we use constant reference pressure 1000 hPa.


In [None]:
dynamic_driver = set_surface_pressure_to_dynamic(dynamic_driver, p0=1e5)

## Write job files to storage


In [None]:
job.p3d["runtime_parameters"]["end_time"] = job.p3d["runtime_parameters"][
    "skip_time_do2d_yz"
]
job.p3dr["initialization_parameters"]["initializing_actions"] = "read_restart_data"

In [None]:
dynamic_driver.ds["init_atmosphere_w"] = (
    ("zw",),
    np.full(dynamic_driver.ds.zw.size, 0.0),
)

In [None]:
for var in dynamic_driver.ds.variables:
    dynamic_driver.ds[var].attrs["lod"] = 1

In [None]:
job.register_driver("static", static_driver)
job.register_driver("dynamic", dynamic_driver)

In [None]:
job.write()