Skip to content

Commit

Permalink
Update embedded async
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborcsardi committed Mar 18, 2024
1 parent 6c1a84b commit b23f9dc
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 12 deletions.
238 changes: 231 additions & 7 deletions R/aaa-async.R
Original file line number Diff line number Diff line change
Expand Up @@ -1087,9 +1087,10 @@ deferred <- R6Class(
initialize = function(action = NULL, on_progress = NULL, on_cancel = NULL,
parents = NULL, parent_resolve = NULL,
parent_reject = NULL, type = NULL,
call = sys.call(-1))
call = sys.call(-1), event_emitter = NULL)
async_def_init(self, private, action, on_progress, on_cancel,
parents, parent_resolve, parent_reject, type, call),
parents, parent_resolve, parent_reject, type, call,
event_emitter),
then = function(on_fulfilled)
def_then(self, private, on_fulfilled),
catch = function(...)
Expand All @@ -1098,7 +1099,9 @@ deferred <- R6Class(
def_finally(self, private, on_finally),
cancel = function(reason = "Cancelled")
def_cancel(self, private, reason),
share = function() { private$shared <<- TRUE; invisible(self) }
share = function() { private$shared <<- TRUE; invisible(self) },

event_emitter = NULL
),

private = list(
Expand Down Expand Up @@ -1150,14 +1153,15 @@ deferred <- R6Class(

async_def_init <- function(self, private, action, on_progress,
on_cancel, parents, parent_resolve,
parent_reject, type, call) {
parent_reject, type, call, event_emitter) {

private$type <- type
private$id <- get_id()
private$event_loop <- get_default_event_loop()
private$parents <- parents
private$action <- action
private$mycall <- call
self$event_emitter <- event_emitter

"!DEBUG NEW `private$id` (`type`)"

Expand Down Expand Up @@ -1773,12 +1777,15 @@ el_add_http <- function(self, private, handle, callback, progress, file,
pool = private$pool,
done = function(response) {
task <- private$tasks[[id]]
task$data$data$event_emitter$emit("end")
private$tasks[[id]] <- NULL
response$content <- do.call(c, as.list(content))
response$file <- outfile
task$callback(NULL, response)
},
data = function(bytes, ...) {
task <- private$tasks[[id]]
task$data$data$event_emitter$emit("data", bytes)
if (!is.null(outfile)) {
## R runs out of connections very quickly, especially because they
## are not removed until a gc(). However, calling gc() is
Expand Down Expand Up @@ -2268,7 +2275,7 @@ el__update_curl_data <- function(self, private) {
#'
#' In an error happen within an `error` listener, then the same happens,
#' the last `synchronise()` or `run_event_loop()` call fails. You can
#' want to wrap the body of the error listeners in a `tryCatch()` call,
#' wrap the body of the error listeners in a `tryCatch()` call,
#' if you want to avoid this.
#'
#' @noRd
Expand Down Expand Up @@ -2491,12 +2498,200 @@ async_reject <- function(.x, .p, ...) {
}

async_reject <- mark_as_async(async_reject)
#' HTTP event emitter for server-sent events
#'
#' Server-sent events are a technique to stream events from a web server
#' to a client, through an open HTTP connection.
#'
#' This class implements an event emitter on an async HTTP query created
#' with [http_get()] and friends, that fires an `"event"` event when the
#' server sends an event. An `"end"` event is emitted when the server
#' closes the connection.
#'
#' An event is a named character vector, the names are the keys of the
#' events.
#'
#' Example using our built-in toy web app:
#' ```r
#' http <- webfakes::new_app_process(async:::sseapp())
#' stream_events <- function() {
#' query <- http_get(http$url("/sse"))
#' sse <- sse_events$new(query)
#' sse$
#' listen_on("event", function(event) {
#' writeLines("Got an event:")
#' print(event)
#' })$
#' listen_on("end", function() {
#' writeLines("Done.")
#' })
#' query
#' }
#'
#' response <- synchronise(stream_events())
#' ```
#'
#'
#' @noRd

sse_events <- R6Class(
"sse_events",
inherit = event_emitter,
public = list(
initialize = function(http_handle) {
super$initialize()
http_handle$event_emitter$listen_on("data", function(bytes) {
private$data <- c(private$data, bytes)
private$emit_events()
})
http_handle$event_emitter$listen_on("end", function() {
self$emit("end")
})
}
),

private = list(
data = NULL,
sep = as.raw(c(0xaL, 0xaL)),
emit_events = function() {
evs <- chunk_sse_events(private$data, private$sep)
private$data <- evs$rest
for (ev in evs$events) {
self$emit("event", ev)
}
}
)
)

chunk_sse_events <- function(data, sep = NULL) {
# skip leading \n
no <- 0L
while (no <= length(data) && data[no + 1] == 0x0a) {
no <- no + 1L
}
if (no > 0) {
data <- data[(no + 1L):length(data)]
}
sep <- sep %||% as.raw(c(0xaL, 0xaL))
mtch <- grepRaw(sep, data, fixed = TRUE, all = TRUE)
# shortcut for no events
if (length(mtch) == 0) {
return(list(events = list(), rest = data))
}

events <- vector("list", length(mtch))
for (p in seq_along(mtch)) {
from <- if (p == 1) 1L else mtch[p - 1] + 2L
to <- mtch[p] - 1L
events[[p]] <- parse_sse_event(data[from:to])
}
events <- drop_nulls(events)

restfrom <- mtch[length(mtch)] + 2L
rest <- if (restfrom <= length(data)) {
data[restfrom:length(data)]
} else {
raw()
}
list(events = events, rest = rest)
}

parse_sse_event <- function(data) {
txt <- rawToChar(data)
Encoding(txt) <- "UTF-8"
lines <- strsplit(txt, "\n", fixed = TRUE)[[1]]
lines <- lines[lines != ""]
if (length(lines) == 0) {
return(NULL)
}
keys <- sub(":.*$", "", lines)
vals <- sub("^[^:]*:[ ]*", "", lines)
structure(vals, names = keys)
}

drop_nulls <- function(x) {
x[!vapply(x, is.null, logical(1))]
}

sseapp <- function() {
app <- webfakes::new_app()
app$get("/sse", function(req, res) {
`%||%` <- function(l, r) if (is.null(l)) r else l
if (is.null(res$locals$sse)) {
duration <- as.double(req$query$duration %||% 2)
delay <- as.double(req$query$delay %||% 0)
numevents <- as.integer(req$query$numevents %||% 5)
pause <- max(duration / numevents, 0.01)
res$locals$sse <- list(
sent = 0,
numevents = numevents,
pause = pause
)

res$
set_header("cache-control", "no-cache")$
set_header("content-type", "text/event-stream")$
set_header("access-control-allow-origin", "*")$
set_header("connection", "keep-alive")$
set_status(200)

if (delay > 0) {
return(res$delay(delay))
}
}

msg <- paste0(
"event: ", res$locals$sse$sent + 1L, "\n",
"message: live long and prosper\n\n"
)
res$locals$sse$sent <- res$locals$sse$sent + 1L
res$write(msg)

if (res$locals$sse$sent == res$locals$sse$numevents) {
res$send("")
} else {
res$delay(res$locals$sse$pause)
}
})
}

#' Asynchronous HTTP GET request
#'
#' Start an HTTP GET request in the background, and report its completion
#' via a deferred.
#'
#' @section HTTP event emitters:
#' An async HTTP deferred object is also an event emitter, see
#' [event_emitter]. Use `$event_emitter` to access the event emitter API,
#' and call `$event_emitter$listen_on()` etc. to listen on HTTP events,
#' etc.
#'
#' * `"data"` is emitted when we receive data from the server, the data is
#' passed on to the listeners as a raw vector. Note that zero-length
#' raw vectors might also happen.
#' * `"end"` is emitted at the end of the HTTP data stream, without
#' additional arguments (Also on error.)
#'
#' Here is an example, that uses the web server from the webfakes
#' package:
#' ```r
#' http <- webfakes::new_app_process(webfakes::httpbin_app())
#' stream_http <- function() {
#' query <- http_get(http$url("/drip?duration=3&numbytes=10"))
#' query$event_emitter$
#' listen_on("data", function(bytes) {
#' writeLines(paste("Got", length(bytes), "byte(s):"))
#' print(bytes)
#' })$
#' listen_on("end", function() {
#' writeLines("Done.")
#' })
#' query
#' }
#'
#' response <- synchronise(stream_http())
#' ```
#'
#' @param url URL to connect to.
#' @param headers HTTP headers to send.
#' @param file If not `NULL`, it must be a string, specifying a file.
Expand Down Expand Up @@ -2571,6 +2766,9 @@ http_get <- mark_as_async(http_get)

#' Asynchronous HTTP HEAD request
#'
#' An async HTTP deferred object is also an event emitter, see
#' [http_get()] for details, and also [event_emitter].
#'
#' @inheritParams http_get
#' @return Deferred object.
#'
Expand Down Expand Up @@ -2619,6 +2817,9 @@ http_head <- mark_as_async(http_head)
#' Start an HTTP POST request in the background, and report its completion
#' via a deferred value.
#'
#' An async HTTP deferred object is also an event emitter, see
#' [http_get()] for details, and also [event_emitter].
#'
#' @inheritParams http_get
#' @param data Data to send. Either a raw vector, or a character string
#' that will be converted to raw with [base::charToRaw]. At most one of
Expand Down Expand Up @@ -2721,9 +2922,30 @@ get_default_curl_options <- function(options) {
)
}

http_events <- R6Class(
"http_events",
inherit = event_emitter,
public = list(
listen_on = function(event, callback) {
private$check(event)
super$listen_on(event, callback)
},
listen_off = function(event, callback) {
private$check(event)
super$listen_off(event, callback)
}
),
private = list(
check = function(event) {
stopifnot(event %in% c("data", "end"))
}
)
)

make_deferred_http <- function(cb, file) {
cb; file
id <- NULL
ee <- http_events$new()
deferred$new(
type = "http", call = sys.call(),
action = function(resolve, progress) {
Expand All @@ -2738,11 +2960,13 @@ make_deferred_http <- function(cb, file) {
function(err, res) if (is.null(err)) resolve(res) else reject(err),
progress,
file,
data = ho$options)
data = c(ho$options, list(event_emitter = ee))
)
},
on_cancel = function(reason) {
if (!is.null(id)) get_default_event_loop()$cancel(id)
}
},
event_emitter = ee
)
}

Expand Down
8 changes: 4 additions & 4 deletions tests/async/test-event-emitter-async.R
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ test_that("named arguments to listeners", {

test_that("all listeners are called", {
arg1 <- arg2 <- arg3 <- NULL
arg11 <- arg21 <- arg31 <- NULL
arg11 <- arg12 <- arg13 <- NULL

deadline <- Sys.time() + as.difftime(2, units = "secs")

do <- function() {
x <- event_emitter$new(async = TRUE)
x$listen_on("foo", function(a1, a2, a3) {
Expand Down Expand Up @@ -141,7 +141,7 @@ test_that("one shot listener is only called once", {
called <- called1 <- 0L

deadline <- Sys.time() + as.difftime(2, units = "secs")

do <- function() {
x <- event_emitter$new(async = TRUE)
x$listen_on("foo", function() { called <<- called + 1L })
Expand All @@ -160,7 +160,7 @@ test_that("can remove listener", {
called <- called2 <- 0L

deadline <- Sys.time() + as.difftime(2, units = "secs")

cb1 <- function() { called <<- called + 1L }
cb2 <- function(x = 1) { called2 <<- called2 + 1L }
do <- function() {
Expand Down
2 changes: 1 addition & 1 deletion tests/async/test-event-emitter.R
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ test_that("named arguments to listeners", {

test_that("all listeners are called", {
arg1 <- arg2 <- arg3 <- NULL
arg11 <- arg21 <- arg31 <- NULL
arg11 <- arg12 <- arg13 <- NULL
do <- function() {
x <- event_emitter$new(async = FALSE)
x$listen_on("foo", function(a1, a2, a3) {
Expand Down
1 change: 1 addition & 0 deletions tests/async/test-http.R
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ test_that("http_post file", {
})

test_that("http_post form", {
local_edition(3)
resp <- NULL
tmp <- tempfile()
on.exit(unlink(tmp), add = TRUE)
Expand Down

0 comments on commit b23f9dc

Please sign in to comment.