Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for the lapply backend operation #31

Merged
merged 34 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
58a7110
Docs: update parameter docs for `sapply` operation
mihaiconstantin Apr 30, 2023
74664d7
Docs: update docs for `get_output` method
mihaiconstantin Apr 30, 2023
c32e3c7
Feat: add `lapply` method to `Service` interface
mihaiconstantin Apr 30, 2023
6bcba63
Feat: add `lapply` implementation for `SyncBackend`
mihaiconstantin Apr 30, 2023
fd88741
Feat: add `lapply` implementation for `AsyncBackend`
mihaiconstantin Apr 30, 2023
8822d81
Feat: add `lapply` implementation for `Context`
mihaiconstantin Apr 30, 2023
89f5a4d
Feat: add `.execute` method for progress tracking steps
mihaiconstantin Apr 30, 2023
68277ea
Refactor: update `sapply` to use `.execute` private method
mihaiconstantin Apr 30, 2023
50e7dbc
Feat: add `lapply` implementation for `ProgressTrackingContext`
mihaiconstantin Apr 30, 2023
ce9b15a
Docs: update comments in `AsyncBackend`
mihaiconstantin Apr 30, 2023
3183e83
Refactor: simplify `.execute` in `ProgressTrackingContext`
mihaiconstantin Apr 30, 2023
c2b5184
Feat: add `UserApiConsumer` class for the user `API`
mihaiconstantin May 3, 2023
ad21b1b
Refactor: update `par_sapply` to use the consumer `API`
mihaiconstantin May 3, 2023
cb37ada
Feat: add `par_lapply` exported function
mihaiconstantin May 3, 2023
305f386
Docs: add documentation for `par_lapply` function
mihaiconstantin May 3, 2023
85ae54a
Docs: update documentation for `par_sapply` function
mihaiconstantin May 3, 2023
1a2709c
Docs: update package docs to mention `par_lapply`
mihaiconstantin May 3, 2023
5f26f1a
Docs: mention `par_lapply` in other `API` functions
mihaiconstantin May 3, 2023
f4a587b
Docs: update `README` to include `par_lapply`
mihaiconstantin May 3, 2023
8e7c3ad
Build: add `UserApiConsumer` to `Collate` in `DESCRIPTION`
mihaiconstantin May 3, 2023
6c21fa7
Build: update package imports and `NAMESPACE`
mihaiconstantin May 3, 2023
261706e
Build: update `NAMESPACE` with new exports
mihaiconstantin May 3, 2023
d1a7813
Test: add and refactor test helpers
mihaiconstantin May 3, 2023
d0c2828
Test: update test for `AsyncBackend` class
mihaiconstantin May 3, 2023
1a1e760
Test: update tests for `ProgressTrackingContext` class
mihaiconstantin May 3, 2023
093aefe
Test: update tests for the user `API` functions
mihaiconstantin May 3, 2023
75746e2
Docs: regenerate `.Rd` man files
mihaiconstantin May 3, 2023
7dd66a8
Docs: add missing topics to `_pkgdown.yml`
mihaiconstantin May 3, 2023
49ddf1e
Docs: add changes to `NEWS`
mihaiconstantin May 3, 2023
26f1f8a
Ci: update line exclusions for coverage workflow
mihaiconstantin May 3, 2023
34f1845
Fix: update `export` operation to default to parent frame
mihaiconstantin May 3, 2023
06d9e6b
Fix: disable `file.create` warnings in `ProgressTrackingContext`
mihaiconstantin May 3, 2023
3cf757b
Feat: add `...` for `get_output` in `SyncBackend`
mihaiconstantin May 3, 2023
e02619e
Docs: add chanes to `NEWS`
mihaiconstantin May 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ jobs:
- name: Test coverage
env:
PROCESSX_NOTIFY_OLD_SIGCHLD: true
run: covr::codecov(line_exclusions = list("R/parabar-package.R"))
run: covr::codecov(line_exclusions = list("R/parabar-package.R", "R/UserApiConsumer.R" = c(59:77)))
shell: Rscript {0}
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Collate:
'ProgressTrackingContext.R'
'ContextFactory.R'
'Warning.R'
'UserApiConsumer.R'
'exports.r'
'logo.R'
'parabar-package.R'
Expand Down
4 changes: 3 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ export(Service)
export(Specification)
export(SyncBackend)
export(TaskState)
export(UserApiConsumer)
export(Warning)
export(clear)
export(configure_bar)
export(evaluate)
export(export)
export(get_option)
export(make_logo)
export(par_lapply)
export(par_sapply)
export(peek)
export(set_default_options)
Expand All @@ -41,7 +43,7 @@ importFrom(parallel,clusterEvalQ)
importFrom(parallel,clusterExport)
importFrom(parallel,detectCores)
importFrom(parallel,makeCluster)
importFrom(parallel,parApply)
importFrom(parallel,parLapply)
importFrom(parallel,parSapply)
importFrom(parallel,stopCluster)
importFrom(progress,progress_bar)
30 changes: 30 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,33 @@
# Development

