Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Within-target parallelism fails #675

Closed
pat-s opened this issue Jan 15, 2019 · 20 comments
Closed

Within-target parallelism fails #675

pat-s opened this issue Jan 15, 2019 · 20 comments

Comments

@pat-s
Copy link
Member

pat-s commented Jan 15, 2019

I've read through #295 and related issue but could not find a clear answer to the question whether within-target parallelism is already supported. I saw you added jobs_preprocess as an argument to make() but I am not sure to what it applies to.
Also to not clutter the other issue and have a "clean" one I decided to open a new issue.

I've tried adding a column named "resources" listing cpus for specific targets as mentioned here in the manual. Ran the plan with parallelism = "future", jobs = 20 but saw no parallelization going on.

Note that in the code I use furrr::future_pwalk() and furrr::future_imap() to parallelize targets.

Here is the drake.R file and here one of the exemplary future_pmap() calls.

Have a great time at rstudio::conf!

@wlandau
Copy link
Collaborator

wlandau commented Jan 15, 2019

Read first

Onlookers, please jump straight to #675 (comment).

Original post

Thanks for sharing the code. Unfortunately, I am getting a 404 error for both links. Is the repo private?

Also, which future::plan() are you using? Within-target parallelism may not work as you expect if you are using future::plan(future::multicore) for parallelism among targets. It sounds like you are using future.batchtools and a cluster, but I just want to make sure.

The resources column tries to make multiple cores available to the target, but it is up to the command itself to actually use them. I recommend something like this:

drake_hpc_template_file("slurm_batchtools.tmpl") # Edit by hand.
library(future.batchtools)
future::plan(batchtools_slurm, template = "slurm_batchtools.tmpl")

plan <- drake_plan(
  a = mclapply(parallel_substeps_a, my_function, mc.cores = ignore(4))
  b = mclapply(parallel_substeps_b, my_function, mc.cores = ignore(4))
)

make(plan, parallelism = "future", jobs = 2)

That should use distributed computing among targets and local parallel computing within targets. Above, ignore() make sure up-to-date targets stay up to date if you change mc.cores.

One last question: is there a reason why you are using future parallelism instead of clustermq parallelism. Just trying to get a sense of what people need. I usually recommend clustermq because there is less overhead.

@wlandau
Copy link
Collaborator

wlandau commented Jan 15, 2019

It has been a while since I looked at #295. I will consider reopening it.

@pat-s
Copy link
Member Author

pat-s commented Jan 15, 2019

I am getting a 404 error for both links. Is the repo private?

Ah, yes 🤦‍♂️ . Not anymore :)

Also, which future::plan() are you using? Within-target parallelism may not work as you expect if you are using future::plan(future::multicore) for parallelism among targets. It sounds like you are using future.batchtools and a cluster, but I just want to make sure.

I am using future::multiprocess. The reasons is that I want to parallelize furrr::future_Xmap(). That's why I also cannot use mclapply() here (I use it elsewhere in the plan). Also this is currently not (yet) running on a cluster, just on a standalone server.

So future::multicore() is currently not usable within drake to parallelize within-targets? Unfortunately there is no other way to parallelize the furrr functions. Then I probably need to stick running those sequential?

I usually recommend clustermq because there is less overhead.

I will use that once I have access to a cluster :)

@wlandau
Copy link
Collaborator

wlandau commented Jan 15, 2019

Yeah, I doubt pbmclapply() within targets will work when you already have future::multicore() for parallelism among targets. And even with future parallelism for targets on a cluster, I doubt future_pmap() or furrr::future_Xmap() will work within targets because of the way future::plan() is designed. Once you have access to a cluster, I recommend trying this stuff out with clustermq parallelism among targets and future/multicore parallelism within targets. Please let me know how it goes.

However, there is a small chance nested futures might allow you to do what you want.

@wlandau wlandau closed this as completed Jan 15, 2019
@pat-s
Copy link
Member Author

pat-s commented Jan 15, 2019

Yeah, I doubt pbmclapply() within targets will work when you already have future::multicore() for parallelism among targets.

I think you might have misunderstood me here.

  • pbmclapply() works fine within targets.
  • Any "future-based within target parallelism" does not work (e.g. using furrr::pmap()
  • Currently I do not use among-target parallelism at all (until I have access to a cluster)

So basically, I want to do the following:

  • Run the plan sequentially (for now) (no "among-target" parallelism)
  • and use within-target parallelism based on "future"

(Sorry if I understood you wrong here and you might have already understood me correctly before.)

@wlandau
Copy link
Collaborator

wlandau commented Jan 15, 2019

Ah, now I think I understand a little better. The jobs argument and the resources column in the plan only work with make(parallelism = "future") for the purposes of scheduling targets. They do not affect any custom calls to future that you put in your commands or functions. The only thing that carries over is your future::plan(). Everything else is up to the user.

Also, I realized I forgot to answer your question about jobs_preprocess. That argument is the number of local multicore jobs for setting up the graph, processing the imports, etc. It helps make() get started faster, and it does not come into play once the targets start to build.

@wlandau
Copy link
Collaborator

wlandau commented Jan 15, 2019

The code for the future backend is in this file. The jobs argument is used to constrain the number of futures deployed at a given time

drake/R/backend-future.R

Lines 187 to 192 in 71ba8bd

initialize_workers <- function(config) {
out <- list()
for (i in seq_len(config$jobs))
out[[i]] <- empty_worker(target = NA_character_)
out
}

And the resources column is applied directly to a target-specific future

drake/R/backend-future.R

Lines 89 to 99 in 71ba8bd

future::future(
expr = drake::future_build(
target = DRAKE_GLOBALS__$target,
meta = DRAKE_GLOBALS__$meta,
config = DRAKE_GLOBALS__$config,
protect = DRAKE_GLOBALS__$protect
),
globals = globals,
label = target,
resources = as.list(layout$resources)
),

@pat-s
Copy link
Member Author

pat-s commented Jan 15, 2019

Hmm. While my function works fine outside of drake, it seems that there are serious clashes when using future within-target parallelism in drake.

I even tried using plan(list(tweak(sequential), tweak(multiprocess, workers = 20L))) before the make() call to built on the idea of nested futures as described in the linked post.

I guess for now I just run it sequentially (takes about 8h so not that much of a problem). However, running in parallel with 20 cores would result in a significant speed up :)

To summarize, I've tried the following options:

  • Initialize future::plan() before drake::make()
  • Call future::plan() within a function that is used by drake::make()
  • Running drake::make() with parallelism = "future" and without
  • Try using nested futures by using a future::plan() with multiple backends (plan(list(tweak(sequential), tweak(multiprocess, workers = 20L)))`)

@wlandau
Copy link
Collaborator

wlandau commented Jan 16, 2019

You are right. I am not sure why multicore futures do not like being called make(). I get the same error even when future::plan() is part of the command.

library(drake)
future::plan(future::multicore, workers = 4)
plan <- drake_plan(x = furrr::future_map(list(1, 1, 1, 1), Sys.sleep))
make(plan)
#> target x
#> fail x
#> Error: Target `x` failed. Call `diagnose(x)` for details. Error message:
#>   Detected an error ('fatal error in wrapper code') by the 'parallel' package while trying to retrieve the value of a MulticoreFuture ('<none>'). This could be because the forked R process that evaluates the future was terminated before it was completed: '{; ...future.f.env <- environment(...future.f); if (!is.null(...future.f.env$`~`)) {; if (is_bad_rlang_tilde(...future.f.env$`~`)) {; ...future.f.env$`~` <- base::`~`; }; ...; .out; }); }'

Created on 2019-01-15 by the reprex package (v0.2.1)

For now, please try multisession parallelism instead.

library(drake)
future::plan(future::multisession, workers = 4)
plan <- drake_plan(x = furrr::future_map(list(1, 1, 1, 1), Sys.sleep))
make(plan)
#> target x
build_times()
#> # A tibble: 1 x 4
#>   target elapsed        user           system        
#>   <chr>  <S4: Duration> <S4: Duration> <S4: Duration>
#> 1 x      1.202s         0.067s         0.005s

Created on 2019-01-15 by the reprex package (v0.2.1)

@pat-s
Copy link
Member Author

pat-s commented Jan 16, 2019

Oh. multisession was really the only setting I did not check...

Thanks for finding that out!! It works.

@wlandau
Copy link
Collaborator

wlandau commented Jan 17, 2019

Glad that works for you.

Indeed, the multicore part fails even without future.

library(drake)
library(parallel)
plan <- drake_plan(x = mclapply(1:2, sqrt, mc.cores = 2))
plan
#> # A tibble: 1 x 2
#>   target command                          
#>   <chr>  <chr>                            
#> 1 x      mclapply(1:2, sqrt, mc.cores = 2)
tmp <- mclapply(1:2, sqrt, mc.cores = 2)
make(plan)
#> target x
#> Warning: target x warnings:
#>   all scheduled cores encountered errors in user code

Created on 2019-01-16 by the reprex package (v0.2.1)

@wlandau
Copy link
Collaborator

wlandau commented Jan 17, 2019

Explanation: lock_envir = TRUE. The multicore functions in the parallel package modify the global environment. If we set lock_envir to FALSE in make(), things work. So we should figure out what exactly parallel is modifying and exempt it from environment locking.

library(drake)
library(parallel)
plan <- drake_plan(x = mclapply(1:2, sqrt, mc.cores = 2))
plan
#> # A tibble: 1 x 2
#>   target command                          
#>   <chr>  <chr>                            
#> 1 x      mclapply(1:2, sqrt, mc.cores = 2)
tmp <- mclapply(1:2, sqrt, mc.cores = 2)
make(plan, lock_envir = FALSE)
#> target x

Created on 2019-01-16 by the reprex package (v0.2.1)

library(drake)
future::plan(future::multicore, workers = 4)
plan <- drake_plan(x = furrr::future_map(list(1, 1, 1, 1), Sys.sleep))
make(plan, lock_envir = FALSE)
#> target x``` r
library(drake)
future::plan(future::multicore, workers = 4)
plan <- drake_plan(x = furrr::future_map(list(1, 1, 1, 1), Sys.sleep))
make(plan, lock_envir = FALSE)
#> target x

Created on 2019-01-16 by the reprex package (v0.2.1)

@wlandau
Copy link
Collaborator

wlandau commented Jan 17, 2019

The future.callr backend also works.

library(drake)
future::plan(future.callr::callr, workers = 4)
plan <- drake_plan(x = furrr::future_map(list(1, 1, 1, 1), Sys.sleep))
make(plan)
#> target x

Created on 2019-01-16 by the reprex package (v0.2.1)

@wlandau
Copy link
Collaborator

wlandau commented Jan 17, 2019

@wlandau
Copy link
Collaborator

wlandau commented Jan 17, 2019

I do not think this is a problem I can solve. I cannot actually change what mclapply() does, and the user can add or remove it at will. All I can do is add to the documentation in the manual: ropensci-books/drake@5a8f587. If someone has an answer here, great. If not, I consider the documentation my due diligence.

@pat-s
Copy link
Member Author

pat-s commented Jan 17, 2019

Always fascinated how deep you dig into such issues.

I'm fine with using plan("multisession"). Thanks for checking this out!

@wlandau
Copy link
Collaborator

wlandau commented Jan 17, 2019

Thanks, @pat-s! Depth is part of the joy of this project.

That, and mclapply() within targets is such a natural thing for people to try that I really want to avoid surprises if possible.

@wlandau
Copy link
Collaborator

wlandau commented Jan 28, 2019

A recap:

The problem

make(plan) fails if you use mclapply() or future::plan(future::multicore) inside targets.

library(drake)
plan <- drake_plan(x = parallel::mclapply(1:2, identity, mc.cores = 2))
make(plan)
#> target x
#> Warning: target x warnings:
#>   all scheduled cores encountered errors in user code

Created on 2019-01-28 by the reprex package (v0.2.1)

Explanation

  1. make(lock_envir = TRUE) (default) locks the user's environment, which is usually the global environment (if you don't use the envir argument of make()). That means bindings (assignments of values to variables) cannot be added to or removed from the global environment.
  2. By default, mclapply() tries to remove .Random.seed from the global environment. Since the default RNG algorithm is Mersenne Twister, we dive into the else block of parallel::mc.set.stream() below.
