Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A quicker end to staged parallelism #369

Closed
13 tasks done
wlandau opened this issue May 1, 2018 · 15 comments
Closed
13 tasks done

A quicker end to staged parallelism #369

wlandau opened this issue May 1, 2018 · 15 comments

Comments

@wlandau
Copy link
Collaborator

wlandau commented May 1, 2018

The problem

With the exception of make(parallelism = "future") and make(parallelism = "Makefile"), drake has an embarrassing approach to parallelism, both literally and figuratively. It divides the dependency graph into conditionally independent stages and then runs those stages sequentially. Targets only run in parallel within those stages. In other words, drake waits for the slowest target to finish before moving to the next stage.

capture

At the time, I could not find an R package to traverse an arbitrary network of jobs in parallel. (Stop me right here if such a tool actually exists!) Parallel computing tools like foreach, parLapply, mclapply, and future_lapply all assume the workers can act independently. So I thought staged parallelism was natural. However, I was thinking within the constraints of the available technology, and parallel efficiency suffered.

Long-term work

In late February, I started the workers package. If you give it a job network and a parallel computing technology, the goal is to traverse the graph using a state-of-the-art algorithm. It has a proof-of-concept implementation and an initial design specification, but it has a long way to go. At this stage, it really needs an R-focused message queue. I was hoping liteq would work, but due to an apparent race condition and some speed/overhead issues on my end, I am stuck for the time being.

Now

I say we rework the make(parallelism = "mclapply") and make(parallelism = "parLapply") backends natively. As I see it, make(parallelism = "mclapply", jobs = 2) should fork 3 processes: 2 workers and 1 master. The master sends jobs to the workers in the correct order at the correct time, and the workers keep coming back for more without stopping. The goal is to minimize idleness and overhead.

The implementation will be similar to make(parallelism = "future", caching = "worker") (see future.R). I think the implementation should go into drake itself because

  1. The code should be relatively short, and
  2. I do not want to lock in the design of workers until I get the chance to do an extensive lit review of parallel graph algorithms.

To-do's:

  • (Re)implement the low-overhead parallel backends.
    • run_lapply() (Just loop through a toposort of the targets.)
    • run_mclapply()
    • run_parLapply()
    • run_future_lapply() (We still might want persistent workers on a cluster.)
  • Use on.exit(set_done(...)) to handle failures on both the master process and the workers.
  • Skip on CRAN any unit tests that spawn more than 2 processes.
  • For all non-distributed parallel backends, do not wait for all the imports before starting the targets.
  • Visualize graphs as regular DAGs without any parallel staging.
  • Retire functions parallel_stages(), max_useful_jobs(), and rate_limiting_times().
  • Re-implement predict_runtime().
  • Rewrite the parallel computing vignette and the timing vignette
  • Remove mentions of staged parallelism from the rest of the documentation.
@wlandau
Copy link
Collaborator Author

wlandau commented May 1, 2018

I have begun work on this in the i369-mclapply branch. It has loads of errors and I am super short on time these days, but I am optimistic because this is a problem I think I already know how to solve.

@wlandau
Copy link
Collaborator Author

wlandau commented May 2, 2018

Super excited about #370! Most of the work and most of the gains!

@wlandau
Copy link
Collaborator Author

wlandau commented May 3, 2018

The next step now is to make the "future_lapply" backend use persistent workers rather than staged parallelism. Why am I bothering with this when we already have make(parallelism = "future")? Because the "future" backend (without the "lapply" part) supports transient workers, not persistent workers. That's one worker per job. It's the right call for clusters with strict wall time limits, but it's a lot of overhead, especially if those jobs get sent to a cluster. I want make(parallelism = "future_lapply", jobs = 4), on the other hand, to launch 4 cluster workers that build any number of targets within the time constraints.

Two things I learned so far in the i369-future_lapply branch:

  1. Workers in future_lapply() are not always concurrent, particularly with future::plan(sequential). So we need a special callr::r_bg() process to act as master.
  2. My first dig a non-staged make(parallelism = "future_lapply") hangs/stalls, and I do not know why just yet. And tomorrow I need to deal with a surprise at work, so I am unlikely to finish it by the end of the week.

