Skip to content

Commit

Permalink
Add utilities to decompose, store and load NWP cascades for use in bl…
Browse files Browse the repository at this point in the history
…ending (#232)

* First version of NWP decomposition

* Added saving to netCDF

* Completed functions for saving and loading decomposed NWP data

* Added example files for the decomposed NWP functions

* Added compatibility with numpy datetime

* Use default output path_workdir for tmp files in blending/utils.py.

* Update documentation of NWP decomposition functions in utils.py

Co-authored-by: Wout Dewettinck <wout.dewettinck@ugent.be>
Co-authored-by: wdewettin <87696913+wdewettin@users.noreply.github.com>
  • Loading branch information
3 people committed Aug 25, 2021
1 parent 4b2ca89 commit 81e88fe
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,6 @@ venv.bak/

# mypy
.mypy_cache/

# example data
examples/radar/
9 changes: 9 additions & 0 deletions examples/read_netcdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import os, netCDF4
import numpy as np
from pysteps import rcparams
from pysteps.blending.utils import load_NWP

NWP_output = rcparams.outputs["path_workdir"] + "cascade_alaro13_01_20170131110000.nc"
start_time = np.datetime64("2017-01-31T11:20")

print(load_NWP(NWP_output, start_time, 4))
71 changes: 71 additions & 0 deletions examples/test_decomposition_NWP.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import matplotlib.pyplot as plt
import numpy as np

from datetime import datetime
from pprint import pprint
from pysteps import io, nowcasts, rcparams
from pysteps.motion.lucaskanade import dense_lucaskanade
from pysteps.postprocessing.ensemblestats import excprob
from pysteps.utils import conversion, dimension, transformation
from pysteps.visualization import plot_precip_field
from pysteps.blending.utils import decompose_NWP
from pysteps.cascade.bandpass_filters import filter_gaussian

num_cascade_levels = 8

###############################################################################
# Read precipitation field
# ------------------------
#
# First thing, the sequence of Swiss radar composites is imported, converted and
# transformed into units of dBR.


date = datetime.strptime("201701311200", "%Y%m%d%H%M")
data_source = "mch"

# Load data source config
root_path = rcparams.data_sources[data_source]["root_path"]
path_fmt = rcparams.data_sources[data_source]["path_fmt"]
fn_pattern = rcparams.data_sources[data_source]["fn_pattern"]
fn_ext = rcparams.data_sources[data_source]["fn_ext"]
importer_name = rcparams.data_sources[data_source]["importer"]
importer_kwargs = rcparams.data_sources[data_source]["importer_kwargs"]
timestep = rcparams.data_sources[data_source]["timestep"]

# Find the radar files in the archive
fns = io.find_by_date(
date, root_path, path_fmt, fn_pattern, fn_ext, timestep, num_prev_files=12
)

# Read the data from the archive
importer = io.get_method(importer_name, "importer")
R_NWP, _, metadata = io.read_timeseries(fns, importer, legacy=True, **importer_kwargs)

# Convert to rain rate
R_NWP, metadata = conversion.to_rainrate(R_NWP, metadata)

# Log-transform the data
R_NWP, metadata = transformation.dB_transform(
R_NWP, metadata, threshold=0.1, zerovalue=-15.0
)

# Fill in the missing data with the threshold value
R_NWP[~np.isfinite(R_NWP)] = metadata["zerovalue"]

# Find the location to save the NWP files
NWP_output = rcparams.outputs["path_workdir"]

# Define the start time of the NWP forecast
analysis_time = metadata["timestamps"][0]

# Decompose the NWP and save to netCDF file
decompose_NWP(
R_NWP,
"alaro13_01",
analysis_time,
5,
metadata["timestamps"],
num_cascade_levels,
NWP_output,
)
229 changes: 229 additions & 0 deletions pysteps/blending/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@
stack_cascades
normalize_cascade
blend_optical_flows
decompose_NWP
load_NWP
"""

import numpy as np
from datetime import datetime, timedelta
from pysteps.cascade import get_method as cascade_get_method
from pysteps.cascade.bandpass_filters import filter_gaussian
from pysteps import utils, rcparams
import os
import netCDF4


def stack_cascades(R_d, donorm=True):
Expand Down Expand Up @@ -136,3 +144,224 @@ def blend_optical_flows(flows, weights):
combined_flows = np.sum(all_c_wn, axis=-1)
# combined_flows [2, m, n]
return combined_flows


def decompose_NWP(
R_NWP,
NWP_model,
analysis_time,
timestep,
valid_times,
num_cascade_levels=8,
output_path=rcparams.outputs["path_workdir"],
decomp_method="fft",
fft_method="numpy",
domain="spatial",
normalize=True,
compute_stats=True,
compact_output=True,
):
"""Decomposes the NWP forecast data into cascades and saves it in
a netCDF file
Parameters
----------
R_NWP: array-like
Array of dimension (n_timesteps, x, y) containing the precipitation forecast
from some NWP model.
NWP_model: str
The name of the NWP model
analysis_time: numpy.datetime64
The analysis time of the NWP forecast. The analysis time is assumed to be a
numpy.datetime64 type as imported by the pysteps importer
timestep: int
Timestep in minutes between subsequent NWP forecast fields
valid_times: array_like
Array containing the valid times of the NWP forecast fields. The times are
assumed to be numpy.datetime64 types as imported by the pysteps importer
num_cascade_levels: int, optional
The number of frequency bands to use. Must be greater than 2. Defaults to 8.
output_path: str, optional
The location where to save the file with the NWP cascade. Defaults to the
path_workdir specified in the rcparams file.
Other Parameters
----------------
decomp_method: str, optional
A string defining the decomposition method to use. Defaults to "fft".
fft_method: str or tuple, optional
A string or a (function,kwargs) tuple defining the FFT method to use
(see :py:func:`pysteps.utils.interface.get_method`).
Defaults to "numpy". This option is not used if input_domain and
output_domain are both set to "spectral".
domain: {"spatial", "spectral"}, optional
If "spatial", the output cascade levels are transformed back to the
spatial domain by using the inverse FFT. If "spectral", the cascade is
kept in the spectral domain. Defaults to "spatial".
normalize: bool, optional
If True, normalize the cascade levels to zero mean and unit variance.
Requires that compute_stats is True. Implies that compute_stats is True.
Defaults to False.
compute_stats: bool, optional
If True, the output dictionary contains the keys "means" and "stds"
for the mean and standard deviation of each output cascade level.
Defaults to False.
compact_output: bool, optional
Applicable if output_domain is "spectral". If set to True, only the
parts of the Fourier spectrum with non-negligible filter weights are
stored. Defaults to False.
Returns
-------
Nothing
"""

# Make a NetCDF file
date_string = np.datetime_as_string(analysis_time, "s")
outfn = os.path.join(
output_path,
"cascade_"
+ NWP_model
+ "_"
+ date_string[:4]
+ date_string[5:7]
+ date_string[8:10]
+ date_string[11:13]
+ date_string[14:16]
+ date_string[17:19]
+ ".nc",
)
ncf = netCDF4.Dataset(outfn, "w", format="NETCDF4")

# Express times relative to the zero time
zero_time = np.datetime64("1970-01-01T00:00:00", "ns")
valid_times = np.array(valid_times) - zero_time
analysis_time = analysis_time - zero_time

# Set attributes of decomposition method
ncf.domain = domain
ncf.normalized = int(normalize)
ncf.compact_output = int(compact_output)
ncf.analysis_time = int(analysis_time)
ncf.timestep = int(timestep)

# Create dimensions
time_dim = ncf.createDimension("time", R_NWP.shape[0])
casc_dim = ncf.createDimension("cascade_levels", num_cascade_levels)
x_dim = ncf.createDimension("x", R_NWP.shape[1])
y_dim = ncf.createDimension("y", R_NWP.shape[2])

# Create variables (decomposed cascade, means and standard deviations)
R_d = ncf.createVariable(
"pr_decomposed", np.float64, ("time", "cascade_levels", "x", "y")
)
means = ncf.createVariable("means", np.float64, ("time", "cascade_levels"))
stds = ncf.createVariable("stds", np.float64, ("time", "cascade_levels"))
v_times = ncf.createVariable("valid_times", np.float64, ("time",))
v_times.units = "nanoseconds since 1970-01-01 00:00:00"

# The valid times are saved as an array of floats, because netCDF files can't handle datetime types
v_times[:] = np.array([np.float64(valid_times[i]) for i in range(len(valid_times))])

# Decompose the NWP data
filter_g = filter_gaussian(R_NWP.shape[1:], num_cascade_levels)
fft = utils.get_method(fft_method, shape=R_NWP.shape[1:], n_threads=1)
decomp_method, _ = cascade_get_method(decomp_method)

for i in range(R_NWP.shape[0]):
R_ = decomp_method(
R_NWP[i, :, :],
filter_g,
fft_method=fft,
output_domain=domain,
normalize=normalize,
compute_stats=compute_stats,
compact_output=compact_output,
)

# Save data to netCDF file
R_d[i, :, :, :] = R_["cascade_levels"]
means[i, :] = R_["means"]
stds[i, :] = R_["stds"]

# Close the file
ncf.close()


def load_NWP(input_nc_path, start_time, n_timesteps):
"""Loads the decomposed NWP data from the netCDF files
Parameters
----------
input_nc_path: str
Path to the saved netCDF file containing the decomposed NWP data.
start_time: numpy.datetime64
The start time of the nowcasting. Assumed to be a numpy.datetime64 type
n_timesteps: int
Number of time steps to forecast
Returns
-------
R_d: list
A list of dictionaries with each element in the list corresponding to
a different time step. Each dictionary has the same structure as the
output of the decomposition function
"""

# Open the file
ncf = netCDF4.Dataset(input_nc_path, "r", format="NETCDF4")

# Initialise the decomposition dictionary
decomp_dict = dict()
decomp_dict["domain"] = ncf.domain
decomp_dict["normalized"] = bool(ncf.normalized)
decomp_dict["compact_output"] = bool(ncf.compact_output)

# Convert the start time and the timestep to datetime64 and timedelta64 type
zero_time = np.datetime64("1970-01-01T00:00:00", "ns")
analysis_time = np.timedelta64(int(ncf.analysis_time), "ns") + zero_time

timestep = ncf.timestep
timestep = np.timedelta64(timestep, "m")

valid_times = ncf.variables["valid_times"][:]
valid_times = np.array(
[np.timedelta64(int(valid_times[i]), "ns") for i in range(len(valid_times))]
)
valid_times = valid_times + zero_time

# Add the valid times to the output
decomp_dict["valid_times"] = valid_times

# Find the indices corresponding with the required start and end time
start_i = (start_time - analysis_time) // timestep
assert analysis_time + start_i * timestep == start_time
end_i = start_i + n_timesteps + 1

# Initialise the list of dictionaries which will serve as the output (cf: the STEPS function)
R_d = list()

for i in range(start_i, end_i):
decomp_dict_ = decomp_dict.copy()

cascade_levels = ncf.variables["pr_decomposed"][i, :, :, :]

# In the netcdf file this is saved as a masked array, so we're checking if there is no mask
assert not cascade_levels.mask

means = ncf.variables["means"][i, :]
assert not means.mask

stds = ncf.variables["stds"][i, :]
assert not stds.mask

# Save the values in the dictionary as normal arrays with the filled method
decomp_dict_["cascade_levels"] = np.ma.filled(cascade_levels)
decomp_dict_["means"] = np.ma.filled(means)
decomp_dict_["stds"] = np.ma.filled(stds)

# Append the output list
R_d.append(decomp_dict_)

return R_d
2 changes: 1 addition & 1 deletion pysteps/pystepsrc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"outputs": {
// path_outputs : path where to save results (figures, forecasts, etc)
"path_outputs": "./",
"path_workdir": "./tmp/"
"path_workdir": "./"
},
"plot": {
// "motion_plot" : "streamplot" or "quiver"
Expand Down

0 comments on commit 81e88fe

Please sign in to comment.