Skip to content

Commit

Permalink
Implement new iteration strategy (#353)
Browse files Browse the repository at this point in the history
New `req_perform_iteratively()` takes the callback and returns a list of responses. Paired with `iterate_with_offset()` and friends to do the iteration, and `resps_combine()` and friends to work with the results.

Fixes #341. Fixes #298.

Co-authored-by: Maximilian Girlich <maximilian.girlich@metoda.com>
  • Loading branch information
hadley and mgirlich committed Oct 25, 2023
1 parent c73661e commit 5524237
Show file tree
Hide file tree
Showing 21 changed files with 705 additions and 1,052 deletions.
12 changes: 6 additions & 6 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ export(curl_help)
export(curl_translate)
export(example_github_client)
export(example_url)
export(iterate_next_request)
export(iterate_with_cursor)
export(iterate_with_link_url)
export(iterate_with_offset)
export(jwt_claim)
export(jwt_encode_hmac)
export(jwt_encode_sig)
Expand Down Expand Up @@ -68,11 +70,6 @@ export(req_oauth_device)
export(req_oauth_password)
export(req_oauth_refresh)
export(req_options)
export(req_paginate)
export(req_paginate_next_url)
export(req_paginate_offset)
export(req_paginate_page_index)
export(req_paginate_token)
export(req_perform)
export(req_perform_iteratively)
export(req_perform_parallel)
Expand Down Expand Up @@ -117,6 +114,9 @@ export(resp_url_queries)
export(resp_url_query)
export(response)
export(response_json)
export(resps_combine)
export(resps_errors)
export(resps_responses)
export(secret_decrypt)
export(secret_decrypt_file)
export(secret_encrypt)
Expand Down
115 changes: 115 additions & 0 deletions R/iterate-helpers.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#' Iteration helpers
#'
#' @description
#' These functions are intended for use with the `next_req` argument to
#' [req_perform_iteratively()]. Each implements iteration for a common
#' pagination pattern:
#'
#' * `iterate_with_offset()` increments a query parameter, e.g. `?page=1`,
#' `?page=2`, or `?offset=1`, `offset=21`.
#' * `iterate_with_cursor()` updates a query parameter with the value of a
#' cursor found somewhere in the response.
#' * `iterate_with_link_url()` follows the url found in the `Link` header.
#' See `resp_link_url()` for more details.
#'
#' @param param_name Name of query parameter.
#' @param start Starting value.
#' @param offset Offset for each page.
#' @param resp_complete A callback function that takes a response (`resp`)
#' and returns `TRUE` if there are no further pages.
#' @param resp_pages A callback function that takes a response (`resp`) and
#' returns the total number of pages, or `NULL` if unknown. It will only
#' be called once.
#' @export
#' @examples
#' req <- request(example_url()) |>
#' req_url_path("/iris") |>
#' req_throttle(10) |>
#' req_url_query(limit = 50)
#'
#' # If you don't know the total number of pages in advance, you can
#' # provide a `resp_complete()` callback
#' is_complete <- function(resp) {
#' length(resp_body_json(resp)$data) == 0
#' }
#' resps <- req_perform_iteratively(
#' req,
#' next_req = iterate_with_offset("page_index", resp_complete = is_complete),
#' max_reqs = Inf
#' )
#'
#' \dontrun{
#' # Alternatively, if the response returns the total number of pages (or you
#' # can easily calculate it), you can use the `resp_pages()` callback which
#' # will generate a better progress bar.
#'
#' resps <- req_perform_iteratively(
#' req %>% req_url_query(limit = 1),
#' next_req = iterate_with_offset(
#' "page_index",
#' resp_pages = function(resp) resp_body_json(resp)$pages
#' ),
#' max_reqs = Inf)
#' }
#'
iterate_with_offset <- function(param_name,
start = 1,
offset = 1,
resp_pages = NULL,
resp_complete = NULL) {
check_string(param_name)
check_number_whole(start)
check_number_whole(offset, min = 1)
check_function2(resp_pages, args = "resp", allow_null = TRUE)
check_function2(resp_complete, args = "resp", allow_null = TRUE)
resp_complete <- resp_complete %||% function(resp) FALSE

known_total <- FALSE
i <- start # assume already fetched

function(resp, req) {
if (!is.null(resp_pages) && !known_total) {
n <- resp_pages(resp)
if (!is.null(n)) {
known_total <<- TRUE
signal("", class = "httr2_total_pages", n = n)
}
}

if (!isTRUE(resp_complete(resp))) {
i <<- i + offset
req %>% req_url_query(!!param_name := i)
}
}
}

#' @rdname iterate_with_offset
#' @export
#' @param resp_param_value A callback function that takes a response (`resp`)
#' and returns the next cursor value. Return `NULL` if there are no further
#' pages.
iterate_with_cursor <- function(param_name, resp_param_value) {
check_string(param_name)
check_function2(resp_param_value, args = "resp")

function(resp, req) {
value <- resp_param_value(resp)
if (!is.null(value)) {
req %>% req_url_query(!!param_name := value)
}
}
}

#' @rdname iterate_with_offset
#' @export
#' @param rel The "link relation type" to use to retrieve the next page.
iterate_with_link_url <- function(rel = "next") {
check_string(rel)

function(resp, req) {
url <- resp_link_url(resp, rel)
if (!is.null(url)) {
req %>% req_url(url)
}
}
}
31 changes: 31 additions & 0 deletions R/iterate-responses.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#' Tools for working with lists of responses
#'
#' * `resps_combine()` combines the data from each response into a single
#' vector.
#' * `resps_response()` returns all successful responses.
#' * `resps_error()` returns all errors.
#'
#' @export
#' @param resps A list of responses (possibly including errors).
#' @param resp_data A function that takes a response (`resp`) and
#' returns its data as a vector or data frame.
resps_combine <- function(resps, resp_data) {
check_installed("vctrs")

check_function2(resp_data, "resp")
vctrs::list_unchop(lapply(resps, resp_data))
}
resps_is_resp <- function(resps) {
vapply(resps, is_response, logical(1))
}

#' @export
#' @rdname resps_combine
resps_responses <- function(resps) {
resps[resps_is_resp(resps)]
}
#' @export
#' @rdname resps_combine
resps_errors <- function(resps) {
resps[!resps_is_resp(resps)]
}
118 changes: 118 additions & 0 deletions R/iterate.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#' Perform requests iteratively, generating new requests from previous responses
#'
#' @description
#' `r lifecycle::badge("experimental")`
#'
#' `req_perform_iteratively()` iteratively generates and performs requests,
#' using a callback function, `next_req`, to define the next request based on
#' the current request and response. You will probably want to it pair with an
#' [iteration helper][iterate_with_offset] and use a
#' [multi-response handler][resps_combine] to process the result.
#'
#' @inheritParams req_perform
#' @param next_req A function that takes the previous response (`resp`) and
#' request (`req`) and returns a [request] for the next page or `NULL` if
#' the iteration should terminate.
#' @param max_reqs The maximum number of requests to perform. Use `Inf` to
#' perform all requests until `next_req()` returns `NULL`.
#' @param progress Display a progress bar? Use `TRUE` to turn on a basic
#' progress bar, use a string to give it a name, or see [progress_bars] to
#' customise it in other ways.
#' @param path Optionally, path to save the body of request. This should be
#' a glue string that uses `{i}` to distinguish different requests.
#' Useful for large responses because it avoids storing the response in
#' memory.
#' @return A list of [response()]s.
#' @export
#' @examples
#' req <- request(example_url()) |>
#' req_url_path("/iris") |>
#' req_throttle(10) |>
#' req_url_query(limit = 5)
#'
#' resps <- req_perform_iteratively(req, iterate_with_offset("page_index"))
#'
#' resps |> resps_combine(function(resp) {
#' data <- resp_body_json(resp)$data
#' data.frame(
#' Sepal.Length = sapply(data, `[[`, "Sepal.Length"),
#' Sepal.Width = sapply(data, `[[`, "Sepal.Width"),
#' Petal.Length = sapply(data, `[[`, "Petal.Length"),
#' Petal.Width = sapply(data, `[[`, "Petal.Width"),
#' Species = sapply(data, `[[`, "Species")
#' )
#' })
req_perform_iteratively <- function(req,
next_req,
path = NULL,
max_reqs = 20,
progress = TRUE) {
check_request(req)
check_function2(next_req, args = c("resp", "req"))
check_number_whole(max_reqs, allow_infinite = TRUE, min = 1)
check_string(path, allow_empty = FALSE, allow_null = TRUE)

get_path <- function(i) {
if (is.null(path)) {
NULL
} else {
glue::glue(path)
}
}

progress <- create_progress_bar(
total = max_reqs,
name = "Iterating",
config = progress
)

resps <- vector("list", length = if (is.finite(max_reqs)) max_reqs else 100)
i <- 1L

tryCatch({
repeat {
resps[[i]] <- resp <- req_perform(req, path = get_path(i))
progress$update()

withCallingHandlers(
{
req <- next_req(resp = resp, req = req)
},
httr2_total_pages = function(cnd) {
# Allow next_req() to shrink the number of pages remaining
# Most important in max_req = Inf case
if (cnd$n < max_reqs) {
max_reqs <<- cnd$n
progress$update(total = max_reqs, inc = 0)
}
}
)

if (is.null(req) || i >= max_reqs) {
break
}
check_request(req, arg = "next_req()")

i <- i + 1L
if (i > length(resps)) {
signal("", class = "httr2:::doubled")
length(resps) <- length(resps) * 2
}
}
}, interrupt = function(cnd) {
# interrupt might occur after i was incremented
if (is.null(resps[[i]])) {
i <<- i - 1
}
cli::cli_alert_warning(
"Terminating iteration; returning {i} response{?s}."
)
})
progress$done()

if (i < length(resps)) {
resps <- resps[seq_len(i)]
}

resps
}
Loading

0 comments on commit 5524237

Please sign in to comment.