@wlandau
Copy link
Collaborator Author

wlandau commented May 3, 2018

A very important piece I forgot: on.exit({traceback(); set_all_workers_done()}). Otherwise, one failure on master will keep them hanging in while() loops. In general, I need a robust way to detect crashed workers.

We should also use a form of try_message() that stores the error in config$cache instead of printing it out.

On liteq, this is accomplished with connections. With multiple sessions running drake workers, I might instead have on.exit({traceback(); set_done()}) to make sure the master process doesn't worry about it any more.

I should implement this before I debug the future_lapply backend. If it works, make(parallelism = "future_lapply") will stop hanging.

@krlmlr
Copy link
Collaborator

krlmlr commented May 5, 2018

I'm now seeing some overhead for targets that are cheap to compute but require large-ish input data:

library(tidyverse)
library(drake)

N <- 500

gen_data <- function() {
  tibble(a = seq_len(N), b = 1, c = 2, d = 3)
}

plan_data <- drake_plan(
  data = gen_data()
)

plan_sub <-
  gen_data() %>%
  transmute(
    target = paste0("data", a),
    command = paste0("data[", a, ", ]")
  )

plan <- 
  bind_rows(plan_data, plan_sub)

system.time(drake::make(plan, jobs = 8))

CRAN version with mclapply(): 17 seconds user time, 4 seconds elapsed

GitHub revision 08c67c8: 68 seconds user time, 30 seconds elapsed

Is this intendedexpected?

@wlandau
Copy link
Collaborator Author

wlandau commented May 5, 2018

Your reprex is a perfect demonstration of what we're potentially sacrificing in #369. And yes, I did expect to have more overhead, especially at first. Staged parallelism excels when we have a large number of conditionally independent targets (i.e. tall dependency graphs).

screenshot_20180505_140555

In the CRAN version of drake, you're essentially just using mclapply() to get all the rows of the data, and all the workers are autonomous. No bottleneck for embarrassingly/perfectly parallel tasks. But in the development version, the master process needs to check all the workers and assign them targets one by one. My guess is that our biggest loss in efficiency happens when the master loops over all the workers one by one to check if each one is idle.

drake/R/mclapply.R

Lines 55 to 75 in 7fe45f0

mc_master <- function(config){
on.exit(mc_set_done_all(config))
config$queue <- new_target_queue(config = config)
while (mc_work_remains(config)){
for (worker in config$workers){
if (mc_is_idle(worker = worker, config = config)){
mc_conclude_target(worker = worker, config = config)
if (!config$queue$size()){
mc_set_done(worker = worker, config = config)
next
}
target <- config$queue$pop0(what = "names")
if (length(target)){
mc_set_target(worker = worker, target = target, config = config)
mc_set_running(worker = worker, config = config)
}
}
}
Sys.sleep(mc_wait)
}
}

If there is a way for the master to jump straight to the idle workers, I suspect/hope the master will be faster.

Part of the trouble is that I am using storr to communicate with the workers. While storr's RDS cache is reasonably thread safe, it's not actually a designed for this purpose. As you alluded to before, what we really need in the long term is a solid, fast, scalable, thread safe message queue. I tried liteq for workers, but the locking mechanism has a lot of overhead of its own and is not totally thread safe for all variants of SQL.

@krlmlr
Copy link
Collaborator

krlmlr commented May 5, 2018

In the code chunk above I don't understand what happens if length(target) is zero.

We're seeing a lot of user time spent for the GitHub version, this can't be just because the master is busy. Are the workers using busy waiting?

Profiling the master loop shows that most of the time is spent in mc_is_idle(): http://rpubs.com/krlmlr/drake-369-profile.

Can you implement an efficient "enumerate all idle workers" with storr? Perhaps if you had a storr namespace for each state and each worker would register/unregister itself from that namespace when the state changes?

@wlandau
Copy link
Collaborator Author

wlandau commented May 5, 2018

In the code chunk above I don't understand what happens if length(target) is zero.

That means all targets have unmet dependencies. $pop0() pops the head of the priority queue if it has priority 0 and just returns character(0) otherwise.

We're seeing a lot of user time spent for the GitHub version, this can't be just because the master is busy. Are the workers using busy waiting?