## Added
- Add optional arguments to the `get_output` operation of `SyncBackend` for
consistency.
- Add more tests to improve coverage.
- Add `par_lapply` function to the user `API`. The `par_lapply` function can be
used to run tasks in parallel akin to `parallel::parLapply`.
- Add `UserApiConsumer` `R6` class that provides an opinionated wrapper around
the developer `API` of the `parabar` package. All parallel operations (e.g.,
`par_sapply` and `par_lapply`) follow more or less the same pattern. The
`UserApiConsumer` encapsulates this pattern and makes it easier to extend
`parabar` with new parallel functions (e.g., `par_apply`) while avoiding code
duplication. The `UserApiConsumer` class can also be used as a standalone
class for parallel operations, however, its primary purpose is to be used by
the parallel task execution functions in the user `API`.

## Changed
- Disable warnings for `file.create` in `ProgressTrackingContext` class. This
warning is superfluous since the code handles creation failures.
- Refactor test helpers to avoid code duplication.
- Update `par_sapply` to use the `UserApiConsumer` class.
- Update the developer `API` `R6` classes to implement the `lapply` parallel
operation.

## Fixed
- Fix the `export` operation in the `SyncBackend` and `Context` classes to
fallback to the parent environment if the argument `environment` is not
provided.

# parabar 1.0.3

## Fixed
Expand Down
47 changes: 42 additions & 5 deletions R/AsyncBackend.R
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ AsyncBackend <- R6::R6Class("AsyncBackend",
}, args = list(capture))
},

# Run tasks on the cluster in the session asynchronously.
# Run tasks asynchronously via the cluster in the session.
.sapply = function(x, fun, ...) {
# Capture the `...`.
dots <- list(...)
Expand All @@ -225,6 +225,21 @@ AsyncBackend <- R6::R6Class("AsyncBackend",
}, args = list(x, fun, dots))
},

# Run tasks asynchronously via the cluster in the session.
.lapply = function(x, fun, ...) {
# Capture the `...`.
dots <- list(...)

# Perform the evaluation from the `R` session.
private$.cluster$call(function(x, fun, dots) {
# Run the task.
output <- do.call(parallel::parLapply, c(list(cluster, x, fun), dots))

# Return to the session.
return(output)
}, args = list(x, fun, dots))
},

# Clear the current output on the backend.
.clear_output = function() {
# Clear output.
Expand Down Expand Up @@ -426,8 +441,7 @@ AsyncBackend <- R6::R6Class("AsyncBackend",
#' @description
#' Run a task on the backend akin to [parallel::parSapply()].
#'
#' @param x A vector (i.e., usually of integers) to pass to the `fun`
#' function.
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
Expand All @@ -445,6 +459,27 @@ AsyncBackend <- R6::R6Class("AsyncBackend",
private$.sapply(x, fun, ...)
},

#' @description
#' Run a task on the backend akin to [parallel::parLapply()].
#'
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
lapply = function(x, fun, ...) {
# Throw if backend is busy.
private$.throw_if_backend_is_busy()

# Deploy the task asynchronously.
private$.lapply(x, fun, ...)
},

#' @description
#' Get the output of the task execution.
#'
Expand All @@ -466,8 +501,10 @@ AsyncBackend <- R6::R6Class("AsyncBackend",
#' task is still running.
#'
#' @return
#' A vector or list of the same length as `x` containing the results of
#' the `fun`. It resembles the format of [base::sapply()].
#' A vector, matrix, or list of the same length as `x`, containing the
#' results of the `fun`. The output format differs based on the specific
#' operation employed. Check out the documentation for the `apply`
#' operations of [`parallel::parallel`] for more information.
get_output = function(wait = FALSE) {
# Reset the output on exit.
on.exit({
Expand Down
36 changes: 30 additions & 6 deletions R/Context.R
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,17 @@ Context <- R6::R6Class("Context",
#' @param variables A character vector of variable names to export.
#'
#' @param environment An environment object from which to export the
#' variables.
#' variables. Defaults to the parent frame.
#'
#' @return This method returns void.
export = function(variables, environment) {
# If no environment is provided.
if (missing(environment)) {
# Use the caller's environment where the variables are defined.
environment <- parent.frame()
}

# Consume the backend API.
# TODO: Check that this works as expected (i.e., the environment).
private$.backend$export(variables, environment)
},

Expand All @@ -185,8 +190,7 @@ Context <- R6::R6Class("Context",
#' @description
#' Run a task on the backend akin to [parallel::parSapply()].
#'
#' @param x A vector (i.e., usually of integers) to pass to the `fun`
#' function.
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
Expand All @@ -201,6 +205,24 @@ Context <- R6::R6Class("Context",
private$.backend$sapply(x = x, fun = fun, ...)
},

#' @description
#' Run a task on the backend akin to [parallel::parLapply()].
#'
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
lapply = function(x, fun, ...) {
# Consume the backend API.
private$.backend$lapply(x = x, fun = fun, ...)
},

#' @description
#' Get the output of the task execution.
#'
Expand All @@ -217,8 +239,10 @@ Context <- R6::R6Class("Context",
#' task.
#'
#' @return
#' A vector or list of the same length as `x` containing the results of
#' the `fun`. It resembles the format of [base::sapply()].
#' A vector, matrix, or list of the same length as `x`, containing the
#' results of the `fun`. The output format differs based on the specific
#' operation employed. Check out the documentation for the `apply`
#' operations of [`parallel::parallel`] for more information.
get_output = function(...) {
# Consume the backend API.
private$.backend$get_output(...)
Expand Down
72 changes: 55 additions & 17 deletions R/ProgressTrackingContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext",
file_path <- Helper$get_option("progress_log_path")

# Create the temporary file.
creation_status <- file.create(file_path)
creation_status <- file.create(file_path, showWarnings = FALSE)

# If the file creation failed.
if (!creation_status) {
Expand Down Expand Up @@ -212,6 +212,27 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext",

# Close and remove the progress bar.
private$.bar$terminate()
},

# Template function for tracking progress of backend operations.
.execute = function(operation, x, fun) {
# Create file for logging progress.
log <- private$.make_log()

# Clear the temporary file on function exit.
on.exit({
# Remove.
unlink(log)
})

# Decorate the task function.
fun <- private$.decorate(task = fun, log = log)

# Evaluate the operation now referencing the decorated task.
eval(operation)

# Show the progress bar and block the main process.
private$.show_progress(total = length(x), log = log)
}
),

Expand Down Expand Up @@ -265,8 +286,7 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext",
#' Run a task on the backend akin to [parallel::parSapply()], but with a
#' progress bar.
#'
#' @param x A vector (i.e., usually of integers) to pass to the `fun`
#' function.
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
Expand All @@ -277,23 +297,41 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext",
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
sapply = function(x, fun, ...) {
# Create file for logging progress.
log <- private$.make_log()

# Clear the temporary file on function exit.
on.exit({
# Remove.
unlink(log)
})
# Prepare the backend operation with early evaluated `...`.
operation <- bquote(
do.call(
super$sapply, c(list(x = x, fun = fun), .(list(...)))
)
)

# Decorate task function.
task <- private$.decorate(task = fun, log = log)
# Execute the task using the desired backend operation.
private$.execute(operation = operation, x = x, fun = fun)
},

# Execute the decorated task.
super$sapply(x = x, fun = task, ...)
#' @description
#' Run a task on the backend akin to [parallel::parLapply()], but with a
#' progress bar.
#'
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
lapply = function(x, fun, ...) {
# Prepare the backend operation with early evaluated `...`.
operation <- bquote(
do.call(
super$lapply, c(list(x = x, fun = fun), .(list(...)))
)
)

# Show the progress bar and block the main process.
private$.show_progress(total = length(x), log = log)
# Execute the task via the `lapply` backend operation.
private$.execute(operation = operation, x = x, fun = fun)
}
),

Expand Down
32 changes: 25 additions & 7 deletions R/Service.R
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ Service <- R6::R6Class("Service",
#' @description
#' Run a task on the backend akin to [parallel::parSapply()].
#'
#' @param x A vector (i.e., usually of integers) to pass to the `fun`
#' function.
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
Expand All @@ -116,22 +115,41 @@ Service <- R6::R6Class("Service",
Exception$method_not_implemented()
},

#' @description
#' Run a task on the backend akin to [parallel::parLapply()].
#'
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
lapply = function(x, fun, ...) {
Exception$method_not_implemented()
},

#' @description
#' Get the output of the task execution.
#'
#' @param ... Additional optional arguments that may be used by concrete
#' implementations.
#'
#' @details
#' This method fetches the output of the task execution after calling
#' the `sapply()` method. It returns the output and immediately removes
#' it from the backend. Therefore, subsequent calls to this method are
#' not advised. This method should be called after the execution of a
#' task.
#'
#' @param ... Additional optional arguments that may be used by concrete
#' implementations.
#'
#' @return
#' A vector or list of the same length as `x` containing the results of
#' the `fun`. It resembles the format of [base::sapply()].
#' A vector, matrix, or list of the same length as `x`, containing the
#' results of the `fun`. The output format differs based on the specific
#' operation employed. Check out the documentation for the `apply`
#' operations of [`parallel::parallel`] for more information.
get_output = function(...) {
Exception$method_not_implemented()
}
Expand Down
Loading