# Workflow and Organization

project templates:

* https://github.com/pavopax/r-workshop-odsc/tree/master/templates/sample-directory-structure
* http://projecttemplate.net/architecture.html

tidyverse 1.0: all the packages you need, all in one package

https://blog.rstudio.org/2016/09/15/tidyverse-1-0-0/

## Asynchronicity

A **future** is a promise that the result of a computation will be made available at some point in the future. If you query a future in a resolved state, the value is made available. But if you query a future in an unresolved state, you have to wait for the computation to finish. This is called being *blocked*.

The benefit of futures is that your code is no longer limited to sequential operation. Futures can be resolved *lazily* (only when requested), *eagerly* (as soon as it's created), or *asynchronously* (using parallel processing). And when you declare a future, the rest of your code can proceed while you wait for the computation to finish. It's even possible to check the status of a future in a non-blocking way.

In [None]:
# https://github.com/HenrikBengtsson/future
library(future)

In [None]:
# Implicitly create a future
a %<-% { 2 + 2 }

`f <- future({ expr })`

`v <- value(f)`

In [None]:
value(future({2 + 3}))

*Question*: How would a for loop of futures work?

## Parallelism

Hardware requirements:
* Threads
* Cores
* GPUs
* Clusters
* Cloud

Software requirements:
* Tasks
* Libraries
* Algorithms

We'll focus today on paralellizing tasks and distributed computation libraries using multiple cores and some simple local clusters.

For distributed algorithm information check the docs, for example SparkR has open sourced exactly how they parallelize model training.

For utilizing GPUs, check out the package `gputools`.

*Question*: Can you name an embarrassingly parallel problem? What about a problem difficult to paralellize?

In [None]:
library(dplyr)
library(glm2)

In [None]:
df <- iris
str(df)

In [None]:
features <- c('Sepal.Length','Sepal.Width','Petal.Length','Petal.Width')
label <- 'Species'
classes <- df[[label]] %>% unique() %>% sort()

In [None]:
fitBinomialClassifier <- function(label, class, features, data) {
  formula <- paste('(', label,'=="', class,'") ~ ',
                   paste(features, collapse=' + '), sep='')
  print(formula)
  glm(as.formula(formula), family = binomial, data = data)
}

In [None]:
system.time(
for(class in classes) {
  print("*****")
  print(class)
  print(fitBinomialClassifier(label, class, features, df))
}
)

Fine, but we don't need to train each classifier one at a time. Just as we replaced iteration with vectorized transforms in data frames, we can replace this for loop with a map-type operation called `lapply`.

In [None]:
worker <- function(class) {  # A curried function
  fitBinomialClassifier(label, class, features, df)
}

In [None]:
system.time({
models <- lapply(classes, worker)  # map(seq, f)
names(models) <- classes
print(models)
})

In [None]:
df[12:17,] %>%
  select(Petal.Width) %>%
  lapply(function(x) c(-x, pi*(x/2)^2)) %>%
  print()

*Exercise*: Use a larger dataset and `system.time` to measure the performance improvement, if any, of using `lapply`. Does it vary by the complexity of the calulation? What about the number of observations? What about the number of features?

## Parallel

[Documentation](https://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf)

In [None]:
library(parallel)

In [None]:
cores <- detectCores() - 1

In [None]:
parallelCluster <- makeCluster(cores)
print(parallelCluster)

In [None]:
parLapply(parallelCluster,
          df$Sepal.Width,
          function(x) pi*(x/2)^2)

In [None]:
tryCatch(
  models <- parLapply(parallelCluster,
                      classes, worker),
  error = function(e) print(e)
)

Notes:
* Libraries must be defined on each remote machine
* Prefer package-style notation such as stats::glm() when calling library functions (as calling library(...) on each remote node would be wasteful)

In [None]:
base <- 3

In [None]:
parLapply(parallelCluster, 
          2:4, 
          function(exponent) base^exponent)

In [None]:
clusterExport(parallelCluster, "base")
base <- 2

Similarly, `clusterEvalQ(cluster, library(<package>)` can be useful.

In [None]:
stopCluster(parallelCluster)

In [None]:
parallelCluster <- makeCluster(detectCores()-1, type = "FORK")
print(parallelCluster)

In [None]:
# build the single argument function we are going to pass to parallel
mkWorker <- function(class, features, df) {
  # make sure each of the three values we need passed 
  # are available in this environment
  force(class)
  force(features)
  force(df)
  # define any and every function our worker function 
  # needs in this environment
  fitBinomialClassifier <- function(label, class, features, data) {
    formula <- paste('(', label, '=="', class, '") ~ ',
                     paste(features, collapse=' + '), sep='')
    glm(as.formula(formula), family = binomial, data = data)
  }
  # Finally: define and return our worker function.
  # The function worker's "lexical closure" 
  # (where it looks for unbound variables)
  # is mkWorker's activation/execution environment 
  # and not the usual Global environment.
  # The parallel library is willing to transport 
  # this environment (which it does not
  # do for the Global environment).
  worker <- function(class) {
    fitBinomialClassifier(label, class, features, df)
  }
  return(worker)
}

system.time({
models <- parLapply(parallelCluster, classes,
                    mkWorker(class, features, df))
names(models) <- classes
print(models)
})

Meet the whole family:
* `parSapply` (`sapply` is a simple wrapper around `lapply`)
* `parLapplyLB` and `parSapplyLB` include load balancing
* `parApply`, `parRapply`, and `parCapply` for matrices, rows, and columns

In [None]:
parSapply(parallelCluster, as.character(2:4), 
          function(exponent){
            x <- as.numeric(exponent)
            c(base = base^x, self = x^x)
          })

In [None]:
# A safer way to stop the cluster
if(!is.null(parallelCluster)) {
  stopCluster(parallelCluster)
  parallelCluster <- c()
}

### And futures as well

In [None]:
availableCores()

In [None]:
cl <- makeCluster(availableCores() - 1)
plan(cluster, workers=cl)
cl

In [None]:
pid <- Sys.getpid()
pid

In [None]:
a %<-% {
  cat("Calculating a...\n")
  Sys.getpid()
}

b %<-% {
  rm(pid)
  cat("Calculating b...\n")
  Sys.getpid()
}

In [None]:
cat(a, b, pid)

In [None]:
# Works on all OS
plan(multisession)

In [None]:
# Won't work on Windows, the multiprocess plan defaults to this
plan(multicore)

In [None]:
plan(lazy)

In [None]:
stopCluster()

### Foreach

Another popular package worth mentioning that uses `doParallel`. It lies somewhere between a `for` loop and `lapply`. [Docs](https://cran.r-project.org/web/packages/doParallel/vignettes/gettingstartedParallel.pdf)

## Caching

It's often useful and performant to cache results to prevent having to rerun calculations. There are several useful libraries including `R.cache` and `DataCache`. This example just does it manually using `digest` to hash.

In [None]:
cl <- makeCluster(cores)

In [None]:
cacheParallel <- function() {
  vars <- 1:2
  tmp <- clusterEvalQ(cl, 
                      library(digest))
 
  parSapply(cl, vars, function(var) {
    fn <- function(x) x^2
    dg <- digest(list(fn, var))
    cache_fn <- 
      sprintf("Cache_%s.Rdata", dg)
    if (file.exists(cache_fn)) {
      load(cache_fn)
    } else {
      var <- fn(var); 
      Sys.sleep(5)
      save(var, file = cache_fn)
    }
    return(var)
  })
}

In [None]:
system.time(out <- cacheParallel())
out

In [None]:
system.time(out <- cacheParallel())
out

In [None]:
# Cleanup
file.remove(list.files(pattern = "Cache.+.Rdata"))

In [None]:
stopCluster(cl)

*Question*: How will caching impact the performance of eg. `parLapply` above?

## Memory

Memory is often *the* limiting factor in distributed jobs. Network bandwidth can make things slow but memory problems kill jobs. Using `FORK` if you can does help as it prevents having to define copies of variables in different closures. In addition:
* Use rm() frequently to keep the environment clean
* Call gc() to run the garbage collector
* Limit the concurrent cores running based on the environment memory
* Selectively apply parallelism to your project

Sources:
* `http://www.win-vector.com/blog/2016/01/parallel-computing-in-r/`
* `https://www.r-bloggers.com/how-to-go-parallel-in-r-basics-tips/`
* `https://github.com/HenrikBengtsson/future`

*Copyright &copy; 2016 The Data Incubator.  All rights reserved.*