Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
is now deprecated (#314).

* `req_perform_parallel()` replaces `multi_req_perform()`. `multi_req_perform()`
is now deprecated (#314).
is now deprecated (#314). `req_perform_parallel()` has a new error handling
strategy matching `req_perform_iterative()` and `req_perform_sequential()`.
It will bubble up errors by default but you can choose an alternative strategy
with the `on_error` argument.

* `oauth_flow_auth_code()` allows the user to enter a URL that contains
authorization `code` and `state` parameters (@fh-mthomson, #326).
Expand Down
6 changes: 3 additions & 3 deletions R/iterate-responses.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
#'
#' @description
#' These function provide a basic toolkit for operating with lists of
#' responses as returned [req_perform_parallel()] and
#' [req_perform_iterative()].
#' responses and possibly errors, as returned by [req_perform_parallel()],
#' [req_perform_sequential()] and [req_perform_iterative()].
#'
#' * `resps_successes()` returns a list successful responses.
#' * `resps_failures()` returns a list failed responses (i.e. errors).
Expand All @@ -23,7 +23,7 @@
#' request(example_url()) |> req_template("/status/:status", status = 404),
#' request("INVALID")
#' )
#' resps <- req_perform_parallel(reqs)
#' resps <- req_perform_parallel(reqs, on_error = "continue")
#'
#' # find successful responses
#' resps |> resps_successes()
Expand Down
37 changes: 31 additions & 6 deletions R/iterate.R
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,24 @@
#' the iteration should terminate.
#' @param max_reqs The maximum number of requests to perform. Use `Inf` to
#' perform all requests until `next_req()` returns `NULL`.
#' @param on_error What should happen if a request fails?
#'
#' * `"stop"`, the default: stop iterating with an error.
#' * `"return"`: stop iterating, returning all the successful responses so
#' far, as well as an error object for the failed request.
#' @param progress Display a progress bar? Use `TRUE` to turn on a basic
#' progress bar, use a string to give it a name, or see [progress_bars] to
#' customise it in other ways.
#' @param path Optionally, path to save the body of request. This should be
#' a glue string that uses `{i}` to distinguish different requests.
#' Useful for large responses because it avoids storing the response in
#' memory.
#' @return A list of [response()]s.
#' @return
#' A list, at most length `max_reqs`, containing [response]s and possibly one
#' error object, if `on_error` is `"return"` and one of the requests errors.
#' If present, the error object will always be the last element in the list.
#'
#' Only httr2 errors are captured; see [req_error()] for more details.
#' @export
#' @examples
#' req <- request(example_url()) |>
Expand All @@ -99,14 +109,16 @@
#' )
#' })
req_perform_iterative <- function(req,
next_req,
path = NULL,
max_reqs = 20,
progress = TRUE) {
next_req,
path = NULL,
max_reqs = 20,
on_error = c("stop", "return"),
progress = TRUE) {
check_request(req)
check_function2(next_req, args = c("resp", "req"))
check_number_whole(max_reqs, allow_infinite = TRUE, min = 1)
check_string(path, allow_empty = FALSE, allow_null = TRUE)
on_error <- arg_match(on_error)

get_path <- function(i) {
if (is.null(path)) {
Expand All @@ -127,7 +139,20 @@ req_perform_iterative <- function(req,

tryCatch({
repeat {
resps[[i]] <- resp <- req_perform(req, path = get_path(i))
httr2_error <- switch(on_error,
stop = function(cnd) zap(),
return = function(cnd) cnd
)
resp <- try_fetch(
req_perform(req, path = get_path(i)),
httr2_error = httr2_error
)
resps[[i]] <- resp

if (on_error == "return" && is_error(resp)) {
break
}

progress$update()

withCallingHandlers(
Expand Down
147 changes: 96 additions & 51 deletions R/multi-req.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,19 @@
#'
#' @description
#' This variation on [req_perform()] performs multiple requests in parallel.
#' Unlike `req_perform()` it always succeeds; it will never throw an error.
#' Instead it will return error objects, which are your responsibility to
#' handle.
#'
#' Exercise caution when using this function; it's easy to pummel a server
#' with many simultaneous requests. Only use it with hosts designed to serve
#' many files at once.
#' many files at once, which are typically web servers, not API servers.
#'
#' # Limitations
#' `req_perform_parallel()` has a few limitations:
#'
#' * Will not retrieve a new OAuth token if it expires part way through
#' the requests.
#' * Does not perform throttling with [req_throttle()].
#' * Does not attempt retries as described by [req_retry()].
#' * Consults the cache set by [req_cache()] before/after all requests.
#' * Only consults the cache set by [req_cache()] before/after all requests.
#'
#' In general, where [req_perform()] might make multiple requests due to retries
#' or OAuth failures, `req_perform_parallel()` will only make 1. If any of
#' these limitations are problematic, you may want to use
#' If any of these limitations are problematic for your use case, we recommend
#' [req_perform_sequential()] instead.
#'
#' @param reqs A list of [request]s.
Expand All @@ -29,12 +23,21 @@
#' @param pool Optionally, a curl pool made by [curl::new_pool()]. Supply
#' this if you want to override the defaults for total concurrent connections
#' (100) or concurrent connections per host (6).
#' @param cancel_on_error Should all pending requests be cancelled when you
#' hit an error. Set this to `TRUE` to stop all requests as soon as you
#' hit an error. Responses that were never performed will have class
#' `httr2_cancelled` in the result.
#' @returns A list the same length as `reqs` where each element is either a
#' [response] or an `error`.
#' @param on_error What should happen if one of the requests fails?
#'
#' * `stop`, the default: stop iterating with an error.
#' * `return`: stop iterating, returning all the successful responses
#' received so far, as well as an error object for the failed request.
#' * `continue`: continue iterating, recording errors in the result.
#' @return
#' A list, the same length as `reqs`, containing [response]s and possibly
#' error objects, if `on_error` is `"return"` or `"continue"` and one of the
#' responses errors. If `on_error` is `"return"` and it errors on the ith
#' request, the ith element of the result will be an error object, and the
#' remaining elements will be `NULL`. If `on_error` is `"continue"`, it will
#' be a mix of requests and error objects.
#'
#' Only httr2 errors are captured; see [req_error()] for more details.
#' @export
#' @examples
#' # Requesting these 4 pages one at a time would take 2 seconds:
Expand All @@ -48,74 +51,103 @@
#' # But it's much faster if you request in parallel
#' system.time(resps <- req_perform_parallel(reqs))
#'
#' # req_perform_parallel() will fail on error
#' reqs <- list(
#' request_base |> req_url_path("/status/200"),
#' request_base |> req_url_path("/status/400"),
#' request("FAILURE")
#' )
#' # req_perform_parallel() will always succeed
#' resps <- req_perform_parallel(reqs)
#' try(resps <- req_perform_parallel(reqs))
#'
#' # but can use on_error to capture all successful results
#' resps <- req_perform_parallel(reqs, on_error = "continue")
#'
#' # Inspect the successful responses
#' resps |> resps_successes()
#'
#' # And the failed responses
#' resps |> resps_failures() |> resps_requests()
req_perform_parallel <- function(reqs, paths = NULL, pool = NULL, cancel_on_error = FALSE) {
if (!is.null(paths)) {
if (length(reqs) != length(paths)) {
cli::cli_abort("If supplied, {.arg paths} must be the same length as {.arg req}.")
}
}
req_perform_parallel <- function(reqs,
paths = NULL,
pool = NULL,
on_error = c("stop", "return", "continue")) {
check_paths(paths, reqs)
on_error <- arg_match(on_error)

perfs <- vector("list", length(reqs))
for (i in seq_along(reqs)) {
perfs[[i]] <- Performance$new(req = reqs[[i]], path = paths[[i]])
perfs[[i]] <- Performance$new(
req = reqs[[i]],
path = paths[[i]],
error_call = environment()
)
perfs[[i]]$submit(pool)
}

pool_run(pool, perfs, cancel_on_error = cancel_on_error)
pool_run(pool, perfs, on_error = on_error)
map(perfs, ~ .$resp)
}


#' Perform a list of requests in parallel
#'
#' @description
#' `r lifecycle::badge("deprecated")`
#'
#' Please use [req_perform_parallel()] instead, and note:
#'
#' * `cancel_on_error = FALSE` is now `on_error = "continue"`
#' * `cancel_on_error = TRUE` is now `on_error = "return"`
#'
#' @export
#' @rdname req_perform_parallel
#' @usage NULL
multi_req_perform <- function(reqs, paths = NULL, pool = NULL, cancel_on_error = FALSE) {
#' @param cancel_on_error Should all pending requests be cancelled when you
#' hit an error? Set this to `TRUE` to stop all requests as soon as you
#' hit an error. Responses that were never performed be `NULL` in the result.
#' @inheritParams req_perform_parallel
#' @keywords internal
multi_req_perform <- function(reqs,
paths = NULL,
pool = NULL,
cancel_on_error = FALSE) {
lifecycle::deprecate_warn(
"1.0.0",
"multi_req_perform()",
"req_perform_parallel()"
)
check_bool(cancel_on_error)

req_perform_parallel(
reqs = reqs,
paths = paths,
pool = pool,
cancel_on_error = cancel_on_error
on_error = if (cancel_on_error) "continue" else "return"
)
}

pool_run <- function(pool, perfs, cancel_on_error = FALSE) {
poll_until_done <- function(pool) {
pool_run <- function(pool, perfs, on_error = "continue") {
on.exit(pool_cancel(pool, perfs), add = TRUE)

# The done and fail callbacks for curl::multi_add() are designed to always
# succeed. If the request actually failed, they raise a `httr_fail`
# signal (not error) that wraps the error. Here we catch that error and
# handle it based on the value of `on_error`
httr2_fail <- switch(on_error,
stop = function(cnd) cnd_signal(cnd$error),
continue = function(cnd) zap(),
return = function(cnd) NULL
)

try_fetch(
repeat({
# TODO: progress bar
run <- curl::multi_run(0.1, pool = pool, poll = TRUE)
if (run$pending == 0) {
break
}
})
}

cancel <- function(cnd) pool_cancel(pool, perfs)
if (!cancel_on_error) {
tryCatch(poll_until_done(pool), interrupt = cancel)
} else {
tryCatch(poll_until_done(pool), interrupt = cancel, `httr2:::failed` = cancel)
}

# Ensuring any pending handles are still completed
curl::multi_run(pool = pool)
}),
interrupt = function(cnd) NULL,
httr2_fail = httr2_fail
)

invisible()
}
Expand All @@ -128,10 +160,12 @@ Performance <- R6Class("Performance", public = list(
handle = NULL,
resp = NULL,
pool = NULL,
error_call = NULL,

initialize = function(req, path = NULL) {
initialize = function(req, path = NULL, error_call = NULL) {
self$req <- req
self$path <- path
self$error_call <- error_call

req <- auth_oauth_sign(req)
req <- cache_pre_fetch(req)
Expand All @@ -145,12 +179,13 @@ Performance <- R6Class("Performance", public = list(

submit = function(pool = NULL) {
if (!is.null(self$resp)) {
# cached
return()
}

self$pool <- pool
self$resp <- error_cnd("httr2_cancelled", message = "Request cancelled")
curl::multi_add(self$handle,
curl::multi_add(
handle = self$handle,
pool = self$pool,
data = self$path,
done = self$succeed,
Expand All @@ -171,18 +206,27 @@ Performance <- R6Class("Performance", public = list(
)
resp <- cache_post_fetch(self$reqs, resp, path = self$paths)

self$resp <- tryCatch(resp_check_status(resp), error = identity)
self$resp <- tryCatch(
resp_check_status(resp, error_call = self$error_call),
error = identity
)
if (is_error(self$resp)) {
signal("", class = "httr2:::failed")
signal("", error = self$resp, class = "httr2_fail")
}
},

fail = function(msg) {
self$resp <- error_cnd("httr2_failure", message = msg, request = self$req)
signal("", class = "httr2:::failed")
self$resp <- error_cnd(
"httr2_failure",
message = msg,
request = self$req,
call = self$error_call
)
signal("", error = self$resp, class = "httr2_fail")
},

cancel = function() {
# No handle if response was cached
if (!is.null(self$handle)) {
curl::multi_cancel(self$handle)
}
Expand All @@ -191,4 +235,5 @@ Performance <- R6Class("Performance", public = list(

pool_cancel <- function(pool, perfs) {
walk(perfs, ~ .x$cancel())
curl::multi_run(pool = pool)
}
Loading