I suspect many of them are. I am only just wrapping up the initial implementation, so I have not spent a lot of time profiling.

Profiling the master loop shows that most of the time is spent in mc_is_idle(): http://rpubs.com/krlmlr/drake-369-profile.

Thanks so much! That really narrows down the focus.

Can you implement an efficient "enumerate all idle workers" with storr? Perhaps if you had a storr namespace for each state and each worker would register/unregister itself from that namespace when the state changes?

Sounds easy to implement. mc_set_idle() and friends could set/read it.

By the way, I am about to add additional mclapply_staged and parLapply_staged backends. These staged parallel backends are really an afterthought, but they may still help with the use case you brought up.

@wlandau
Copy link
Collaborator Author

wlandau commented May 5, 2018

For anyone else looking on, I have added back staged parallelism in some additional backends: make(parallelism = "mclapply_staged") and make(parallelism = "parLapply_staged"). The make(parallelism = "mclapply") and make(parallelism = "parLapply") still use sensible persistent workers and have nothing to do with staged parallelism.

All the code for staged parallelism is in R/staged.R, and if we ever decide to remove staged parallelism completely, all we need to do is remove the file. The difference that #369 makes is that staged parallelism is just an afterthought and not entangled in the core internals.

@wlandau
Copy link
Collaborator Author

wlandau commented May 6, 2018

Update: we have a new timing vignette. See the strategy section to see how predict_runtime() and predict_load_balancing() can help you plan the scale of your workflow. I am super excited about this! Drake is starting to mature as a serious parallel computing engine!

@wlandau wlandau closed this as completed in ea17878 May 6, 2018
@wlandau
Copy link
Collaborator Author

wlandau commented May 6, 2018

We now have a new and improved parallelism guide. Much clearer and up to date.

This issue was quite the exciting sprint. I am really looking forward to testing and iterating on the newly reworked backends. Please help spread the word.

@wlandau
Copy link
Collaborator Author

wlandau commented May 8, 2018

To recap: drake has 3 parallel scheduling now, as demonstrated by the videos embedded here. @krlmlr and @gaborcsardi, I have been thinking for the last 3 months about our vigorous conversation about revdepcheck at RStudio::conf(2018), and I am happy to say that drake is much better able to perform this kind of work efficiently.

@wlandau
Copy link
Collaborator Author

wlandau commented Jun 11, 2018

FYI: @krlmlr, coordination among persistent workers seems to be much faster now after the work I did this weekend. Scheduling is now based on drake's own liteq-like message queue, which I will soon release as a separate package.

@wlandau
Copy link
Collaborator Author

wlandau commented Jun 13, 2018

Things just got even faster in fea4acb. We'll let it play out, but there is a good chance we'll find we won't need staged parallelism anymore.

@wlandau
Copy link
Collaborator Author

wlandau commented Oct 27, 2018

"mclapply_staged" also appears faster than the clustermq-based backends.

system.time(drake::make(plan, parallelism = "mclapply_staged", jobs = 8))
#> target data
#>    user  system elapsed 
#>   2.638   0.133   4.490
options(clustermq.scheduler = "multicore")
system.time(drake::make(plan, parallelism = "clustermq", jobs = 8, verbose = FALSE))
#> Submitting 8 worker jobs (ID: 6150) ...
#> Master: [11.3s 97.5% CPU]; Worker: [avg 1.8% CPU, max 311.3 Mb]
#>    user  system elapsed 
#>  13.854   0.523  14.698
options(clustermq.scheduler = "multicore")
system.time(drake::make(plan, parallelism = "clustermq_staged", jobs = 8, verbose = FALSE))
#> Submitting 8 worker jobs (ID: 6752) ...
#> Running 1 calculations (1 calls/chunk) ...
#> Running 500 calculations (1 calls/chunk) ...
#> Master: [4.7s 86.1% CPU]; Worker: [avg 4.7% CPU, max 310.8 Mb]
#>    user  system elapsed 
#>   6.465   0.299   7.415

On the other hand, I am not sure how much drake should accommodate projects that spread work so thinly over so many targets. Maybe packing and unpacking could compensate (#304).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants