Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mrc-4177 allow passing of dependencies to task queue #31

Merged
merged 12 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
steps:
- uses: actions/checkout@v2

- uses: r-lib/actions/setup-r@v1
- uses: r-lib/actions/setup-r@v2

- uses: r-lib/actions/setup-pandoc@v1

Expand Down
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: queuer
Title: Queue Tasks
Version: 0.4.0
Version: 0.5.0
Authors@R: c(person("Rich", "FitzJohn", role = c("aut", "cre"),
email = "rich.fitzjohn@gmail.com"),
person("Imperial College of Science, Technology and Medicine",
Expand All @@ -13,7 +13,7 @@ Depends:
R (>= 3.2.2)
Imports:
R6,
context (>= 0.4.0),
context (>= 0.5.0),
ids,
lazyeval,
progress (>= 1.1.1)
Expand Down
13 changes: 7 additions & 6 deletions R/bulk.R
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
qlapply <- function(obj, private, X, FUN, ...,
envir = parent.frame(),
timeout = 0, time_poll = 1, progress = NULL,
name = NULL, overwrite = FALSE) {
name = NULL, overwrite = FALSE, depends_on = NULL) {
## TODO: The dots here are going to cause grief at some point. I
## may need a more robust way of passing additional arguments in,
## but not sure what that looks like...
enqueue_bulk(obj, private, X, FUN, ..., do_call = FALSE,
timeout = timeout, time_poll = time_poll,
progress = progress, name = name,
envir = envir, overwrite = overwrite)
envir = envir, overwrite = overwrite, depends_on = depends_on)
}

## A downside of the current treatment of dots is there are quite a
Expand All @@ -23,10 +23,11 @@ enqueue_bulk <- function(obj, private, X, FUN, ..., do_call = TRUE,
envir = parent.frame(),
timeout = 0, time_poll = 1, progress = NULL,
name = NULL, use_names = TRUE,
overwrite = FALSE) {
overwrite = FALSE, depends_on = NULL) {

obj <- enqueue_bulk_submit(obj, private, X, FUN, ..., do_call = do_call,
envir = envir, progress = progress, name = name,
use_names = use_names, overwrite = overwrite)
use_names = use_names, overwrite = overwrite, depends_on = depends_on)
if (timeout > 0) {
## TODO: this is possibly going to change as interrupt changes in
## current R-devel (as of 3.3.x)
Expand All @@ -41,7 +42,7 @@ enqueue_bulk_submit <- function(obj, private, X, FUN, ..., DOTS = NULL,
do_call = FALSE,
envir = parent.frame(), progress = NULL,
name = NULL, use_names = TRUE,
overwrite = FALSE) {
overwrite = FALSE, depends_on = NULL) {
## TODO: If I push this to *only* be a method, then the assertion is
## not needed.
if (!inherits(obj, "queue_base")) {
Expand All @@ -61,7 +62,7 @@ enqueue_bulk_submit <- function(obj, private, X, FUN, ..., DOTS = NULL,
DOTS <- lapply(lazyeval::lazy_dots(...), "[[", "expr")
}
ids <- context::bulk_task_save(X, FUN, obj$context, DOTS,
do_call, use_names, envir)
do_call, use_names, envir, depends_on)

message(sprintf("submitting %s tasks", length(ids)))
private$submit_or_delete(ids, names(ids))
Expand Down
42 changes: 30 additions & 12 deletions R/queue_base.R
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,13 @@ queue_base <- R6::R6Class(
##'
##' @param submit Logical indicating if the task should be submitted
##'
##' @param depends_on Optional vector of task ids to depend on
##'
##' @param name Optional name for the task
enqueue = function(expr, envir = parent.frame(), submit = TRUE,
name = NULL) {
name = NULL, depends_on = NULL) {
## TODO: when is submit = FALSE wanted?
self$enqueue_(substitute(expr), envir, submit, name)
self$enqueue_(substitute(expr), envir, submit, name, depends_on)
},

##' @description Queue a task
Expand All @@ -199,10 +201,13 @@ queue_base <- R6::R6Class(
##' @param submit Logical indicating if the task should be submitted
##'
##' @param name Optional name for the task
##'
##' @param depends_on Optional vector of task ids to depend on
##'
enqueue_ = function(expr, envir = parent.frame(), submit = TRUE,
name = NULL) {
name = NULL, depends_on = NULL) {
self$initialize_context()
task_id <- context::task_save(expr, self$context, envir)
task_id <- context::task_save(expr, self$context, envir, depends_on = depends_on)
if (submit) {
private$submit_or_delete(task_id, name)
}
Expand Down Expand Up @@ -245,15 +250,18 @@ queue_base <- R6::R6Class(
##'
##' @param name Optional name for a created bundle
##'
##' @param depends_on Optional task ids to depend on (see
##' [context::bulk_task_save()]).
##'
##' @param overwrite Logical, indicating if we should overwrite any
##' bundle that exists with name `name`.
enqueue_bulk = function(X, FUN, ..., do_call = TRUE,
envir = parent.frame(),
timeout = 0, time_poll = 1, progress = NULL,
name = NULL, overwrite = FALSE) {
name = NULL, overwrite = FALSE, depends_on = NULL) {
enqueue_bulk(self, private, X, FUN, ..., do_call = do_call, envir = envir,
timeout = timeout, time_poll = time_poll,
progress = progress, name = name, overwrite = overwrite)
progress = progress, name = name, overwrite = overwrite, depends_on = depends_on)
},

##' @description Apply a function over a list of data. This is
Expand Down Expand Up @@ -282,12 +290,15 @@ queue_base <- R6::R6Class(
##'
##' @param overwrite Logical, indicating if we should overwrite any
##' bundle that exists with name `name`.
##'
##' @param depends_on Optional task ids to depend on (see
##' [context::bulk_task_save()]).
lapply = function(X, FUN, ..., envir = parent.frame(),
timeout = 0, time_poll = 1, progress = NULL,
name = NULL, overwrite = FALSE) {
name = NULL, overwrite = FALSE, depends_on = NULL) {
qlapply(self, private, X, FUN, ..., envir = envir,
timeout = timeout, time_poll = time_poll,
progress = progress, name = name, overwrite = overwrite)
progress = progress, name = name, overwrite = overwrite, depends_on = depends_on)
},

##' @description A wrapper like `mapply`
Expand Down Expand Up @@ -330,17 +341,20 @@ queue_base <- R6::R6Class(
##' bundle that exists with name `name`.
##'
##' @param use_names Use names
##'
##' @param depends_on Optional task ids to depend on (see
##' [context::bulk_task_save()]).
mapply = function(FUN, ..., MoreArgs = NULL,
envir = parent.frame(), timeout = 0,
time_poll = 1, progress = NULL, name = NULL,
use_names = TRUE, overwrite = FALSE) {
use_names = TRUE, overwrite = FALSE, depends_on = NULL) {
## TODO: consider deleting
X <- mapply_X(...)
self$enqueue_bulk(X, FUN, DOTS = MoreArgs, do_call = TRUE,
envir = envir, timeout = timeout,
time_poll = time_poll, progress = progress,
name = name, use_names = use_names,
overwrite = overwrite)
overwrite = overwrite, depends_on = depends_on)
},

##' @description Submit a task into a queue. This is a stub
Expand All @@ -350,7 +364,10 @@ queue_base <- R6::R6Class(
##' @param task_ids Vector of tasks to submit
##'
##' @param names Optional vector of names of tasks
submit = function(task_ids, names = NULL) {
##'
##' @param depends_on Optional named list of task ids to vectors of
##' dependencies, e.g. list("t3" = c("t", "t1"), "t4" = "t)
submit = function(task_ids, names = NULL, depends_on = NULL) {
},

##' @description Unsubmit a task from the queue. This is a stub
Expand All @@ -367,11 +384,12 @@ queue_base <- R6::R6Class(
db = NULL,

submit_or_delete = function(task_ids, name = NULL) {
dependencies <- context::task_deps(task_ids, private$db, named = TRUE)
delete_these_tasks <- function(e) {
message("Deleting task as submission failed")
context::task_delete(task_ids, private$root)
}
withCallingHandlers(self$submit(task_ids, name),
withCallingHandlers(self$submit(task_ids, name, dependencies),
error = delete_these_tasks)
}
))
2 changes: 1 addition & 1 deletion R/queue_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ queue_local <- R6::R6Class(

## Ordinarily there would be two objects created; one worker and
## one queue. but for the local queue these are the same.
submit = function(task_ids, names = NULL) {
submit = function(task_ids, names = NULL, depends_on = NULL) {
if (!is.null(self$log_path)) {
private$db$mset(task_ids, self$log_path, "log_path")
}
Expand Down
62 changes: 56 additions & 6 deletions man/queue_base.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions tests/testthat/test-bulk.R
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,25 @@ test_that("$enqueue_bulk", {
expect_equal(bundle$function_name(), "base::I")
})

test_that("enqueue_bulk with dependencies", {
ctx <- context::context_save(tempfile(), storage_type = "environment")
ctx <- context::context_load(ctx, new.env(parent = .GlobalEnv))
obj <- queue_base$new(ctx)
t <- obj$enqueue(sin(1))

expect_error(obj$enqueue_bulk(1:3, quote(I), depends_on = "123"),
"Failed to save as dependency 123 does not exist")

expect_error(obj$enqueue_bulk(1:3, quote(I), depends_on = list(t$id)),
"'depends_on' must either be a vector or a list of length 3")

t2 <- obj$enqueue_bulk(1:3, quote(I), depends_on = t$id)
t3 <- obj$enqueue_bulk(1:3, quote(I), depends_on = list(t$id, t2$id, list(t$id, t2$id)))

expect_equal(context::task_deps(t2$ids, ctx), rep(list(t$id), 3))
expect_equal(context::task_deps(t3$ids, ctx), list(t$id, t2$id, list(t$id, t2$id)))
})

test_that("exotic functions", {
skip_if_not_using_local_queue()
Sys.setenv(R_TESTS = "")
Expand Down
Loading