Skip to content

Commit

Permalink
Merge pull request dynamicslab#196 from kratzert/master
Browse files Browse the repository at this point in the history
* Fix typo in models doc (dynamicslab#190)

* CPU environment now pulls from conda forge, which is necessary to ensure an up-to-date xarray. (dynamicslab#193)

* Fix handling of weekly frequencies. (dynamicslab#194)

From neuralhydrology/neuralhydrology#111:
pd.infer_freq will return strings like "W-SUN" for weekly data, which
pd.Timestamp doesn't understand. We now convert these frequencies to
their equivalent multiple of 7D.

* Correcting the pre model hook and UMAL sampling (dynamicslab#195)

* The current use of `pre_model_hook` only applied the hook in training.
This can be useful, but UMAL requires the hook also in validation and
test. Thus, with the old setup UMAL only worked for the training.
My propsoed changes make the `pre_model_hook` part of the model, apply
it everywhere, and allow UMAL validation and evaluation.

I think in the future we should also allow for different hook behaviors
according to the setting at which it is called. But, for now  the
proposed changes are enough.

* Simpler Hook and Cleaner Pipeline

This commit comprises two things.
(1) A pre model hook that is simpler than the one in  original PR.
(2) An idea to avoid copying the whole dataset that just involves
a copy of the labels. That is still suboptimal in terms of memory
use, but make the overall code simpler. Not sure if it is the best
version. However,  at some point we have to extend the labels for
the loss, and we need to do so  withouth breaking the whole
downstream procedure. So maybe this is a good middle ground.

* Simplified UMAL Sampling Logic

This commit implements an idea from Martin that simplifies the sampling logic of the UMAL sampling util so that the sampling automatically checks whether the data has been extended.
Thus no extra argument is required for the function call for `sample_umal`.

* Spell correction neuralhydrology/modelzoo/basemodel.py

Updated comment so that additional is spelled correctly

Co-authored-by: Martin Gauch <15731649+gauchm@users.noreply.github.com>

* Spell correction for neuralhydrology/utils/samplingutils.py

UMAl -> UMAL

Co-authored-by: Martin Gauch <15731649+gauchm@users.noreply.github.com>

* Imporvement for Comments

Added some minor changes to the comments.

---------

Co-authored-by: Martin Gauch <15731649+gauchm@users.noreply.github.com>

* Update __about__.py

---------

Co-authored-by: Martin Gauch <15731649+gauchm@users.noreply.github.com>
Co-authored-by: Grey Nearing <grey.nearing@gmail.com>
Co-authored-by: Daniel Klotz <klotz@ml.jku.at>
  • Loading branch information
4 people committed May 17, 2023
2 parents 9304f7c + 8828524 commit f8e96a6
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 112 deletions.
2 changes: 1 addition & 1 deletion docs/source/usage/models.rst
Expand Up @@ -28,7 +28,7 @@ The number of components can be set in the config.yml using ``n_distributions``.

UMAL
^^^^
:py:class:`neuralhydrology.modelzoo.head.CMAL` implements an *Uncountable Mixture of Asymmetric Laplacians* head. That is, a mixture density network that uses an uncountable amount of asymmetric Laplace distributions as components. The *uncountable property* is achieved by implicitly learning the conditional density and approximating it, when needed, with a Monte-Carlo integration, using sampled asymmetry parameters. The UMAL components are defined by two parameters (the location and the scale) and linked by a set of weights. The current implementation uses two hidden layers. The output activation for the scale has some major differences to the original implementation, since it is upper bounded (using :py:func:`0.5*torch.sigmoid`).
:py:class:`neuralhydrology.modelzoo.head.UMAL` implements an *Uncountable Mixture of Asymmetric Laplacians* head. That is, a mixture density network that uses an uncountable amount of asymmetric Laplace distributions as components. The *uncountable property* is achieved by implicitly learning the conditional density and approximating it, when needed, with a Monte-Carlo integration, using sampled asymmetry parameters. The UMAL components are defined by two parameters (the location and the scale) and linked by a set of weights. The current implementation uses two hidden layers. The output activation for the scale has some major differences to the original implementation, since it is upper bounded (using :py:func:`0.5*torch.sigmoid`).

During inference the number of components and weights used for the Monte-Carlo approximation are defined in the config.yml by ``n_taus``. The additional argument ``umal_extend_batch`` allows to explicitly account for this integration step during training by repeatedly sampling the asymmetry parameter and extending the batch by ``n_taus``. Furthermore, depending on the used output activation the sampling of the asymmetry parameters can yield unwarranted model behavior. Therefore the lower- and upper-bounds of the sampling can be adjusted using the ``tau_down`` and ``tau_up`` options in the config yml.
The sampling for UMAL is defined by choosing the number of samples (``n_samples``), and the approach for handling negative samples (``negative_sample_handling``).
Expand Down
4 changes: 2 additions & 2 deletions environments/environment_cpu.yml
Expand Up @@ -2,7 +2,7 @@ name: neuralhydrology
channels:
- pytorch
- defaults
- anaconda
- conda-forge
dependencies:
- bokeh
- cpuonly
Expand All @@ -29,4 +29,4 @@ dependencies:
- sphinx-rtd-theme
- nbsphinx
- nbsphinx-link
- parso==0.8.0
- parso==0.8.0
2 changes: 1 addition & 1 deletion neuralhydrology/__about__.py
@@ -1 +1 @@
__version__ = "1.6.0"
__version__ = "1.7.0"
6 changes: 6 additions & 0 deletions neuralhydrology/datautils/utils.py
Expand Up @@ -173,6 +173,12 @@ def infer_frequency(index: Union[pd.DatetimeIndex, np.ndarray]) -> str:
if native_frequency[0] not in '0123456789': # add a value to the unit so to_timedelta works
native_frequency = f'1{native_frequency}'

# pd.Timedelta doesn't understand weekly (W) frequencies, so we convert them to the equivalent multiple of 7D.
weekly_freq = re.match('(\d+)W(-(MON|TUE|WED|THU|FRI|SAT|SUN))?$', native_frequency)
if weekly_freq is not None:
n = int(weekly_freq[1]) * 7
native_frequency = f'{n}D'

# Assert that the frequency corresponds to a positive time delta. We first add one offset to the base datetime
# to make sure it's aligned with the frequency. Otherwise, adding an offset of e.g. 0Y would round up to the
# nearest year-end, so we'd incorrectly miss a frequency of zero.
Expand Down
1 change: 1 addition & 0 deletions neuralhydrology/evaluation/tester.py
Expand Up @@ -422,6 +422,7 @@ def _evaluate(self, model: BaseModel, loader: DataLoader, frequencies: List[str]
for key in data:
if not key.startswith('date'):
data[key] = data[key].to(self.device)
data = model.pre_model_hook(data, is_train=False)
predictions, loss = self._get_predictions_and_loss(model, data)

for freq in frequencies:
Expand Down
26 changes: 25 additions & 1 deletion neuralhydrology/modelzoo/basemodel.py
Expand Up @@ -4,7 +4,7 @@
import torch.nn as nn

from neuralhydrology.utils.config import Config
from neuralhydrology.utils.samplingutils import sample_pointpredictions
from neuralhydrology.utils.samplingutils import sample_pointpredictions, umal_extend_batch


class BaseModel(nn.Module):
Expand Down Expand Up @@ -69,3 +69,27 @@ def forward(self, data: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
Model output and potentially any intermediate states and activations as a dictionary.
"""
raise NotImplementedError

def pre_model_hook(self, data: Dict[str, torch.Tensor], is_train: bool) -> Dict[str, torch.Tensor]:
"""A function to execute before the model in training, validaton and test.
The beahvior can be adapted depending on the run configuration and the provided arguments.
Parameters
----------
data : Dict[str, torch.Tensor]
Dictionary, containing input features as key-value pairs and labels y.
is_train : bool
Defines if the hook is executed in train mode or in validation/test mode.
Returns
-------
data : Dict[str, torch.Tensor]
The modified (or unmodified) data that are used for the training or evaluation.
"""
if self.cfg.head.lower() == "umal":
data = umal_extend_batch(data, self.cfg, n_taus=self.cfg.n_taus, extend_y=True)
else:
# here one can implement additional pre model hooks
pass

return data
6 changes: 2 additions & 4 deletions neuralhydrology/training/basetrainer.py
Expand Up @@ -284,8 +284,8 @@ def _train_epoch(self, epoch: int):
if not key.startswith('date'):
data[key] = data[key].to(self.device)

# apply possible subclass pre-processing
data = self._pre_model_hook(data)
# apply possible pre-processing to the batch before the forward pass
data = self.model.pre_model_hook(data, is_train=True)

# get predictions
predictions = self.model(data)
Expand Down Expand Up @@ -382,5 +382,3 @@ def _create_folder_structure(self):
self.cfg.img_log_dir = self.cfg.run_dir / "img_log"
self.cfg.img_log_dir.mkdir(parents=True)

def _pre_model_hook(self, data: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
return data
6 changes: 3 additions & 3 deletions neuralhydrology/training/loss.py
Expand Up @@ -360,15 +360,15 @@ class MaskedUMALLoss(BaseLoss):
def __init__(self, cfg, eps: float = 1e-5):
super(MaskedUMALLoss, self).__init__(cfg,
prediction_keys=['mu', 'b'],
ground_truth_keys=['y', 'tau'],
ground_truth_keys=['y_extended', 'tau'],
output_size_per_target=2)
self.eps = eps
self._n_taus_count = cfg.n_taus
self._n_taus_log = torch.as_tensor(np.log(cfg.n_taus).astype('float32'))

def _get_loss(self, prediction: Dict[str, torch.Tensor], ground_truth: Dict[str, torch.Tensor], **kwargs):
mask = ~torch.isnan(ground_truth['y']).any(1).any(1)
y = ground_truth['y'][mask]
mask = ~torch.isnan(ground_truth['y_extended']).any(1).any(1)
y = ground_truth['y_extended'][mask]
t = ground_truth['tau'][mask]
m = prediction['mu'][mask]
b = prediction['b'][mask]
Expand Down
5 changes: 1 addition & 4 deletions neuralhydrology/training/train.py
@@ -1,5 +1,4 @@
from neuralhydrology.training.basetrainer import BaseTrainer
from neuralhydrology.training.umaltrainer import UMALTrainer
from neuralhydrology.utils.config import Config


Expand All @@ -13,10 +12,8 @@ def start_training(cfg: Config):
"""
# MC-LSTM is a special case, where the head returns an empty string but the model is trained as regression model.
if cfg.head.lower() in ['regression', 'gmm', 'cmal', '']:
if cfg.head.lower() in ['regression', 'gmm', 'umal', 'cmal', '']:
trainer = BaseTrainer(cfg=cfg)
elif cfg.head.lower() == 'umal':
trainer = UMALTrainer(cfg=cfg)
else:
raise ValueError(f"Unknown head {cfg.head}.")
trainer.initialize_training()
Expand Down
25 changes: 0 additions & 25 deletions neuralhydrology/training/umaltrainer.py

This file was deleted.

67 changes: 0 additions & 67 deletions neuralhydrology/training/utils.py

This file was deleted.

66 changes: 63 additions & 3 deletions neuralhydrology/utils/samplingutils.py
Expand Up @@ -6,7 +6,6 @@
from torch.distributions import Categorical, Uniform

from neuralhydrology.utils.config import Config
from neuralhydrology.training.utils import umal_extend_batch


def sample_pointpredictions(model: 'BaseModel', data: Dict[str, torch.Tensor],
Expand Down Expand Up @@ -186,7 +185,7 @@ def __init__(self, model: 'BaseModel', data: Dict[str, torch.Tensor], head: str)
else:
self.freq_suffixes = ['']

self.batch_size_data = data[f'x_d{self.freq_suffixes[0]}'].shape[0]
self.batch_size_data = data[f'y{self.freq_suffixes[0]}'].shape[0]

def _get_frequency_last_n(self, freq_suffix: str):
if isinstance(self.predict_last_n, int):
Expand Down Expand Up @@ -486,7 +485,8 @@ def sample_umal(model: 'BaseModel', data: Dict[str, torch.Tensor], n_samples: in
model.train()

# n_taus expands the batch by itself and adds a sampled tau as input (new_batch_size = n_taus*batch_size):
data = umal_extend_batch(data, setup.cfg, n_taus=setup.cfg.n_taus)
if 'y_extended' not in data:
data = model.pre_model_hook(data, is_train=False)

# make predictions:
pred = model(data)
Expand Down Expand Up @@ -543,6 +543,66 @@ def sample_umal(model: 'BaseModel', data: Dict[str, torch.Tensor], n_samples: in
samples.update({freq_key: sample_points})
return samples

def umal_extend_batch(data: Dict[str, torch.Tensor], cfg: Config, n_taus: int = 1, extend_y: bool = False) \
-> Dict[str, torch.Tensor]:
"""This function extends the batch for the usage in UMAL (see: [#]_).
UMAL makes an MC approximation to a mixture integral by sampling random asymmetry parameters (tau). This can be
parallelized by expanding the batch for each tau.
Parameters
----------
data : Dict[str, torch.Tensor]
Dictionary, containing input features as key-value pairs.
cfg : Config
The run configuration.
n_taus : int
Number of taus to expand the batch.
extend_y : bool
Option to also extend the labels/y.
Returns
-------
Dict[str, torch.Tensor]
Dictionary, containing expanded input features and tau samples as key-value pairs.
References
----------
.. [#] A. Brando, J. A. Rodriguez, J. Vitria, and A. R. Munoz: Modelling heterogeneous distributions
with an Uncountable Mixture of Asymmetric Laplacians. Advances in Neural Information Processing Systems,
pp. 8838-8848, 2019.
"""
# setup:
if cfg.use_frequencies:
freq_suffixes = [f'_{freq}' for freq in cfg.use_frequencies]
else:
freq_suffixes = ['']

for freq_suffix in freq_suffixes:
batch_size, seq_length, input_size = data[f'x_d{freq_suffix}'].shape

if isinstance(cfg.predict_last_n, int):
predict_last_n = cfg.predict_last_n
else:
predict_last_n = cfg.predict_last_n[freq_suffix[1:]]

# sample tau within [tau_down, tau_up] and add to data:
tau = (cfg.tau_up - cfg.tau_down) * torch.rand(batch_size * n_taus, 1, 1) + cfg.tau_down
tau = tau.repeat(1, seq_length, 1) # in our convention tau remains the same over all inputs
tau = tau.to(data[f'x_d{freq_suffix}'].device)
data[f'tau{freq_suffix}'] = tau[:, -predict_last_n:, :]

# extend dynamic inputs with tau and expand batch:
x_d = data[f'x_d{freq_suffix}'].repeat(n_taus, 1, 1)
data[f'x_d{freq_suffix}'] = torch.cat([x_d, tau], dim=-1)
data[f'y_extended{freq_suffix}'] = data[f'y{freq_suffix}'].repeat(n_taus, 1, 1)
if f'x_s{freq_suffix}' in data:
data[f'x_s{freq_suffix}'] = data[f'x_s{freq_suffix}'].repeat(n_taus, 1)
if f'x_one_hot{freq_suffix}' in data:
data[f'x_one_hot{freq_suffix}'] = data[f'x_one_hot{freq_suffix}'].repeat(n_taus, 1)

return data


@njit
def bernoulli_subseries_sampler(
Expand Down
2 changes: 1 addition & 1 deletion test/test_datautils.py
Expand Up @@ -20,7 +20,7 @@ def test_infer_frequency():
"""Test the logic to infer frequencies. """
assert infer_frequency(pd.date_range('2000-01-01', '2000-01-10', freq='D')) == '1D'
assert infer_frequency(pd.date_range('2000-01-01', '2000-01-10', freq='48H')) == '2D'
assert infer_frequency(pd.date_range('2000-01-01', '2000-03-01', freq='W-MON')) == '1W-MON'
assert infer_frequency(pd.date_range('2000-01-01', '2000-03-01', freq='W-MON')) == '7D'

# just a single date
pytest.raises(ValueError, infer_frequency, [pd.to_datetime('2000-01-01')])
Expand Down

0 comments on commit f8e96a6

Please sign in to comment.