# Global mass conservation on pressure level ERA5

This notebook derives the global mass conservation scheme using ERA5.

In [3]:
import numpy as np
import xarray as xr

In [4]:
import matplotlib.pyplot as plt
%matplotlib inline

ERA5 pressure level data:

* `/glade/derecho/scratch/ksha/CREDIT_data/ERA5_plevel_base/upper_air/*.zarr`
* `/glade/derecho/scratch/ksha/CREDIT_data/ERA5_plevel_base/surf/*.zarr`
* `/glade/derecho/scratch/ksha/CREDIT_data/ERA5_plevel_base/accum/*.zarr`

In [5]:
base_dir = '/glade/derecho/scratch/ksha/CREDIT_data/ERA5_plevel_1deg/'
filename = base_dir + 'all_in_one/ERA5_plevel_1deg_6h_1993_bilinear.zarr'

ds_surf = xr.open_zarr(filename)
ds_accum = xr.open_zarr(filename)
ds_upper = xr.open_zarr(filename)
ds_static = xr.open_zarr(base_dir + 'static/ERA5_plevel_1deg_6h_static.zarr')

In [6]:
t0 = 100
t1 = 101

In [7]:
GRAVITY = 9.80665
R = 6371000  # m
RHO_WATER = 1000.0 # kg/m^3

In [8]:
x = ds_surf['longitude']
y = ds_surf['latitude']
lon, lat = np.meshgrid(x, y)
level_p = 100*np.array(ds_upper['level'])

In [9]:
level_p # Pa or kg/m/s2

array([   100.,    200.,    300.,    500.,    700.,   1000.,   2000.,
         3000.,   5000.,   7000.,  10000.,  12500.,  15000.,  17500.,
        20000.,  22500.,  25000.,  30000.,  35000.,  40000.,  45000.,
        50000.,  55000.,  60000.,  65000.,  70000.,  75000.,  77500.,
        80000.,  82500.,  85000.,  87500.,  90000.,  92500.,  95000.,
        97500., 100000.])

In [10]:
# level_diff = np.diff(level_p)
# level_diff_cumsum = np.concatenate(([0], np.cumsum(level_diff)))

In [11]:
q = np.array(ds_upper['Q'].isel(time=slice(t0, t1+1))) # kg/kg
u = np.array(ds_upper['U'].isel(time=slice(t0, t1+1))) # m/s
v = np.array(ds_upper['V'].isel(time=slice(t0, t1+1)))

In [12]:
precip = np.array(ds_accum['total_precipitation'].isel(time=slice(t0, t1+1)))
evapor = np.array(ds_accum['evaporation'].isel(time=slice(t0, t1+1)))

In [13]:
# def geometric_mean(data):
#     return np.exp(np.mean(np.log(data)))

# def weighted_mean(data, weights, axis, keepdims=False):
    
#     expanded_weights = np.broadcast_to(weights, data.shape)
#     weighted_sum = np.sum(data * expanded_weights, axis=axis, keepdims=keepdims)
#     weights_sum = np.sum(expanded_weights, axis=axis, keepdims=keepdims)
#     return weighted_sum / weights_sum

def weighted_sum(data, weights, axis, keepdims=False):
    '''
    Compute the weighted sum of a given quantity

    Args:
        data: the quantity to be sum-ed
        weights: weights that can be broadcasted to the shape of data
        axis: dims to compute the sum
        keepdims: keepdims

    Returns:
        weighted sum
    '''
    expanded_weights = np.broadcast_to(weights, data.shape)
    return np.sum(data * expanded_weights, axis=axis, keepdims=keepdims)

def pressure_integral(q, level_p, output_shape):
    '''
    Compute the pressure level integral of a given quantity using np.trapz

    Args:
        q: the quantity with dims of (level, lat, lon) or (time, level, lat, lon)
        level_p: the pressure level of q as [Pa] and with dims of (level,)
        output_shape: either (lat, lon) or (time, lat, lon)

    Returns:
        Pressure level integrals of q
    '''
    # (level, lat, lon) --> (lat, lon)
    if len(output_shape) == 2:
        Q = np.empty(output_shape)
        for ix in range(output_shape[0]):
            for iy in range(output_shape[1]):
                Q[ix, iy] = np.trapz(q[:, ix, iy], level_p)
                
    # (time, level, lat, lon) --> (time, lat, lon)
    elif len(output_shape) == 3:
        Q = np.empty(output_shape)
        for i_time in range(output_shape[0]):
            for ix in range(output_shape[1]):
                for iy in range(output_shape[2]):
                    Q[i_time, ix, iy] = np.trapz(q[i_time, :, ix, iy], level_p)
                    
    else:
        print('wrong output_shape')
        raise
        
    return Q

## Grid area computation

