-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathasync.R
More file actions
211 lines (202 loc) · 6.11 KB
/
async.R
File metadata and controls
211 lines (202 loc) · 6.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#' Make an async function
#'
#' @description
#'
#' `async()` functions are building blocks for cooperative
#' concurrency.
#'
#' - They are _concurrent_ because they are jointly managed by a
#' scheduler in charge of running them.
#'
#' - They are _cooperative_ because they decide on their own when they
#' can no longer make quick progress and need to __await__ some
#' result. This is done with the `await()` keyword which suspends
#' the async function and gives control back to the scheduler. The
#' scheduler waits until the next async operation is ready to make
#' progress.
#'
#' The async framework used by `async()` functions is implemented in
#' the [later](https://github.com/r-lib/later/) and
#' [promises](https://rstudio.github.io/promises/) packages:
#'
#' - You can chain async functions created with coro to promises.
#' - You can await promises. You can also await futures created with
#' the [future](https://github.com/HenrikBengtsson/future) package
#' because they are coercible to promises.
#'
#' @param fn An anonymous function within which `await()` calls are
#' allowed.
#' @return A function that returns a [promises::promise()] invisibly.
#'
#' @seealso [async_generator()] and [await_each()];
#' [coro_debug()] for step-debugging.
#'
#' @examples
#' # This async function counts down from `n`, sleeping for 2 seconds
#' # at each iteration:
#' async_count_down <- async(function(n) {
#' while (n > 0) {
#' cat("Down", n, "\n")
#' await(async_sleep(2))
#' n <- n - 1
#' }
#' })
#'
#' # This async function counts up until `stop`, sleeping for 0.5
#' # seconds at each iteration:
#' async_count_up <- async(function(stop) {
#' n <- 1
#' while (n <= stop) {
#' cat("Up", n, "\n")
#' await(async_sleep(0.5))
#' n <- n + 1
#' }
#' })
#'
#' # You can run these functions concurrently using `promise_all()`
#' if (interactive()) {
#' promises::promise_all(async_count_down(5), async_count_up(5))
#' }
#' @export
async <- function(fn) {
assert_lambda(substitute(fn))
generator0(fn, type = "async")
}
#' @rdname async
#' @param x An awaitable value, i.e. a [promise][promises::promise].
#' @export
await <- function(x) {
abort("`await()` can't be called directly or within function arguments.")
}
#' Sleep asynchronously
#' @param seconds The number of second to sleep.
#' @return A chainable promise.
#' @export
async_sleep <- function(seconds) {
promises::promise(function(resolve, reject) {
later::later(~ resolve(NULL) , delay = seconds)
})
}
#' Construct an async generator
#'
#' @description
#'
#' An async generator constructs iterable functions that are also
#' awaitables. They support both the `yield()` and `await()` syntax.
#' An async iterator can be looped within async functions and
#' iterators using `await_each()` on the input of a `for` loop.
#'
#' The iteration protocol is derived from the one described in
#' [`iterator`][iterator]. An async iterator always returns a
#' promise. When the iterator is exhausted, it returns a resolved
#' promise to the exhaustion sentinel.
#'
#' @param fn An anonymous function describing an async generator
#' within which `await()` calls are allowed.
#' @return A generator factory. Generators constructed with this
#' factory always return [promises::promise()].
#'
#' @seealso [async()] for creating awaitable functions;
#' [async_collect()] for collecting the values of an async iterator;
#' [coro_debug()] for step-debugging.
#' @examples
#' # Creates awaitable functions that transform their inputs into a stream
#' generate_stream <- async_generator(function(x) for (elt in x) yield(elt))
#'
#' # Maps a function to a stream
#' async_map <- async_generator(function(.i, .fn, ...) {
#' for (elt in await_each(.i)) {
#' yield(.fn(elt, ...))
#' }
#' })
#'
#' # Example usage:
#' if (interactive()) {
#' library(magrittr)
#' generate_stream(1:3) %>% async_map(`*`, 2) %>% async_collect()
#' }
#' @export
async_generator <- function(fn) {
assert_lambda(substitute(fn))
generator0(fn, type = "async_generator")
}
#' @rdname async_generator
#' @inheritParams await
#' @export
await_each <- function(x) {
abort("`await_each()` must be called within a `for` loop of an async function.")
}
#' @export
print.coro_async <- function(x, ..., internals = FALSE) {
writeLines("<async>")
print_generator(x, ..., internals = internals)
}
#' @export
print.coro_async_generator <- function(x, ..., internals = FALSE) {
writeLines("<async/generator>")
print_generator(x, ..., internals = internals)
}
#' Async operations
#'
#' @description
#'
#' `r lifecycle::badge("experimental")`
#'
#' Customisation point for the _async_ package or any concurrency
#' framework that defines a "then" operation. Assign the result of
#' `async_ops()` to the `.__coro_async_ops__.` symbol in your
#' namespace.
#'
#' @param package The package name of the framework as a
#' string. `async()` and `async_generator()` check that the package
#' is installed at runtime.
#' @param then A function of two arguments. The first argument is a
#' promise object (as created by `as_promise`). The second argument
#' is a callback function that must be called once the promise
#' object is resolved.
#' @param as_promise A function of one argument. It should be a no-op
#' when passed a promise object, and otherwise wrap the value in a
#' resolved promise.
#'
#' @keywords internal
#' @export
async_ops <- function(package, then, as_promise) {
stopifnot(
is_string(package),
is_function(then),
is_function(as_promise)
)
structure(
list(
package = package,
then = then,
as_promise = as_promise
),
class = "coro_async_ops"
)
}
get_async_ops <- function(env) {
ops <- env_get(env, ".__coro_async_ops__.", inherit = TRUE, default = NULL)
if (!is_null(ops)) {
return(ops)
}
async_ops(
package = "promises",
then = op_then,
as_promise = op_as_promise
)
}
op_then <- function(x, callback) {
promises::then(
x,
onFulfilled = callback,
onRejected = function(cnd) callback(stop(cnd))
)
}
op_as_promise <- function(x) {
if (promises::is.promise(x)) {
x
} else {
promises::promise_resolve(x)
}
}