Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Functions to store and compute climatological weights #231

Merged
merged 12 commits into from
Aug 18, 2021
Merged
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/psf/black
rev: 21.6b0
rev: 21.7b0
hooks:
- id: black
language_version: python3
176 changes: 176 additions & 0 deletions pysteps/blending/clim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""
pysteps.blending.clim
=====================

Module with methods to read, write and compute past and climatological model weights.

.. autosummary::
:toctree: ../generated/

save_weights
calc_clim_weights
"""

import numpy as np
from os.path import exists, join
import pickle
from pysteps import rcparams


def get_default_weights(n_cascade_levels=8, n_models=1):
"""
Get the default weights as given in BPS2004.
Take subset of n_cascade_levels or add entries with small values (1e-4) if
n_cascade_levels differs from 8.

Parameters
----------
n_cascade_levels: int
Number of cascade levels.
nmodels: int, optional
Number of NWP models

Returns
-------
default_weights: array-like
Array of shape [model, scale_level] containing the climatological weights.

"""

default_weights = np.array(
[0.848, 0.537, 0.237, 0.065, 0.020, 0.0044, 0.0052, 0.0040]
)
n_weights = default_weights.shape[0]
if n_cascade_levels < n_weights:
default_weights = default_weights[0:n_cascade_levels]
elif n_cascade_levels > n_weights:
default_weights = np.append(
default_weights, np.repeat(1e-4, n_cascade_levels - n_weights)
)
return np.resize(default_weights, (n_models, n_cascade_levels))


def save_weights(
current_weights,
validtime,
outdir_path=rcparams.outputs["path_workdir"],
window_length=30,
):
"""
Add the current NWP weights to update today's daily average weight. If the
day is over, update the list of daily average weights covering a rolling
window.

Parameters
----------
current_weights: array-like
Array of shape [model, scale_level, ...]
containing the current weights of the different NWP models per cascade level.
outdir_path: string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validtime still misses here. What kind of date string should it be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I looked into using numpy's built-in datetime64 type but got discouraged by the apparent lack of documentation. So I'll stick to datetime instead.

Path to folder where the historical weights are stored.
window_length: int, optional
Length of window (in days) over which to compute the climatological weights.

Returns
-------
None

"""

n_cascade_levels = current_weights.shape[1]

# Load weights_today, a dictionary containing {mean_weights, n, last_validtime}
weights_today_file = join(outdir_path, "NWP_weights_today.pkl")
if exists(weights_today_file):
with open(weights_today_file, "rb") as f:
weights_today = pickle.load(f)
else:
weights_today = {
"mean_weights": np.copy(current_weights),
"n": 0,
"last_validtime": validtime,
}

# Load the past weights which is an array with dimensions day x model x scale_level
past_weights_file = join(outdir_path, "NWP_weights_window.npy")
past_weights = None
if exists(past_weights_file):
past_weights = np.load(past_weights_file)
# First check if we have started a new day wrt the last written weights, in which
# case we should update the daily weights file and reset daily statistics.
if weights_today["last_validtime"].date() < validtime.date():
# Append weights to the list of the past X daily averages.
if past_weights is not None:
past_weights = np.append(
past_weights, [weights_today["mean_weights"]], axis=0
)
else:
past_weights = np.array([weights_today["mean_weights"]])
print(past_weights.shape)
# Remove oldest if the number of entries exceeds the window length.
if past_weights.shape[0] > window_length:
past_weights = np.delete(past_weights, 0, axis=0)
# FIXME also write out last_validtime.date() in this file?
# In that case it will need pickling or netcdf.
# Write out the past weights within the rolling window.
np.save(past_weights_file, past_weights)
# Reset statistics for today.
weights_today["n"] = 0
weights_today["mean_weights"] = 0

# Reset today's weights if needed and/or compute online average from the
# current weights using numerically stable algorithm
weights_today["n"] += 1
weights_today["mean_weights"] += (
current_weights - weights_today["mean_weights"]
) / weights_today["n"]
weights_today["last_validtime"] = validtime
with open(weights_today_file, "wb") as f:
pickle.dump(weights_today, f)

return None


def calc_clim_weights(
outdir_path=rcparams.outputs["path_workdir"],
n_cascade_levels=8,
nmodels=1,
window_length=30,
):
"""
Return the climatological weights based on the daily average weights in the
rolling window. This is done using a geometric mean.

Parameters
----------
outdir_path: string
Path to folder where the historical weights are stored.
n_cascade_levels: int
Number of cascade levels.
nmodels: int, optional
Number of NWP models
window_length: int, optional
Length of window (in days) over which to compute the climatological weights.

Returns
-------
climatological_mean_weights: array-like
Array of shape [model, scale_level, ...] containing the climatological weights.