Reference:
* [ERA5 grid geometry](https://confluence.ecmwf.int/display/CKB/ERA5%3A+What+is+the+spatial+reference)

\begin{equation}
A = R^2 \cdot \Delta\left(\sin \phi \right) \cdot \Delta \lambda
\end{equation}

In [14]:
GRAVITY = 9.80665
RHO_WATER = 1000.0 # kg/m^3
RAD_EARTH = 6371000 # m
LH_WATER = 2.26e6  # J/kg
CP_DRY = 1005 # J/kg K
CP_VAPOR = 1846 # J/kg K

def grid_area(lat, lon):
    '''
    Compute grid cell areas using the exact formula for spherical quadrilaterals.

    Args:
        lat, lon: 2D arrays of latitude and longitude in degrees.

    Return:
        area: 2D array of grid cell areas in square meters.
    '''
    # Convert latitude and longitude to radians
    lat_rad = np.deg2rad(lat)
    lon_rad = np.deg2rad(lon)
    
    # Compute sine of latitude
    sin_lat_rad = np.sin(lat_rad)
    
    # Compute gradient of sine of latitude (d_phi)
    d_phi = np.gradient(sin_lat_rad, axis=0, edge_order=2)
    
    # Compute gradient of longitude (d_lambda)
    d_lambda = np.gradient(lon_rad, axis=1, edge_order=2)
    
    # Adjust d_lambda to be within -π and π
    d_lambda = (d_lambda + np.pi) % (2 * np.pi) - np.pi
    
    # Compute grid cell area
    area = np.abs(RAD_EARTH**2 * d_phi * d_lambda)
    
    return area

area = grid_area(lat, lon)
w_lat = area #/ np.sum(area)

## Negative humidity fixes

**Plan A**

Given $q_{min}=10^{-12}$ as the lowest possible specific humidity in an atmospheric column. Negative humidity at pressure layer $p_k$ will be corrected to $q_{min}$

\begin{equation}
q_{fill}\left(p_k\right) = q_{min}
\end{equation}

The humidity at layer $k-1$ will be adjusted to conserve the mass of water vapor

\begin{equation}
\Delta q\left(p_{k-1}\right) = \left(q_{min} - q_k\right)\frac{\Delta p_k}{\Delta p_{k-1}}
\end{equation}

\begin{equation}
q_{fill}\left(p_{k-1}\right) = q\left(p_{k-1}\right) - \Delta q\left(p_{k-1}\right)
\end{equation}

If $k=0$ or $q_{fill}\left(p_{k-1}\right)$ is negative, i.e., not enough mass above $p_k$ to be relocated, no adjustments will be made for $q\left(p_{k-1}\right)$.

**Plan B**

Fix the layer without any adjustments on other layers:

\begin{equation}
q_{fill}\left(p_k\right) = q_{min}
\end{equation}

The conservation of total dry air mass will fix the mass imbalance.

## Conservation of total dry air mass

Equation on a single air column (flux form equation with unit of kg/m^2/s):

\begin{equation}
\mathbf{\nabla} \cdot \frac{1}{g} \int_{p_0}^{p_1}{\left[\left(1-q\right)\mathbf{v}\right]}dp + \frac{1}{g}\frac{\partial}{\partial t}\int_{p_0}^{p_1}{\left(1-q\right)}dp = 0
\end{equation}

For global sum, the first term (the divergence of vertically integrated dry air mass flux) is zero. So the second term (the time tendency of vertically integrated dry air mass per area) is also zero. 

From the second term = 0, the global sum of dry air mass $\overline{m_d}$ stays unchanged (kg) :

\begin{equation}
m_d = \sum{\frac{1}{g}\int_{p_0}^{p_1}{\left(1-q\right)}dp}
\end{equation}

\begin{equation}
m_d\left(\mathrm{y_input}\right) - m_d\left(\mathrm{y_pred}\right) = 0
\end{equation}

For any residuals of this conservation, we apply multiplicative correction to specific humidty to close the budget:

\begin{equation}
q^*\left(\mathrm{y_pred}\right) = 1 - \left[1 - q\left(\mathrm{y_pred}\right)\right]*\frac{m_d\left(\mathrm{y_input}\right)}{m_d\left(\mathrm{y_pred}\right)}
\end{equation}

In [28]:
# # toy model 1 
# a = np.array([[0.1, 0.2, 0.3, 0.4, 0.5], [0.15, 0.25, 0.35, 0.45, 0.55]])
# w = np.array([150, 100])
# y_level = np.array([100, 200, 300, 700, 1000])

# amount = np.trapz(1-a, y_level)
# amount_weighted_sum = np.sum(amount*w)

# correct_sum = 151750

# ratio = correct_sum / amount_weighted_sum
# a_correct = 1 - (1 - a) * ratio
# amount_fix = np.trapz(1-a_correct, y_level)
# amount_sum_fix = np.sum(amount_fix*w)

# error = correct_sum - amount_sum_fix

In [40]:
# # toy model 2
# a = np.array([
#     [0.1, 0.2, 0.3, 0.4, 0.5],
#     [0.15, 0.25, 0.35, 0.45, 0.55]
# ])
# w = np.array([150, 100])
# y_level = np.array([100, 200, 300, 700, 1000])
# correct_sum = 151750

# # Amount from the first two columns (fixed)
# amount_fixed = np.trapz(1 - a[:, :2], y_level[:2], axis=-1)
# amount_fixed_weighted_sum = np.sum(amount_fixed * w)

# # Amount from the last three columns (to be adjusted)
# amount_adjust = np.trapz(1 - a[:, 2:], y_level[2:], axis=-1)
# amount_adjust_weighted_sum = np.sum(amount_adjust * w)

# # Compute the needed amount from the adjustable columns
# needed_amount_adjust = correct_sum - amount_fixed_weighted_sum

# # Compute the ratio
# ratio = needed_amount_adjust / amount_adjust_weighted_sum

# # Apply the correction
# a_correct = a.copy()
# a_correct[:, 2:] = 1 - (1 - a[:, 2:]) * ratio

# # Recalculate the corrected amounts
# amount_fix = np.trapz(1 - a_correct[:, 2:], y_level[2:], axis=-1)
# amount_sum_fix = amount_fixed_weighted_sum + np.sum(amount_fix * w)

# # Compute the error
# error = correct_sum - amount_sum_fix

# print("Corrected a:\n", a_correct)
# print("Error:", error)

In [24]:
q_correct = np.copy(q) # <-- corrected y_pred on q
output_shape = (2,)+lon.shape

correction_cycle_num = 1 # iterative to handle numrical precision

ind_fix = 25 # fix starts from this layer

for i in range(correction_cycle_num):

    ratio_dry_air = 1-q_correct

    ratio_dry_air_hold = ratio_dry_air[:, :ind_fix, ...]
    level_p_hold = level_p[:ind_fix]
    
    ratio_dry_air_fix = ratio_dry_air[:, (ind_fix-1):, ...]
    level_p_fix = level_p[(ind_fix-1):]
    
    mass_dry_per_area_hold = pressure_integral(ratio_dry_air_hold, level_p_hold, output_shape) / GRAVITY
    mass_dry_sum_hold = weighted_sum(mass_dry_per_area_hold, w_lat, axis=(1, 2), keepdims=False)

    mass_dry_per_area_fix = pressure_integral(ratio_dry_air_fix, level_p_fix, output_shape) / GRAVITY
    mass_dry_sum_fix = weighted_sum(mass_dry_per_area_fix, w_lat, axis=(1, 2), keepdims=False)
    
    # ----------------------------------------------------------------------- #
    # check residual term
    mass_dry_res = (mass_dry_sum_hold + mass_dry_sum_fix)[0] - (mass_dry_sum_hold + mass_dry_sum_fix)[1]
    print('Residual to conserve the dry air mass [kg]: {}'.format(mass_dry_res))
    print('Ratio to the total amount of air [kg/kg]: {}'.format(mass_dry_res/5.148e18))
    # ----------------------------------------------------------------------- #
    
    # correction
    print('Correction iter {}'.format(i))
    # get correction ratio
    q_correct_ratio = ((mass_dry_sum_hold + mass_dry_sum_fix)[0] - mass_dry_sum_hold[1]) / mass_dry_sum_fix[1]
    q_correct[1, ind_fix:, ...] = 1 - (1 - q_correct[1, ind_fix:, ...]) * q_correct_ratio

# final checks
mass_dry_per_area = pressure_integral(1-q_correct, level_p, output_shape) / GRAVITY
mass_dry_sum = weighted_sum(mass_dry_per_area, w_lat, axis=(1, 2), keepdims=False)

# ----------------------------------------------------------------------- #
# check residual term
mass_dry_res = mass_dry_sum[0] - mass_dry_sum[1]
print('Residual to conserve the dry air mass [kg]: {}'.format(mass_dry_res))
print('Ratio to the total amount of air [kg/kg]: {}'.format(mass_dry_res/5.148e18))

Residual to conserve the dry air mass [kg]: -3595688023040.0
Ratio to the total amount of air [kg/kg]: -6.984630969386169e-07
Correction iter 0
Residual to conserve the dry air mass [kg]: -275043307520.0
Ratio to the total amount of air [kg/kg]: -5.342721591297591e-08


In [21]:
q_correct = np.copy(q) # <-- corrected y_pred on q
output_shape = (2,)+lon.shape

correction_cycle_num = 1 # iterative to handle numrical precision

for i in range(correction_cycle_num):
    
    mass_dry_per_area = pressure_integral(1-q_correct, level_p, output_shape) / GRAVITY # kg/m^2
    mass_dry_sum = weighted_sum(mass_dry_per_area, w_lat, axis=(1, 2), keepdims=False) # kg
    
    # ----------------------------------------------------------------------- #
    # check residual term
    mass_dry_res = mass_dry_sum[0] - mass_dry_sum[1]
    print('Residual to conserve the dry air mass [kg]: {}'.format(mass_dry_res))
    print('Ratio to the total amount of air [kg/kg]: {}'.format(mass_dry_res/5.148e18))
    # ----------------------------------------------------------------------- #
    # correction
    print('Correction iter {}'.format(i))
    # get correction ratio
    q_correct_ratio = mass_dry_sum[0] / mass_dry_sum[1] # no p level weighting
    q_correct[1, ...] = 1 - (1 - q_correct[1, ...]) * q_correct_ratio

# final checks
mass_dry_per_area = pressure_integral(1-q_correct, level_p, output_shape) / GRAVITY
mass_dry_sum = weighted_sum(mass_dry_per_area, w_lat, axis=(1, 2), keepdims=False)

# ----------------------------------------------------------------------- #
# check residual term
mass_dry_res = mass_dry_sum[0] - mass_dry_sum[1]
print('Residual to conserve the dry air mass [kg]: {}'.format(mass_dry_res))
print('Ratio to the total amount of air [kg/kg]: {}'.format(mass_dry_res/5.148e18))

Residual to conserve the dry air mass [kg]: -3595688023040.0
Ratio to the total amount of air [kg/kg]: -6.984630969386169e-07
Correction iter 0
Residual to conserve the dry air mass [kg]: 120504224768.0
Ratio to the total amount of air [kg/kg]: 2.340796906915307e-08


## Moisture budget

Equation on a single air column (flux form equation with unit of kg/m^2/s):

\begin{equation}
\mathbf{\nabla} \cdot \frac{1}{g} \int_{0}^{p_s}{\left(\mathbf{v}q\right)}dp = -\frac{1}{g}\frac{\partial}{\partial t}\int_{0}^{p_s}{q}dp - E - P
\end{equation}

For global sum, the first term (the divergence of integrated moisture flux) is zero. So the second term (the time tendency of total column water $Q$) is balanced by evaporation $E$ and precipitation $P$ (kg/s):

\begin{equation}
\overline{\left(\frac{\partial Q}{\partial t}\right)} =  - \overline{E} - \overline{P}
\end{equation}

For any residuals of this conservation, we use precipitation to close the budge:

\begin{equation}
\overline{P}^* = \overline{\left[\frac{Q\left(\mathrm{y_pred}\right) - Q\left(\mathrm{y_input}\right)}{\mathrm{second}}\right]} - \overline{E}
\end{equation}

\begin{equation}
P^* = P * \frac{\overline{P}^*}{\overline{P}}
\end{equation}

In [14]:
# q_correct = np.copy(q)

In [15]:
N_seconds = 3600 * 6 # 6 hourly data
output_shape = (2,)+lon.shape

precip_flux = precip[1, ...] * RHO_WATER / N_seconds # m/hour --> kg/m^2/s, positive
evapor_flux = evapor[1, ...] * RHO_WATER / N_seconds # kg/m^2/s, negative

precip_correct = np.copy(precip_flux) # <-- corrected y_pred on precip

correction_cycle_num = 1

# pre-compute TWC
TWC = pressure_integral(q_correct, level_p, output_shape) / GRAVITY # kg/m^2
dTWC_dt = (TWC[1, ...] - TWC[0, ...]) / N_seconds # kg/m^2/s
TWC_sum = weighted_sum(dTWC_dt, w_lat, axis=(0, 1), keepdims=False) # kg/s

# pre-compute evaporation
E_sum = weighted_sum(evapor_flux, w_lat, axis=(0, 1), keepdims=False) # kg/s

for i in range(correction_cycle_num):
    
    P_sum = weighted_sum(precip_correct, w_lat, axis=(0, 1), keepdims=False) # kg/s
    residual = -TWC_sum - E_sum - P_sum # kg/s
    print('Residual to conserve moisture budge [kg/s]: {}'.format(residual))
    P_correct = P_sum + residual # kg/s
    P_correct_ratio = (P_sum + residual) / P_sum
    print('correction ratio: {}'.format(P_correct_ratio))
    precip_correct = precip_correct * P_correct_ratio

# final checks
P_sum = weighted_sum(precip_correct, w_lat, axis=(0, 1), keepdims=False)
residual = -TWC_sum - E_sum - P_sum
print('Residual to conserve moisture budge [kg/s]: {}'.format(residual))

Residual to conserve moisture budge [kg/s]: -256320051.6473465
correction ratio: 0.9829969112402189
Residual to conserve moisture budge [kg/s]: -442.4025478363037
