Skip to content

Commit

Permalink
Functions to store and compute climatological weights (#231)
Browse files Browse the repository at this point in the history
* Implement the functions get_default_weights, save_weights, calc_clim_weights. 

These functions are used to evolve the weights in the scale- and skill-dependent blending with NWP in the STEPS blending algorithm. The current weights, based on the correlations per cascade level, are regressed towards these climatological weights in the course of the forecast.

These functions save the current and compute the climatological weights (a running mean of the weights of the past n days, where typically n=30). First daily averages are stored and these are then averaged over the running window of n days.

* Add tests for pysteps climatological weight io and calculations.

* Add path_workdir to outputs section in pystepsrc file and use it as a default path to store/retrieve blending weights.

* Minor changes to docstrings, changes to skill scores and testing scripts

* Completed documentation for blending clim module, cleanup.

Co-authored-by: RubenImhoff <r.o.imhoff@live.nl>
  • Loading branch information
ladc and RubenImhoff authored Aug 18, 2021
1 parent 2dfd531 commit c6368ab
Show file tree
Hide file tree
Showing 6 changed files with 385 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/psf/black
rev: 21.6b0
rev: 21.7b0
hooks:
- id: black
language_version: python3
188 changes: 188 additions & 0 deletions pysteps/blending/clim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
"""
pysteps.blending.clim
=====================
Module with methods to read, write and compute past and climatological model weights.
.. autosummary::
:toctree: ../generated/
get_default_weights
save_weights
calc_clim_weights
"""

import numpy as np
from os.path import exists, join
import pickle
from pysteps import rcparams


def get_default_weights(n_cascade_levels=8, n_models=1):
"""
Get the default weights as given in BPS2004.
Take subset of n_cascade_levels or add entries with small values (1e-4) if
n_cascade_levels differs from 8.
Parameters
----------
n_cascade_levels: int, optional
Number of cascade levels. Defaults to 8.
n_models: int, optional
Number of NWP models. Defaults to 1.
Returns
-------
default_weights: array-like
Array of shape [model, scale_level] containing the climatological weights.
"""

default_weights = np.array(
[0.848, 0.537, 0.237, 0.065, 0.020, 0.0044, 0.0052, 0.0040]
)
n_weights = default_weights.shape[0]
if n_cascade_levels < n_weights:
default_weights = default_weights[0:n_cascade_levels]
elif n_cascade_levels > n_weights:
default_weights = np.append(
default_weights, np.repeat(1e-4, n_cascade_levels - n_weights)
)
return np.resize(default_weights, (n_models, n_cascade_levels))


def save_weights(
current_weights,
validtime,
outdir_path=rcparams.outputs["path_workdir"],
window_length=30,
):
"""
Add the current NWP weights to update today's daily average weight. If the
day is over, update the list of daily average weights covering a rolling
window.
Parameters
----------
current_weights: array-like
Array of shape [model, scale_level, ...]
containing the current weights of the different NWP models per cascade
level.
validtime: datetime
Datetime object containing the date and time at for which the current
weights are valid.
outdir_path: string, optional
Path to folder where the historical weights are stored. Defaults to
path_workdir from rcparams.
window_length: int, optional
Length of window (in days) of daily weights that should be retained.
Defaults to 30.
Returns
-------
None
"""

n_cascade_levels = current_weights.shape[1]

# Load weights_today, a dictionary containing {mean_weights, n, last_validtime}
weights_today_file = join(outdir_path, "NWP_weights_today.pkl")
if exists(weights_today_file):
with open(weights_today_file, "rb") as f:
weights_today = pickle.load(f)
else:
weights_today = {
"mean_weights": np.copy(current_weights),
"n": 0,
"last_validtime": validtime,
}

# Load the past weights which is an array with dimensions day x model x scale_level
past_weights_file = join(outdir_path, "NWP_weights_window.npy")
past_weights = None
if exists(past_weights_file):
past_weights = np.load(past_weights_file)
# First check if we have started a new day wrt the last written weights, in which
# case we should update the daily weights file and reset daily statistics.
if weights_today["last_validtime"].date() < validtime.date():
# Append weights to the list of the past X daily averages.
if past_weights is not None:
past_weights = np.append(
past_weights, [weights_today["mean_weights"]], axis=0
)
else:
past_weights = np.array([weights_today["mean_weights"]])
print(past_weights.shape)
# Remove oldest if the number of entries exceeds the window length.
if past_weights.shape[0] > window_length:
past_weights = np.delete(past_weights, 0, axis=0)
# FIXME also write out last_validtime.date() in this file?
# In that case it will need pickling or netcdf.
# Write out the past weights within the rolling window.
np.save(past_weights_file, past_weights)
# Reset statistics for today.
weights_today["n"] = 0
weights_today["mean_weights"] = 0

# Reset today's weights if needed and/or compute online average from the
# current weights using numerically stable algorithm
weights_today["n"] += 1
weights_today["mean_weights"] += (
current_weights - weights_today["mean_weights"]
) / weights_today["n"]
weights_today["last_validtime"] = validtime
with open(weights_today_file, "wb") as f:
pickle.dump(weights_today, f)

return None


def calc_clim_weights(
n_cascade_levels=8,
outdir_path=rcparams.outputs["path_workdir"],
n_models=1,
window_length=30,
):
"""
Return the climatological weights based on the daily average weights in the
rolling window. This is done using a geometric mean.
Parameters
----------
n_cascade_levels: int, optional
Number of cascade levels.
outdir_path: string, optional
Path to folder where the historical weights are stored. Defaults to
path_workdir from rcparams.
n_models: int, optional
Number of NWP models. Defaults to 1.
window_length: int, optional
Length of window (in days) over which to compute the climatological
weights. Defaults to 30.
Returns
-------
climatological_mean_weights: array-like
Array of shape [model, scale_level, ...] containing the climatological
(geometric) mean weights.
"""
past_weights_file = join(outdir_path, "NWP_weights_window.npy")
# past_weights has dimensions date x model x scale_level x ....
if exists(past_weights_file):
past_weights = np.load(past_weights_file)
else:
past_weights = None
# check if there's enough data to compute the climatological skill
if not past_weights or past_weights.shape[0] < window_length:
return get_default_weights(n_cascade_levels, n_models)
# reduce window if necessary
else:
past_weights = past_weights[-window_length:]

# Calculate climatological weights from the past_weights using the
# geometric mean.
geomean_weights = np.exp(np.log(past_weights).mean(axis=0))

return geomean_weights
43 changes: 27 additions & 16 deletions pysteps/blending/skill_scores.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def spatial_correlation(obs, mod):
return rho


def lt_dependent_cor_nwp(lt, correlations, outdir_path, skill_kwargs=dict()):
def lt_dependent_cor_nwp(lt, correlations, skill_kwargs=dict()):
"""Determine the correlation of a model field for lead time lt and
cascade k, by assuming that the correlation determined at t=0 regresses
towards the climatological values.
Expand All @@ -79,10 +79,9 @@ def lt_dependent_cor_nwp(lt, correlations, outdir_path, skill_kwargs=dict()):
Array of shape [n_cascade_levels] containing per cascade_level the
correlation between the normalized cascade of the observed (radar)
rainfall field and the normalized cascade of the model field.
outdir_path: str
Path to folder where the historical weights are stored.
skill_kwargs : dict, optional
Dictionary containing e.g. the nmodels and window_length parameters.
Dictionary containing e.g. the outdir_path, nmodels and window_length
parameters.
Returns
-------
Expand All @@ -99,9 +98,7 @@ def lt_dependent_cor_nwp(lt, correlations, outdir_path, skill_kwargs=dict()):
"""
# Obtain the climatological values towards which the correlations will
# regress
clim_cor_values, regr_pars = clim_regr_values(
len(correlations), outdir_path, skill_kwargs=None
)
clim_cor_values, regr_pars = clim_regr_values(n_cascade_levels=len(correlations))
# Determine the speed of the regression (eq. 24 in BPS2004)
qm = np.exp(-lt / regr_pars[0, :]) * (2 - np.exp(-lt / regr_pars[1, :]))
# Determine the correlation for lead time lt
Expand Down Expand Up @@ -158,7 +155,8 @@ def lt_dependent_cor_extrapolation(PHI, correlations=None, correlations_prev=Non
return rho, rho_prev


def clim_regr_values(n_cascade_levels, outdir_path, skill_kwargs=dict()):
# TODO: Make sure the method can also handle multiple model inputs..
def clim_regr_values(n_cascade_levels, skill_kwargs=dict()):
"""Obtains the climatological correlation values and regression parameters
from a file called NWP_weights_window.bin in the outdir_path. If this file
is not present yet, the values from :cite:`BPS2004` are used.
Expand All @@ -168,10 +166,9 @@ def clim_regr_values(n_cascade_levels, outdir_path, skill_kwargs=dict()):
----------
n_cascade_levels : int
The number of cascade levels to use.
outdir_path: str
Path to folder where the historical weights are stored.
skill_kwargs : dict, optional
Dictionary containing e.g. the nmodels and window_length parameters.
Dictionary containing e.g. the outdir_path, nmodels and window_length
parameters.
Returns
-------
Expand Down Expand Up @@ -201,16 +198,18 @@ def clim_regr_values(n_cascade_levels, outdir_path, skill_kwargs=dict()):
# First, obtain climatological skill values
try:
clim_cor_values = clim.calc_clim_weights(
outdir_path, n_cascade_levels, **skill_kwargs
n_cascade_levels=n_cascade_levels, **skill_kwargs
)
except FileNotFoundError:
# The climatological skill values file does not exist yet, so we'll
# use the default values from BPS2004.
clim_cor_values = np.array(
[0.848, 0.537, 0.237, 0.065, 0.020, 0.0044, 0.0052, 0.0040]
[[0.848, 0.537, 0.237, 0.065, 0.020, 0.0044, 0.0052, 0.0040]]
)

# Check if clim_cor_values has only one dimension, otherwise it has
clim_cor_values = clim_cor_values[0]

# Check if clim_cor_values has only one model, otherwise it has
# returned the skill values for multiple models
if clim_cor_values.ndim != 1:
raise IndexError(
Expand All @@ -224,16 +223,28 @@ def clim_regr_values(n_cascade_levels, outdir_path, skill_kwargs=dict()):
# Get the number of cascade levels that is missing
n_extra_lev = n_cascade_levels - clim_cor_values.shape[0]
# Append the array with correlation values of 10e-4
clim_cor_values = np.append(clim_cor_values, np.repeat(10e-4, n_extra_lev))
clim_cor_values = np.append(clim_cor_values, np.repeat(1e-4, n_extra_lev))

# Get the regression parameters (L in eq. 24 in BPS2004) - Hard coded,
# change to own values when present.
# TODO: Regression pars for different cascade levels
regr_pars = np.array(
[
[130.0, 165.0, 120.0, 55.0, 50.0, 15.0, 15.0, 10.0],
[155.0, 220.0, 200.0, 75.0, 10e4, 10e4, 10e4, 10e4],
]
)

# Check whether the number of cascade_levels is correct
if regr_pars.shape[1] > n_cascade_levels:
regr_pars = regr_pars[:, 0:n_cascade_levels]
elif regr_pars.shape[1] < n_cascade_levels:
# Get the number of cascade levels that is missing
n_extra_lev = n_cascade_levels - regr_pars.shape[1]
# Append the array with correlation values of 10e-4
regr_pars = np.append(
regr_pars,
[np.repeat(10.0, n_extra_lev), np.repeat(10e4, n_extra_lev)],
axis=1,
)

return clim_cor_values, regr_pars
3 changes: 2 additions & 1 deletion pysteps/pystepsrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"silent_import": false,
"outputs": {
// path_outputs : path where to save results (figures, forecasts, etc)
"path_outputs": "./"
"path_outputs": "./",
"path_workdir": "./tmp/"
},
"plot": {
// "motion_plot" : "streamplot" or "quiver"
Expand Down
Loading

0 comments on commit c6368ab

Please sign in to comment.