Skip to content

Commit

Permalink
Improve hpc checksums in light of #283 and #469
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau committed Jul 16, 2018
1 parent 968252c commit f2efdf4
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 35 deletions.
2 changes: 1 addition & 1 deletion NAMESPACE
Expand Up @@ -8,7 +8,6 @@ export(as_file)
export(available_hash_algos)
export(backend)
export(bind_plans)
export(build_clustermq_staged)
export(build_drake_graph)
export(build_graph)
export(build_times)
Expand All @@ -20,6 +19,7 @@ export(check)
export(check_plan)
export(clean)
export(cleaned_namespaces)
export(cmq_staged_build)
export(config)
export(configure_cache)
export(contains)
Expand Down
41 changes: 37 additions & 4 deletions R/mc_utils.R
Expand Up @@ -173,22 +173,37 @@ mc_get_checksum <- function(target, config){
safe_get_hash(key = target, namespace = "kernels", config = config),
safe_get_hash(key = target, namespace = "meta", config = config),
safe_get_hash(key = target, namespace = "attempt", config = config),
file_dependency_hash(target, config, "output_files"),
mc_output_file_checksum(target, config),
sep = " "
)
}

mc_output_file_checksum <- function(target, config){
files <- unlist(igraph::vertex_attr(
graph = config$graph,
name = "output_files",
index = target
))
vapply(
X = sort(files),
FUN = rehash_file,
FUN.VALUE = character(1),
config = config
) %>%
digest::digest(algo = config$long_hash_algo)
}

mc_is_good_checksum <- function(target, checksum, config){
stamp <- mc_get_checksum(target = target, config = config)
if (!identical(stamp, checksum)){
local_checksum <- mc_get_checksum(target = target, config = config)
if (!identical(local_checksum, checksum)){
return(FALSE)
}
if (identical("failed", get_progress_single(target, cache = config$cache))){
return(TRUE) # covered with parallel processes # nocov
}
all(
vapply(
X = unlist(strsplit(stamp, " "))[1:3], # Exclude attempt flag (often NA).
X = unlist(strsplit(local_checksum, " "))[1:3], # Exclude attempt flag (often NA). # nolint
config$cache$exists_object,
FUN.VALUE = logical(1)
)
Expand All @@ -212,6 +227,24 @@ mc_wait_checksum <- function(target, checksum, config, timeout = 300){
)
}

mc_wait_outfile_checksum <- function(target, checksum, config, timeout = 300){
i <- 0
while (i < timeout / mc_wait){
local_checksum <- mc_output_file_checksum(target, config)
if (identical(local_checksum, checksum)){
return()
} else {
Sys.sleep(mc_wait)
}
i <- i + 1
}
drake_error(
"Target `", target, "` did not download from your ",
"network file system. Checksum verification timed out after about ",
timeout, " seconds.", config = config
)
}

mc_abort_with_errored_workers <- function(config){
if (length(failed_workers <- config$cache$list("mc_error"))){
if (!identical(config$keep_going, TRUE)){
Expand Down
9 changes: 6 additions & 3 deletions R/meta.R
Expand Up @@ -101,7 +101,8 @@ dependency_hash <- function(target, config) {
file_dependency_hash <- function(
target,
config,
which = c("input_files", "output_files")
which = c("input_files", "output_files"),
size_cutoff = rehash_file_size_cutoff
){
which <- match.arg(which)
files <- unlist(igraph::vertex_attr(
Expand All @@ -113,7 +114,8 @@ file_dependency_hash <- function(
X = sort(files),
FUN = file_hash,
FUN.VALUE = character(1),
config = config
config = config,
size_cutoff = size_cutoff
) %>%
digest::digest(algo = config$long_hash_algo)
}
Expand Down Expand Up @@ -153,7 +155,8 @@ should_rehash_file <- function(filename, new_mtime, old_mtime,
do_rehash
}

file_hash <- function(target, config, size_cutoff = 1e5) {
file_hash <- function(
target, config, size_cutoff = rehash_file_size_cutoff) {
if (is_file(target)) {
filename <- drake::drake_unquote(target)
} else {
Expand Down
32 changes: 9 additions & 23 deletions R/staged.R
Expand Up @@ -213,7 +213,7 @@ run_clustermq_staged <- function(config){
fun = function(target){
# This call is actually tested in tests/testthat/test-clustermq.R.
# nocov start
drake::build_clustermq_staged(
drake::cmq_staged_build(
target = target,
meta_list = meta_list,
config = config
Expand All @@ -223,10 +223,14 @@ run_clustermq_staged <- function(config){
workers = workers,
export = export
)
tmp <- lightly_parallelize(
lightly_parallelize(
X = builds,
FUN = function(build){
wait_for_file(build = build, config = config)
mc_wait_outfile_checksum(
target = build$target,
checksum = build$checksum,
config = config
)
conclude_build(
target = build$target,
value = build$value,
Expand All @@ -246,7 +250,7 @@ run_clustermq_staged <- function(config){
#' @keywords internal
#' @inheritParams drake_build
#' @param meta_list list of metadata
build_clustermq_staged <- function(target, meta_list, config){
cmq_staged_build <- function(target, meta_list, config){
# This function is actually tested in tests/testthat/test-clustermq.R.
# nocov start
do_prework(config = config, verbose_packages = FALSE)
Expand All @@ -256,25 +260,7 @@ build_clustermq_staged <- function(target, meta_list, config){
meta = meta_list[[target]],
config = config
)
build$checksum <- file_dependency_hash(target, config, "output_files")
build$checksum <- mc_output_file_checksum(target, config)
build
# nocov end
}

wait_for_file <- function(build, config){
R.utils::withTimeout({
while (!all(file.exists(drake_unquote(build$meta$output_files)))){
Sys.sleep(mc_wait) # nocov
}
while (
!identical(
file_dependency_hash(build$target, config, "output_files"),
build$checksum
)
){
Sys.sleep(mc_wait) # nocov
}
},
timeout = 60
)
}
2 changes: 2 additions & 0 deletions R/utils.R
Expand Up @@ -27,6 +27,8 @@ is_not_file <- function(x){
!is_file(x)
}

rehash_file_size_cutoff <- 1e5

braces <- function(x) {
paste("{\n", x, "\n}")
}
Expand Down
6 changes: 3 additions & 3 deletions man/build_clustermq_staged.Rd → man/cmq_staged_build.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion tests/testthat/test-intermediate-file.R
Expand Up @@ -77,11 +77,14 @@ test_with_dir("imported file_in file", {
load_mtcars_example() # for report.Rmd
config <- drake_config(dbug_plan(), envir = envir, verbose = 4)
testrun(config)
for (file in c("report.Rmd", paste0(letters[1:2], ".rds"))){
for (file in paste0(letters[1:2], ".rds")){
saveRDS(2, file)
testrun(config)
expect_equal(sort(justbuilt(config)), sort(c("nextone", "yourinput")))
}
write("new content", file = "report.Rmd", append = TRUE)
testrun(config)
expect_equal(sort(justbuilt(config)), sort(c("nextone", "yourinput")))
saveRDS(2, "c.rds")
testrun(config)
expect_equal(sort(justbuilt(config)), sort(c(
Expand Down
7 changes: 7 additions & 0 deletions tests/testthat/test-parallel.R
Expand Up @@ -222,4 +222,11 @@ test_with_dir("checksum functionality", {
expect_error(
mc_wait_checksum(
target = "combined", checksum = bad, config = config, timeout = 0.1))
checksum <- mc_output_file_checksum(target = "combined", config = config)
expect_silent(
mc_wait_outfile_checksum(
target = "combined", checksum = checksum, config = config, timeout = 0.1))
expect_error(
mc_wait_outfile_checksum(
target = "combined", checksum = bad, config = config, timeout = 0.1))
})

0 comments on commit f2efdf4

Please sign in to comment.