Skip to content

Commit

Permalink
Shiny-related improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau committed Mar 14, 2024
1 parent ce39cda commit c572f22
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 15 deletions.
6 changes: 4 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# crew 0.9.0.9000 (development)

# crew 0.9.0.9001 (development)

* Clarify the intent of `controller$promise(mode = "one")` in the vignette on promises (@jcheng5).
* Implement an `error` argument in `pop()` which may help with integration with `ExtendedTask` (@jcheng5).
* Handle task errors in the Shiny vignette (@jcheng5).

# crew 0.9.0

Expand Down
20 changes: 18 additions & 2 deletions R/crew_controller.R
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ crew_class_controller <- R6::R6Class(
#' then `iterate[[names]]` must be a character vector.
#' @param save_command Logical of length 1, whether to store
#' a text string version of the R command in the output.
#' @param error Character vector of length 1, choice of action if
#' @param error Character of length 1, choice of action if
#' a task has an error. Possible values:
#' * `"stop"`: throw an error in the main R session instead of returning
#' a value. In case of an error, the results from the last errored
Expand Down Expand Up @@ -785,6 +785,7 @@ crew_class_controller <- R6::R6Class(
message = "cannot map() until all prior tasks are completed and popped"
)
crew_assert(substitute, isTRUE(.) || isFALSE(.))
crew_assert(error %in% c("stop", "warn", "silent"))
if (substitute) {
command <- substitute(command)
}
Expand Down Expand Up @@ -964,12 +965,19 @@ crew_class_controller <- R6::R6Class(
#' within the last `seconds_interval` seconds. `FALSE` to auto-scale
#' every time `scale()` is called. Throttling avoids
#' overburdening the `mirai` dispatcher and other resources.
#' @param error Character of length 1, choice of action if
#' the popped task threw an error. Possible values:
#' * `"stop"`: throw an error in the main R session instead of returning
#' a value.
#' * `"warn"`: throw a warning.
#' * `"silent"`: do nothing special.
#' @param controllers Not used. Included to ensure the signature is
#' compatible with the analogous method of controller groups.
pop = function(
scale = TRUE,
collect = NULL,
throttle = TRUE,
error = "silent",
controllers = NULL
) {
crew_deprecate(
Expand Down Expand Up @@ -1030,16 +1038,24 @@ crew_class_controller <- R6::R6Class(
return(out)
}
# nocov end
error_message <- .subset2(out, "error")
on.exit({
index <- .subset2(out, "worker")
private$.log$tasks[index] <- .subset2(log, "tasks")[index] + 1L
private$.log$seconds[index] <- .subset2(log, "seconds")[index] +
.subset2(out, "seconds")
private$.log$errors[index] <- .subset2(log, "errors")[index] +
!anyNA(.subset2(out, "error"))
!anyNA(error_message)
private$.log$warnings[index] <- .subset2(log, "warnings")[index] +
!anyNA(.subset2(out, "warnings"))
}, add = TRUE)
if (!anyNA(error_message)) {
if (identical(error, "stop")) {
crew_error(message = error_message)
} else if (identical(error, "warn")) {
crew_warning(message = error_message)
}
}
out
},
#' @description Pop all available task results and return them in a tidy
Expand Down
13 changes: 12 additions & 1 deletion R/crew_controller_group.R
Original file line number Diff line number Diff line change
Expand Up @@ -673,17 +673,28 @@ crew_class_controller_group <- R6::R6Class(
#' within the last `seconds_interval` seconds. `FALSE` to auto-scale
#' every time `scale()` is called. Throttling avoids
#' overburdening the `mirai` dispatcher and other resources.
#' @param error Character of length 1, choice of action if
#' the popped task threw an error. Possible values:
#' * `"stop"`: throw an error in the main R session instead of returning
#' a value.
#' * `"warn"`: throw a warning.
#' * `"silent"`: do nothing special.
#' @param controllers Character vector of controller names.
#' Set to `NULL` to select all controllers.
pop = function(
scale = TRUE,
collect = NULL,
throttle = TRUE,
error = "silent",
controllers = NULL
) {
control <- private$.select_controllers(controllers)
for (controller in control) {
out <- controller$pop(scale = scale, throttle = throttle)
out <- controller$pop(
scale = scale,
throttle = throttle,
error = error
)
if (!is.null(out)) {
return(out)
}
Expand Down
12 changes: 11 additions & 1 deletion man/crew_class_controller.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions man/crew_class_controller_group.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions tests/testthat/test-crew_controller_group.R
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,47 @@ crew_test("crew_controller_group()", {
}
})

crew_test("crew_controller_group() can relay task errors as local errors", {
skip_on_cran()
skip_on_os("windows")
a <- crew_controller_local(
seconds_idle = 360
)
x <- crew_controller_group(a)
on.exit({
x$terminate()
rm(x)
gc()
crew_test_sleep()
})
x$start()
x$push(command = stop("this is an error"), name = "warnings_and_errors")
x$wait(seconds_timeout = 5)
expect_crew_error(x$pop(scale = FALSE, error = "stop"))
})

crew_test("crew_controller_group() can relay task errors as local warnings", {
skip_on_cran()
skip_on_os("windows")
a <- crew_controller_local(
seconds_idle = 360
)
x <- crew_controller_group(a)
on.exit({
x$terminate()
rm(x)
gc()
crew_test_sleep()
})
x$start()
x$push(command = stop("this is an error"), name = "warnings_and_errors")
x$wait(seconds_timeout = 5)
expect_warning(
x$pop(scale = FALSE, error = "warn"),
class = "crew_warning"
)
})

crew_test("crew_controller_group() select", {
skip_on_cran()
skip_on_os("windows")
Expand Down
39 changes: 39 additions & 0 deletions tests/testthat/test-crew_controller_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,45 @@ crew_test("crew_controller_local() warnings and errors", {
)
})

crew_test("crew_controller_local() can relay task errors as local errors", {
skip_on_cran()
skip_on_os("windows")
x <- crew_controller_local(
seconds_idle = 360
)
on.exit({
x$terminate()
rm(x)
gc()
crew_test_sleep()
})
x$start()
x$push(command = stop("this is an error"), name = "warnings_and_errors")
x$wait(seconds_timeout = 5)
expect_crew_error(x$pop(scale = FALSE, error = "stop"))
})

crew_test("crew_controller_local() can relay task errors as local warnings", {
skip_on_cran()
skip_on_os("windows")
x <- crew_controller_local(
seconds_idle = 360
)
on.exit({
x$terminate()
rm(x)
gc()
crew_test_sleep()
})
x$start()
x$push(command = stop("this is an error"), name = "warnings_and_errors")
x$wait(seconds_timeout = 5)
expect_warning(
x$pop(scale = FALSE, error = "warn"),
class = "crew_warning"
)
})

crew_test("crew_controller_local() can terminate a lost worker", {
skip_on_cran()
skip_on_os("windows")
Expand Down
28 changes: 19 additions & 9 deletions vignettes/shiny.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,21 @@ Every time the user presses the "Submit a task (5 seconds)" button, the app push
})
```

At the bottom of `server()` is the crux of the app: the event loop that scans for results. The leading `req(reactive_poll())` ensures the loop only runs when results could come in, and the `reactive_poll(controller$nonempty())` at the end turns off polling when everything is all done. `invalidateLater(millis = 100)` says to poll every 0.1 seconds when polling is activated.^[`controller$pop()` is inexpensive, especially when there are no workers to relaunch or results to collect. In practice, the polling interval is a balance between responsiveness and CPU usage, and it may not be 0.1 seconds for every app.] The other three lines collect a result, plot it if available, and update the status message.
At the bottom of `server()` is the crux of the app: the event loop that scans for results. The leading `req(reactive_poll())` ensures the loop only runs when results could come in, and `invalidateLater(millis = 100)` says to poll every 0.1 seconds when polling is activated.^[`controller$pop()` is inexpensive, especially when there are no workers to relaunch or results to collect. In practice, the polling interval is a balance between responsiveness and CPU usage, and it may not be 0.1 seconds for every app.] The `if()` statement decides what to do if the task threw an error. If the task succeeded, then the app shows the plot, updates the status, and resumes polling if needed. If the task failed, then the app reports the error message of the task as the status, and polling stops.

```r
observe({
req(reactive_poll())
invalidateLater(millis = 100)
result <- controller$pop()$result
if (!is.null(result)) reactive_result(result[[1]])
reactive_status(status_message(n = length(controller$tasks)))
reactive_poll(controller$nonempty())
output <- controller$pop()
if (anyNA(output$error)) { # Task succeeded.
reactive_result(output$result[[1]])
reactive_status(status_message(n = length(controller$tasks)))
reactive_poll(controller$nonempty())
} else if (!is.null(output)) { # Task threw an error.
reactive_status(paste("Task error:", output$error))
reactive_poll(FALSE)
}
})
}
```
Expand Down Expand Up @@ -210,10 +215,15 @@ server <- function(input, output, session) {
observe({
req(reactive_poll())
invalidateLater(millis = 100)
result <- controller$pop()$result
if (!is.null(result)) reactive_result(result[[1]])
reactive_status(status_message(n = length(controller$tasks)))
reactive_poll(controller$nonempty())
output <- controller$pop()
if (anyNA(output$error)) { # Task succeeded.
reactive_result(output$result[[1]])
reactive_status(status_message(n = length(controller$tasks)))
reactive_poll(controller$nonempty())
} else if (!is.null(output)) { # Task threw an error.
reactive_status(paste("Task error:", output$error))
reactive_poll(FALSE)
}
})
}

Expand Down

0 comments on commit c572f22

Please sign in to comment.