/
AsyncBackend.R
612 lines (538 loc) · 21.2 KB
/
AsyncBackend.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
#' @include Exception.R Backend.R Specification.R TaskState.R
#' @title
#' AsyncBackend
#'
#' @description
#' This is a concrete implementation of the abstract class [`parabar::Backend`]
#' that implements the [`parabar::Service`] interface. This backend executes
#' tasks in parallel asynchronously (i.e., without blocking the main `R`
#' session) on a [parallel::makeCluster()] cluster created in a background `R`
#' [`session`][`callr::r_session`].
#'
#' @examples
#' # Create a specification object.
#' specification <- Specification$new()
#'
#' # Set the number of cores.
#' specification$set_cores(cores = 2)
#'
#' # Set the cluster type.
#' specification$set_type(type = "psock")
#'
#' # Create an asynchronous backend object.
#' backend <- AsyncBackend$new()
#'
#' # Start the cluster on the backend.
#' backend$start(specification)
#'
#' # Check if there is anything on the backend.
#' backend$peek()
#'
#' # Create a dummy variable.
#' name <- "parabar"
#'
#' # Export the variable to the backend.
#' backend$export("name")
#'
#' # Remove variable from current environment.
#' rm(name)
#'
#' # Run an expression on the backend, using the exported variable `name`.
#' backend$evaluate({
#' # Print the name.
#' print(paste0("Hello, ", name, "!"))
#' })
#'
#' # Run a task in parallel (i.e., approx. 2.5 seconds).
#' backend$sapply(
#' x = 1:10,
#' fun = function(x) {
#' # Sleep a bit.
#' Sys.sleep(0.5)
#'
#' # Compute something.
#' output <- x + 1
#'
#' # Return the result.
#' return(output)
#' }
#' )
#'
#' # Right know the main process is free and the task is executing on a `psock`
#' # cluster started in a background `R` session.
#'
#' # Trying to get the output immediately will throw an error, indicating that the
#' # task is still running.
#' try(backend$get_output())
#'
#' # However, we can block the main process and wait for the task to complete
#' # before fetching the results.
#' backend$get_output(wait = TRUE)
#'
#' # Clear the backend.
#' backend$clear()
#'
#' # Check that there is nothing on the cluster.
#' backend$peek()
#'
#' # Stop the backend.
#' backend$stop()
#'
#' # Check that the backend is not active.
#' backend$active
#'
#' @seealso
#' [`parabar::Service`], [`parabar::Backend`], [`parabar::SyncBackend`],
#' [`parabar::ProgressTrackingContext`], and [`parabar::TaskState`].
#'
#' @export
AsyncBackend <- R6::R6Class("AsyncBackend",
inherit = Backend,
private = list(
# The progress tracking capabilities of the backend implementation.
.supports_progress = TRUE,
# Create a parallel cluster in the `R` session.
.make_cluster = function(specification) {
# Create cluster in the `.GlobalEnv` in the separate session.
private$.cluster$run(function(cores, type) {
# Make the actual cluster.
cluster <<- parallel::makeCluster(spec = cores, type = type)
}, args = list(
specification$cores, specification$type
))
},
# Stop the cluster existing in the separate `R` session.
.close_cluster = function() {
# Send the function.
private$.cluster$run(function() {
# Stop the cluster.
parallel::stopCluster(cluster)
})
},
# Start a cluster in a separate `R` session.
.start = function(specification) {
# If a cluster is already active.
if (private$.active) {
# Throw error.
Exception$cluster_active()
}
# Create a permanent separate `R` session.
private$.cluster <- callr::r_session$new()
# Create cluster in session based on specification.
private$.make_cluster(specification)
# Sanitize the cluster.
private$.clear()
# Toggle the active flag.
private$.toggle_active_state()
},
# Stop the cluster in the session.
.stop = function() {
# If there is no cluster active.
if (!private$.active) {
# Throw.
Exception$cluster_not_active()
}
# Terminate the cluster in the separate `R` session.
private$.close_cluster()
# Terminate the separate `R` session.
private$.cluster$close()
# Rest the cluster field.
private$.cluster <- NULL
# Toggle the active flag.
private$.toggle_active_state()
},
# Sanitize the cluster in the session.
.clear = function() {
# Run the clear command in the session.
private$.cluster$run(function() {
# Evaluate the expression on the cluster.
parallel::clusterEvalQ(cluster, rm(list = ls(all.names = TRUE)))
})
# Remain silent.
invisible()
},
# Inspect what is on the cluster, not the session.
.peek = function() {
# Run the command on the cluster in the session.
private$.cluster$run(function() {
# Check what is on the cluster.
parallel::clusterEvalQ(cluster, ls(all.names = TRUE))
})
},
# Export variables on the cluster in the session.
.export = function(variables, environment) {
# Create new environment only with the variables that are being exported.
new_environment <- new.env()
# Assign the variables to be exported in the new environment.
for (variable in variables) {
# Assign the variables.
assign(variable, get(variable, environment), new_environment)
}
# Export to the cluster via the session.
private$.cluster$run(function(variables, environment) {
# The actual export command.
parallel::clusterExport(cluster, variables, environment)
}, args = list(variables, new_environment))
# Remain silent.
invisible()
},
# Evaluate an expression on the cluster in the session.
.evaluate = function(expression) {
# Capture the expression.
capture <- substitute(expression)
# Perform the evaluation on the cluster via the `R` session.
private$.cluster$run(function(expression) {
# Evaluate the expression.
parallel::clusterCall(cluster, eval, expression)
}, args = list(capture))
},
# Run tasks asynchronously via the cluster in the session.
.sapply = function(x, fun, ...) {
# Capture the `...`.
dots <- list(...)
# Perform the evaluation from the `R` session.
private$.cluster$call(function(x, fun, dots) {
# Run the task.
output <- do.call(parallel::parSapply, c(list(cluster, x, fun), dots))
# Return to the session.
return(output)
}, args = list(x, fun, dots))
},
# Run tasks asynchronously via the cluster in the session.
.lapply = function(x, fun, ...) {
# Capture the `...`.
dots <- list(...)
# Perform the evaluation from the `R` session.
private$.cluster$call(function(x, fun, dots) {
# Run the task.
output <- do.call(parallel::parLapply, c(list(cluster, x, fun), dots))
# Return to the session.
return(output)
}, args = list(x, fun, dots))
},
# Run tasks asynchronously via the cluster in the session.
.apply = function(x, margin, fun, ...) {
# Capture the `...`.
dots <- list(...)
# Perform the evaluation from the `R` session.
private$.cluster$call(function(x, margin, fun, dots) {
# Run the task.
output <- do.call(parallel::parApply, c(list(cluster, x, margin, fun), dots))
# Return to the session.
return(output)
}, args = list(x, margin, fun, dots))
},
# Clear the current output on the backend.
.clear_output = function() {
# Clear output.
private$.output <- NULL
},
# Set the output based on the session read.
.set_output = function() {
# Get all session output.
output <- private$.cluster$read()
# If an error ocurred in the session.
if (!is.null(output$error)) {
# Throw error in the main session.
Exception$async_task_error(output$error)
}
# Otherwise, store the relevant results from the output.
private$.output <- output$result
},
# Get the current task state (i.e., what is happening in the session).
.get_task_state = function() {
# If the backend does not have an active cluster (i.e., session in this case).
if (!private$.active) {
# Throw.
Exception$cluster_not_active()
}
# Create task state object holding the current state.
task_state <- TaskState$new(private$.cluster)
return(task_state)
},
# Throw an exception if the backend is not ready to be used.
.throw_if_backend_is_busy = function() {
# Get task state.
task_state <- private$.get_task_state()
# If a task is running.
if (task_state$task_is_running) {
# Throw error.
Exception$async_task_running()
}
# If a task is completed with unread results.
if (task_state$task_is_completed) {
# Throw error.
Exception$async_task_completed()
}
},
# Wait for the task to finish and fetch the results.
.wait_to_fetch_results = function() {
# Get task state.
task_state <- private$.get_task_state()
# If no task has started (i.e., not deployed) on the backend.
if (task_state$task_not_started) {
# Throw error.
Exception$async_task_not_started()
}
# If a task is currently running, wait for its completion.
if (task_state$task_is_running) {
# Wait for task to finish.
private$.cluster$poll_process(-1)
# Read the session and set the output.
private$.set_output()
# Otherwise, a completed task awaits results to be read.
} else {
# Fetch and set the results right away.
private$.set_output()
}
},
# Attempt to fetch the results if the task finished.
.fetch_results = function() {
# Get task state.
task_state <- private$.get_task_state()
# If no task has started (i.e., not deployed) on the backend.
if (task_state$task_not_started) {
# Throw error.
Exception$async_task_not_started()
}
# If a task is still running on the backend.
if (task_state$task_is_running) {
# Throw error.
Exception$async_task_running()
}
# Otherwise, read the results of a completed task.
private$.set_output()
}
),
public = list(
#' @description
#' Create a new [`parabar::AsyncBackend`] object.
#'
#' @return
#' An object of class [`parabar::AsyncBackend`].
initialize = function() { invisible() },
#' @description
#' Destroy the current [`parabar::AsyncBackend`] instance.
#'
#' @return
#' An object of class [`parabar::AsyncBackend`].
finalize = function() {
# If a cluster is active, stop before deleting the instance.
if (private$.active) {
# Stop the cluster.
private$.stop()
}
},
#' @description
#' Start the backend.
#'
#' @param specification An object of class [`parabar::Specification`]
#' that contains the backend configuration.
#'
#' @return
#' This method returns void. The resulting backend must be stored in the
#' `.cluster` private field on the [`parabar::Backend`] abstract class,
#' and accessible to any concrete backend implementations via the active
#' binding `cluster`.
start = function(specification) {
private$.start(specification)
},
#' @description
#' Stop the backend.
#'
#' @return
#' This method returns void.
stop = function() {
private$.stop()
},
#' @description
#' Remove all objects from the backend. This function is equivalent to
#' calling `rm(list = ls(all.names = TRUE))` on each node in the
#' backend.
#'
#' @return
#' This method returns void.
clear = function() {
private$.clear()
},
#' @description
#' Inspect the backend for variables available in the `.GlobalEnv`.
#'
#' @return
#' This method returns a list of character vectors, where each element
#' corresponds to a node in the backend. The character vectors contain
#' the names of the variables available in the `.GlobalEnv` on each
#' node.
peek = function() {
private$.peek()
},
#' @description
#' Export variables from a given environment to the backend.
#'
#' @param variables A character vector of variable names to export.
#'
#' @param environment An environment object from which to export the
#' variables.
#'
#' @return This method returns void.
export = function(variables, environment) {
# If no environment is provided.
if (missing(environment)) {
# Use the caller's environment where the variables are defined.
environment <- parent.frame()
}
# Export and return the output.
private$.export(variables, environment)
},
#' @description
#' Evaluate an arbitrary expression on the backend.
#'
#' @param expression An unquoted expression to evaluate on the backend.
#'
#' @return
#' This method returns the result of the expression evaluation.
evaluate = function(expression) {
# Capture the expression.
capture <- substitute(expression)
# Prepare the call.
capture_call <- bquote(private$.evaluate(.(capture)))
# Perform the call.
eval(capture_call)
},
#' @description
#' Run a task on the backend akin to [parallel::parSapply()].
#'
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
sapply = function(x, fun, ...) {
# Throw if backend is busy.
private$.throw_if_backend_is_busy()
# Deploy the task asynchronously.
private$.sapply(x, fun, ...)
},
#' @description
#' Run a task on the backend akin to [parallel::parLapply()].
#'
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
lapply = function(x, fun, ...) {
# Throw if backend is busy.
private$.throw_if_backend_is_busy()
# Deploy the task asynchronously.
private$.lapply(x, fun, ...)
},
#' @description
#' Run a task on the backend akin to [parallel::parApply()].
#'
#' @param x An array to pass to the `fun` function.
#'
#' @param margin A numeric vector indicating the dimensions of `x` the
#' `fun` function should be applied over. For example, for a matrix,
#' `margin = 1` indicates applying `fun` rows-wise, `margin = 2`
#' indicates applying `fun` columns-wise, and `margin = c(1, 2)`
#' indicates applying `fun` element-wise. Named dimensions are also
#' possible depending on `x`. See [parallel::parApply()] and
#' [base::apply()] for more details.
#'
#' @param fun A function to apply to `x` according to the `margin`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
apply = function(x, margin, fun, ...) {
# Throw if backend is busy.
private$.throw_if_backend_is_busy()
# Validate provided margins.
Helper$check_array_margins(margin, dim(x))
# Deploy the task asynchronously.
private$.apply(x, margin, fun, ...)
},
#' @description
#' Get the output of the task execution.
#'
#' @param wait A logical value indicating whether to wait for the task
#' to finish executing before fetching the results. Defaults to `FALSE`.
#' See the **Details** section for more information.
#'
#' @details
#' This method fetches the output of the task execution after calling
#' the `sapply()` method. It returns the output and immediately removes
#' it from the backend. Subsequent calls to this method will throw an
#' error if no additional tasks have been executed in the meantime. This
#' method should be called after the execution of a task.
#'
#' If `wait = TRUE`, the method will block the main process until the
#' backend finishes executing the task and the results are available. If
#' `wait = FALSE`, the method will immediately attempt to fetch the
#' results from the background `R` session, and throw an error if the
#' task is still running.
#'
#' @return
#' A vector, matrix, or list of the same length as `x`, containing the
#' results of the `fun`. The output format differs based on the specific
#' operation employed. Check out the documentation for the `apply`
#' operations of [`parallel::parallel`] for more information.
get_output = function(wait = FALSE) {
# Reset the output on exit.
on.exit({
# Clear.
private$.clear_output()
})
# If the user wants to wait for the results.
if (wait) {
# Wait to fetch the results.
private$.wait_to_fetch_results()
} else {
# Otherwise, try to fetch the results now (i.e., without waiting).
private$.fetch_results()
}
# Return the simplified output.
return(private$.output)
}
),
active = list(
#' @field task_state A list of logical values indicating the state of
#' the task execution. See the [`parabar::TaskState`] class for more
#' information on how the statues are determined. The following statuses
#' are available:
#' - `task_not_started`: Indicates whether the backend is free. `TRUE`
#' signifies that no task has been started and the backend is free to
#' deploy.
#' - `task_is_running`: Indicates whether a task is currently running on
#' the backend.
#' - `task_is_completed`: Indicates whether a task has finished
#' executing. `TRUE` signifies that the output of the task has not been
#' fetched. Calling the method `get_option()` will move the output from
#' the background `R` session to the main `R` session. Once the output
#' has been fetched, the backend is free to deploy another task.
task_state = function() {
# Get a task state instance with the state.
task_state <- private$.get_task_state()
# Return a simplified state.
return(list(
task_not_started = task_state$task_not_started,
task_is_running = task_state$task_is_running,
task_is_completed = task_state$task_is_completed
))
}
)
)