"""
past_weights_file = join(outdir_path, "NWP_weights_window.npy")
# past_weights has dimensions date x model x scale_level x ....
if exists(past_weights_file):
past_weights = np.load(past_weights_file)
# check if there's enough data to compute the climatological skill
if not past_weights or past_weights.shape[0] < window_length:
return get_default_weights(nmodels, n_cascade_levels)
# reduce window if necessary
else:
past_weights = past_weights[-window_length:]

# Calculate climatological weights from the past_weights using the
# geometric mean.
geomean_weights = np.exp(np.log(past_weights).mean(axis=0))

return geomean_weights
3 changes: 2 additions & 1 deletion pysteps/pystepsrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"silent_import": false,
"outputs": {
// path_outputs : path where to save results (figures, forecasts, etc)
"path_outputs": "./"
"path_outputs": "./",
"path_workdir": "./tmp/"
},
"plot": {
// "motion_plot" : "streamplot" or "quiver"
Expand Down
148 changes: 148 additions & 0 deletions pysteps/tests/test_blending_clim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
import numpy as np
import pytest

from pysteps.blending.clim import save_weights, calc_clim_weights
import random
from datetime import datetime, timedelta
from os.path import join, exists
import pickle
from numpy.testing import assert_array_equal

random.seed(12356)
n_cascade_levels = 7
model_names = ["alaro13", "arome13"]
default_start_weights = [0.8, 0.5]
""" Helper functions """


def generate_random_weights(n_cascade_levels, n_models=1):
"""
Generate random weights which decay exponentially with scale.
"""
start_weights = np.array([random.uniform(0.5, 0.99) for i in range(n_models)])
powers = np.arange(1, n_cascade_levels + 1)
return pow(start_weights[:, np.newaxis], powers)


def generate_fixed_weights(n_cascade_levels, n_models=1):
"""
Generate weights starting at default_start_weights which decay exponentially with scale.
"""
start_weights = np.resize(default_start_weights, n_models)
powers = np.arange(1, n_cascade_levels + 1)
return pow(start_weights[:, np.newaxis], powers)


""" Test arguments """

clim_arg_names = ("startdatestr", "enddatestr", "n_models", "expected_weights_today")

test_enddates = ["20210701235500", "20210702000000", "20200930235500"]

clim_arg_values = [
(
"20210701230000",
"20210701235500",
1,
{
"mean_weights": generate_fixed_weights(n_cascade_levels),
"n": 12,
"last_validtime": datetime.strptime(test_enddates[0], "%Y%m%d%H%M%S"),
},
),
(
"20210701235500",
"20210702000000",
1,
{
"mean_weights": generate_fixed_weights(n_cascade_levels),
"n": 1,
"last_validtime": datetime.strptime(test_enddates[1], "%Y%m%d%H%M%S"),
},
),
(
"20200801000000",
"20200930235500",
1,
{
"mean_weights": generate_fixed_weights(n_cascade_levels),
"n": 288,
"last_validtime": datetime.strptime(test_enddates[2], "%Y%m%d%H%M%S"),
},
),
(
"20210701230000",
"20210701235500",
2,
{
"mean_weights": generate_fixed_weights(n_cascade_levels, 2),
"n": 12,
"last_validtime": datetime.strptime(test_enddates[0], "%Y%m%d%H%M%S"),
},
),
(
"20210701230000",
"20210702000000",
2,
{
"mean_weights": generate_fixed_weights(n_cascade_levels, 2),
"n": 1,
"last_validtime": datetime.strptime(test_enddates[1], "%Y%m%d%H%M%S"),
},
),
(
"20200801000000",
"20200930235500",
2,
{
"mean_weights": generate_fixed_weights(n_cascade_levels, 2),
"n": 288,
"last_validtime": datetime.strptime(test_enddates[2], "%Y%m%d%H%M%S"),
},
),
]


@pytest.mark.parametrize(clim_arg_names, clim_arg_values)
def test_save_weights(
startdatestr, enddatestr, n_models, expected_weights_today, tmpdir
):
"""Test if the weights are saved correctly and the daily average is computed"""

# get validtime
currentdate = datetime.strptime(startdatestr, "%Y%m%d%H%M%S")
enddate = datetime.strptime(enddatestr, "%Y%m%d%H%M%S")
timestep = timedelta(minutes=5)

outdir_path = tmpdir

while currentdate <= enddate:
current_weights = generate_fixed_weights(n_cascade_levels, n_models)
print("Saving weights: ", current_weights, currentdate, outdir_path)
save_weights(current_weights, currentdate, outdir_path, window_length=2)
currentdate += timestep

weights_today_file = join(outdir_path, "NWP_weights_today.pkl")
assert exists(weights_today_file)
with open(weights_today_file, "rb") as f:
weights_today = pickle.load(f)

# Check type
assert type(weights_today) == type({})
assert "mean_weights" in weights_today
assert "n" in weights_today
assert "last_validtime" in weights_today
assert_array_equal(
weights_today["mean_weights"], expected_weights_today["mean_weights"]
)
assert weights_today["n"] == expected_weights_today["n"]
assert weights_today["last_validtime"] == expected_weights_today["last_validtime"]


if __name__ == "__main__":
save_weights(
generate_fixed_weights(n_cascade_levels, 1),
datetime.strptime("20200801000000", "%Y%m%d%H%M%S"),
"/tmp/",
)