In [1]:
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from joblib import Parallel, delayed

# Pseudo Random Numbers and parallel computing
This notebook highlights the dangers of using pseudo random numbers in our parallelised research code. We will consider three types of code execution: *serial execution* in which instances of a function are called succesively, *concurrent esecution* which takes advantage of multi-threaded cores to run functions asynchronously on the same core, and *parallel execution* which runs instances of a function on multiple cores. 


## Example : randomly parameterised hydrological model
We will consider the hydrological model developed in the research paper. We have a `HydrologicalModel` object, which has parameters that are drawn from distrubutions using `np.random`. We then use the initialised model to produce 6 modelled discharge values for a catchment with `area=1000` km.

We will do this process using series, concurrent and parallel execution, and look at how our results differ. In each instance, we will initialise a new random seed prior to each execution using `random.seed(42)`, so each attempt *should* have access to the same list of random numbers.

In [5]:
class HydrologicalModel:
    def __init__(self):
        """Initialize the model with a stochastic random state.

        attributes:
            alpha (np.float32) : normally distributed intercept.
            beta (np.float32) : normally distributed slope.
            sigma (np.float32) : uniformally distributed residual std.
            
        """
        self.alpha = np.random.normal(-3.462, 0.529)  # Intercept variation
        self.beta = np.random.normal(0.755, 0.085)  # Slope variation
        self.sigma = np.random.uniform(1.15, 0.084)  # Error term variation
    
    def predict_discharge(self, area):
        """Predict discharge using a stochastic model.
        args:
            area (int) : the area of catchment for prediction. 
        returns:
            np.float32 : the estimated discharge. 
        """
        return np.exp(self.alpha + self.beta * np.log(area)) + np.random.normal(0, self.sigma)

# Stochastic simulation function
def simulate_discharge(_):
    model = HydrologicalModel()
    return model.predict_discharge(area=1000)  # Example catchment area (km²)

In [3]:
# set random state
np.random.seed(42)
# Serial Execution
serial_results = [simulate_discharge(i) for i in range(6)]

np.random.seed(42)
# concurrent Execution 
with ThreadPoolExecutor(max_workers=3) as executor:
    thread_results = list(executor.map(simulate_discharge, range(6)))

np.random.seed(42)
# Parallel Execution
process_results = Parallel(n_jobs=3)(delayed(simulate_discharge)(i) for i in range(6))

In [4]:
print(f"Serial Results: ", np.round(serial_results, 3))
print("Threaded Results: ", np.round(thread_results, 3))
print("Multiprocessing Results: ", np.round(process_results, 3))

Serial Results:  [6.513 8.451 5.648 1.562 3.474 1.646]
Threaded Results:  [6.513 8.451 5.648 1.562 3.474 1.646]
Multiprocessing Results:  [ 3.573  7.126  6.493 10.309  3.188  2.927]


## Fixing the code : Useing `np-.random.RandomState` and sharing between models
The improved code below adds an argument `rng` to the model's `__init__` function. This allows a numpy random number generator to be passed to the model. We also add an argument `seed` to the `simulate_discharge()` function which allows us to set the random state of the `rng` used in a model prior to it being called. By iterating over seeds `[42, 43, 44, 45, 46, 47]` we can ensure that a consistent random seed is used, even in parallel execution. 

This new approach gets around the issue of `np.random.seed()` not being accessible to the parallel tasks by ensuring that a specific random number generator is allocated to model instances. 

In [9]:
class HydrologicalModel:
    def __init__(self, rng):
        """Initialize the model with a stochastic random state.
        args:
            rng (np.default_rng) : a numpy random number generator.
        attributes:
            alpha (np.float32) : normally distributed intercept.
            beta (np.float32) : normally distributed slope.
            sigma (np.float32) : uniformally distributed residual std.
            
        """
        self.rng = rng
        self.alpha = rng.normal(-3.462, 0.529)  # Intercept variation
        self.beta = rng.normal(0.755, 0.085)  # Slope variation
        self.sigma = rng.uniform(1.15, 0.084)  # Error term variation
    
    def predict_discharge(self, area):
        """Predict discharge using a stochastic model.
        args:
            area (int) : the area of catchment for prediction. 
        returns:
            np.float32 : the estimated discharge. 
        """
        return np.exp(self.alpha + self.beta * np.log(area)) + self.rng.normal(0, self.sigma)

# Stochastic simulation function
def simulate_discharge(seed):
    rng = np.random.RandomState(seed)
    model = HydrologicalModel(rng)
    return model.predict_discharge(area=1000)  # Example catchment area (km²)

In [10]:
# set random state
# Serial Execution
serial_results = [simulate_discharge(42+i) for i in range(6)]

# concurrent Execution 
with ThreadPoolExecutor(max_workers=3) as executor:
    thread_results = list(executor.map(simulate_discharge, [42 + i for i in range(6)]))

# Parallel Execution
process_results = Parallel(n_jobs=3)(delayed(simulate_discharge)(42+i) for i in range(6))

In [11]:
print(f"Serial Results: ", np.round(serial_results, 3))
print("Threaded Results: ", np.round(thread_results, 3))
print("Multiprocessing Results: ", np.round(process_results, 3))

Serial Results:  [ 6.513  3.113  7.242  6.734 15.514  8.157]
Threaded Results:  [ 6.513  3.113  7.242  6.734 15.514  8.157]
Multiprocessing Results:  [ 6.513  3.113  7.242  6.734 15.514  8.157]
