Skip to content

Commit

Permalink
Merge pull request #1267 from ropensci/1266
Browse files Browse the repository at this point in the history
Marshalling/branching patch
  • Loading branch information
wlandau committed Apr 14, 2024
2 parents e9a582a + 4e655c5 commit 751281e
Show file tree
Hide file tree
Showing 13 changed files with 75 additions and 7 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Description: Pipeline tools coordinate the pieces of computationally
The methodology in this package
borrows from GNU 'Make' (2015, ISBN:978-9881443519)
and 'drake' (2018, <doi:10.21105/joss.00550>).
Version: 1.6.0.9002
Version: 1.6.0.9003
License: MIT + file LICENSE
URL: https://docs.ropensci.org/targets/, https://github.com/ropensci/targets
BugReports: https://github.com/ropensci/targets/issues
Expand Down
4 changes: 4 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ S3method(target_get_type_cli,tar_stem)
S3method(target_is_branchable,default)
S3method(target_is_branchable,tar_pattern)
S3method(target_is_branchable,tar_stem)
S3method(target_marshal_value,default)
S3method(target_marshal_value,tar_pattern)
S3method(target_needs_worker,default)
S3method(target_needs_worker,tar_builder)
S3method(target_needs_worker,tar_pattern)
Expand Down Expand Up @@ -277,6 +279,8 @@ S3method(target_skip,tar_pattern)
S3method(target_skip,tar_stem)
S3method(target_sync_file_meta,default)
S3method(target_sync_file_meta,tar_builder)
S3method(target_unmarshal_value,default)
S3method(target_unmarshal_value,tar_pattern)
S3method(target_update_depend,tar_builder)
S3method(target_update_depend,tar_pattern)
S3method(target_validate,tar_branch)
Expand Down
4 changes: 3 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# targets 1.6.0.9002
# targets 1.6.0.9003

## Invalidating changes

* Use `secretbase::siphash13()` instead of `digest(algo = "xxhash64", serializationVersion = 3)` so hashes of in-memory objects no longer depend on serialization version 3 headers (#1244, @shikokuchuo). Unfortunately, pipelines built with earlier versions of `targets` will need to rerun.

## Other improvements

* Ensure patterns marshal properly (#1266, #1264, https://github.com/njtierney/geotargets/issues/52, @Aariq, @njtierney).
* Inform and prompt the user when the pipeline was built with an old version of `targets` and changes to the package will cause the current work to rerun (#1244). For the `tar_make*()` functions, `utils::menu()` prompts the user to give people a chance to downgrade if necessary.
* For type safety in the internal database class, read all columns as character vectors in `data.table::fread()`, then convert them to the correct types afterwards.
* Add a new `tar_resources_custom_format()` function which can pass environment variables to customize the behavior of custom `tar_format()` storage formats (#1263, #1232, @Aariq, @noamross).
* Only marshal dependencies if actually sending the target to a parallel worker.

# targets 1.6.0

Expand Down
11 changes: 9 additions & 2 deletions R/class_builder.R
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ target_prepare.tar_builder <- function(
)
builder_ensure_deps(target, pipeline, "main")
builder_update_subpipeline(target, pipeline)
builder_marshal_subpipeline(target)
}

# nocov start
Expand Down Expand Up @@ -127,7 +126,6 @@ target_run.tar_builder <- function(target, envir, path_store) {
target$subpipeline <- NULL
})
target_gc(target)
builder_unmarshal_subpipeline(target)
builder_ensure_deps(target, target$subpipeline, "worker")
frames <- frames_produce(envir, target, target$subpipeline)
builder_set_tar_runtime(target, frames, path_store)
Expand All @@ -154,6 +152,7 @@ target_run_worker.tar_builder <- function(
tar_runtime$fun <- fun
tar_options$import(options)
set_envvars(envvars)
builder_unmarshal_subpipeline(target)
target_run(target, envir, path_store)
builder_marshal_value(target)
target
Expand Down Expand Up @@ -316,6 +315,14 @@ builder_unmarshal_subpipeline <- function(target) {
if (!is.null(subpipeline) && identical(retrieval, "main")) {
pipeline_unmarshal_values(target$subpipeline)
}
patterns <- fltr(
names(subpipeline$targets),
~inherits(pipeline_get_target(subpipeline, .x), "tar_pattern")
)
map(
setdiff(patterns, target$settings$dimensions),
~target_ensure_value(pipeline_get_target(subpipeline, .x), subpipeline)
)
}

builder_handle_warnings <- function(target, scheduler) {
Expand Down
1 change: 1 addition & 0 deletions R/class_clustermq.R
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ clustermq_class <- R6::R6Class(
length(need_workers) > 0L
},
run_worker = function(target) {
builder_marshal_subpipeline(target)
if (self$garbage_collection) {
gc()
}
Expand Down
1 change: 1 addition & 0 deletions R/class_crew.R
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ crew_class <- R6::R6Class(
meta = self$meta,
pending = FALSE
)
builder_marshal_subpipeline(target)
self$sync_meta_time()
self$controller$push(
command = command,
Expand Down
1 change: 1 addition & 0 deletions R/class_future.R
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ future_class <- R6::R6Class(
self$worker_list <- memory_init()
},
run_worker = function(target) {
builder_marshal_subpipeline(target)
if (self$garbage_collection) {
gc()
}
Expand Down
10 changes: 10 additions & 0 deletions R/class_pattern.R
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ target_bootstrap.tar_pattern <- function(
invisible()
}

#' @export
target_marshal_value.tar_pattern <- function(target) {
target$value <- NULL
}

#' @export
target_unmarshal_value.tar_pattern <- function(target) {
target$value <- NULL
}

#' @export
print.tar_pattern <- function(x, ...) {
cat(
Expand Down
3 changes: 2 additions & 1 deletion R/class_pipeline.R
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,9 @@ pipeline_marshal_values <- function(pipeline) {
}

pipeline_unmarshal_values <- function(pipeline) {
names <- pipeline_get_names(pipeline)
map(
pipeline_get_names(pipeline),
names,
~target_unmarshal_value(pipeline_get_target(pipeline, .x))
)
}
Expand Down
10 changes: 10 additions & 0 deletions R/class_target.R
Original file line number Diff line number Diff line change
Expand Up @@ -413,12 +413,22 @@ target_sync_file_meta.default <- function(target, meta) {
}

target_marshal_value <- function(target) {
UseMethod("target_marshal_value")
}

#' @export
target_marshal_value.default <- function(target) {
if (!is.null(target$value)) {
store_marshal_value(target$store, target)
}
}

target_unmarshal_value <- function(target) {
UseMethod("target_unmarshal_value")
}

#' @export
target_unmarshal_value.default <- function(target) {
if (!is.null(target$value)) {
store_unmarshal_value(target$store, target)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/hpc/sge_batchtools.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
#$ -V
#$ -N <%= job.name %>
#$ -pe smp <%= if (is.null(resources$slots)) 1 else resources$slots %>
module load R/4.2.2
module load R/4.3.2
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'
exit 0
2 changes: 1 addition & 1 deletion tests/hpc/sge_clustermq.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@
#$ -cwd
#$ -V
#$ -pe smp {{ cores | 1 }}
module load R/4.2.2
module load R/4.3.2
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
31 changes: 31 additions & 0 deletions tests/testthat/test-tar_format.R
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,34 @@ tar_test("custom format envvar resources", {
expect_equal(tar_read(target_name), data.frame(x = 1L))
expect_equal(Sys.getenv("SERIALIZATION", unset = ""), "")
})

tar_test("patterns are marshaled correctly", {
skip_cran()
skip_on_os("windows")
skip_on_os("solaris")
skip_if_not_installed("crew", minimum_version = "0.9.0")
skip_if_not_installed("torch")
on.exit(crew_test_sleep())
tar_script({
tar_option_set(controller = crew::crew_controller_local())
list(
tar_target(x, c(1L, 2L)),
tar_target(
y,
torch::torch_tensor(x),
pattern = map(x),
format = "torch",
iteration = "list"
),
tar_target(
z,
y[[1]] + y[[2]],
format = "torch"
)
)
})
tar_make(callr_function = NULL)
tar_load(z)
expect_s3_class(z, "torch_tensor")
expect_true(identical(as.integer(z), 3L))
})

0 comments on commit 751281e

Please sign in to comment.