Skip to content

Commit

Permalink
simplified sinking
Browse files Browse the repository at this point in the history
  • Loading branch information
mllg committed Aug 22, 2016
1 parent 4a974c1 commit 9099f46
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 52 deletions.
4 changes: 2 additions & 2 deletions R/clusterFunctionsInteractive.R
Expand Up @@ -26,10 +26,10 @@ makeClusterFunctionsInteractive = function(external = FALSE, write.logs = TRUE)

submitJob = function(reg, jc) {
if (external) {
runOSCommand(Rscript(), sprintf("-e \"batchtools::doJobCollection('%s', con = '%s')\"", jc$uri, jc$log.file))
runOSCommand(Rscript(), sprintf("-e \"batchtools::doJobCollection('%s', output = '%s')\"", jc$uri, jc$log.file))
makeSubmitJobResult(status = 0L, batch.id = "cfInteractive", msg = "")
} else {
doJobCollection(jc, con = jc$log.file)
doJobCollection(jc, output = jc$log.file)
makeSubmitJobResult(status = 0L, batch.id = "cfInteractive", msg = "")
}
}
Expand Down
16 changes: 10 additions & 6 deletions R/clusterFunctionsMulticore.R
Expand Up @@ -19,7 +19,7 @@ Multicore = R6Class("Multicore",
while(!self$collect()) {}

i = wf(is.na(self$pids))
self$pids[i] = parallel::mcparallel(doJobCollection(jc, con = jc$log.file), mc.set.seed = FALSE)$pid
self$pids[i] = parallel::mcparallel(doJobCollection(jc, output = jc$log.file), mc.set.seed = FALSE)$pid
self$hashes[i] = jc$job.hash
invisible(jc$job.hash)
},
Expand All @@ -39,7 +39,7 @@ Multicore = R6Class("Multicore",
)
)

#' @title ClusterFunctions for Parallel Execution
#' @title ClusterFunctions for Parallel Multicore Execution
#'
#' @description
#' Jobs are spawned asynchronously using the packages \pkg{parallel}.
Expand All @@ -50,15 +50,19 @@ Multicore = R6Class("Multicore",
#'
#' @param ncpus [\code{integer(1)}]\cr
#' Number of VPUs of worker.
#' Default is to use all cores but one, where total number of cores "available" is given by option \code{mc.cores}
#' Default is to use all cores. The total number of cores "available" is given by option \code{mc.cores}
#' and defaults to the heuristic implemented in \code{\link[parallel]{detectCores}}.
#' @return [\code{\link{ClusterFunctions}}].
#' @family clusterFunctions
#' @export
makeClusterFunctionsMulticore = function(ncpus = max(getOption("mc.cores", parallel::detectCores()), 1L)) {
makeClusterFunctionsMulticore = function(ncpus = NA_integer_) {
if (testOS("windows"))
stop("ClusterFunctionsMulticore do not support Windows. Use makeClusterFunctionsSocket instead.")
assertCount(ncpus, positive = TRUE)
assertCount(ncpus, positive = TRUE, na.ok = TRUE)
if (is.na(ncpus)) {
ncpus = max(getOption("mc.cores", parallel::detectCores()), 1L)
info("Auto-detected %i CPUs", ncpus)
}
p = Multicore$new(ncpus)

submitJob = function(reg, jc) {
Expand All @@ -71,6 +75,6 @@ makeClusterFunctionsMulticore = function(ncpus = max(getOption("mc.cores", paral
p$list()
}

makeClusterFunctions(name = "Parallel", submitJob = submitJob, listJobsRunning = listJobsRunning,
makeClusterFunctions(name = "Multicore", submitJob = submitJob, listJobsRunning = listJobsRunning,
hooks = list(pre.sync = function(reg, fns) p$collect()), store.job = FALSE)
}
12 changes: 8 additions & 4 deletions R/clusterFunctionsSocket.R
Expand Up @@ -17,7 +17,7 @@ Socket = R6Class("Socket",
self$pids[self$pids == res$tag] = ""
}
i = wf(!nzchar(self$pids))
snow::sendCall(self$cl[[i]], doJobCollection, list(jc = jc, con = jc$log.file), return = FALSE, tag = jc$job.hash)
snow::sendCall(self$cl[[i]], doJobCollection, list(jc = jc, output = jc$log.file), return = FALSE, tag = jc$job.hash)
self$pids[i] = jc$job.hash
invisible(jc$job.hash)
},
Expand Down Expand Up @@ -45,13 +45,17 @@ Socket = R6Class("Socket",
#'
#' @param ncpus [\code{integer(1)}]\cr
#' Number of VPUs of worker.
#' Default is to use all cores but one, where total number of cores "available" is given by option \code{mc.cores}
#' Default is to use all cores. The total number of cores "available" is given by option \code{mc.cores}
#' and defaults to the heuristic implemented in \code{\link[parallel]{detectCores}}.
#' @return [\code{\link{ClusterFunctions}}].
#' @family clusterFunctions
#' @export
makeClusterFunctionsSocket = function(ncpus = max(getOption("mc.cores", parallel::detectCores()), 1L)) {
assertCount(ncpus, positive = TRUE)
makeClusterFunctionsSocket = function(ncpus = NA_integer_) {
assertCount(ncpus, positive = TRUE, na.ok = TRUE)
if (is.na(ncpus)) {
ncpus = max(getOption("mc.cores", parallel::detectCores()), 1L)
info("Auto-detected %i CPUs", ncpus)
}
p = Socket$new(ncpus)

submitJob = function(reg, jc) {
Expand Down
40 changes: 13 additions & 27 deletions R/doJobCollection.R
Expand Up @@ -8,27 +8,27 @@
#' Either an object of class \dQuote{JobCollection} as returned by
#' \code{\link{makeJobCollection}} or a string point to file containing a
#' \dQuote{JobCollection} (saved with \code{\link[base]{saveRDS}}).
#' @param con [\code{\link[base]{connection}} | \code{character(1)}]\cr
#' A connection for the output. Defaults to \code{\link[base]{stdout}}.
#' Alternatively the name of a file to write to.
#' @param output [\code{character(1)}]\cr
#' Path to a file to write the output to. Defaults to \code{NULL} which means
#' that output is written to the active \code{\link[base]{sink}}.
#' @return [\code{character(1)}]: Hash of the \code{\link{JobCollection}} executed.
#' @export
doJobCollection = function(jc, con = stdout()) {
doJobCollection = function(jc, output = NULL) {
UseMethod("doJobCollection")
}


#' @export
doJobCollection.character = function(jc, con = stdout()) {
doJobCollection.character = function(jc, output = NULL) {
obj = readRDS(jc)
if (!obj$debug)
file.remove(jc)
doJobCollection.JobCollection(obj, con = con)
doJobCollection.JobCollection(obj, output = output)
}


#' @export
doJobCollection.JobCollection = function(jc, con = stdout()) {
doJobCollection.JobCollection = function(jc, output = NULL) {
error = function(msg, ...) {
updates = data.table(job.id = jc$defs$job.id, started = ustamp(), done = ustamp(),
error = stri_trunc(stri_trim_both(sprintf(msg, ...)), 500L, " [truncated]"),
Expand All @@ -44,28 +44,14 @@ doJobCollection.JobCollection = function(jc, con = stdout()) {
options(warn = 1L)
}

# sink output
if (!inherits(con, "terminal")) {
close.sinks = function() {
sink(type = "message")
sink(type = "output")
if (close.file)
close(con)
}

if (!inherits(con, "connection")) {
con = file(con, open = "wt")
close.file = TRUE
} else {
close.file = FALSE
}

sink(file = con)
sink(file = con, type = "message")
on.exit(close.sinks(), add = TRUE)
if (!is.null(output)) {
assertString(output)
fp = file(output, open = "wt")
sink(file = fp)
sink(file = fp, type = "message")
on.exit({ sink(type = "message"); sink(type = "output"); close(fp) }, add = TRUE)
}


# subset array jobs
if (isTRUE(jc$resources$chunks.as.arrayjobs) && !is.na(jc$array.var)) {
i = Sys.getenv(jc$array.var)
Expand Down
8 changes: 4 additions & 4 deletions man/doJobCollection.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions man/makeClusterFunctionsMulticore.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions man/makeClusterFunctionsSocket.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions tests/testthat/test_doJobCollection.R
Expand Up @@ -7,7 +7,7 @@ test_that("doJobCollection handles bulky log output", {
batchMap(fun, N, reg = reg)
jc = makeJobCollection(1, reg = reg)
fn = tempfile()
doJobCollection(jc, con = fn)
doJobCollection(jc, output = fn)
lines = readLines(fn)
expect_true(any(nchar(lines) >= N))
})
Expand All @@ -19,7 +19,7 @@ test_that("doJobCollection truncates error messages", {
batchMap(fun, N, reg = reg)
jc = makeJobCollection(1, reg = reg)
fn = tempfile()
doJobCollection(jc, con = fn)
doJobCollection(jc, output = fn)
syncRegistry(reg = reg)
msg = getErrorMessages(reg = reg)$message
expect_true(stri_endswith_fixed(msg, " [truncated]"))
Expand Down

0 comments on commit 9099f46

Please sign in to comment.