
# <center>Introduction to Parallel R</center>
<p>
<center>Robert Bjornson</center> 
<p>
<center><i>Yale Center for Research Computing</i></center>

## Tutorial materials
On our github: ``` git clone git@github.com:ycrc/ParallelR.git ```

or download ```https://github.com/ycrc/ParallelR.git```

The presentation is a jupyter notebook.  A static copy is available here:
https://github.com/ycrc/ParallelR/blob/master/ParallelR.ipynb

aka: https://tinyurl.com/rhjc7cx

Code examples are in Examples/Presentation

To run the code (either files or jupyter) you'll need to install the R packages as described below


## What is the Yale Center for Research Computing?


- Independent center under the Provost's office
- Created to support your research computing needs
- Focus is on high performance computing and storage
- ~20 staff, including applications specialists and system engineers
- Available to consult with and educate users
- Manage compute clusters and support users
- Located at 160 St. Ronan st, at the corner of Edwards and St. Ronan
- http://research.computing.yale.edu
- Questions/issues: email hpc@yale.edu

## Overview of Presentation
- Intro to Parallel R
- Using Foreach for parallelism in R
- Examples using Foreach 
- Some other options


## What does "Parallel R" mean?
In increasing level of complexity:

1. Lots of independent, sequential R jobs that could be run in parallel
1. An R program containing a "loop" with completely independent iterations
1. An R program with a decomposition involving communication

## Installing your own Parallel R environment 

We recommend using conda

```
$ module load miniconda
$ conda create --name parallel_r -c conda-forge r-base r-essentials r-doMC r-Rmpi
```

This final step (only necessary for the doMPI backend) must be done on login node.
```
$ R
> install.packages('doMPI')
```

## Multitude of Parallel R packages (confusing)


- Parallel apply approaches:
 - snow: original multihost, tcp/ssh method 
 - multicore: original forking method
 - parallel: unification of multicore and snow
- Foreach: high level, parallel for loop method
- Rmpi: interface to MPI for advanced parallel programming

_Parallel R_ by Stephen Weston, O'Reilly Press (unfortunately predates foreach)

Parallel apply _and_ foreach assume that you are applying a computation to a large set of inputs and each is
independent, producing a large set of outputs.

We will focus on __foreach__


