Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into docker
Browse files Browse the repository at this point in the history
  • Loading branch information
mllg committed Feb 14, 2017
2 parents 66de166 + 46e2bfe commit a113567
Show file tree
Hide file tree
Showing 45 changed files with 268 additions and 136 deletions.
12 changes: 10 additions & 2 deletions .travis.yml
@@ -1,7 +1,15 @@
language: r
dist: trusty
sudo: false
language: r
cache: packages

r:
- oldrel
- release
- devel

r_packages:
- covr

after_success:
- Rscript -e 'covr::coveralls()'
- if [[ "${TRAVIS_R_VERSION_STRING}" == "release" ]]; then Rscript -e 'covr::coveralls()'; fi
2 changes: 1 addition & 1 deletion DESCRIPTION
Expand Up @@ -50,4 +50,4 @@ Suggests:
snow,
testthat
VignetteBuilder: knitr
RoxygenNote: 6.0.0
RoxygenNote: 6.0.1
2 changes: 2 additions & 0 deletions NEWS.md
Expand Up @@ -8,6 +8,8 @@
* New function `batchReduce()`.
* New function `estimateRuntimes()`.
* New function `removeRegistry()`.
* Missing result files are now handled more consistently, raising an exception in its defaults if the result is not available.
The argument `missing.val` has been added to `reduceResultsList()` and `reduceResultsDataTable()` and removed from `loadResult()` and `batchMapResults()`.
* Timestamps are now stored with sub-second accuracy.
* Renamed Torque to TORQUE. This especially affects the function `makeClusterFunctionsTorque` which must now be called via `makeClusterFunctionsTORQUE()`
* `chunkIds()` has been deprecated. Use `chunk()`, `lpt()` or `binpack()` instead.
Expand Down
11 changes: 4 additions & 7 deletions R/batchMapResults.R
Expand Up @@ -10,7 +10,6 @@
#' @template ids
#' @param ... [ANY]\cr
#' Arguments to vectorize over (list or vector). Passed to \code{\link{batchMap}}.
#' @template missing.val
#' @template more.args
#' @param target [\code{\link{Registry}}]\cr
#' Empty Registry where new jobs are created for.
Expand All @@ -36,9 +35,9 @@
#' # Map old to new ids. First, get a table with results and parameters
#' results = rjoin(getJobPars(reg = target), reduceResultsDataTable(reg = target))
#'
#' # Parameter '..id' points to job.id in 'source'. Use an inner join to combine:
#' # Parameter '..id' points to job.id in 'source'. Use a inner join to combine:
#' ijoin(results, reduceResultsDataTable(reg = tmp), by = c("..id" = "job.id"))
batchMapResults = function(fun, ids = NULL, ..., missing.val, more.args = list(), target, source = getDefaultRegistry()) {
batchMapResults = function(fun, ids = NULL, ..., more.args = list(), target, source = getDefaultRegistry()) {
assertRegistry(source, sync = TRUE)
assertRegistry(target, sync = TRUE)
assertFunction(fun)
Expand All @@ -49,13 +48,11 @@ batchMapResults = function(fun, ids = NULL, ..., missing.val, more.args = list()
stop("Target registry 'target' must be empty")

more.args = c(list(..file.dir = source$file.dir, ..fun = fun), more.args)
if (!missing(missing.val))
more.args["..missing.val"] = list(missing.val)
args = c(list(..id = ids$job.id), list(...))

batchMap(batchMapResultsWrapper, args = args, more.args = more.args, reg = target)
}

batchMapResultsWrapper = function(..fun, ..file.dir, ..id, ..missing.val, ...) {
..fun(.loadResult(..file.dir, ..id, ..missing.val), ...)
batchMapResultsWrapper = function(..fun, ..file.dir, ..id, ...) {
..fun(readRDS(getResultFiles(..file.dir, ..id)), ...)
}
16 changes: 11 additions & 5 deletions R/clusterFunctions.R
Expand Up @@ -31,12 +31,17 @@
#' @param array.var [\code{character(1)}]\cr
#' Name of the environment variable set by the scheduler to identify IDs of job arrays.
#' Default is \code{NA} for no array support.
#' @param scheduler.delay [\code{numeric(1)}]\cr
#' Time to sleep after important interactions with the scheduler to ensure a sane state.
#' Currently only user after \code{\link{submitJobs}}.
#' @param store.job [\code{logical(1)}]\cr
#' Flag to indicate that the cluster function implementation of \code{submitJob} can not directly handle \code{\link{JobCollection}} objects.
#' If set to \code{FALSE}, the \code{\link{JobCollection}} is serialized to the file system before submitting the job.
#' @param scheduler.latency [\code{numeric(1)}]\cr
#' Time to sleep after important interactions with the scheduler to ensure a sane state.
#' Currently only triggered after calling \code{\link{submitJobs}}.
#' @param fs.latency [\code{numeric(1)}]\cr
#' Expected maximum latency of the file system, in seconds.
#' Set to a positive number for network file systems like NFS which enables more robust (but also more expensive) mechanisms to
#' access files and directories.
#' Usually safe to set to \code{NA} which disables the expensive heuristic if you are working on a local file system.
#' @param hooks [\code{list}]\cr
#' Named list of functions which will we called on certain events like \dQuote{pre.submit} or \dQuote{post.sync}.
#' See \link{Hooks}.
Expand All @@ -45,7 +50,7 @@
#' @family ClusterFunctions
#' @family ClusterFunctionsHelper
makeClusterFunctions = function(name, submitJob, killJob = NULL, listJobsQueued = NULL, listJobsRunning = NULL,
array.var = NA_character_, store.job = FALSE, scheduler.delay = 0, hooks = list()) {
array.var = NA_character_, store.job = FALSE, scheduler.latency = 0, fs.latency = NA_real_, hooks = list()) {
assertList(hooks, types = "function", names = "unique")
assertSubset(names(hooks), unlist(batchtools$hooks, use.names = FALSE))

Expand All @@ -57,7 +62,8 @@ makeClusterFunctions = function(name, submitJob, killJob = NULL, listJobsQueued
listJobsRunning = assertFunction(listJobsRunning, "reg", null.ok = TRUE),
array.var = assertString(array.var, na.ok = TRUE),
store.job = assertFlag(store.job),
scheduler.delay = assertNumber(scheduler.delay, lower = 0),
scheduler.latency = assertNumber(scheduler.latency, lower = 0),
fs.latency = assertNumber(fs.latency, lower = 0, na.ok = TRUE),
hooks = hooks),
"ClusterFunctions")
}
Expand Down
5 changes: 3 additions & 2 deletions R/clusterFunctionsDocker.R
Expand Up @@ -33,7 +33,7 @@
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsDocker = function(image, docker.args = character(0L), image.args = character(0L), scheduler.delay = 1) { # nocov start
makeClusterFunctionsDocker = function(image, docker.args = character(0L), image.args = character(0L), scheduler.latency = 1, fs.latency = 65) { # nocov start
assertString(image)
assertCharacter(docker.args, any.missing = FALSE)
assertCharacter(image.args, any.missing = FALSE)
Expand Down Expand Up @@ -109,5 +109,6 @@ makeClusterFunctionsDocker = function(image, docker.args = character(0L), image.
}

makeClusterFunctions(name = "Docker", submitJob = submitJob, killJob = killJob, listJobsRunning = listJobsRunning,
store.job = TRUE, scheduler.delay = scheduler.delay, hooks = list(pre.submit.job = housekeeping, post.sync = housekeeping))
store.job = TRUE, scheduler.latency = scheduler.latency, fs.latency = fs.latency,
hooks = list(pre.submit.job = housekeeping, post.sync = housekeeping))
} # nocov end
5 changes: 3 additions & 2 deletions R/clusterFunctionsInteractive.R
Expand Up @@ -16,10 +16,11 @@
#' Sink the output to log files. Turning logging off can increase the speed of
#' calculations but makes it very difficult to debug.
#' Default is \code{TRUE}.
#' @inheritParams makeClusterFunctions
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsInteractive = function(external = FALSE, write.logs = TRUE) {
makeClusterFunctionsInteractive = function(external = FALSE, write.logs = TRUE, fs.latency = NA_real_) {
assertFlag(external)
assertFlag(write.logs)

Expand All @@ -35,5 +36,5 @@ makeClusterFunctionsInteractive = function(external = FALSE, write.logs = TRUE)
}
}

makeClusterFunctions(name = "Interactive", submitJob = submitJob, store.job = external)
makeClusterFunctions(name = "Interactive", submitJob = submitJob, store.job = external, fs.latency = fs.latency)
}
4 changes: 2 additions & 2 deletions R/clusterFunctionsLSF.R
Expand Up @@ -24,7 +24,7 @@
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsLSF = function(template = findTemplateFile("lsf"), scheduler.delay = 1) { # nocov start
makeClusterFunctionsLSF = function(template = findTemplateFile("lsf"), scheduler.latency = 1, fs.latency = 65) { # nocov start
template = cfReadBrewTemplate(template)

# When LSB_BJOBS_CONSISTENT_EXIT_CODE = Y, the bjobs command exits with 0 only
Expand Down Expand Up @@ -73,5 +73,5 @@ makeClusterFunctionsLSF = function(template = findTemplateFile("lsf"), scheduler
}

makeClusterFunctions(name = "LSF", submitJob = submitJob, killJob = killJob, listJobsQueued = listJobsQueued,
listJobsRunning = listJobsRunning, store.job = TRUE, scheduler.delay = scheduler.delay)
listJobsRunning = listJobsRunning, store.job = TRUE, scheduler.latency = scheduler.latency, fs.latency = fs.latency)
} # nocov end
5 changes: 3 additions & 2 deletions R/clusterFunctionsMulticore.R
Expand Up @@ -73,10 +73,11 @@ Multicore = R6Class("Multicore",
#' Does not work on Windows, use \code{\link{makeClusterFunctionsSocket}} instead.
#'
#' @template ncpus
#' @inheritParams makeClusterFunctions
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsMulticore = function(ncpus = NA_integer_) {
makeClusterFunctionsMulticore = function(ncpus = NA_integer_, fs.latency = NA_real_) {
if (testOS("windows"))
stop("ClusterFunctionsMulticore do not support Windows. Use makeClusterFunctionsSocket instead.")
ncpus = asCount(ncpus, na.ok = TRUE, positive = TRUE)
Expand All @@ -98,5 +99,5 @@ makeClusterFunctionsMulticore = function(ncpus = NA_integer_) {
}

makeClusterFunctions(name = "Multicore", submitJob = submitJob, listJobsRunning = listJobsRunning,
store.job = FALSE, hooks = list(pre.sync = function(reg, fns) p$collect(1)))
store.job = FALSE, fs.latency = fs.latency, hooks = list(pre.sync = function(reg, fns) p$collect(1)))
}
4 changes: 2 additions & 2 deletions R/clusterFunctionsOpenLava.R
Expand Up @@ -24,7 +24,7 @@
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsOpenLava = function(template = findTemplateFile("openlava"), scheduler.delay = 1) { # nocov start
makeClusterFunctionsOpenLava = function(template = findTemplateFile("openlava"), scheduler.latency = 1, fs.latency = 65) { # nocov start
template = cfReadBrewTemplate(template)

# When LSB_BJOBS_CONSISTENT_EXIT_CODE = Y, the bjobs command exits with 0 only
Expand Down Expand Up @@ -73,5 +73,5 @@ makeClusterFunctionsOpenLava = function(template = findTemplateFile("openlava"),
}

makeClusterFunctions(name = "OpenLava", submitJob = submitJob, killJob = killJob, listJobsQueued = listJobsQueued,
listJobsRunning = listJobsRunning, store.job = TRUE, scheduler.delay = scheduler.delay)
listJobsRunning = listJobsRunning, store.job = TRUE, scheduler.latency = scheduler.latency, fs.latency = fs.latency)
} # nocov end
4 changes: 2 additions & 2 deletions R/clusterFunctionsSGE.R
Expand Up @@ -25,7 +25,7 @@
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsSGE = function(template = findTemplateFile("sge"), scheduler.delay = 1) { # nocov start
makeClusterFunctionsSGE = function(template = findTemplateFile("sge"), scheduler.latency = 1, fs.latency = 65) { # nocov start
template = cfReadBrewTemplate(template)

submitJob = function(reg, jc) {
Expand Down Expand Up @@ -65,5 +65,5 @@ makeClusterFunctionsSGE = function(template = findTemplateFile("sge"), scheduler
}

makeClusterFunctions(name = "SGE", submitJob = submitJob, killJob = killJob, listJobsQueued = listJobsQueued,
listJobsRunning = listJobsRunning, store.job = TRUE, scheduler.delay = scheduler.delay)
listJobsRunning = listJobsRunning, store.job = TRUE, scheduler.latency = scheduler.latency, fs.latency = fs.latency)
} # nocov end
6 changes: 4 additions & 2 deletions R/clusterFunctionsSSH.R
Expand Up @@ -7,6 +7,7 @@
#'
#' @param workers [\code{list} of \code{\link{Worker}}]\cr
#' List of Workers as constructed with \code{\link{Worker}}.
#' @inheritParams makeClusterFunctions
#'
#' @note
#' If you use a custom \dQuote{.ssh/config} file, make sure your
Expand All @@ -22,7 +23,7 @@
#' # cluster functions for multicore execution on the local machine
#' makeClusterFunctionsSSH(list(Worker$new("localhost", ncpus = 2)))
#' }
makeClusterFunctionsSSH = function(workers) { # nocov start
makeClusterFunctionsSSH = function(workers, fs.latency = 65) { # nocov start
assertList(workers, types = "Worker")
nodenames = vcapply(workers, "[[", "nodename")
if (anyDuplicated(nodenames))
Expand Down Expand Up @@ -62,5 +63,6 @@ makeClusterFunctionsSSH = function(workers) { # nocov start
unlist(lapply(workers, function(w) w$list(reg)), use.names = FALSE)
}

makeClusterFunctions(name = "SSH", submitJob = submitJob, killJob = killJob, listJobsRunning = listJobsRunning, store.job = TRUE)
makeClusterFunctions(name = "SSH", submitJob = submitJob, killJob = killJob, listJobsRunning = listJobsRunning,
store.job = TRUE, fs.latency = fs.latency)
} # nocov end
5 changes: 3 additions & 2 deletions R/clusterFunctionsSlurm.R
Expand Up @@ -30,7 +30,7 @@
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsSlurm = function(template = findTemplateFile("slurm"), clusters = NULL, array.jobs = TRUE, scheduler.delay = 1) { # nocov start
makeClusterFunctionsSlurm = function(template = findTemplateFile("slurm"), clusters = NULL, array.jobs = TRUE, scheduler.latency = 1, fs.latency = 65) { # nocov start
if (!is.null(clusters))
assertString(clusters, min.chars = 1L)
assertFlag(array.jobs)
Expand Down Expand Up @@ -100,5 +100,6 @@ makeClusterFunctionsSlurm = function(template = findTemplateFile("slurm"), clust
}

makeClusterFunctions(name = "Slurm", submitJob = submitJob, killJob = killJob, listJobsRunning = listJobsRunning,
listJobsQueued = listJobsQueued, array.var = "SLURM_ARRAY_TASK_ID", store.job = TRUE, scheduler.delay = scheduler.delay)
listJobsQueued = listJobsQueued, array.var = "SLURM_ARRAY_TASK_ID", store.job = TRUE,
scheduler.latency = scheduler.latency, fs.latency = fs.latency)
} # nocov end
5 changes: 3 additions & 2 deletions R/clusterFunctionsSocket.R
Expand Up @@ -45,10 +45,11 @@ Socket = R6Class("Socket",
#' Jobs are spawned asynchronously using the package \pkg{snow}.
#'
#' @template ncpus
#' @inheritParams makeClusterFunctions
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsSocket = function(ncpus = NA_integer_) {
makeClusterFunctionsSocket = function(ncpus = NA_integer_, fs.latency = 65) {
assertCount(ncpus, positive = TRUE, na.ok = TRUE)
if (is.na(ncpus)) {
ncpus = max(getOption("mc.cores", parallel::detectCores()), 1L, na.rm = TRUE)
Expand All @@ -70,5 +71,5 @@ makeClusterFunctionsSocket = function(ncpus = NA_integer_) {
}

makeClusterFunctions(name = "Socket", submitJob = submitJob, listJobsRunning = listJobsRunning,
store.job = FALSE, hooks = list(pre.sync = function(reg, fns) p$list()))
store.job = FALSE, fs.latency = fs.latency, hooks = list(pre.sync = function(reg, fns) p$list()))
}
5 changes: 3 additions & 2 deletions R/clusterFunctionsTORQUE.R
Expand Up @@ -20,7 +20,7 @@
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsTORQUE = function(template = findTemplateFile("torque"), scheduler.delay = 1) { # nocov start
makeClusterFunctionsTORQUE = function(template = findTemplateFile("torque"), scheduler.latency = 1, fs.latency = 65) { # nocov start
template = cfReadBrewTemplate(template, "##")

submitJob = function(reg, jc) {
Expand Down Expand Up @@ -66,5 +66,6 @@ makeClusterFunctionsTORQUE = function(template = findTemplateFile("torque"), sch
}

makeClusterFunctions(name = "TORQUE", submitJob = submitJob, killJob = killJob, listJobsQueued = listJobsQueued,
listJobsRunning = listJobsRunning, array.var = "PBS_ARRAYID", store.job = TRUE, scheduler.delay = scheduler.delay)
listJobsRunning = listJobsRunning, array.var = "PBS_ARRAYID", store.job = TRUE,
scheduler.latency = scheduler.latency, fs.latency = fs.latency)
} # nocov end
10 changes: 5 additions & 5 deletions R/filenames.R
@@ -1,19 +1,19 @@
getResultPath = function(reg = getDefaultRegistry()) {
getResultPath = function(reg) {
file.path(reg$file.dir, "results")
}
getLogPath = function(reg = getDefaultRegistry()) {
getLogPath = function(reg) {
file.path(reg$file.dir, "logs")
}

getJobPath = function(reg = getDefaultRegistry()) {
getJobPath = function(reg) {
file.path(reg$file.dir, "jobs")
}

getUpdatePath = function(reg = getDefaultRegistry()) {
getUpdatePath = function(reg) {
file.path(reg$file.dir, "updates")
}

getExternalPath = function(reg = getDefaultRegistry()) {
getExternalPath = function(reg) {
file.path(reg$file.dir, "external")
}

Expand Down
5 changes: 5 additions & 0 deletions R/helpers.R
Expand Up @@ -159,3 +159,8 @@ with_seed = function(seed, expr) {
}
eval.parent(expr)
}

chsetdiff = function(x, y) {
# Note: assumes that x has no duplicates
x[chmatch(x, y, 0L) == 0L]
}
2 changes: 1 addition & 1 deletion R/killJobs.R
Expand Up @@ -54,7 +54,7 @@ killJobs = function(ids = NULL, reg = getDefaultRegistry()) {
saveRegistry(reg)

tab = setkeyv(tab[, c("job.id", "batch.id", "killed")], "job.id")
Sys.sleep(reg$cluster.functions$scheduler.delay)
Sys.sleep(reg$cluster.functions$scheduler.latency)
runHook(reg, "post.kill", tab)
return(tab)
}
19 changes: 4 additions & 15 deletions R/loadResult.R
Expand Up @@ -4,26 +4,15 @@
#' Loads the result of a single job.
#'
#' @template id
#' @template missing.val
#' @template reg
#' @return [\code{ANY}]. The saved result or \code{missing.val} if result file
#' is not found.
#' @return [\code{ANY}]. The stored result.
#' @family Results
#' @export
loadResult = function(id, missing.val, reg = getDefaultRegistry()) {
loadResult = function(id, reg = getDefaultRegistry()) {
assertRegistry(reg)
id = convertId(reg, id)
if (missing(missing.val) && nrow(.findDone(reg, id)) == 0L)
if (nrow(.findDone(reg, id)) == 0L)
stopf("Job with id %i not terminated", id$job.id)
.loadResult(reg$file.dir, id$job.id, missing.val)
}

.loadResult = function(file.dir, id, missing.val) {
fn = getResultFiles(file.dir, id)
if (!file.exists(fn)) {
if (missing(missing.val))
stopf("Result for job with id=%i not found in %s", id, fn)
return(missing.val)
}
fn = getResultFiles(reg$file.dir, id$job.id)
return(readRDS(fn))
}

0 comments on commit a113567

Please sign in to comment.