Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

when combining, workers persist for longer than needed #1080

Closed
3 tasks done
psadil opened this issue Nov 24, 2019 · 15 comments
Closed
3 tasks done

when combining, workers persist for longer than needed #1080

psadil opened this issue Nov 24, 2019 · 15 comments

Comments

@psadil
Copy link

psadil commented Nov 24, 2019

Prework

  • Read and abide by drake's code of conduct.
  • Search for duplicates among the existing issues, both open and closed.
  • If you think your issue has a quick and definite solution, consider posting to Stack Overflow under the drake-r-package tag. (If you anticipate extended follow-up and discussion, you are already in the right place!)

Description

This is closely related to #751, but I think slightly different (sorry if it's actually the same issue!). A workflow has a first target which is easy to parallelize (target B, below). The second target combines these outputs (target C), which requires fewer workers. That's the end of the workflow. However, it seems like workers from the first target are not shut down until the second target begins. This means that workers from the first target can persist for much longer than they're needed.

Reproducible example

Two workers get going on the B targets. B_10 completes quickly, B_60 takes a bit longer. Only one of those workers will be needed for C, but B_10 hangs around until B_60 has finished.

options(
  clustermq.scheduler = "lsf",
  clustermq.template = "/home/ps52a/lsf_clustermq.tmpl")

plan <- drake::drake_plan(
  B = drake::target(
    Sys.sleep(A),
    transform = map(A = c(1, 6))),
  C = drake::target(
    Sys.sleep(prod(B)),
    transform = combine(B))
)

con <- drake::drake_config(
  plan = plan,
  jobs = 2,
  parallelism = 'clustermq',
  template = list(log_file = 'log.txt'),
  verbose = 2
)

drake::make(
  plan = plan,
  config = con
)

sample output

zeromq 4.1.4 is located under /share/pkg/zeromq/4.1.4
R 3.6.0 is located under /share/pkg/R/3.6.0
When compiling modules for this, be sure to load gcc/8.1.0

R version 3.6.0 (2019-04-26) -- "Planting of a Tree"
Copyright (C) 2019 The R Foundation for Statistic
R version 3.6.0 (2019-04-26) -- "Planting of a Tree"
Copyright (C) 2019 The R Foundation for Statistical Computing
Platform: x86_64-pc-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

  Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'hel> clustermq:::worker("tcp://ghpcc06:7236", timeout=600)
2019-11-24 10:55:51.727918 | Master: tcp://ghpcc06:7236
2019-11-24 10:55:51.790999 | WORKER_UP to: tcp://ghpcc06:7236
2019-11-24 10:55:52.702796 | > DO_SETUP (0.905s wait)
2019-11-24 10:55:52.703698 | token from msg: set_common_data_token
2019-11-24 10:55:52.727205 | > DO_CALL (0.000s wait)
⠦05506 | token from msg: set_common_data_token
2019-11-24 10:55:52.741344 | > DO_CALL (0.000s wait)
⠦2019-11-24 10:56:02.785387 | eval'd: drake::cmq_buildtargetmetadepslayoutconfig
2019-11-24 10:56:02.867493 | > WORKER_WAIT (0.000s wait)
2019-11-24 10:56:02.868206 | waiting 0.10s
2019-11-24 10:56:02.970602 | > WORKER_WAIT (0.000s wait)
2019-11-24 10:56:02.971313 | waiting 0.10s
2019-11-24 10:56:03.073992 | > WORKER_WAIT (0.001s wait)
2019-11-24 10:56:03.074686 | waiting 0.10s
2019-11-24 10:56:03.177123 | > WORKER_WAIT (0.000s wait)
2019-11-24 10:56:03.177838 | waiting 0.10s
2019-11-24 10:56:03.288285 | > WORKER_WAIT (0.000s wait)
...

The worker continues to wait for a while, and one worker (I'm not sure which one) shuts down when C starts

Desired result

When a worker has finished a job and there are enough other currently engaged workers to finish the remaining jobs, that finished worker will be shut down.

Does this mean I should use transient workers?

Session info

> sessionInfo()
R version 3.6.0 (2019-04-26)                       
Platform: x86_64-pc-linux-gnu (64-bit)             
Running under: Red Hat Enterprise Linux Server release 6.10 (Santiago)                                

Matrix products: default                           
BLAS:   /share/pkg/R/3.6.0/lib64/R/lib/libRblas.so 
LAPACK: /share/pkg/R/3.6.0/lib64/R/lib/libRlapack.so                                                  

locale:                                            
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C                                                          
 [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8                                                
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8                                               
 [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                                                             
 [9] LC_ADDRESS=C               LC_TELEPHONE=C                                                        
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C                                                   

attached base packages:                            
[1] stats     graphics  grDevices utils     datasets  methods   base                                  

other attached packages:                           
[1] drake_7.7.0                                    

loaded via a namespace (and not attached):         
 [1] txtq_0.2.0       digest_0.6.22    crayon_1.3.4     assertthat_0.2.1                              
 [5] R6_2.4.0         backports_1.1.5  storr_1.2.1      magrittr_1.5                                  
 [9] pillar_1.4.2     rlang_0.4.1      cli_1.1.0        filelock_1.0.2                                
[13] igraph_1.2.4.1   compiler_3.6.0   pkgconfig_2.0.3  rzmq_0.9.6                                    
[17] base64url_1.4    clustermq_0.8.8  tibble_2.1.3
@wlandau
Copy link
Collaborator

wlandau commented Nov 24, 2019

Yeah, I see what you mean, and I agree it does occupy resources unnecessarily.

One limitation is that drake's master process cannot see or control what is happening on each individual clustermq worker. (It can only talk to the pool of persistent workers as a whole.) Before you spoke up, I thought it would be impossible to work around this. But you got me thinking more, and there is actually something we can try..

Proposal: send a shutdown message whenever the number of unscheduled targets (ones we have not yet sent to workers) drops below the number of workers. I predict that extra idle workers will terminate and workers in the middle of building targets will remain unaffected. I think the shutdown message will go to the next available idle worker, and even if it does go to a worker in the middle of a job, I do not think it will interrupt the job (@mschubert, would you confirm?)

@wlandau
Copy link
Collaborator

wlandau commented Nov 24, 2019

On second thought, I do not think that idea will work. The development version of drake supports dynamic branching, which lets you create new (sub-)targets while make() is running. This means we do not always know in advance how many targets we will have, which makes it hard to safely drop workers on the fly. I guess if we could dynamically add workers to a clustermq QSys object, we would be able to drop them safely, but I do not think this is currently possible in clustermq (@mschubert, please correct me if I am wrong).

So @psadil, like you said initially, I think a better way to conserve resources in your case is to use transient workers, e.g. make(parallelism = "future").

So close yet so far...

@wlandau
Copy link
Collaborator

wlandau commented Nov 24, 2019

Depending on mschubert/clustermq#182, I might actually come back to this issue. @psadil, on the off chance that that happens, would you install https://github.com/ropensci/drake/tree/1080 and see if those extra workers disappear?

@mschubert
Copy link

Your current handling in f449117 seems reasonable to me.

Using the worker API, you will always have to decide whether to shut down a worker or have it wait, because clustermq does not know how much work remains.

@psadil
Copy link
Author

psadil commented Nov 25, 2019

@wlandau, thanks for the response. I'd be happy to try out your fix. But I think it'll be a few days before I can run it.

@ooo
Copy link

ooo bot commented Nov 25, 2019

👋 Hey @psadil...

Letting you know, @wlandau is currently OOO until Monday, December 2nd 2019. ❤️

@wlandau
Copy link
Collaborator

wlandau commented Nov 25, 2019

Forgot to mention: if you supply a drake_config() to make(), all other arguments are ignored. If you supply a config, I highly recommend that you do not set any other arguments.

@wlandau
Copy link
Collaborator

wlandau commented Nov 25, 2019

Hmm... it actually doesn't work on my end because config$counter$remaining includes all the targets currently running. But like I said, I may revisit the issue when mschubert/clustermq#182 is fixed.

@psadil
Copy link
Author

psadil commented Dec 2, 2019

Hi, @wlandau. am I reading your last message correctly that you've already tested the fix you implemented in f449117, and so another test would not someone else to try to run it?

either way, thanks for the responses (even while you were OOO!)

@wlandau
Copy link
Collaborator

wlandau commented Dec 2, 2019

Yeah, do not worry about trying it yourself. It does not work yet, and we cannot roll out a solution anyway until mschubert/clustermq#182 is addressed.

@mschubert
Copy link

we cannot roll out a solution anyway until mschubert/clustermq#182 is addressed

If I'm not misunderstanding the point, this is not entirely true.

Let's say drake knows that target one runs on 100 nodes, target 2 on one node, and the workflow only consists of those two targets.

You can use the worker API as it is right now to never have more workers than remaining targets (if you send the shutdown signal instead of the wait for all superfluous workers).

I thought this is what you did in f449117? This should work, and improve the situation. The only thing that depends on mschubert/clustermq#182 is if there is a target 3 that requires more workers again, and you want to temporarily down- and then upscale your workers.

@wlandau
Copy link
Collaborator

wlandau commented Dec 2, 2019

I thought this is what you did in f449117? This should work, and improve the situation.

It almost works, but config$counter$remaining takes both unassigned and currently building targets into account. I suspect we need config$queue$size() instead.

if (config$counter$workers > config$counter$remaining) {

The only thing that depends on mschubert/clustermq#182 is if there is a target 3 that requires more workers again, and you want to temporarily down- and then upscale your workers.

Dynamic branching creates such additional targets while make() is running, so we need the ability to dynamically restore workers back up to the nominal level.

@wlandau
Copy link
Collaborator

wlandau commented Dec 2, 2019

FYI: I just pushed a quick fix to the 1080 branch (8895b1d) which worked on my company's SGE cluster. In the following script, drake dropped down to 1 worker for most of the runtime.

library(drake)
options(
  clustermq.scheduler = "sge",
  clustermq.template = "sge.tmpl" # drake_hpc_template_file("sge_clustermq.tmpl")
)
clean(destroy = TRUE)

plan <- drake_plan(
  a = target(Sys.sleep(x), transform = map(x = c(1, 10))),
  b = target(Sys.sleep(prod(a)), transform = combine(a))
)

make(plan, parallelism = "clustermq", jobs = 3)

@brendanf
Copy link
Contributor

Wouldn't it at least be safe to shutdown extra workers once there are 0 remaining unassigned targets?

@wlandau
Copy link
Collaborator

wlandau commented Dec 10, 2019

Yes, and drake does do this already.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants