From c6368ab173991ed34c86f516004ac9033b20efd6 Mon Sep 17 00:00:00 2001 From: Lesley De Cruz Date: Wed, 18 Aug 2021 14:37:01 +0200 Subject: [PATCH] Functions to store and compute climatological weights (#231) * Implement the functions get_default_weights, save_weights, calc_clim_weights. These functions are used to evolve the weights in the scale- and skill-dependent blending with NWP in the STEPS blending algorithm. The current weights, based on the correlations per cascade level, are regressed towards these climatological weights in the course of the forecast. These functions save the current and compute the climatological weights (a running mean of the weights of the past n days, where typically n=30). First daily averages are stored and these are then averaged over the running window of n days. * Add tests for pysteps climatological weight io and calculations. * Add path_workdir to outputs section in pystepsrc file and use it as a default path to store/retrieve blending weights. * Minor changes to docstrings, changes to skill scores and testing scripts * Completed documentation for blending clim module, cleanup. Co-authored-by: RubenImhoff --- .pre-commit-config.yaml | 2 +- pysteps/blending/clim.py | 188 ++++++++++++++++++++ pysteps/blending/skill_scores.py | 43 +++-- pysteps/pystepsrc | 3 +- pysteps/tests/test_blending_clim.py | 148 +++++++++++++++ pysteps/tests/test_blending_skill_scores.py | 31 ++-- 6 files changed, 385 insertions(+), 30 deletions(-) create mode 100644 pysteps/blending/clim.py create mode 100644 pysteps/tests/test_blending_clim.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c80571951..f35ad636b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/psf/black - rev: 21.6b0 + rev: 21.7b0 hooks: - id: black language_version: python3 \ No newline at end of file diff --git a/pysteps/blending/clim.py b/pysteps/blending/clim.py new file mode 100644 index 000000000..fbb03e1ff --- /dev/null +++ b/pysteps/blending/clim.py @@ -0,0 +1,188 @@ +""" +pysteps.blending.clim +===================== + +Module with methods to read, write and compute past and climatological model weights. + +.. autosummary:: + :toctree: ../generated/ + + get_default_weights + save_weights + calc_clim_weights +""" + +import numpy as np +from os.path import exists, join +import pickle +from pysteps import rcparams + + +def get_default_weights(n_cascade_levels=8, n_models=1): + """ + Get the default weights as given in BPS2004. + Take subset of n_cascade_levels or add entries with small values (1e-4) if + n_cascade_levels differs from 8. + + Parameters + ---------- + n_cascade_levels: int, optional + Number of cascade levels. Defaults to 8. + n_models: int, optional + Number of NWP models. Defaults to 1. + + Returns + ------- + default_weights: array-like + Array of shape [model, scale_level] containing the climatological weights. + + """ + + default_weights = np.array( + [0.848, 0.537, 0.237, 0.065, 0.020, 0.0044, 0.0052, 0.0040] + ) + n_weights = default_weights.shape[0] + if n_cascade_levels < n_weights: + default_weights = default_weights[0:n_cascade_levels] + elif n_cascade_levels > n_weights: + default_weights = np.append( + default_weights, np.repeat(1e-4, n_cascade_levels - n_weights) + ) + return np.resize(default_weights, (n_models, n_cascade_levels)) + + +def save_weights( + current_weights, + validtime, + outdir_path=rcparams.outputs["path_workdir"], + window_length=30, +): + """ + Add the current NWP weights to update today's daily average weight. If the + day is over, update the list of daily average weights covering a rolling + window. + + Parameters + ---------- + current_weights: array-like + Array of shape [model, scale_level, ...] + containing the current weights of the different NWP models per cascade + level. + validtime: datetime + Datetime object containing the date and time at for which the current + weights are valid. + outdir_path: string, optional + Path to folder where the historical weights are stored. Defaults to + path_workdir from rcparams. + window_length: int, optional + Length of window (in days) of daily weights that should be retained. + Defaults to 30. + + Returns + ------- + None + + """ + + n_cascade_levels = current_weights.shape[1] + + # Load weights_today, a dictionary containing {mean_weights, n, last_validtime} + weights_today_file = join(outdir_path, "NWP_weights_today.pkl") + if exists(weights_today_file): + with open(weights_today_file, "rb") as f: + weights_today = pickle.load(f) + else: + weights_today = { + "mean_weights": np.copy(current_weights), + "n": 0, + "last_validtime": validtime, + } + + # Load the past weights which is an array with dimensions day x model x scale_level + past_weights_file = join(outdir_path, "NWP_weights_window.npy") + past_weights = None + if exists(past_weights_file): + past_weights = np.load(past_weights_file) + # First check if we have started a new day wrt the last written weights, in which + # case we should update the daily weights file and reset daily statistics. + if weights_today["last_validtime"].date() < validtime.date(): + # Append weights to the list of the past X daily averages. + if past_weights is not None: + past_weights = np.append( + past_weights, [weights_today["mean_weights"]], axis=0 + ) + else: + past_weights = np.array([weights_today["mean_weights"]]) + print(past_weights.shape) + # Remove oldest if the number of entries exceeds the window length. + if past_weights.shape[0] > window_length: + past_weights = np.delete(past_weights, 0, axis=0) + # FIXME also write out last_validtime.date() in this file? + # In that case it will need pickling or netcdf. + # Write out the past weights within the rolling window. + np.save(past_weights_file, past_weights) + # Reset statistics for today. + weights_today["n"] = 0 + weights_today["mean_weights"] = 0 + + # Reset today's weights if needed and/or compute online average from the + # current weights using numerically stable algorithm + weights_today["n"] += 1 + weights_today["mean_weights"] += ( + current_weights - weights_today["mean_weights"] + ) / weights_today["n"] + weights_today["last_validtime"] = validtime + with open(weights_today_file, "wb") as f: + pickle.dump(weights_today, f) + + return None + + +def calc_clim_weights( + n_cascade_levels=8, + outdir_path=rcparams.outputs["path_workdir"], + n_models=1, + window_length=30, +): + """ + Return the climatological weights based on the daily average weights in the + rolling window. This is done using a geometric mean. + + Parameters + ---------- + n_cascade_levels: int, optional + Number of cascade levels. + outdir_path: string, optional + Path to folder where the historical weights are stored. Defaults to + path_workdir from rcparams. + n_models: int, optional + Number of NWP models. Defaults to 1. + window_length: int, optional + Length of window (in days) over which to compute the climatological + weights. Defaults to 30. + + Returns + ------- + climatological_mean_weights: array-like + Array of shape [model, scale_level, ...] containing the climatological + (geometric) mean weights. + + """ + past_weights_file = join(outdir_path, "NWP_weights_window.npy") + # past_weights has dimensions date x model x scale_level x .... + if exists(past_weights_file): + past_weights = np.load(past_weights_file) + else: + past_weights = None + # check if there's enough data to compute the climatological skill + if not past_weights or past_weights.shape[0] < window_length: + return get_default_weights(n_cascade_levels, n_models) + # reduce window if necessary + else: + past_weights = past_weights[-window_length:] + + # Calculate climatological weights from the past_weights using the + # geometric mean. + geomean_weights = np.exp(np.log(past_weights).mean(axis=0)) + + return geomean_weights diff --git a/pysteps/blending/skill_scores.py b/pysteps/blending/skill_scores.py index f8225959f..d53ac3548 100644 --- a/pysteps/blending/skill_scores.py +++ b/pysteps/blending/skill_scores.py @@ -65,7 +65,7 @@ def spatial_correlation(obs, mod): return rho -def lt_dependent_cor_nwp(lt, correlations, outdir_path, skill_kwargs=dict()): +def lt_dependent_cor_nwp(lt, correlations, skill_kwargs=dict()): """Determine the correlation of a model field for lead time lt and cascade k, by assuming that the correlation determined at t=0 regresses towards the climatological values. @@ -79,10 +79,9 @@ def lt_dependent_cor_nwp(lt, correlations, outdir_path, skill_kwargs=dict()): Array of shape [n_cascade_levels] containing per cascade_level the correlation between the normalized cascade of the observed (radar) rainfall field and the normalized cascade of the model field. - outdir_path: str - Path to folder where the historical weights are stored. skill_kwargs : dict, optional - Dictionary containing e.g. the nmodels and window_length parameters. + Dictionary containing e.g. the outdir_path, nmodels and window_length + parameters. Returns ------- @@ -99,9 +98,7 @@ def lt_dependent_cor_nwp(lt, correlations, outdir_path, skill_kwargs=dict()): """ # Obtain the climatological values towards which the correlations will # regress - clim_cor_values, regr_pars = clim_regr_values( - len(correlations), outdir_path, skill_kwargs=None - ) + clim_cor_values, regr_pars = clim_regr_values(n_cascade_levels=len(correlations)) # Determine the speed of the regression (eq. 24 in BPS2004) qm = np.exp(-lt / regr_pars[0, :]) * (2 - np.exp(-lt / regr_pars[1, :])) # Determine the correlation for lead time lt @@ -158,7 +155,8 @@ def lt_dependent_cor_extrapolation(PHI, correlations=None, correlations_prev=Non return rho, rho_prev -def clim_regr_values(n_cascade_levels, outdir_path, skill_kwargs=dict()): +# TODO: Make sure the method can also handle multiple model inputs.. +def clim_regr_values(n_cascade_levels, skill_kwargs=dict()): """Obtains the climatological correlation values and regression parameters from a file called NWP_weights_window.bin in the outdir_path. If this file is not present yet, the values from :cite:`BPS2004` are used. @@ -168,10 +166,9 @@ def clim_regr_values(n_cascade_levels, outdir_path, skill_kwargs=dict()): ---------- n_cascade_levels : int The number of cascade levels to use. - outdir_path: str - Path to folder where the historical weights are stored. skill_kwargs : dict, optional - Dictionary containing e.g. the nmodels and window_length parameters. + Dictionary containing e.g. the outdir_path, nmodels and window_length + parameters. Returns ------- @@ -201,16 +198,18 @@ def clim_regr_values(n_cascade_levels, outdir_path, skill_kwargs=dict()): # First, obtain climatological skill values try: clim_cor_values = clim.calc_clim_weights( - outdir_path, n_cascade_levels, **skill_kwargs + n_cascade_levels=n_cascade_levels, **skill_kwargs ) except FileNotFoundError: # The climatological skill values file does not exist yet, so we'll # use the default values from BPS2004. clim_cor_values = np.array( - [0.848, 0.537, 0.237, 0.065, 0.020, 0.0044, 0.0052, 0.0040] + [[0.848, 0.537, 0.237, 0.065, 0.020, 0.0044, 0.0052, 0.0040]] ) - # Check if clim_cor_values has only one dimension, otherwise it has + clim_cor_values = clim_cor_values[0] + + # Check if clim_cor_values has only one model, otherwise it has # returned the skill values for multiple models if clim_cor_values.ndim != 1: raise IndexError( @@ -224,11 +223,10 @@ def clim_regr_values(n_cascade_levels, outdir_path, skill_kwargs=dict()): # Get the number of cascade levels that is missing n_extra_lev = n_cascade_levels - clim_cor_values.shape[0] # Append the array with correlation values of 10e-4 - clim_cor_values = np.append(clim_cor_values, np.repeat(10e-4, n_extra_lev)) + clim_cor_values = np.append(clim_cor_values, np.repeat(1e-4, n_extra_lev)) # Get the regression parameters (L in eq. 24 in BPS2004) - Hard coded, # change to own values when present. - # TODO: Regression pars for different cascade levels regr_pars = np.array( [ [130.0, 165.0, 120.0, 55.0, 50.0, 15.0, 15.0, 10.0], @@ -236,4 +234,17 @@ def clim_regr_values(n_cascade_levels, outdir_path, skill_kwargs=dict()): ] ) + # Check whether the number of cascade_levels is correct + if regr_pars.shape[1] > n_cascade_levels: + regr_pars = regr_pars[:, 0:n_cascade_levels] + elif regr_pars.shape[1] < n_cascade_levels: + # Get the number of cascade levels that is missing + n_extra_lev = n_cascade_levels - regr_pars.shape[1] + # Append the array with correlation values of 10e-4 + regr_pars = np.append( + regr_pars, + [np.repeat(10.0, n_extra_lev), np.repeat(10e4, n_extra_lev)], + axis=1, + ) + return clim_cor_values, regr_pars diff --git a/pysteps/pystepsrc b/pysteps/pystepsrc index 7e2b2c9d7..ff8fcb6cc 100644 --- a/pysteps/pystepsrc +++ b/pysteps/pystepsrc @@ -4,7 +4,8 @@ "silent_import": false, "outputs": { // path_outputs : path where to save results (figures, forecasts, etc) - "path_outputs": "./" + "path_outputs": "./", + "path_workdir": "./tmp/" }, "plot": { // "motion_plot" : "streamplot" or "quiver" diff --git a/pysteps/tests/test_blending_clim.py b/pysteps/tests/test_blending_clim.py new file mode 100644 index 000000000..e790984c7 --- /dev/null +++ b/pysteps/tests/test_blending_clim.py @@ -0,0 +1,148 @@ +# -*- coding: utf-8 -*- +import numpy as np +import pytest + +from pysteps.blending.clim import save_weights, calc_clim_weights +import random +from datetime import datetime, timedelta +from os.path import join, exists +import pickle +from numpy.testing import assert_array_equal + +random.seed(12356) +n_cascade_levels = 7 +model_names = ["alaro13", "arome13"] +default_start_weights = [0.8, 0.5] +""" Helper functions """ + + +def generate_random_weights(n_cascade_levels, n_models=1): + """ + Generate random weights which decay exponentially with scale. + """ + start_weights = np.array([random.uniform(0.5, 0.99) for i in range(n_models)]) + powers = np.arange(1, n_cascade_levels + 1) + return pow(start_weights[:, np.newaxis], powers) + + +def generate_fixed_weights(n_cascade_levels, n_models=1): + """ + Generate weights starting at default_start_weights which decay exponentially with scale. + """ + start_weights = np.resize(default_start_weights, n_models) + powers = np.arange(1, n_cascade_levels + 1) + return pow(start_weights[:, np.newaxis], powers) + + +""" Test arguments """ + +clim_arg_names = ("startdatestr", "enddatestr", "n_models", "expected_weights_today") + +test_enddates = ["20210701235500", "20210702000000", "20200930235500"] + +clim_arg_values = [ + ( + "20210701230000", + "20210701235500", + 1, + { + "mean_weights": generate_fixed_weights(n_cascade_levels), + "n": 12, + "last_validtime": datetime.strptime(test_enddates[0], "%Y%m%d%H%M%S"), + }, + ), + ( + "20210701235500", + "20210702000000", + 1, + { + "mean_weights": generate_fixed_weights(n_cascade_levels), + "n": 1, + "last_validtime": datetime.strptime(test_enddates[1], "%Y%m%d%H%M%S"), + }, + ), + ( + "20200801000000", + "20200930235500", + 1, + { + "mean_weights": generate_fixed_weights(n_cascade_levels), + "n": 288, + "last_validtime": datetime.strptime(test_enddates[2], "%Y%m%d%H%M%S"), + }, + ), + ( + "20210701230000", + "20210701235500", + 2, + { + "mean_weights": generate_fixed_weights(n_cascade_levels, 2), + "n": 12, + "last_validtime": datetime.strptime(test_enddates[0], "%Y%m%d%H%M%S"), + }, + ), + ( + "20210701230000", + "20210702000000", + 2, + { + "mean_weights": generate_fixed_weights(n_cascade_levels, 2), + "n": 1, + "last_validtime": datetime.strptime(test_enddates[1], "%Y%m%d%H%M%S"), + }, + ), + ( + "20200801000000", + "20200930235500", + 2, + { + "mean_weights": generate_fixed_weights(n_cascade_levels, 2), + "n": 288, + "last_validtime": datetime.strptime(test_enddates[2], "%Y%m%d%H%M%S"), + }, + ), +] + + +@pytest.mark.parametrize(clim_arg_names, clim_arg_values) +def test_save_weights( + startdatestr, enddatestr, n_models, expected_weights_today, tmpdir +): + """Test if the weights are saved correctly and the daily average is computed""" + + # get validtime + currentdate = datetime.strptime(startdatestr, "%Y%m%d%H%M%S") + enddate = datetime.strptime(enddatestr, "%Y%m%d%H%M%S") + timestep = timedelta(minutes=5) + + outdir_path = tmpdir + + while currentdate <= enddate: + current_weights = generate_fixed_weights(n_cascade_levels, n_models) + print("Saving weights: ", current_weights, currentdate, outdir_path) + save_weights(current_weights, currentdate, outdir_path, window_length=2) + currentdate += timestep + + weights_today_file = join(outdir_path, "NWP_weights_today.pkl") + assert exists(weights_today_file) + with open(weights_today_file, "rb") as f: + weights_today = pickle.load(f) + + # Check type + assert type(weights_today) == type({}) + assert "mean_weights" in weights_today + assert "n" in weights_today + assert "last_validtime" in weights_today + assert_array_equal( + weights_today["mean_weights"], expected_weights_today["mean_weights"] + ) + assert weights_today["n"] == expected_weights_today["n"] + assert weights_today["last_validtime"] == expected_weights_today["last_validtime"] + + +if __name__ == "__main__": + save_weights( + generate_fixed_weights(n_cascade_levels, 1), + datetime.strptime("20200801000000", "%Y%m%d%H%M%S"), + "/tmp/", + ) diff --git a/pysteps/tests/test_blending_skill_scores.py b/pysteps/tests/test_blending_skill_scores.py index ced452fbc..0bf4f1a98 100644 --- a/pysteps/tests/test_blending_skill_scores.py +++ b/pysteps/tests/test_blending_skill_scores.py @@ -19,7 +19,7 @@ ) clim_cor_values_6lev = np.array([0.848, 0.537, 0.237, 0.065, 0.020, 0.0044]) clim_cor_values_9lev = np.array( - [0.848, 0.537, 0.237, 0.065, 0.020, 0.0044, 0.0052, 0.0040, 10e-4] + [0.848, 0.537, 0.237, 0.065, 0.020, 0.0044, 0.0052, 0.0040, 1e-4] ) # Set the regression values @@ -191,8 +191,6 @@ # The test @pytest.mark.parametrize(skill_scores_arg_names, skill_scores_arg_values) - -# TODO: Also test the pysteps.blending.clim.calc_clim_weights functionality in # The test function to be used def test_blending_skill_scores( obs, @@ -216,7 +214,7 @@ def test_blending_skill_scores( """ # Calculate the spatial correlation of the given model field - correlations_t0 = spatial_correlation(obs, mod) + correlations_t0 = np.array(spatial_correlation(obs, mod)) # Check if the field has the same number of cascade levels as the model # field and as the given n_cascade_levels @@ -231,27 +229,32 @@ def test_blending_skill_scores( assert_array_almost_equal( correlations_t0, expected_cor_t0, - "Returned spatial correlation is not the same as the expected value", + decimal=3, + err_msg="Returned spatial correlation is not the same as the expected value", ) # Test if the NWP correlation regresses towards the correct value given # a lead time in minutes # First, check if the climatological values are returned correctly - correlations_clim, regr_clim = clim_regr_values(n_cascade_levels, "random_path") + correlations_clim, regr_clim = clim_regr_values(n_cascade_levels=n_cascade_levels) assert ( correlations_clim.shape[0] == n_cascade_levels ), "Number of cascade levels should be the same as n_cascade_levels" assert_array_almost_equal( correlations_clim, clim_cor_values, - "Not the correct climatological correlations were returned", + decimal=3, + err_msg="Not the correct climatological correlations were returned", ) assert_array_almost_equal( - regr_clim, regr_pars, "Not the correct regression parameters were returned" + regr_clim, + regr_pars, + decimal=3, + err_msg="Not the correct regression parameters were returned", ) # Then, check the regression of the correlation values - correlations_nwp_lt = lt_dependent_cor_nwp(lt, correlations_t0, "random_path") + correlations_nwp_lt = lt_dependent_cor_nwp(lt=lt, correlations=correlations_t0) assert ( correlations_nwp_lt.shape[0] == mod.shape[0] ), "Number of cascade levels should be the same as in the model field" @@ -261,14 +264,17 @@ def test_blending_skill_scores( assert_array_almost_equal( correlations_nwp_lt, expected_cor_nwp_lt, - "Correlations of NWP not equal to the expected correlations", + decimal=3, + err_msg="Correlations of NWP not equal to the expected correlations", ) # Finally, make sure nowcast correlation regresses towards the correct # value given some PHI-values. - correlations_nowcast_lt = lt_dependent_cor_extrapolation( + correlations_nowcast_lt, __ = lt_dependent_cor_extrapolation( PHI, correlations_t0, cor_prev ) + + print(correlations_nowcast_lt) assert ( correlations_nowcast_lt.shape[0] == mod.shape[0] ), "Number of cascade levels should be the same as in the model field" @@ -278,5 +284,6 @@ def test_blending_skill_scores( assert_array_almost_equal( correlations_nowcast_lt, expected_cor_nowcast_lt, - "Correlations of nowcast not equal to the expected correlations", + decimal=3, + err_msg="Correlations of nowcast not equal to the expected correlations", )