Skip to content

Commit

Permalink
Handle FS latency (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
mllg committed Feb 13, 2017
1 parent 4791e87 commit 46e2bfe
Show file tree
Hide file tree
Showing 31 changed files with 183 additions and 60 deletions.
16 changes: 11 additions & 5 deletions R/clusterFunctions.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@
#' @param array.var [\code{character(1)}]\cr
#' Name of the environment variable set by the scheduler to identify IDs of job arrays.
#' Default is \code{NA} for no array support.
#' @param scheduler.delay [\code{numeric(1)}]\cr
#' Time to sleep after important interactions with the scheduler to ensure a sane state.
#' Currently only user after \code{\link{submitJobs}}.
#' @param store.job [\code{logical(1)}]\cr
#' Flag to indicate that the cluster function implementation of \code{submitJob} can not directly handle \code{\link{JobCollection}} objects.
#' If set to \code{FALSE}, the \code{\link{JobCollection}} is serialized to the file system before submitting the job.
#' @param scheduler.latency [\code{numeric(1)}]\cr
#' Time to sleep after important interactions with the scheduler to ensure a sane state.
#' Currently only triggered after calling \code{\link{submitJobs}}.
#' @param fs.latency [\code{numeric(1)}]\cr
#' Expected maximum latency of the file system, in seconds.
#' Set to a positive number for network file systems like NFS which enables more robust (but also more expensive) mechanisms to
#' access files and directories.
#' Usually safe to set to \code{NA} which disables the expensive heuristic if you are working on a local file system.
#' @param hooks [\code{list}]\cr
#' Named list of functions which will we called on certain events like \dQuote{pre.submit} or \dQuote{post.sync}.
#' See \link{Hooks}.
Expand All @@ -45,7 +50,7 @@
#' @family ClusterFunctions
#' @family ClusterFunctionsHelper
makeClusterFunctions = function(name, submitJob, killJob = NULL, listJobsQueued = NULL, listJobsRunning = NULL,
array.var = NA_character_, store.job = FALSE, scheduler.delay = 0, hooks = list()) {
array.var = NA_character_, store.job = FALSE, scheduler.latency = 0, fs.latency = NA_real_, hooks = list()) {
assertList(hooks, types = "function", names = "unique")
assertSubset(names(hooks), unlist(batchtools$hooks, use.names = FALSE))

Expand All @@ -57,7 +62,8 @@ makeClusterFunctions = function(name, submitJob, killJob = NULL, listJobsQueued
listJobsRunning = assertFunction(listJobsRunning, "reg", null.ok = TRUE),
array.var = assertString(array.var, na.ok = TRUE),
store.job = assertFlag(store.job),
scheduler.delay = assertNumber(scheduler.delay, lower = 0),
scheduler.latency = assertNumber(scheduler.latency, lower = 0),
fs.latency = assertNumber(fs.latency, lower = 0, na.ok = TRUE),
hooks = hooks),
"ClusterFunctions")
}
Expand Down
5 changes: 3 additions & 2 deletions R/clusterFunctionsDocker.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsDocker = function(image, docker.args = character(0L), image.args = character(0L), scheduler.delay = 1) { # nocov start
makeClusterFunctionsDocker = function(image, docker.args = character(0L), image.args = character(0L), scheduler.latency = 1, fs.latency = 65) { # nocov start
assertString(image)
assertCharacter(docker.args, any.missing = FALSE)
assertCharacter(image.args, any.missing = FALSE)
Expand Down Expand Up @@ -92,5 +92,6 @@ makeClusterFunctionsDocker = function(image, docker.args = character(0L), image.
}

makeClusterFunctions(name = "Docker", submitJob = submitJob, killJob = killJob, listJobsRunning = listJobsRunning,
store.job = TRUE, scheduler.delay = scheduler.delay, hooks = list(pre.submit.job = housekeeping, post.sync = housekeeping))
store.job = TRUE, scheduler.latency = scheduler.latency, fs.latency = fs.latency,
hooks = list(pre.submit.job = housekeeping, post.sync = housekeeping))
} # nocov end
5 changes: 3 additions & 2 deletions R/clusterFunctionsInteractive.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
#' Sink the output to log files. Turning logging off can increase the speed of
#' calculations but makes it very difficult to debug.
#' Default is \code{TRUE}.
#' @inheritParams makeClusterFunctions
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsInteractive = function(external = FALSE, write.logs = TRUE) {
makeClusterFunctionsInteractive = function(external = FALSE, write.logs = TRUE, fs.latency = NA_real_) {
assertFlag(external)
assertFlag(write.logs)

Expand All @@ -35,5 +36,5 @@ makeClusterFunctionsInteractive = function(external = FALSE, write.logs = TRUE)
}
}

makeClusterFunctions(name = "Interactive", submitJob = submitJob, store.job = external)
makeClusterFunctions(name = "Interactive", submitJob = submitJob, store.job = external, fs.latency = fs.latency)
}
4 changes: 2 additions & 2 deletions R/clusterFunctionsLSF.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsLSF = function(template = findTemplateFile("lsf"), scheduler.delay = 1) { # nocov start
makeClusterFunctionsLSF = function(template = findTemplateFile("lsf"), scheduler.latency = 1, fs.latency = 65) { # nocov start
template = cfReadBrewTemplate(template)

# When LSB_BJOBS_CONSISTENT_EXIT_CODE = Y, the bjobs command exits with 0 only
Expand Down Expand Up @@ -73,5 +73,5 @@ makeClusterFunctionsLSF = function(template = findTemplateFile("lsf"), scheduler
}

makeClusterFunctions(name = "LSF", submitJob = submitJob, killJob = killJob, listJobsQueued = listJobsQueued,
listJobsRunning = listJobsRunning, store.job = TRUE, scheduler.delay = scheduler.delay)
listJobsRunning = listJobsRunning, store.job = TRUE, scheduler.latency = scheduler.latency, fs.latency = fs.latency)
} # nocov end
5 changes: 3 additions & 2 deletions R/clusterFunctionsMulticore.R
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,11 @@ Multicore = R6Class("Multicore",
#' Does not work on Windows, use \code{\link{makeClusterFunctionsSocket}} instead.
#'
#' @template ncpus
#' @inheritParams makeClusterFunctions
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsMulticore = function(ncpus = NA_integer_) {
makeClusterFunctionsMulticore = function(ncpus = NA_integer_, fs.latency = NA_real_) {
if (testOS("windows"))
stop("ClusterFunctionsMulticore do not support Windows. Use makeClusterFunctionsSocket instead.")
ncpus = asCount(ncpus, na.ok = TRUE, positive = TRUE)
Expand All @@ -98,5 +99,5 @@ makeClusterFunctionsMulticore = function(ncpus = NA_integer_) {
}

makeClusterFunctions(name = "Multicore", submitJob = submitJob, listJobsRunning = listJobsRunning,
store.job = FALSE, hooks = list(pre.sync = function(reg, fns) p$collect(1)))
store.job = FALSE, fs.latency = fs.latency, hooks = list(pre.sync = function(reg, fns) p$collect(1)))
}
4 changes: 2 additions & 2 deletions R/clusterFunctionsOpenLava.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsOpenLava = function(template = findTemplateFile("openlava"), scheduler.delay = 1) { # nocov start
makeClusterFunctionsOpenLava = function(template = findTemplateFile("openlava"), scheduler.latency = 1, fs.latency = 65) { # nocov start
template = cfReadBrewTemplate(template)

# When LSB_BJOBS_CONSISTENT_EXIT_CODE = Y, the bjobs command exits with 0 only
Expand Down Expand Up @@ -73,5 +73,5 @@ makeClusterFunctionsOpenLava = function(template = findTemplateFile("openlava"),
}

makeClusterFunctions(name = "OpenLava", submitJob = submitJob, killJob = killJob, listJobsQueued = listJobsQueued,
listJobsRunning = listJobsRunning, store.job = TRUE, scheduler.delay = scheduler.delay)
listJobsRunning = listJobsRunning, store.job = TRUE, scheduler.latency = scheduler.latency, fs.latency = fs.latency)
} # nocov end
4 changes: 2 additions & 2 deletions R/clusterFunctionsSGE.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsSGE = function(template = findTemplateFile("sge"), scheduler.delay = 1) { # nocov start
makeClusterFunctionsSGE = function(template = findTemplateFile("sge"), scheduler.latency = 1, fs.latency = 65) { # nocov start
template = cfReadBrewTemplate(template)

submitJob = function(reg, jc) {
Expand Down Expand Up @@ -65,5 +65,5 @@ makeClusterFunctionsSGE = function(template = findTemplateFile("sge"), scheduler
}

makeClusterFunctions(name = "SGE", submitJob = submitJob, killJob = killJob, listJobsQueued = listJobsQueued,
listJobsRunning = listJobsRunning, store.job = TRUE, scheduler.delay = scheduler.delay)
listJobsRunning = listJobsRunning, store.job = TRUE, scheduler.latency = scheduler.latency, fs.latency = fs.latency)
} # nocov end
6 changes: 4 additions & 2 deletions R/clusterFunctionsSSH.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#'
#' @param workers [\code{list} of \code{\link{Worker}}]\cr
#' List of Workers as constructed with \code{\link{Worker}}.
#' @inheritParams makeClusterFunctions
#'
#' @note
#' If you use a custom \dQuote{.ssh/config} file, make sure your
Expand All @@ -22,7 +23,7 @@
#' # cluster functions for multicore execution on the local machine
#' makeClusterFunctionsSSH(list(Worker$new("localhost", ncpus = 2)))
#' }
makeClusterFunctionsSSH = function(workers) { # nocov start
makeClusterFunctionsSSH = function(workers, fs.latency = 65) { # nocov start
assertList(workers, types = "Worker")
nodenames = vcapply(workers, "[[", "nodename")
if (anyDuplicated(nodenames))
Expand Down Expand Up @@ -62,5 +63,6 @@ makeClusterFunctionsSSH = function(workers) { # nocov start
unlist(lapply(workers, function(w) w$list(reg)), use.names = FALSE)
}

makeClusterFunctions(name = "SSH", submitJob = submitJob, killJob = killJob, listJobsRunning = listJobsRunning, store.job = TRUE)
makeClusterFunctions(name = "SSH", submitJob = submitJob, killJob = killJob, listJobsRunning = listJobsRunning,
store.job = TRUE, fs.latency = fs.latency)
} # nocov end
5 changes: 3 additions & 2 deletions R/clusterFunctionsSlurm.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsSlurm = function(template = findTemplateFile("slurm"), clusters = NULL, array.jobs = TRUE, scheduler.delay = 1) { # nocov start
makeClusterFunctionsSlurm = function(template = findTemplateFile("slurm"), clusters = NULL, array.jobs = TRUE, scheduler.latency = 1, fs.latency = 65) { # nocov start
if (!is.null(clusters))
assertString(clusters, min.chars = 1L)
assertFlag(array.jobs)
Expand Down Expand Up @@ -100,5 +100,6 @@ makeClusterFunctionsSlurm = function(template = findTemplateFile("slurm"), clust
}

makeClusterFunctions(name = "Slurm", submitJob = submitJob, killJob = killJob, listJobsRunning = listJobsRunning,
listJobsQueued = listJobsQueued, array.var = "SLURM_ARRAY_TASK_ID", store.job = TRUE, scheduler.delay = scheduler.delay)
listJobsQueued = listJobsQueued, array.var = "SLURM_ARRAY_TASK_ID", store.job = TRUE,
scheduler.latency = scheduler.latency, fs.latency = fs.latency)
} # nocov end
5 changes: 3 additions & 2 deletions R/clusterFunctionsSocket.R
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ Socket = R6Class("Socket",
#' Jobs are spawned asynchronously using the package \pkg{snow}.
#'
#' @template ncpus
#' @inheritParams makeClusterFunctions
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsSocket = function(ncpus = NA_integer_) {
makeClusterFunctionsSocket = function(ncpus = NA_integer_, fs.latency = 65) {
assertCount(ncpus, positive = TRUE, na.ok = TRUE)
if (is.na(ncpus)) {
ncpus = max(getOption("mc.cores", parallel::detectCores()), 1L, na.rm = TRUE)
Expand All @@ -70,5 +71,5 @@ makeClusterFunctionsSocket = function(ncpus = NA_integer_) {
}

makeClusterFunctions(name = "Socket", submitJob = submitJob, listJobsRunning = listJobsRunning,
store.job = FALSE, hooks = list(pre.sync = function(reg, fns) p$list()))
store.job = FALSE, fs.latency = fs.latency, hooks = list(pre.sync = function(reg, fns) p$list()))
}
5 changes: 3 additions & 2 deletions R/clusterFunctionsTORQUE.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#' @return [\code{\link{ClusterFunctions}}].
#' @family ClusterFunctions
#' @export
makeClusterFunctionsTORQUE = function(template = findTemplateFile("torque"), scheduler.delay = 1) { # nocov start
makeClusterFunctionsTORQUE = function(template = findTemplateFile("torque"), scheduler.latency = 1, fs.latency = 65) { # nocov start
template = cfReadBrewTemplate(template, "##")

submitJob = function(reg, jc) {
Expand Down Expand Up @@ -66,5 +66,6 @@ makeClusterFunctionsTORQUE = function(template = findTemplateFile("torque"), sch
}

makeClusterFunctions(name = "TORQUE", submitJob = submitJob, killJob = killJob, listJobsQueued = listJobsQueued,
listJobsRunning = listJobsRunning, array.var = "PBS_ARRAYID", store.job = TRUE, scheduler.delay = scheduler.delay)
listJobsRunning = listJobsRunning, array.var = "PBS_ARRAYID", store.job = TRUE,
scheduler.latency = scheduler.latency, fs.latency = fs.latency)
} # nocov end
10 changes: 5 additions & 5 deletions R/filenames.R
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
getResultPath = function(reg = getDefaultRegistry()) {
getResultPath = function(reg) {
file.path(reg$file.dir, "results")
}
getLogPath = function(reg = getDefaultRegistry()) {
getLogPath = function(reg) {
file.path(reg$file.dir, "logs")
}

getJobPath = function(reg = getDefaultRegistry()) {
getJobPath = function(reg) {
file.path(reg$file.dir, "jobs")
}

getUpdatePath = function(reg = getDefaultRegistry()) {
getUpdatePath = function(reg) {
file.path(reg$file.dir, "updates")
}

getExternalPath = function(reg = getDefaultRegistry()) {
getExternalPath = function(reg) {
file.path(reg$file.dir, "external")
}

Expand Down
5 changes: 5 additions & 0 deletions R/helpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,8 @@ with_seed = function(seed, expr) {
}
eval.parent(expr)
}

chsetdiff = function(x, y) {
# Note: assumes that x has no duplicates
x[chmatch(x, y, 0L) == 0L]
}
2 changes: 1 addition & 1 deletion R/killJobs.R
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ killJobs = function(ids = NULL, reg = getDefaultRegistry()) {
saveRegistry(reg)

tab = setkeyv(tab[, c("job.id", "batch.id", "killed")], "job.id")
Sys.sleep(reg$cluster.functions$scheduler.delay)
Sys.sleep(reg$cluster.functions$scheduler.latency)
runHook(reg, "post.kill", tab)
return(tab)
}
2 changes: 1 addition & 1 deletion R/submitJobs.R
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ submitJobs = function(ids = NULL, resources = list(), sleep = default.sleep, reg
pb$tick(len = 1, tokens = list(status = "Submitting"))
}

Sys.sleep(reg$cluster.functions$scheduler.delay)
Sys.sleep(reg$cluster.functions$scheduler.latency)
runHook(reg, "post.submit")

# return ids, registry is saved via on.exit()
Expand Down
1 change: 0 additions & 1 deletion R/sweepRegistry.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
sweepRegistry = function(reg = getDefaultRegistry()) {
assertRegistry(reg, writeable = TRUE, running.ok = FALSE)
"!DEBUG [sweepRegistry]: Running sweepRegistry"
chsetdiff = function(x, y) x[chmatch(x, y, 0L) == 0L] # assume x to have no duplicates

path = getResultPath(reg)
obsolete = chsetdiff(
Expand Down
33 changes: 33 additions & 0 deletions R/waitForFiles.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# use list.files() here as this seems to trick the nfs cache
# see https://github.com/mllg/batchtools/issues/85
waitForFiles = function(path, fns, timeout = NA_real_) {
if (is.na(timeout))
return(TRUE)

fns = fns[!file.exists(fns)]
if (length(fns) == 0L)
return(TRUE)

"!DEBUG [waitForFiles]: `length(fns)` files not found via 'file.exists()'"
fns = chsetdiff(fns, list.files(path))
if (length(fns) == 0L)
return(TRUE)

timeout = timeout + Sys.time()
repeat {
Sys.sleep(0.5)
fns = chsetdiff(fns, list.files(path))
if (length(fns) == 0L)
return(TRUE)
if (Sys.time() > timeout)
stopf("Timeout while waiting for %i files, e.g. '%s'", length(fns), fns[1L])
}
}

waitForResults = function(reg, ids) {
waitForFiles(
file.path(reg$file.dir, "results"),
sprintf("%i.rds", ids$job.id),
reg$cluster.functions$fs.latency
)
}
2 changes: 2 additions & 0 deletions R/waitForJobs.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ waitForJobs = function(ids = NULL, sleep = default.sleep, timeout = 604800, stop
if (nrow(ids.nt) == 0L) {
"!DEBUG [waitForJobs]: All jobs terminated"
pb$update(1)
waitForResults(reg, ids)
return(nrow(.findErrors(reg, ids)) == 0L)
}

Expand Down Expand Up @@ -85,6 +86,7 @@ waitForJobs = function(ids = NULL, sleep = default.sleep, timeout = 604800, stop
if (nrow(ids.nt[!ids.on.sys, on = "job.id"][ids.disappeared, on = "job.id", nomatch = 0L]) > 0L) {
warning("Some jobs disappeared from the system")
pb$update(1)
waitForResults(reg, ids)
return(FALSE)
}
}
Expand Down
12 changes: 9 additions & 3 deletions man/makeClusterFunctions.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions man/makeClusterFunctionsDocker.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 46e2bfe

Please sign in to comment.