Skip to content

Commit

Permalink
Merge pull request #31 from mihaiconstantin/feat/lapply
Browse files Browse the repository at this point in the history
Add support for the `lapply` backend operation. It partially addresses #30.
  • Loading branch information
mihaiconstantin committed May 3, 2023
2 parents 3fdba7e + e02619e commit ab8d896
Show file tree
Hide file tree
Showing 43 changed files with 1,561 additions and 489 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ jobs:
- name: Test coverage
env:
PROCESSX_NOTIFY_OLD_SIGCHLD: true
run: covr::codecov(line_exclusions = list("R/parabar-package.R"))
run: covr::codecov(line_exclusions = list("R/parabar-package.R", "R/UserApiConsumer.R" = c(59:77)))
shell: Rscript {0}
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Collate:
'ProgressTrackingContext.R'
'ContextFactory.R'
'Warning.R'
'UserApiConsumer.R'
'exports.r'
'logo.R'
'parabar-package.R'
Expand Down
4 changes: 3 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ export(Service)
export(Specification)
export(SyncBackend)
export(TaskState)
export(UserApiConsumer)
export(Warning)
export(clear)
export(configure_bar)
export(evaluate)
export(export)
export(get_option)
export(make_logo)
export(par_lapply)
export(par_sapply)
export(peek)
export(set_default_options)
Expand All @@ -41,7 +43,7 @@ importFrom(parallel,clusterEvalQ)
importFrom(parallel,clusterExport)
importFrom(parallel,detectCores)
importFrom(parallel,makeCluster)
importFrom(parallel,parApply)
importFrom(parallel,parLapply)
importFrom(parallel,parSapply)
importFrom(parallel,stopCluster)
importFrom(progress,progress_bar)
30 changes: 30 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,33 @@
# Development

## Added
- Add optional arguments to the `get_output` operation of `SyncBackend` for
consistency.
- Add more tests to improve coverage.
- Add `par_lapply` function to the user `API`. The `par_lapply` function can be
used to run tasks in parallel akin to `parallel::parLapply`.
- Add `UserApiConsumer` `R6` class that provides an opinionated wrapper around
the developer `API` of the `parabar` package. All parallel operations (e.g.,
`par_sapply` and `par_lapply`) follow more or less the same pattern. The
`UserApiConsumer` encapsulates this pattern and makes it easier to extend
`parabar` with new parallel functions (e.g., `par_apply`) while avoiding code
duplication. The `UserApiConsumer` class can also be used as a standalone
class for parallel operations, however, its primary purpose is to be used by
the parallel task execution functions in the user `API`.

## Changed
- Disable warnings for `file.create` in `ProgressTrackingContext` class. This
warning is superfluous since the code handles creation failures.
- Refactor test helpers to avoid code duplication.
- Update `par_sapply` to use the `UserApiConsumer` class.
- Update the developer `API` `R6` classes to implement the `lapply` parallel
operation.

## Fixed
- Fix the `export` operation in the `SyncBackend` and `Context` classes to
fallback to the parent environment if the argument `environment` is not
provided.

# parabar 1.0.3

## Fixed
Expand Down
47 changes: 42 additions & 5 deletions R/AsyncBackend.R
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ AsyncBackend <- R6::R6Class("AsyncBackend",
}, args = list(capture))
},

# Run tasks on the cluster in the session asynchronously.
# Run tasks asynchronously via the cluster in the session.
.sapply = function(x, fun, ...) {
# Capture the `...`.
dots <- list(...)
Expand All @@ -225,6 +225,21 @@ AsyncBackend <- R6::R6Class("AsyncBackend",
}, args = list(x, fun, dots))
},

# Run tasks asynchronously via the cluster in the session.
.lapply = function(x, fun, ...) {
# Capture the `...`.
dots <- list(...)

# Perform the evaluation from the `R` session.
private$.cluster$call(function(x, fun, dots) {
# Run the task.
output <- do.call(parallel::parLapply, c(list(cluster, x, fun), dots))

# Return to the session.
return(output)
}, args = list(x, fun, dots))
},

# Clear the current output on the backend.
.clear_output = function() {
# Clear output.
Expand Down Expand Up @@ -426,8 +441,7 @@ AsyncBackend <- R6::R6Class("AsyncBackend",
#' @description
#' Run a task on the backend akin to [parallel::parSapply()].
#'
#' @param x A vector (i.e., usually of integers) to pass to the `fun`
#' function.
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
Expand All @@ -445,6 +459,27 @@ AsyncBackend <- R6::R6Class("AsyncBackend",
private$.sapply(x, fun, ...)
},

#' @description
#' Run a task on the backend akin to [parallel::parLapply()].
#'
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
lapply = function(x, fun, ...) {
# Throw if backend is busy.
private$.throw_if_backend_is_busy()

# Deploy the task asynchronously.
private$.lapply(x, fun, ...)
},

#' @description
#' Get the output of the task execution.
#'
Expand All @@ -466,8 +501,10 @@ AsyncBackend <- R6::R6Class("AsyncBackend",
#' task is still running.
#'
#' @return
#' A vector or list of the same length as `x` containing the results of
#' the `fun`. It resembles the format of [base::sapply()].
#' A vector, matrix, or list of the same length as `x`, containing the
#' results of the `fun`. The output format differs based on the specific
#' operation employed. Check out the documentation for the `apply`
#' operations of [`parallel::parallel`] for more information.
get_output = function(wait = FALSE) {
# Reset the output on exit.
on.exit({
Expand Down
36 changes: 30 additions & 6 deletions R/Context.R
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,17 @@ Context <- R6::R6Class("Context",
#' @param variables A character vector of variable names to export.
#'
#' @param environment An environment object from which to export the
#' variables.
#' variables. Defaults to the parent frame.
#'
#' @return This method returns void.
export = function(variables, environment) {
# If no environment is provided.
if (missing(environment)) {
# Use the caller's environment where the variables are defined.
environment <- parent.frame()
}

# Consume the backend API.
# TODO: Check that this works as expected (i.e., the environment).
private$.backend$export(variables, environment)
},

Expand All @@ -185,8 +190,7 @@ Context <- R6::R6Class("Context",
#' @description
#' Run a task on the backend akin to [parallel::parSapply()].
#'
#' @param x A vector (i.e., usually of integers) to pass to the `fun`
#' function.
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
Expand All @@ -201,6 +205,24 @@ Context <- R6::R6Class("Context",
private$.backend$sapply(x = x, fun = fun, ...)
},

#' @description
#' Run a task on the backend akin to [parallel::parLapply()].
#'
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
lapply = function(x, fun, ...) {
# Consume the backend API.
private$.backend$lapply(x = x, fun = fun, ...)
},

#' @description
#' Get the output of the task execution.
#'
Expand All @@ -217,8 +239,10 @@ Context <- R6::R6Class("Context",
#' task.
#'
#' @return
#' A vector or list of the same length as `x` containing the results of
#' the `fun`. It resembles the format of [base::sapply()].
#' A vector, matrix, or list of the same length as `x`, containing the
#' results of the `fun`. The output format differs based on the specific
#' operation employed. Check out the documentation for the `apply`
#' operations of [`parallel::parallel`] for more information.
get_output = function(...) {
# Consume the backend API.
private$.backend$get_output(...)
Expand Down
72 changes: 55 additions & 17 deletions R/ProgressTrackingContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext",
file_path <- Helper$get_option("progress_log_path")

# Create the temporary file.
creation_status <- file.create(file_path)
creation_status <- file.create(file_path, showWarnings = FALSE)

# If the file creation failed.
if (!creation_status) {
Expand Down Expand Up @@ -212,6 +212,27 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext",

# Close and remove the progress bar.
private$.bar$terminate()
},

# Template function for tracking progress of backend operations.
.execute = function(operation, x, fun) {
# Create file for logging progress.
log <- private$.make_log()

# Clear the temporary file on function exit.
on.exit({
# Remove.
unlink(log)
})

# Decorate the task function.
fun <- private$.decorate(task = fun, log = log)

# Evaluate the operation now referencing the decorated task.
eval(operation)

# Show the progress bar and block the main process.
private$.show_progress(total = length(x), log = log)
}
),

Expand Down Expand Up @@ -265,8 +286,7 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext",
#' Run a task on the backend akin to [parallel::parSapply()], but with a
#' progress bar.
#'
#' @param x A vector (i.e., usually of integers) to pass to the `fun`
#' function.
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
Expand All @@ -277,23 +297,41 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext",
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
sapply = function(x, fun, ...) {
# Create file for logging progress.
log <- private$.make_log()

# Clear the temporary file on function exit.
on.exit({
# Remove.
unlink(log)
})
# Prepare the backend operation with early evaluated `...`.
operation <- bquote(
do.call(
super$sapply, c(list(x = x, fun = fun), .(list(...)))
)
)

# Decorate task function.
task <- private$.decorate(task = fun, log = log)
# Execute the task using the desired backend operation.
private$.execute(operation = operation, x = x, fun = fun)
},

# Execute the decorated task.
super$sapply(x = x, fun = task, ...)
#' @description
#' Run a task on the backend akin to [parallel::parLapply()], but with a
#' progress bar.
#'
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
lapply = function(x, fun, ...) {
# Prepare the backend operation with early evaluated `...`.
operation <- bquote(
do.call(
super$lapply, c(list(x = x, fun = fun), .(list(...)))
)
)

# Show the progress bar and block the main process.
private$.show_progress(total = length(x), log = log)
# Execute the task via the `lapply` backend operation.
private$.execute(operation = operation, x = x, fun = fun)
}
),

Expand Down
32 changes: 25 additions & 7 deletions R/Service.R
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ Service <- R6::R6Class("Service",
#' @description
#' Run a task on the backend akin to [parallel::parSapply()].
#'
#' @param x A vector (i.e., usually of integers) to pass to the `fun`
#' function.
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
Expand All @@ -116,22 +115,41 @@ Service <- R6::R6Class("Service",
Exception$method_not_implemented()
},

#' @description
#' Run a task on the backend akin to [parallel::parLapply()].
#'
#' @param x An atomic vector or list to pass to the `fun` function.
#'
#' @param fun A function to apply to each element of `x`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
lapply = function(x, fun, ...) {
Exception$method_not_implemented()
},

#' @description
#' Get the output of the task execution.
#'
#' @param ... Additional optional arguments that may be used by concrete
#' implementations.
#'
#' @details
#' This method fetches the output of the task execution after calling
#' the `sapply()` method. It returns the output and immediately removes
#' it from the backend. Therefore, subsequent calls to this method are
#' not advised. This method should be called after the execution of a
#' task.
#'
#' @param ... Additional optional arguments that may be used by concrete
#' implementations.
#'
#' @return
#' A vector or list of the same length as `x` containing the results of
#' the `fun`. It resembles the format of [base::sapply()].
#' A vector, matrix, or list of the same length as `x`, containing the
#' results of the `fun`. The output format differs based on the specific
#' operation employed. Check out the documentation for the `apply`
#' operations of [`parallel::parallel`] for more information.
get_output = function(...) {
Exception$method_not_implemented()
}
Expand Down

0 comments on commit ab8d896

Please sign in to comment.