Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial work on a manual scheduler (#227) #259

Merged
merged 48 commits into from
Feb 20, 2018
Merged

Initial work on a manual scheduler (#227) #259

merged 48 commits into from
Feb 20, 2018

Conversation

wlandau
Copy link
Member

@wlandau wlandau commented Feb 17, 2018

This PR addresses #227. Here, make(..., parallelism = "future") uses a manual scheduler to process the targets (imports are processed using staged parallelism with the default backend). Users may choose between caching = "master" and caching = "worker" to select whether caching happens on the master process or each worker individually. (Workers may not have cache access, and non-storr_rds() caches may not be threadsafe). The queue in queue.R is not really a priority, queue, just a cheap imitation and placeholder. When dirmeier/datastructures#4 is fixed, we can move to a much better and more scalable one.

This scheduler is a new backend, and none of the other backends are affected. This is a good opportunity to iterate on the new scheduler and make it more efficient as we compare it to the other backends.

@codecov-io
Copy link

codecov-io commented Feb 17, 2018

Codecov Report

Merging #259 into master will not change coverage.
The diff coverage is 100%.

Impacted file tree graph

@@          Coverage Diff           @@
##           master   #259    +/-   ##
======================================
  Coverage     100%   100%            
======================================
  Files          64     66     +2     
  Lines        4682   4871   +189     
======================================
+ Hits         4682   4871   +189
Impacted Files Coverage Δ
R/future.R 100% <100%> (ø)
R/parallel_ui.R 100% <100%> (ø) ⬆️
R/make.R 100% <100%> (ø) ⬆️
R/config.R 100% <100%> (ø) ⬆️
R/workplan.R 100% <100%> (ø) ⬆️
R/dependencies.R 100% <100%> (ø) ⬆️
R/queue.R 100% <100%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 763eda6...825f146. Read the comment docs.

This commit documents the experimental "future"
backend and "caching" argument to `make()`.
These features need time to play out in real projects,
but they are ready for code review and hopefully alpha testing.
@wlandau
Copy link
Member Author

wlandau commented Feb 18, 2018

FYI: just wrapped in a solution to #169. Seems to work fine on an old project I keep around for testing purposes. Details are here in the parallelism vignette. cc @kendonB

@kendonB
Copy link
Contributor

kendonB commented Feb 18, 2018

Amazing and I look forward to testing this! Just glancing at the vignette:

  1. it looks like the evaluator list is not a column of my_plan. If the interface is to have this object just floating in the environment, I strongly recommend you change it to being a column in my_plan. What if I sorted myplan? Or added to the top of my_plan without adding evaluators? Ideally this would be an argument to the *_plan functions and allocating resources as I'm building the different pieces of my plan. Maybe this is your intention once this is tested and working?

  2. Last I looked the future::plan function returns the previous plan if there was one, otherwise it returns the new one. The code in the vignette would then have the same plan twice in local and remote.

So good that this functionality is getting in. No more making in multiple stages!

Sent with thumbs only.

@wlandau
Copy link
Member Author

wlandau commented Feb 18, 2018

Glad to hear your enthusiasm, Kendon. I can think of no better alpha/beta tester than you.

  1. In that place in the vignette, I mean to assign the evaluator list to my_plan$evaluator. It really is supposed to be a column in my_plan, I just made a mistake in the vignette. Will fix soon.
  2. You're right.
 library(future)
> remote <- future::plan(multisession)
> local <- future::plan(multicore)
> remote
sequential:
- args: function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, local = TRUE, earlySignal = FALSE, label = NULL, ...)
- tweaked: FALSE
- call: plan("default", .init = FALSE)
> local
multisession:
- args: function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, persistent = FALSE, workers = availableCores(), gc = FALSE, earlySignal = FALSE, label = NULL, ...)
- tweaked: FALSE
- call: future::plan(multisession)
> 

@HenrikBengtsson, what is the safest way to collect a bunch of heterogeneous custom evaluators in advance?

@HenrikBengtsson
Copy link

A plain R list should be fine; "evaluators" are just plain functions with some extra attributes.

@wlandau
Copy link
Member Author

wlandau commented Feb 18, 2018

@HenrikBengtsson that's great to hear. It means #169 should be possible. Is there a better way to generate that list of evaluators? As I show in the above code fragment, it seems like I am either not using future::plan() correctly here or I need to use something different than future::plan() altogether.

@HenrikBengtsson
Copy link

Without having going into the details of what your doing, I should share there is a long-term(!) plan to support resource specifications when setting up futures. It's on my todo list to write some of these ideas up to get the discussion going and to help others like to to plan ahead. I'm thinking of something like:

a <- future({ expr }, resources = c(ram = ">= 5 GB", mount = "/home/alice/data", local = TRUE))
b <- future({ expr }, resources = c(ram = ">= 3 GB"))

This would make it possible to control which worker / type of worker each future will be resolved on. It sounds related to what's in this issue (at least from a quick look). To implement this will take a lot of time to get right, but it might help converge thoughts and ideas.

I need to find time to fully understand where you're going, but fo you to go forward, the rule of thumb would be to avoid hacking into the internals of futures "too much". If there's a use case that you have that is not currently provided by the Future API, it's not unlikely we can figure a way to add it. Your and others higher-level usage of the Future API helps form its identity.

@wlandau
Copy link
Member Author

wlandau commented Feb 18, 2018

Thanks Henrik. Now I understand future's current capabilities a little better. From what @kendonB has said, the main use case is the ability to specify different resources. But I think it would also be nice to be able to simultaneously tell some futures to run locally and others to run on HPC compute nodes. For example, some drake targets run long computations, and others just knit little R Markdown reports that annotate results summaries.

Copy link
Collaborator

@krlmlr krlmlr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Just a few minor nits. I'm really excited to try this!

workers <- initialize_workers(config)
# While any targets are queued or running...
while (work_remains(queue = queue, workers = workers, config = config)){
for (id in seq_along(workers)){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (id in get_idle_workers(workers)) { would reduce indent by one, but I'm not sure of the consequences.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean it should be outside the while(work_remains(...)){ loop?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I meant collapsing the for and the if. Probably not worth it.

)
}
}
Sys.sleep(1e-9)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually sleep? Maybe it's cleaner to offload to a function that waits for a task to finish, some parallel backends may provide blocking methods that don't poll.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my preliminary debugging/testing, I found it helpful to set the interval to 0.1, and I did notice the sleeping. Definitely in favor of a better alternative, I just do not know what that would be specifically.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure we're on the same page: There's polling (which always works), and there's notifications/passive waiting (which needs backend support). To avoid full CPU load when polling, it's advisable to sleep -- not too much, but also not too briefly.

Here, I'd suggest to extract a function that returns as soon as the scheduler needs to do some work. This allows to use passive waiting where available, or polling otherwise.

future::plan("next")
structure(
future::future(
expr = drake_future_task(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid exporting drake_future_task, we'd need to sneak in the triple colon here somehow to avoid R CMD check failures. Not sure if it's worth the effort.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not sure it's worth the effort either.

R/future.R Outdated
}

all_concluded <- function(workers, config){
lapply(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the rare situations where I'd use a for loop for early abortion. I haven't found a purrr verb that implements this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And as far as looping is concerned, the number of workers is likely to be small. See c753067.

if (is_idle(worker)){
NULL
} else {
# It's hard to make this line run in a small test workflow
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this line be hit in a parallel workflow with Sleep(1) till Sleep(5) and two processors? Maybe also with Sleep(0.1) till Sleep(0.5) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would hope so. I have had problems with Sys.sleep() in the past, but I think those issues are unrelated.

out
}

decrease_revdep_keys <- function(worker, config, queue){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nomenclature: "rev" vs. "downstream"/"upstream"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose "revdeps" here because drake talks about "dependencies" a lot in its internals. There is far less "downstream"/"upstream" terminology, and maybe we should make all the terms agree everywhere.

# The real priority queue will be
# https://github.com/dirmeier/datastructures
# once the CRAN version has decrease-key
# (https://github.com/dirmeier/datastructures/issues/4).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add

Remotes:
    dirmeier/datastructures

to DESCRIPTION.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it. I haven't had a chance to play with decrease-key in datastructures, but I will soon.

self$pop(n = 1, what = what)
}
},
# This is all wrong and inefficient.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's vectorized, so it's not too bad ;-) We'll have to implement efficient name lookups (or a mapping from target names to integers) for full efficiency. I wouldn't worry too much unless it becomes a bottleneck.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the encouragement. It was the best I could do for a cheap imitation priority queue given R's suboptimal scalability when it comes to recursion and looping. I will be thinking about this as we move to datastructures.

R/workplan.R Outdated
@@ -153,7 +153,7 @@ drake_plan_override <- function(target, field, config){
if (is.null(in_plan)){
return(config[[field]])
} else {
return(in_plan[config$plan$target == target])
return(in_plan[[which(config$plan$target == target)]])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if which() returns a vector of length != 1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching that. Fixed in 825f146.

# suitable enough for unit testing, but
# I did artificially stall targets and verified that this line
# is reached in the future::multisession backend as expected.
next # nocov
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to Sys.sleep() here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not see a reason, but maybe I am missing something. What would it do there?

Copy link
Collaborator

@krlmlr krlmlr Feb 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid full CPU load?

@wlandau wlandau merged commit cfac8fd into master Feb 20, 2018
@wlandau wlandau deleted the i227-attempt2 branch February 20, 2018 20:14
@krlmlr
Copy link
Collaborator

krlmlr commented Feb 20, 2018

I'm now missing progress output with parallelism = "future_lapply" and future::plan(future.callr::callr).

@wlandau
Copy link
Member Author

wlandau commented Feb 20, 2018

Do you mean metadata returned by drake::progress() or do you mean stdout/stderr logs?

@krlmlr
Copy link
Collaborator

krlmlr commented Feb 20, 2018

The green target and the target name.

@HenrikBengtsson
Copy link

Quick comment on the Future API: Adding support for querying the progress of a worker/future, will have to fall under "optional" features since we cannot demand all future backends to support it. Adding support for optional features to the Future API has to be done in a way that will be compatible with whatever possible future backends a random person will invent in the future (pun/sic!) - that is - extending the API has to be done with great care which is why it does not "just" exist (it's easy for some backends). I've started HenrikBengtsson/future#172 for discussing how to go forward and find minimal common denominators for these type of features.

@wlandau
Copy link
Member Author

wlandau commented Feb 20, 2018

Kirill: With parallelism = "future" and caching = "master", you may have more luck. But with future_lapply(), all those messages get printed by the workers and lost in their futures. Related: HenrikBengtsson/future#67, HenrikBengtsson/future#141, HenrikBengtsson/future#171, and HenrikBengtsson/future#172.

Henrik: thanks for clarifying. It sounds like drake might have to do its own job monitoring at some point.

@krlmlr
Copy link
Collaborator

krlmlr commented Feb 21, 2018

Thanks. I'd expect the scheduler (master) to print progress messages. Will file a new issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants