Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE: foreach with doParallel and clustermq backends returns different results #126

Closed
Zhuk66 opened this issue Feb 25, 2019 · 4 comments
Closed

Comments

@Zhuk66
Copy link

Zhuk66 commented Feb 25, 2019

I am trying to implement nested loops using clustermq and doParallel.
I noticed incosistent behavior. For example:

setwd("~/zmq")
library(clustermq)
library(itertools)
library(foreach)
library(doParallel)
library(purrr)

myFunc <- function(x, ...) {
  list(return=1:x, error=rep(NA, x))
}

testCMQ <- function(x, parallel = FALSE, cmq = FALSE, ...) {
  
  args <- list(...)
  
  exports <- list(isplitVector=isplitVector, `%dopar%`=`%dopar%`, parallel=parallel, 
                  foreach=foreach, registerDoParallel=registerDoParallel, 
                  myFunc=myFunc, safely=safely, args=args)
  
  if(parallel) {
    if(cmq) {
      register_dopar_cmq(n_jobs=8, memory=4096, export=exports)
    } else {
      registerDoParallel(cores=4)
    }
  } else {
      registerDoSEQ()
  }
  
  
  rv <- foreach(chunkIt = isplitVector(1:x, chunkSize=4),
                      .combine  = append,
                      .inorder  = TRUE,
                      .verbose  = FALSE ) %dopar% {
                        
                        registerDoParallel(cores=4)
                        foreach( v = isplitVector(chunkIt,  chunkSize=1),
                                 .inorder  = TRUE,
                                 .verbose  = FALSE) %dopar% {
                                   
                                   args <- c(list(x=v), args)
                                   do.call(safely(myFunc), args)
                                 }
                      } %>% transpose
     return(rv)
}
  1. parallel execution with clustermq:
> res <- testCMQ(10, parallel=T, cmq=T)
Submitting 3 worker jobs (ID: 7288) ...
Running 3 calculations (1 calls/chunk) ...
Master: [1.5s 2.6% CPU]; Worker: [avg 98.8% CPU, max 242.5 Mb]                             
Warning messages:
1: Element 2 has length 3 not 4 
2: Element 3 has length 3 not 4 
> length(res)
[1] 4 
  1. Parallel execution with doParallel
> res <- testCMQ(10, parallel=T, cmq=F)
> length(res)
[1] 2
  1. Sequential execution:
> res <- testCMQ(10, parallel=F)
> length(res)
[1] 2
@mschubert
Copy link
Owner

Can you simplify your example a bit?

For instance, the inner foreach should never change as far as I understand.

@Zhuk66
Copy link
Author

Zhuk66 commented Feb 26, 2019

Internal cycle can be executed using registerDoParallel or registerDoSEQ, it does not change the result. However the result is different if we use registerDoParallel (registerDoSEQ) or register_dopar_cmq (before external foreach).

@Zhuk66
Copy link
Author

Zhuk66 commented Mar 21, 2019

Here is hack that solved this issue for me:
In master.r I added a function:
flatten <- function(data, rettype) {
res <- list()
for(i in 1:length(data)) {
for(j in 1:length(data[[i]])) {
res <- c(res, data[[i]][j])
}
}
type <- paste0('as.', rettype)
res <- do.call(type, list(res))
return(res)
}

... and I am calling it right after we've finished building a job_result:
83 } else # or else shut it down
84 qsys$send_shutdown_worker()
85 }
86
87 job_result <- flatten(job_result, rettype)
88
89 if (qsys$reusable || qsys$cleanup())
90 on.exit(NULL)
91
92 summarize_result(job_result, n_errors, n_warnings, cond_msgs,
93 min(submit_index)-1, fail_on_error)
94 }

@mschubert
Copy link
Owner

mschubert commented Apr 5, 2019

Minimal example:

register_dopar_cmq(n_jobs=0)
res = foreach(i=1:3, .combine=c) %dopar% sqrt(i)
cmp = foreach(i=1:3, .combine=c) %do% sqrt(i)

Fixed in 1f045e2.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants