Skip to content

Commit

Permalink
Place ensemble member number determination for blending inside foreca…
Browse files Browse the repository at this point in the history
…st loop to prevent out of memory issues (#273)

Determine which member is blended with which NWP member per time step instead of at once to reduce memory usage and requirements.
  • Loading branch information
RubenImhoff committed Mar 18, 2022
1 parent b44c945 commit 8936236
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 102 deletions.
191 changes: 91 additions & 100 deletions pysteps/blending/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
method to decompose and store the NWP model fields whenever a new NWP model
field is present, is present in pysteps.blending.utils.decompose_NWP.
#. Estimate AR parameters for the extrapolation nowcast and noise cascade.
#. Before starting the forecast loop, determine which NWP models will be
combined with which nowcast ensemble members. The number of output ensemble
members equals the maximum number of (ensemble) members in the input, which
can be either the defined number of (nowcast) ensemble members or the number
of NWP models/members.
#. Initialize all the random generators.
#. Calculate the initial skill of the NWP model forecasts at t=0.
#. Start the forecasting loop:
#. Determine which NWP models will be combined with which nowcast ensemble
member. The number of output ensemble members equals the maximum number
of (ensemble) members in the input, which can be either the defined
number of (nowcast) ensemble members or the number of NWP models/members.
#. Determine the skill and weights of the forecasting components
(extrapolation, NWP and noise) for that lead time.
#. Regress the extrapolation and noise cascades separately to the subsequent
Expand Down Expand Up @@ -149,7 +148,9 @@ def forecast(
Datetime object containing the date and time for which the forecast
is issued.
n_ens_members: int
The number of ensemble members to generate.
The number of ensemble members to generate. This number should always be
equal to or larger than the number of NWP ensemble members / number of
NWP models.
n_cascade_levels: int, optional
The number of cascade levels to use. Default set to 8 due to default
climatological skill values on 8 levels.
Expand Down Expand Up @@ -561,34 +562,19 @@ def forecast(
precip_cascade, ar_order, n_cascade_levels, MASK_thr
)

# 5. Before calling the worker for the forecast loop, determine which (NWP)
# models will be combined with which nowcast ensemble members. With the
# way it is implemented at this moment: n_ens_members of the output equals
# the maximum number of (ensemble) members in the input (either the nowcasts or NWP).
(
precip_cascade,
precip_models_cascade,
precip_models_pm,
velocity_models,
mu_models,
sigma_models,
n_ens_members,
n_model_indices,
) = _find_nwp_combination(
precip_cascade,
precip_models_cascade,
precip_models_pm,
velocity_models,
mu_models,
sigma_models,
n_ens_members,
ar_order,
n_cascade_levels,
blend_nwp_members,
)
# 5. Repeat precip_cascade for n ensemble members
# First, discard all except the p-1 last cascades because they are not needed
# for the AR(p) model
precip_cascade = [precip_cascade[i][-ar_order:] for i in range(n_cascade_levels)]

precip_cascade = [
[precip_cascade[j].copy() for j in range(n_cascade_levels)]
for i in range(n_ens_members)
]
precip_cascade = np.stack(precip_cascade)

# Also initialize the cascade of temporally correlated noise, which has the
# same shape as R_c, but starts with value zero.
# same shape as precip_cascade, but starts with value zero.
noise_cascade = np.zeros(precip_cascade.shape)

# 6. Initialize all the random generators and prepare for the forecast loop
Expand Down Expand Up @@ -617,17 +603,7 @@ def forecast(

precip = precip[-1, :, :]

# 7. Calculate the initial skill of the (NWP) model forecasts at t=0
rho_nwp_models = _compute_initial_nwp_skill(
precip_cascade,
precip_models_cascade,
domain_mask,
issuetime,
outdir_path_skill,
clim_kwargs,
)

# Also initizalize the current and previous extrapolation forecast scale
# 7. initizalize the current and previous extrapolation forecast scale
# for the nowcasting component
rho_extr_prev = np.repeat(1.0, PHI.shape[0])
rho_extr = PHI[:, 0] / (1.0 - PHI[:, 1]) # phi1 / (1 - phi2), see BPS2004
Expand Down Expand Up @@ -681,9 +657,43 @@ def forecast(
if measure_time:
starttime = time.time()

# 8.1.1 Determine the skill of the components for lead time (t0 + t)
# First for the extrapolation component. Only calculate it when t > 0.
# 8.1.1 Before calling the worker for the forecast loop, determine which (NWP)
# models will be combined with which nowcast ensemble members. With the
# way it is implemented at this moment: n_ens_members of the output equals
# the maximum number of (ensemble) members in the input (either the nowcasts or NWP).
(
precip_models_cascade_temp,
precip_models_pm_temp,
velocity_models_temp,
mu_models_temp,
sigma_models_temp,
n_model_indices,
) = _find_nwp_combination(
precip_models_cascade[:, t, :, :, :],
precip_models_pm[:, t, :, :],
velocity_models[:, t, :, :, :],
mu_models[:, t, :],
sigma_models[:, t, :],
n_ens_members,
ar_order,
n_cascade_levels,
blend_nwp_members,
)

if t == 0:
# 8.1.2 Calculate the initial skill of the (NWP) model forecasts at t=0
rho_nwp_models = _compute_initial_nwp_skill(
precip_cascade,
precip_models_cascade_temp,
domain_mask,
issuetime,
outdir_path_skill,
clim_kwargs,
)

if t > 0:
# 8.1.3 Determine the skill of the components for lead time (t0 + t)
# First for the extrapolation component. Only calculate it when t > 0.
(
rho_extr,
rho_extr_prev,
Expand Down Expand Up @@ -740,18 +750,22 @@ def worker(j):
# Only the weights of the components without the extrapolation
# cascade will be determined here. The full set of weights are
# determined after the extrapolation step in this method.
if blend_nwp_members and precip_models_cascade.shape[0] > 1:
if blend_nwp_members and precip_models_cascade_temp.shape[0] > 1:
weights_model_only = np.zeros(
(precip_models_cascade.shape[0] + 1, n_cascade_levels)
(precip_models_cascade_temp.shape[0] + 1, n_cascade_levels)
)
for i in range(n_cascade_levels):
# Determine the normalized covariance matrix (containing)
# the cross-correlations between the models
cov = np.corrcoef(
np.stack(
[
precip_models_cascade[n_model, t, i, :, :].flatten()
for n_model in range(precip_models_cascade.shape[0])
precip_models_cascade_temp[
n_model, i, :, :
].flatten()
for n_model in range(
precip_models_cascade_temp.shape[0]
)
]
)
)
Expand Down Expand Up @@ -883,12 +897,12 @@ def worker(j):
V_stack = np.concatenate(
(
velocity_pert[None, :, :, :],
velocity_models[:, t, :, :, :],
velocity_models_temp,
),
axis=0,
)
else:
V_model_ = velocity_models[j, t, :, :, :]
V_model_ = velocity_models_temp[j]
V_stack = np.concatenate(
(velocity_pert[None, :, :, :], V_model_[None, :, :, :]),
axis=0,
Expand Down Expand Up @@ -983,11 +997,11 @@ def worker(j):
# Stack the perturbed extrapolation and the NWP velocities
if blend_nwp_members:
V_stack = np.concatenate(
(velocity_pert[None, :, :, :], velocity_models[:, t, :, :, :]),
(velocity_pert[None, :, :, :], velocity_models_temp),
axis=0,
)
else:
V_model_ = velocity_models[j, t, :, :, :]
V_model_ = velocity_models_temp[j]
V_stack = np.concatenate(
(velocity_pert[None, :, :, :], V_model_[None, :, :, :]), axis=0
)
Expand Down Expand Up @@ -1050,32 +1064,32 @@ def worker(j):
cascades_stacked = np.concatenate(
(
R_f_ep_out[None, t_index],
precip_models_cascade[:, t],
precip_models_cascade_temp,
Yn_ep_out[None, t_index],
),
axis=0,
) # [(extr_field, n_model_fields, noise), n_cascade_levels, ...]
means_stacked = np.concatenate(
(mu_extrapolation[None, :], mu_models[:, t]), axis=0
(mu_extrapolation[None, :], mu_models_temp), axis=0
)
sigmas_stacked = np.concatenate(
(sigma_extrapolation[None, :], sigma_models[:, t]),
(sigma_extrapolation[None, :], sigma_models_temp),
axis=0,
)
else:
cascades_stacked = np.concatenate(
(
R_f_ep_out[None, t_index],
precip_models_cascade[None, j, t],
precip_models_cascade_temp[None, j],
Yn_ep_out[None, t_index],
),
axis=0,
) # [(extr_field, n_model_fields, noise), n_cascade_levels, ...]
means_stacked = np.concatenate(
(mu_extrapolation[None, :], mu_models[None, j, t]), axis=0
(mu_extrapolation[None, :], mu_models_temp[None, j]), axis=0
)
sigmas_stacked = np.concatenate(
(sigma_extrapolation[None, :], sigma_models[None, j, t]),
(sigma_extrapolation[None, :], sigma_models_temp[None, j]),
axis=0,
)

Expand Down Expand Up @@ -1173,15 +1187,15 @@ def worker(j):
R_pm_stacked = np.concatenate(
(
R_pm_ep[None, t_index],
precip_models_pm[:, t],
precip_models_pm_temp,
),
axis=0,
)
else:
R_pm_stacked = np.concatenate(
(
R_pm_ep[None, t_index],
precip_models_pm[None, j, t],
precip_models_pm_temp[None, j],
),
axis=0,
)
Expand All @@ -1198,11 +1212,11 @@ def worker(j):
weights_pm_normalized_mod_only.reshape(
weights_pm_normalized_mod_only.shape[0], 1, 1
)
* precip_models_pm[:, t],
* precip_models_pm_temp,
axis=0,
)
else:
R_pm_blended_mod_only = precip_models_pm[j, t]
R_pm_blended_mod_only = precip_models_pm_temp[j]

# The extrapolation components are NaN outside the advected
# radar domain. This results in NaN values in the blended
Expand Down Expand Up @@ -1816,7 +1830,6 @@ def _estimate_ar_parameters_radar(R_c, ar_order, n_cascade_levels, MASK_thr):


def _find_nwp_combination(
R_c,
precip_models,
R_models_pm,
velocity_models,
Expand All @@ -1831,26 +1844,22 @@ def _find_nwp_combination(
With the way it is implemented at this moment: n_ens_members of the output equals
the maximum number of (ensemble) members in the input (either the nowcasts or NWP).
"""
###
# First, discard all except the p-1 last cascades because they are not needed
# for the AR(p) model
R_c = [R_c[i][-ar_order:] for i in range(n_cascade_levels)]
# Make sure the number of model members is not larger than than or equal to
# n_ens_members
n_model_members = precip_models.shape[0]
if n_model_members > n_ens_members:
raise ValueError(
"The number of NWP model members is larger than the given number of ensemble members. n_model_members <= n_ens_members."
)

# Check if NWP models/members should be used individually, or if all of
# them are blended together per nowcast ensemble member.
if blend_nwp_members:
# stack the extrapolation cascades into a list containing all ensemble members
R_c = [
[R_c[j].copy() for j in range(n_cascade_levels)]
for i in range(n_ens_members)
]
R_c = np.stack(R_c)
n_model_indices = None

else:
# Start with determining the maximum and mimimum number of members/models
# in both input products
n_model_members = precip_models.shape[0]
n_ens_members_max = max(n_ens_members, n_model_members)
n_ens_members_min = min(n_ens_members, n_model_members)
# Also make a list of the model index numbers. These indices are needed
Expand All @@ -1871,20 +1880,12 @@ def _find_nwp_combination(
# member 5, etc.), until 10 is reached.
if n_ens_members_min != n_ens_members_max:
if n_model_members == 1:
precip_models = np.repeat(
precip_models[:, :, :, :, :], n_ens_members_max, axis=0
)
mu_models = np.repeat(mu_models[:, :, :], n_ens_members_max, axis=0)
sigma_models = np.repeat(
sigma_models[:, :, :], n_ens_members_max, axis=0
)
velocity_models = np.repeat(
velocity_models[:, :, :, :], n_ens_members_max, axis=0
)
precip_models = np.repeat(precip_models, n_ens_members_max, axis=0)
mu_models = np.repeat(mu_models, n_ens_members_max, axis=0)
sigma_models = np.repeat(sigma_models, n_ens_members_max, axis=0)
velocity_models = np.repeat(velocity_models, n_ens_members_max, axis=0)
# For the prob. matching
R_models_pm = np.repeat(
R_models_pm[:, :, :, :], n_ens_members_max, axis=0
)
R_models_pm = np.repeat(R_models_pm, n_ens_members_max, axis=0)
# Finally, for the model indices
n_model_indices = np.repeat(n_model_indices, n_ens_members_max, axis=0)

Expand All @@ -1903,22 +1904,12 @@ def _find_nwp_combination(
# Finally, for the model indices
n_model_indices = np.repeat(n_model_indices, repeats, axis=0)

R_c = [
[R_c[j].copy() for j in range(n_cascade_levels)]
for i in range(n_ens_members_max)
]
R_c = np.stack(R_c)

n_ens_members = n_ens_members_max

return (
R_c,
precip_models,
R_models_pm,
velocity_models,
mu_models,
sigma_models,
n_ens_members,
n_model_indices,
)

Expand Down Expand Up @@ -2015,7 +2006,7 @@ def _compute_initial_nwp_skill(
rho_nwp_models = [
blending.skill_scores.spatial_correlation(
obs=R_c[0, :, -1, :, :],
mod=precip_models[n_model, 0, :, :, :],
mod=precip_models[n_model, :, :, :],
domain_mask=domain_mask,
)
for n_model in range(precip_models.shape[0])
Expand All @@ -2024,7 +2015,7 @@ def _compute_initial_nwp_skill(

# Ensure that the model skill decreases with increasing scale level.
for n_model in range(precip_models.shape[0]):
for i in range(1, precip_models.shape[2]):
for i in range(1, precip_models.shape[1]):
if rho_nwp_models[n_model, i] > rho_nwp_models[n_model, i - 1]:
# Set it equal to the previous scale level
rho_nwp_models[n_model, i] = rho_nwp_models[n_model, i - 1]
Expand Down
4 changes: 2 additions & 2 deletions pysteps/tests/test_blending_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
(1, 3, 4, 6, "incremental", "cdf", False, "bps", False, 4),
(1, 3, 4, 9, "incremental", "cdf", False, "spn", True, 4),
(2, 3, 10, 8, "incremental", "cdf", False, "spn", True, 10),
(5, 3, 4, 8, "incremental", "cdf", False, "spn", True, 5),
(5, 3, 5, 8, "incremental", "cdf", False, "spn", True, 5),
(1, 10, 1, 8, "incremental", "cdf", False, "spn", True, 1),
(5, 3, 2, 8, "incremental", "cdf", True, "spn", True, 2),
(2, 3, 2, 8, "incremental", "cdf", True, "spn", True, 2),
]


Expand Down

0 comments on commit 8936236

Please sign in to comment.