Skip to content

Commit

Permalink
...
Browse files Browse the repository at this point in the history
  • Loading branch information
mllg committed Mar 4, 2020
1 parent 31a833a commit 43c68bc
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 12 deletions.
9 changes: 7 additions & 2 deletions R/Registry.R
Expand Up @@ -266,8 +266,13 @@ assertRegistry = function(reg, class = NULL, writeable = FALSE, sync = FALSE, ru
if (!running.ok && nrow(.findOnSystem(reg = reg)) > 0L)
stop("This operation is not allowed while jobs are running on the system")

if (sync && sync(reg))
saveRegistry(reg)
if (sync) {
merged = sync(reg)
if (length(merged)) {
saveRegistry(reg)
file_remove(merged)
}
}

invisible(TRUE)
}
Expand Down
3 changes: 2 additions & 1 deletion R/killJobs.R
Expand Up @@ -48,10 +48,11 @@ killJobs = function(ids = NULL, reg = getDefaultRegistry()) {
warningf("Could not kill %i jobs", sum(!tab$killed))

# reset killed jobs
sync(reg = reg)
merged = sync(reg = reg)
cols = c("submitted", "started", "done", "error", "mem.used", "resource.id", "batch.id", "log.file", "job.hash")
reg$status[tab[tab$killed], (cols) := list(NA_real_, NA_real_, NA_real_, NA_character_, NA_real_, NA_integer_, NA_character_, NA_character_, NA_character_)]
saveRegistry(reg)
file_remove(merged)

tab = setkeyv(tab[, c("job.id", "batch.id", "killed")], "job.id")
Sys.sleep(reg$cluster.functions$scheduler.latency)
Expand Down
5 changes: 4 additions & 1 deletion R/loadRegistry.R
Expand Up @@ -68,8 +68,11 @@ loadRegistry = function(file.dir, work.dir = NULL, conf.file = findConfFile(), m
if (make.default)
batchtools$default.registry = reg

if (sync(reg = reg) || updated)
merged = sync(reg = reg)
if (length(merged) || updated) {
saveRegistry(reg)
file_remove(merged)
}
return(reg)
}

Expand Down
14 changes: 7 additions & 7 deletions R/syncRegistry.R
Expand Up @@ -10,18 +10,20 @@
#' @export
syncRegistry = function(reg = getDefaultRegistry()) {
assertRegistry(reg)
altered = sync(reg)
if (altered)
merged = sync(reg)
if (length(merged)) {
saveRegistry(reg)
altered
file_remove(merged)
}
length(merged) > 0L
}


sync = function(reg) {
"!DEBUG [syncRegistry]: Triggered syncRegistry"
fns = list.files(dir(reg, "updates"), full.names = TRUE)
if (length(fns) == 0L)
return(invisible(FALSE))
return(character())

runHook(reg, "pre.sync", fns = fns)

Expand All @@ -45,10 +47,8 @@ sync = function(reg) {
if (nrow(updates) > 0L) {
expr = quote(`:=`(started = i.started, done = i.done, error = i.error, mem.used = i.mem.used))
reg$status[updates, eval(expr), on = "job.id"]
if (reg$writeable)
file_remove(fns[!failed])
}

runHook(reg, "post.sync", updates = updates)
invisible(nrow(updates) > 0L)
if (reg$writeable) fns[!failed] else character()
}
5 changes: 4 additions & 1 deletion R/waitForJobs.R
Expand Up @@ -115,7 +115,10 @@ waitForJobs = function(ids = NULL, sleep = NULL, timeout = 604800, expire.after
return(FALSE)
}

if (suppressMessages(sync(reg = reg)))
merged = suppressMessages(sync(reg = reg))
if (length(merged)) {
saveRegistry(reg)
file_remove(merged)
}
}
}

0 comments on commit 43c68bc

Please sign in to comment.