Skip to content

Commit

Permalink
Merge branch 'steps_blending' into pysteps-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
RubenImhoff committed Oct 11, 2021
2 parents bccb8fc + e9059cb commit 2c639f8
Show file tree
Hide file tree
Showing 11 changed files with 1,634 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,6 @@ venv.bak/

# mypy
.mypy_cache/

# example data
examples/radar/
9 changes: 9 additions & 0 deletions examples/read_netcdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import os, netCDF4
import numpy as np
from pysteps import rcparams
from pysteps.blending.utils import load_NWP

NWP_output = rcparams.outputs["path_workdir"] + "cascade_alaro13_01_20170131110000.nc"
start_time = np.datetime64("2017-01-31T11:20")

print(load_NWP(NWP_output, start_time, 4))
71 changes: 71 additions & 0 deletions examples/test_decomposition_NWP.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import matplotlib.pyplot as plt
import numpy as np

from datetime import datetime
from pprint import pprint
from pysteps import io, nowcasts, rcparams
from pysteps.motion.lucaskanade import dense_lucaskanade
from pysteps.postprocessing.ensemblestats import excprob
from pysteps.utils import conversion, dimension, transformation
from pysteps.visualization import plot_precip_field
from pysteps.blending.utils import decompose_NWP
from pysteps.cascade.bandpass_filters import filter_gaussian

num_cascade_levels = 8

###############################################################################
# Read precipitation field
# ------------------------
#
# First thing, the sequence of Swiss radar composites is imported, converted and
# transformed into units of dBR.


date = datetime.strptime("201701311200", "%Y%m%d%H%M")
data_source = "mch"

# Load data source config
root_path = rcparams.data_sources[data_source]["root_path"]
path_fmt = rcparams.data_sources[data_source]["path_fmt"]
fn_pattern = rcparams.data_sources[data_source]["fn_pattern"]
fn_ext = rcparams.data_sources[data_source]["fn_ext"]
importer_name = rcparams.data_sources[data_source]["importer"]
importer_kwargs = rcparams.data_sources[data_source]["importer_kwargs"]
timestep = rcparams.data_sources[data_source]["timestep"]

# Find the radar files in the archive
fns = io.find_by_date(
date, root_path, path_fmt, fn_pattern, fn_ext, timestep, num_prev_files=12
)

# Read the data from the archive
importer = io.get_method(importer_name, "importer")
R_NWP, _, metadata = io.read_timeseries(fns, importer, legacy=True, **importer_kwargs)

# Convert to rain rate
R_NWP, metadata = conversion.to_rainrate(R_NWP, metadata)

# Log-transform the data
R_NWP, metadata = transformation.dB_transform(
R_NWP, metadata, threshold=0.1, zerovalue=-15.0
)

# Fill in the missing data with the threshold value
R_NWP[~np.isfinite(R_NWP)] = metadata["zerovalue"]

# Find the location to save the NWP files
NWP_output = rcparams.outputs["path_workdir"]

# Define the start time of the NWP forecast
analysis_time = metadata["timestamps"][0]

# Decompose the NWP and save to netCDF file
decompose_NWP(
R_NWP,
"alaro13_01",
analysis_time,
5,
metadata["timestamps"],
num_cascade_levels,
NWP_output,
)
4 changes: 4 additions & 0 deletions pysteps/blending/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
"""Methods for blending NWP model(s) with nowcasts."""

from pysteps.blending.interface import get_method
188 changes: 188 additions & 0 deletions pysteps/blending/clim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
"""
pysteps.blending.clim
=====================
Module with methods to read, write and compute past and climatological model weights.
.. autosummary::
:toctree: ../generated/
get_default_weights
save_weights
calc_clim_weights
"""

import numpy as np
from os.path import exists, join
import pickle
from pysteps import rcparams


def get_default_weights(n_cascade_levels=8, n_models=1):
"""
Get the default weights as given in BPS2004.
Take subset of n_cascade_levels or add entries with small values (1e-4) if
n_cascade_levels differs from 8.
Parameters
----------
n_cascade_levels: int, optional
Number of cascade levels. Defaults to 8.
n_models: int, optional
Number of NWP models. Defaults to 1.
Returns
-------
default_weights: array-like
Array of shape [model, scale_level] containing the climatological weights.
"""

default_weights = np.array(
[0.848, 0.537, 0.237, 0.065, 0.020, 0.0044, 0.0052, 0.0040]
)
n_weights = default_weights.shape[0]
if n_cascade_levels < n_weights:
default_weights = default_weights[0:n_cascade_levels]
elif n_cascade_levels > n_weights:
default_weights = np.append(
default_weights, np.repeat(1e-4, n_cascade_levels - n_weights)
)
return np.resize(default_weights, (n_models, n_cascade_levels))


def save_weights(
current_weights,
validtime,
outdir_path=rcparams.outputs["path_workdir"],
window_length=30,
):
"""
Add the current NWP weights to update today's daily average weight. If the
day is over, update the list of daily average weights covering a rolling
window.
Parameters
----------
current_weights: array-like
Array of shape [model, scale_level, ...]
containing the current weights of the different NWP models per cascade
level.
validtime: datetime
Datetime object containing the date and time at for which the current
weights are valid.
outdir_path: string, optional
Path to folder where the historical weights are stored. Defaults to
path_workdir from rcparams.
window_length: int, optional
Length of window (in days) of daily weights that should be retained.
Defaults to 30.
Returns
-------
None
"""

n_cascade_levels = current_weights.shape[1]

# Load weights_today, a dictionary containing {mean_weights, n, last_validtime}
weights_today_file = join(outdir_path, "NWP_weights_today.pkl")
if exists(weights_today_file):
with open(weights_today_file, "rb") as f:
weights_today = pickle.load(f)
else:
weights_today = {
"mean_weights": np.copy(current_weights),
"n": 0,
"last_validtime": validtime,
}

# Load the past weights which is an array with dimensions day x model x scale_level
past_weights_file = join(outdir_path, "NWP_weights_window.npy")
past_weights = None
if exists(past_weights_file):
past_weights = np.load(past_weights_file)
# First check if we have started a new day wrt the last written weights, in which
# case we should update the daily weights file and reset daily statistics.
if weights_today["last_validtime"].date() < validtime.date():
# Append weights to the list of the past X daily averages.
if past_weights is not None:
past_weights = np.append(
past_weights, [weights_today["mean_weights"]], axis=0
)
else:
past_weights = np.array([weights_today["mean_weights"]])
print(past_weights.shape)
# Remove oldest if the number of entries exceeds the window length.
if past_weights.shape[0] > window_length:
past_weights = np.delete(past_weights, 0, axis=0)
# FIXME also write out last_validtime.date() in this file?
# In that case it will need pickling or netcdf.
# Write out the past weights within the rolling window.
np.save(past_weights_file, past_weights)
# Reset statistics for today.
weights_today["n"] = 0
weights_today["mean_weights"] = 0

# Reset today's weights if needed and/or compute online average from the
# current weights using numerically stable algorithm
weights_today["n"] += 1
weights_today["mean_weights"] += (
current_weights - weights_today["mean_weights"]
) / weights_today["n"]
weights_today["last_validtime"] = validtime
with open(weights_today_file, "wb") as f:
pickle.dump(weights_today, f)

return None


def calc_clim_weights(
n_cascade_levels=8,
outdir_path=rcparams.outputs["path_workdir"],
n_models=1,
window_length=30,
):
"""
Return the climatological weights based on the daily average weights in the
rolling window. This is done using a geometric mean.
Parameters
----------
n_cascade_levels: int, optional
Number of cascade levels.
outdir_path: string, optional
Path to folder where the historical weights are stored. Defaults to
path_workdir from rcparams.
n_models: int, optional
Number of NWP models. Defaults to 1.
window_length: int, optional
Length of window (in days) over which to compute the climatological
weights. Defaults to 30.
Returns
-------
climatological_mean_weights: array-like
Array of shape [model, scale_level, ...] containing the climatological
(geometric) mean weights.
"""
past_weights_file = join(outdir_path, "NWP_weights_window.npy")
# past_weights has dimensions date x model x scale_level x ....
if exists(past_weights_file):
past_weights = np.load(past_weights_file)
else:
past_weights = None
# check if there's enough data to compute the climatological skill
if not past_weights or past_weights.shape[0] < window_length:
return get_default_weights(n_cascade_levels, n_models)
# reduce window if necessary
else:
past_weights = past_weights[-window_length:]

# Calculate climatological weights from the past_weights using the
# geometric mean.
geomean_weights = np.exp(np.log(past_weights).mean(axis=0))

return geomean_weights
56 changes: 56 additions & 0 deletions pysteps/blending/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
"""
pysteps.blending.interface
==========================
Interface for the blending module. It returns a callable function for computing
blended nowcasts with NWP models.
.. autosummary::
:toctree: ../generated/
get_method
"""

from pysteps.blending import steps

_blending_methods = dict()
_blending_methods["steps"] = steps.forecast


def get_method(name):
"""Return a callable function for computing nowcasts.
Description:
Return a callable function for computing deterministic or ensemble
precipitation nowcasts.
Implemented methods:
+-----------------+-------------------------------------------------------+
| Name | Description |
+=================+=======================================================+
+-----------------+-------------------------------------------------------+
| steps | the STEPS stochastic nowcasting blending method |
| | described in :cite:`Seed2003`, :cite:`BPS2006` and |
| | :cite:`SPN2013`. The blending weights approach |
| | currently follows :cite:`BPS2006`. |
+-----------------+-------------------------------------------------------+
"""
if isinstance(name, str):
name = name.lower()
else:
raise TypeError(
"Only strings supported for the method's names.\n"
+ "Available names:"
+ str(list(_blending_methods.keys()))
) from None

try:
return _blending_methods[name]
except KeyError:
raise ValueError(
"Unknown blending method {}\n".format(name)
+ "The available methods are:"
+ str(list(_blending_methods.keys()))
) from None
Loading

0 comments on commit 2c639f8

Please sign in to comment.