Skip to content

Commit

Permalink
simulations now in parallel for multiple parameter sets, resolves #7
Browse files Browse the repository at this point in the history
  • Loading branch information
kratzert committed Oct 28, 2017
1 parent e87299d commit 27e420d
Show file tree
Hide file tree
Showing 7 changed files with 402 additions and 323 deletions.
73 changes: 36 additions & 37 deletions docs/source/examples/model_api_example.rst

Large diffs are not rendered by default.

70 changes: 35 additions & 35 deletions examples/model_api_example.ipynb

Large diffs are not rendered by default.

86 changes: 56 additions & 30 deletions rrmpg/models/abcmodel.py
Expand Up @@ -14,7 +14,7 @@

import numpy as np

from numba import njit
from numba import njit, prange
from scipy import optimize

from .basemodel import BaseModel
Expand Down Expand Up @@ -102,7 +102,8 @@ def get_random_params(self, num=1):

return params

def simulate(self, prec, initial_state=0, return_storage=False):
def simulate(self, prec, initial_state=0, return_storage=False,
params=None):
"""Simulate the streamflow for the passed precipitation.
This function makes sanity checks on the input and then calls the
Expand All @@ -114,6 +115,10 @@ def simulate(self, prec, initial_state=0, return_storage=False):
initial_state: (optional) Initial value for the storage.
return_storage: (optional) Boolean, wether or not to return the
simulated storage for each timestep.
params: (optional) Numpy array of parameter sets, that will be
evaluated a once in parallel. Must be of the models own custom
data type. If nothing is passed, the parameters, stored in the
model object, will be used.
Returns:
An array with the simulated stream flow for each timestep and
Expand Down Expand Up @@ -144,10 +149,21 @@ def simulate(self, prec, initial_state=0, return_storage=False):
if not isinstance(return_storage, bool):
raise TypeError("The return_storage arg must be a boolean.")

# Create custom numpy data struct containing the model parameters
params = np.zeros(1, dtype=self._dtype)
for param in self._param_list:
params[param] = getattr(self, param)
# If no parameters were passed, prepare array w. params from attributes
if params is None:
params = np.zeros(1, dtype=self._dtype)
for param in self._param_list:
params[param] = getattr(self, param)

# Else, check the param input for correct datatype
else:
if params.dtype != self._dtype:
msg = ["The model parameters must be a numpy array of the ",
"models own custom data type."]
raise TypeError("".join(msg))
# if only one parameter set is passed, expand dimensions to 1D
if isinstance(params, np.void):
params = np.expand_dims(params, params.ndim)

# Call ABC-model simulation function and return results
if return_storage:
Expand Down Expand Up @@ -228,30 +244,40 @@ def _loss(X, *args):
return loss_value


@njit
@njit(parallel=True)
def _simulate_abc(params, prec, initial_state):
"""Run a simulation of the ABC-model for given input and params."""
# Unpack model parameters
a = params['a'][0]
b = params['b'][0]
c = params['c'][0]

"""Run a simulation of the ABC-model for given input and param sets."""
# Number of simulation timesteps
num_timesteps = len(prec)

# Initialize array for the simulated stream flow and the storage
qsim = np.zeros(num_timesteps, np.float64)
storage = np.zeros(num_timesteps, np.float64)

# Set the initial storage value
storage[0] = initial_state

# Model simulation
for t in range(1, num_timesteps):
# Calculate the streamflow
qsim[t] = (1 - a - b) * prec[t] + c * storage[t-1]

# Update the storage
storage[t] = (1 - c) * storage[t-1] + a * prec[t]

return qsim, storage

# Initialize arrays for all simulations of all parameter sets.
qsim_all = np.zeros((num_timesteps, params.size), dtype=np.float64)
storage_all = np.zeros((num_timesteps, params.size), dtype=np.float64)

# Process different param sets in parallel through 'prange'-loop
for i in prange(params.size):
# Unpack model parameters
a = params['a'][i]
b = params['b'][i]
c = params['c'][i]

# Initialize array for the simulated stream flow and the storage
qsim = np.zeros(num_timesteps, np.float64)
storage = np.zeros(num_timesteps, np.float64)

# Set the initial storage value
storage[0] = initial_state

# Model simulation
for t in range(1, num_timesteps):
# Calculate the streamflow
qsim[t] = (1 - a - b) * prec[t] + c * storage[t-1]

# Update the storage
storage[t] = (1 - c) * storage[t-1] + a * prec[t]

# copy results into arrays of all qsims and storages
qsim_all[:, i] = qsim
storage_all[:, i] = storage

return qsim_all, storage_all

0 comments on commit 27e420d

Please sign in to comment.