## Foreach
- designed and implemented by Stephen Weston
- easy to use
- more general than parallel apply 
- natural semantics, similar to _for_ loop
- [documentation](https://www.rdocumentation.org/packages/foreach/versions/1.4.7/topics/foreach)
- [vignette](https://cran.r-project.org/web/packages/foreach/vignettes/foreach.pdf)
- works on multiple cpus on one machine, or on many machines
- manages environments for you

Foreach iterates over 1 or more indices, executes an expression on those indices, and returns a collection, by default a list:
```
l <- foreach (arguments ...) %dopar% expr
```

## Foreach backends
- Foreach code is independent of parallel "backend"
- Code registers a parallel backend
- Variety of available backends
 - doMC (only linux and macos)
 - doSNOW
 - doParallel (also works on windows)
 - doMPI

In [None]:
# setup for "forked" parallelism
library(foreach)
library(doMC)
registerDoMC(8)

In [None]:
res <- foreach(i=1:5) %dopar% {
    i*i
}
res

In [None]:
# simple function that simulates computing for set time
spin<-function(sec) {

  start<-proc.time()[[3]]

  while (TRUE) {
    z=1;
    for (i in 1:100000) {
      z<-z+1
    }
    now<-proc.time()[[3]]
    if (now-start > sec) { break }
  }
  now-start
}

In [None]:
spin(3)

In [None]:
f<-function(i){
    spin(5)
    i*i
}

system.time(
res<-foreach (i=1:8) %dopar% 
{
   f(i)
}
)

res

In [None]:
# we can use %do% instead of %dopar% for testing
f<-function(i){
    spin(5)
    i*i
}

system.time(
res<-foreach (i=1:2) %do% 
{
   f(i)
}
)

res

In [None]:
# Rather than use a function, we can just put code in the block directly
# foreach will return the value of the last expression
res<-foreach (i=1:8) %dopar% 
{
    spin(10)
    i*i
}

res

## Combiners
Rather than returning the raw list, we can combine (reduce) the values.  Foreach supports many combiners, using the named parameter .combine:
 - "c", "+", "*", "cbind", "rbind"
 - arbitrary user-supplied function of two variables

In [None]:
# Here, we add all of the results into a single integer
res<-foreach (i=1:8, .combine="+") %dopar% 
{
    spin(i)
    i*i
}

res

In [None]:
# check that we got the right answer
sum(1:8 * 1:8)

In [None]:
mymax <- function(a,b) {
    if (a>b) a else b
}

In [None]:
# User supplied combiner function.  
# Note: you can also use .combine="mymax"
res<-foreach (i=1:8, .combine=mymax) %dopar% 
{
    spin(i)
    i*i
}

res


In [None]:
# Be very careful not to forget the dot!
# Here, combine is an iteration variable with only 1 value
# So, we only use 1 value from i, and iterate once!

res<-foreach (i=1:10, combine="+") %dopar% 
{
    spin(i)
    i*i
}

res

## Multiple Indices and Nested Foreach's

In [None]:
# This is how to iterate over multiple indices

res<-foreach (i=1:3, j=10:12) %dopar%
{  
    spin(i)
    i*j
}

res

In [None]:
# Be careful, foreach will iterate over the shortest sequence!
# This was the problem with combine="+", above!
res<-foreach (i=1:4, j=10:12) %dopar%
{  
    spin(i)
    i*j
}

res

In [None]:
# This is how to nest foreach's using %:%
# result is a list of lists

res<-foreach (i=1:3) %:% 
  foreach (j=1:4) %dopar% {
    i*j
}

res

In [None]:
# check the list of list claim
typeof(res)
typeof(res[[1]])

In [None]:
# Maybe a list of lists isn't what we want
# By clever use of combiners, we can arrange to get a matrix back
system.time(
  res<-foreach (i=1:3, .combine='cbind') %:% 
  foreach (j=1:4, .combine='c') %dopar% {
    i*j
  }
)

res

## Error handling
- What happens if fatal errors happen within foreach?
- Multiple possibilities, controlled by .errorhandling option
     - stop (default): call stop()
     - pass return error as result of iteration
     - remove omit this task from result

In [None]:
# What about errors?  Foo is an undefined function
# default is 'stop', i.e. the foreach quits with an error
res<-foreach (i=1:3) %dopar%
{  
   if (i==3) {
       foo(i)
   } else {
       sqrt(i)
   }
    
}


In [None]:
# or return the error
res<-foreach (i=1:3, .errorhandling='pass') %dopar%
{  
   if (i==3) {
       foo(i)
   } else {
       sqrt(i)
   }
    
}
res

In [None]:
# Or just omit it
res<-foreach (i=1:3, .errorhandling='remove') %dopar%
{  
   if (i==3) {
       foo(i)
   } else {
       sqrt(i)
   }
    
}
res

In [None]:
# We can also return a list of errors as the result of the foreach
res<-foreach (i=1:3, .errorhandling='pass') %dopar%
{  
    foo()
}
res

## Summary: we've seen how to
- replace for loop with foreach
- loop over multiple indices
- next foreach loops
- combine (reduce) results

## A note on Foreach on Windows
- doMC (multicore) is fork-based and not supported
- Use doParallel instead
```
library(doParallel)
cl <- makeCluster(4)
registerDoParallel(cl)
foreach(i=1:100) %dopar% sqrt(i)
```

## Installing on Windows
- use anaconda, much like on linux
```
conda create --name parallel_r -c conda-forge r-base \
r-essentials r-doParallel
```

## Moving beyond the jupyter notebook
- So far, I've been using the cpus my jupyter notebook has allocated
- How do I run parallel R codes on the cluster using slurm?

## Integrating foreach+doMC with Slurm
- Using slurm, request desired cores using -c
- In R code, query slurm env variable to get core count for registration:
```
cores<-strtoi(Sys.getenv('SLURM_CPUS_PER_TASK', unset=1))
registerDoMC(cores)
```


## Example batch script using foreach and doMC
```
#!/bin/bash
#SBATCH -c 4 

module load miniconda
source activate parallel_r
R --slave -f ex1.R
```

Example scripts:  Examples/Presentation/ex1.sh and ex1.R

## Iterators library
- Since foreach likes to iterate over things, the _iterators_ library can be very useful

https://cran.r-project.org/web/packages/iterators/vignettes/iterators.pdf

```
icount(n)  # iterates n times
iter(func) # wraps func with iterator
```

In [None]:
foreach(icount(1000), .combine='+') %do% 1

In [None]:
ifun <- iter(function() sample(0:9, 4, replace=TRUE))

In [None]:
nextElem(ifun)

In [None]:
# more complex use of iterators
foreach(icount(10), v=iter(function() sample(0:9, 4, replace=TRUE))) %do%
    v

In [None]:
# more complex use of iterators
foreach(i=icount(10), v=iter(function() sample(0:9, 4, replace=TRUE))) %do%
    c(i, v)

# Using multiple machines/nodes
- Thus far, we've only used multiple cpus on a single machine, via doMC
- Advantages:
 - Very simple to use
 - Environment automatically inherited
- But:
 - Limits the degree of parallelism to cpus on one machine (e.g. 4-28)
- Using the MPI "backend" allows us to scale to 100s or 1000s of cpus
- (Almost) no change required to code body. We just create and register a different backend 
- Make sure to clean up at end:
```
closeCluster(cl)
mpi.quit()
```

## Changes to R code to use MPI backend on HPC
replace
```
library(doMC)
registerDoMC(cores)
```
with
```
library(doMPI)
cl<-startMPIcluster(verbose=TRUE, logdir="log")
registerDoMPI(cl)
...

closeCluster(cl)
mpi.quit()
```

## doMPI Foreach batch script
```
#!/bin/bash
#SBATCH -n 4 -N 4

module load miniconda
source activate parallel_r
mpirun R --slave -f ex1mpi.R
```

## Notes on using multiple nodes
- It is possible to use other backends (doParallel or doSnow) to run on multiple nodes.  On our HPC clusters, we recommend doMPI+slurm
- It is possible to specify the number of workers: ```startMPIcluster(count=3)``` but best to let slurm handle it
- The number of workers is slurm ntasks-1
- DON'T query SLURM_NTASKS; only the master sees the right value (???)


## Run Example ex1mpi.sh, ex1mpi.R

## Notes on using random numbers with Foreach
- Each worker is a separate process with its own random number stream.  
- Setting seed in the master won't make results reproduceable
- To make results reproduceable, set seed inside foreach loop.
- General advice: be careful with random numbers and parallel programs
    - ensure workers don't use the same stream
    - if you want reproduciblity, test it!

http://www.sugarscape.net/blog/random-number-seed-in-foreach/

In [None]:
## This works as expected
library(foreach)
library(doMC)

registerDoMC(4) 
set.seed(10)
r <- foreach(i = 1:5, .combine = "c") %do% {
    rnorm(1)
}
r

In [None]:
## We get different results each time, despite setting seed.  Reason: the workers reset the seed
library(foreach)
library(doMC)

registerDoMC(4) 
set.seed(1)
r <- foreach(i = 1:5, .combine = "c") %dopar% {
    rnorm(1)
}
r

In [None]:
set.seed(1)
rnorm(5)

In [None]:
## Now our results are reproduceable
library(foreach)
library(doMC)

registerDoMC(4) 

r <- foreach(i = 1:5, .combine = "c") %dopar% {
    set.seed(i)
    rnorm(1)
}
r

In [None]:
## Don't do this!
library(foreach)
library(doMC)

registerDoMC(4) 

r <- foreach(i = 1:5, .combine = "c") %dopar% {
    set.seed(1)
    rnorm(1)
}
r

## Notes on foreach environment (Subtle but important!)

The interations of foreach are done by "workers"

The environment seen by the workers doing the interations of the foreach block differ between doMC and doMPI!

- doMC is based on linux fork().  Workers get a full copy of the master environment, including libraries, variables, etc.
- doMC starts fresh workers for every foreach.
- in doMPI, the master and all workers execute the code up to the registerMPI().  The master continues on, but workers enter a worker loop there.  Thus, they have access only to things existing at that point, unless otherwise provided.
- But, in addition, Foreach does its best to introspect the parallel code block, and provide needed variables.
- You can explicitly provide packages using foreach's .packages
- You can explicitly control export of variables using .export or .noexport
- doMC can be fine-tuned using underlying options for multicore (e.g. scheduling)



## A Real life example
- We received a request for help from a researcher whose code was running very slowly
- It had been running for a week already, and not made much progress

_Thanks to Anabelle Cardoso for sharing the code!_


In [None]:
Gs <- (1:330)/330       # define what Gs you want on the x-axis (smoothness of lines)
ls <- (1:100)/100       # how many lambdas do you want (how many lines)
reps <- 100
...
for (i in 1:length(Gs)) {
        G <- Gs[i]
        landscape <- matrix(data=runif(size*size),ncol=size,nrow=size)
        landscape[landscape <= G] <- 0
        landscape[landscape > G] <- 1

        for (j in 1:length(ls)) {
                l <- ls[j]
                temp <- rep.int(NA,reps)
                for (m in 1:reps) {
                        spreadfire(space = landscape,"vonNeu",l=l)
                        temp[m] <- length(endspace[endspace >= 2])/length(endspace)
                        }
                pburn[i,j] <- max(temp)
        }
write.csv(pburn,"test1_F1_theoretical.csv")}


## Notes about code
- triply nested loop, invoking spreadfire function 3,300,000 times!
- end result is 330x100 element matrix written as csv file
- each inner invocation is independent of all others.  THIS IS KEY!
- we could replace all three for's with foreach's, but plenty of parallelism in outer loop

In [None]:
library(foreach)
library(doMC)
cores<-strtoi(Sys.getenv('SLURM_CPUS_PER_TASK', unset=1))
registerDoMC(cores)
...
pburn <- foreach (i=1:length(Gs), .combine=rbind) %dopar% {
        G <- Gs[i]
        landscape <- matrix(data=runif(size*size),ncol=size,nrow=size)
        landscape[landscape <= G] <- 0
        landscape[landscape > G] <- 1

        tmppburn <- numeric(length(ls))

        for (j in 1:length(ls)) {
                l <- ls[j]
                temp <- rep.int(NA,reps)
                for (m in 1:reps) {
                        spreadfire(space = landscape,"vonNeu",l=l)
                        temp[m] <- length(endspace[endspace >= 2])/length(endspace)
                        }
                tmppburn[j] <- max(temp)
        }
        tmppburn
}
write.csv(pburn,"rob.csv")

## Strategy
- Only parallelize outer loop
- Each foreach interation produces 1 row of the final result (tmppburn)
- .combine=rbind glues rows into final result

## Result
- very modest changes to the code
- code runs nearly perfectly in parallel, using almost no additional memory.


## Random Forest Example
Presentations/ex2.R and ex2mpi.R

Illustrating:
- use of .combine and .packages




## Alternatives to Foreach
- As mentioned, Parallel, Snow, and Multicore are other packages for Parallel R
- Parallel is a unification of snow and multicore
- all three provide various forms of parallel apply function
- The "unification" can be extremely confusing
- Less support for providing environment


In [None]:
# example using forking via parallel package.  Also see ex3.R

library(parallel)

nstarts=20
cores=4

f<-function(i) {
  spin(1)
  i*i
}

system.time({
results <- mclapply(1:nstarts, f, mc.cores=cores)
})

print(Reduce('+', results))

## Rmpi
- MPI is a standard for writing parallel programs
- Supports direct passing of messages between processes and collective operations: barriers, broadcasts, etc.

Rmpi supports all of that, plus a higher level "master/worker" model.  It is very complicated.
I've searched for compelling example programs.  Best found so far: pipelined prime number generator
http://heather.cs.ucdavis.edu/~matloff/158/Rmpi.pdf

- My advice; stay away!



## dSQ
dSQ makes it easy to run large numbers of independent jobs on the cluster, including R jobs

First, create file containing list of commands to run (jobs.txt). I usually do this via a mkjobs script.
```
Rscript mkjobs.R
```
output
```
module load R; Rscript ex4.R 1 2 100 
module load R; Rscript ex4.R 2 3 100 
...
```
Then, generate the batch script and submit the job:
```
module load dSQ
dSQ --job-file jobs.txt --mem-10g -t 0-1
Batch script generated. To submit your jobs, run:
 sbatch dsq-jobs-2019-08-22.sh
sbatch dsq-jobs-2019-08-22.sh
dSQAutopsy dsq-jobs-*.sh *_status.tsv 
```

Each job generates a single .RData file.  We need a final script that finds the best output, like we did with foreach

```
Rscript comb.R
```



## dSQ versus Foreach
- dSQ is more adaptive on the cluster.  It will use as many cpus as it can get, growing and shrinking dynamically.  It can also use scavenge partition
- each dSQ task will require its own memory.  This may result in more total memory usage.
- dSQ has a much higher startup overhead per task
- the main program used in the dSQ tasks is EXACTLY the sequential program
- foreach is more concise, one program to write versus one program and several scripts.  Especially true if the parallelizable portion of the program is (textually) small.

## Resources

https://www.rdocumentation.org/packages/foreach/versions/1.4.7/topics/foreach

https://cran.r-project.org/web/packages/foreach/vignettes/foreach.pdf

https://cran.r-project.org/web/packages/doMC/vignettes/gettingstartedMC.pdf

https://cran.r-project.org/web/packages/doMPI/vignettes/doMPI.pdf