## Random Numbers and Reproducibility
Now you might ask, can I reproduce my results if the application uses random numbers? Can I generate the same results regardless of if the code runs sequentially or in parallel? This chapter will answer these questions. You will learn about a random number generator well suited to a parallel environment and how the various packages make use of it.

### SOCK vs. FORK
In addition to the code in the previous exercise, we also created a FORK cluster for you.

cl.fork <- makeCluster(2, type = "FORK")
Your job is to register the two cluster objects with the preloaded doParallel package and compare results obtained with parallel foreach. How do the results differ in terms of reproducibility?

In [4]:
# Load packages

library(parallel)
library(foreach)
library(doParallel)


# Register the SOCK cluster
cl_sock <- makeCluster(2, type = "SOCK")
registerDoParallel(cl.sock)

replicate(
  # Use 2 replicates
  n = 2, 
  expr = {
    # Set the seed to 100
    set.seed(100)
    # Run two iterations in parallel, bound by rows
    foreach(i = 1:2, .combine = rbind) %dopar% rnorm(3)
  }, 
  simplify = FALSE
)

0,1,2,3
result.1,-1.6514072,0.767988,0.4183381
result.2,-0.2876203,-0.9179266,-0.1580649

0,1,2,3
result.1,-0.3895553,-0.2045693,-1.0037024
result.2,-1.6960712,1.4756209,0.4041849


In [5]:
# Change this to register the FORK cluster
cl_sock <- makeCluster(2, type = "FORK")
registerDoParallel(cl.fork)

# Run this again and look at the output!
replicate(
  n = 2, 
  expr = {
    set.seed(100)
    foreach(i = 1:2, .combine = rbind) %dopar% rnorm(3)
  }, 
  simplify = FALSE
)

# fyi - FORK generates the same random numbers on each node because nodes are copies of the master.

ERROR: Error in makeForkCluster(nnodes = spec, ...): fork clusters are not supported on Windows


### Setting an RNG
In this exercise, you will look at the impact of the clusterSetRNGStream() function by checking the RNG kind before and after the RNG initialization.

clusterSetRNGStream() takes two arguments: a cluster object, and an integer for the random seed.


In [6]:
# Create a cluster
cl <- makeCluster(2)

# Check RNGkind on workers
clusterCall(cl, RNGkind)

# Set the RNG seed on workers
clusterSetRNGStream(cl, 100)

# Check RNGkind on workers
clusterCall(cl, RNGkind)

# You confirmed that by using clusterSetRNGStream() the worker's RNG switches to the L'Ecuyer generator.

### Reproducible results in parallel
Now you are ready to make your results reproducible. You will use the simple embarrassingly parallel application for computing a mean of random numbers, mean_of_rnorm(), which we parallelized in the second chapter using clusterApply(). The parallel package, mean_of_rnorm(), n_vec (vector of sample sizes of 1000 replicated 5 times) are available in your workspace. You will now call clusterApply() repeatedly to check if results can be reproduced, without and with initializing the RNG.

In [7]:
# The cluster, & how many numbers to generate

n_vec = rep(1000, 5)

# define mean_of_rnorm
mean_of_rnorm <- function(n) {
  # Generate normally distributed random numbers
  random_numbers <- rnorm(n)
  # Calculate the mean of the random numbers
  mean(random_numbers)
}

t(replicate(
  # Use 3 replicates
  n = 3,
  expr = {
    # Spread across cl, apply mean_of_rnorm() to n_vec
    clusterApply(cl, n_vec, mean_of_rnorm)
  }
))

0,1,2,3,4
-0.01586949,0.01799809,0.0176576,-0.05922263,-0.0549694
-0.02851165,-0.01373292,-0.01867162,0.06237399,-0.03204695
-0.02196545,-0.01464207,-0.001095244,-0.01865088,-9.867087e-06


In [8]:
# Update the replicated expression so that you set the cluster's random number generation stream's seed to 1234 
# before you call clusterApply().

t(replicate(
  n = 3,
  expr = {
    # Set the cluster's RNG stream seed to 1234
    clusterSetRNGStream(cl, iseed = 1234)
    clusterApply(cl, n_vec, mean_of_rnorm)
  }
))

0,1,2,3,4
-0.008597904,-0.006089337,-0.01398052,-0.06629339,0.004297755
-0.008597904,-0.006089337,-0.01398052,-0.06629339,0.004297755
-0.008597904,-0.006089337,-0.01398052,-0.06629339,0.004297755


In [9]:
# run again
# Look at each row of the output. You should start to experience a sensation of deja vu.
t(replicate(
  n = 3,
  expr = {
    # Set the cluster's RNG stream seed to 1234
    clusterSetRNGStream(cl, iseed = 1234)
    clusterApply(cl, n_vec, mean_of_rnorm)
  }
))

0,1,2,3,4
-0.008597904,-0.006089337,-0.01398052,-0.06629339,0.004297755
-0.008597904,-0.006089337,-0.01398052,-0.06629339,0.004297755
-0.008597904,-0.006089337,-0.01398052,-0.06629339,0.004297755


### Non-reproducible results in parallel
Now we know that we can make results reproducible, so let's look at a situation when we cannot do so. We will run the previous example on clusters of different sizes, once on a cluster with two workers and on a cluster with four workers. Again, the parallel package, mean_of_rnorm(), and n_vec, are in your workspace.

In [10]:
# Make a cluster of size 2
cl2 <- makeCluster(2)

# Set the cluster's RNG stream seed to 1234
clusterSetRNGStream(cl2, iseed = 1234)

# Spread across the cluster, apply mean_of_rnorm() to n_vec
unlist(clusterApply(cl2, n_vec, mean_of_rnorm))

In [11]:
# Make a cluster of size 4
cl4 <- makeCluster(4)

# Set the cluster's RNG stream seed to 1234
clusterSetRNGStream(cl4, iseed = 1234)

# Spread across the cluster, apply mean_of_rnorm() to n_vec
unlist(clusterApply(cl4, n_vec, mean_of_rnorm))

### Reproducing migration app with foreach
In the previous chapters, you parallelized an application of migration projections. Now let's make sure the application is reproducible in both parallel as well as sequential environments. You will use the foreach framework with the doParallel, doSEQ, and doRNG backends for the parallel, sequential, and RNG environments, respectively. The function to use is ar1_block_of_trajectories() which you will instruct to generate 10 migration trajectories for each of the first 5 rows of the ar1est parameter dataset.

All the necessary functions, the dataset, seed (set to 123), and packages doParallel and doRNG are loaded in your workspace.

In [15]:
# define ar1_multiple_blocks_of_trajectories (again)

ar1est = read.csv("US migration.csv", header = TRUE)

ar1_one_value <- function(est, r) {
    est['mu'] + est['phi'] * (r - est['mu']) + 
        rnorm(1, sd = est['sigma'])
}

ar1_one_trajectory <- function(est, rate0, len = 15) {
    trajectory <- rep(NA, len)
    rate <- rate0
    for (time in seq_len(len)) {
        trajectory[time] <- ar1_one_value(est, r = rate)
        rate <- trajectory[time]
    }
    trajectory
}

ar1_block_of_trajectories <- function(id, rate0 = 0.015, traj_len = 15, block_size = 10) {
    trajectories <- matrix(NA, nrow = block_size, ncol = traj_len)
    for (i in seq_len(block_size)) 
        trajectories[i,] <- ar1_one_trajectory(unlist(ar1est[id, ]), rate0 = rate0, len = traj_len)
    trajectories
}

# Function definition of ar1_multiple_blocks_of_trajectories()
ar1_multiple_blocks_of_trajectories <- function(ids, ...) {
  # Call ar1_block_of_trajectories() for each ids
  trajectories_by_block <- lapply(ids, ar1_block_of_trajectories, ...)
  
  # rbind results
  do.call(rbind,trajectories_by_block)
}

# load doRNG
# install.packages("doRNG")
library(doRNG)
# Register doParallel and doRNG
seed = 123
registerDoParallel(cores = 2)
registerDoRNG(seed)

# Call ar1_block_of_trajectories via foreach
mpar <- foreach(r = 1:5) %dopar% ar1_block_of_trajectories(r)

# Register sequential backend, set seed and run foreach
registerDoSEQ()
set.seed(seed)
mseq <- foreach(r = 1:5) %dorng% ar1_block_of_trajectories(r)

# Check if results identical
identical(mpar, mseq)

### Reproducing migration app with future.apply
In this exercise, you will implement the same application as before, but now using the future.apply package. The package is available in your workspace, as are all functions and datasets needed for this exercise. The seed is set to 99

In [None]:
## NOT RUN

# IMPORTANT: The future_lapply() function has been moved to the future.apply package. 
# The version in this package (future) is defunct and gives an error if used. It will be fully removed in an upcoming release. 

# Set multiprocess plan 
plan(multiprocess, workers = 2)

# Call ar1_block_of_trajectories via future_lapply
mfpar <- future_lapply(1:5, ar1_block_of_trajectories, future.seed = seed)

# Set sequential plan and repeat future_lapply()
plan(sequential)
mfseq <- future_lapply(1:5, ar1_block_of_trajectories, future.seed = seed)

# Check if results are identical
identical(mfpar, mfseq)