> parallel:::mc.set.stream
function () 
{
    if (RNGkind()[1L] == "L'Ecuyer-CMRG") {
        assign(".Random.seed", get("LEcuyer.seed", envir = RNGenv), 
            envir = .GlobalEnv)
    }
    else {
        if (exists(".Random.seed", envir = .GlobalEnv, inherits = FALSE)) 
            rm(".Random.seed", envir = .GlobalEnv, inherits = FALSE)
    }
}
<bytecode: 0x4709808>
<environment: namespace:parallel>

Workarounds

  • Avoid mclapply() and future::plan(future::multicore). furrr::map() and parallel::parLapply() are more dependable alternatives anyway. In the case of furrr, invoke future::plan(future.callr::callr) or future::plan(future::multisession) first.
  • In make(), set the lock_envir argument to FALSE. This approach deactivates important reproducibility guardrails, so be careful.
  • In mclapply(), set the mc.set.seed argument to FALSE. If your computations require pseudo-random numbers (rnorm(), runif(), etc.) you will need to manually set a different seed for each parallel process, e.g.
parallel::mclapply(X = 1:4, mc.cores = 4, FUN = function(i) {
  set.seed(sum(.Random.seed) + i)
  # Do some work...
})

Related

@wlandau wlandau changed the title Within-target parallelism using the "future" backend Within-target parallelism fails Jan 28, 2019
@wlandau
Copy link
Collaborator

wlandau commented Feb 5, 2019

Update: in 300cea8, I added an informative error message that points to workarounds.

library(drake)
library(parallel)
plan <- drake_plan(x = mclapply(1:2, identity, mc.cores = 2))
make(plan)
#> target x
#> Warning: target x warnings:
#>   
#>  Having problems with parallel::mclapply(), future::future(), or furrr::future_map() in drake? Try one of the workarounds at https://ropenscilabs.github.io/drake-manual/hpc.html#parallel-computing-within-targets or https://github.com/ropensci/drake/issues/675. 
#> 
#> 
#>   all scheduled cores encountered errors in user code

library(future)
library(furrr)
plan(multicore)
plan <- drake_plan(x = future_map(1:2, identity))
make(plan)
#> target x
#> fail x
#> Error: Target `x` failed. Call `diagnose(x)` for details. Error message:
#>   
#>  Having problems with parallel::mclapply(), future::future(), or furrr::future_map() in drake? Try one of the workarounds at https://ropenscilabs.github.io/drake-manual/hpc.html#parallel-computing-within-targets or https://github.com/ropensci/drake/issues/675. 
#> 
#> Detected an error ('fatal error in wrapper code') by the 'parallel' package while trying to retrieve the value of a MulticoreFuture ('<none>'). This could be because the forked R process that evaluates the future was terminated before it was completed: '{; ...future.f.env <- environment(...future.f); if (!is.null(...future.f.env$`~`)) {; if (is_bad_rlang_tilde(...future.f.env$`~`)) {; ...future.f.env$`~` <- base::`~`; }; ...; .out; }); }'

Created on 2019-02-05 by the reprex package (v0.2.1.9000)

@PedramNavid
Copy link

Just wanted to thank you for the very helpful error message. Made debugging this about 100x easier than if I had discovered it otherwise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants