Skip to content

Commit

Permalink
Merge pull request #36 from mihaiconstantin/feat/apply
Browse files Browse the repository at this point in the history
Add support for the `apply` backend operation. Addresses #30.
  • Loading branch information
mihaiconstantin committed May 7, 2023
2 parents 6880ebc + d0da5df commit 7b43457
Show file tree
Hide file tree
Showing 44 changed files with 968 additions and 35 deletions.
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export(evaluate)
export(export)
export(get_option)
export(make_logo)
export(par_apply)
export(par_lapply)
export(par_sapply)
export(peek)
Expand All @@ -43,6 +44,7 @@ importFrom(parallel,clusterEvalQ)
importFrom(parallel,clusterExport)
importFrom(parallel,detectCores)
importFrom(parallel,makeCluster)
importFrom(parallel,parApply)
importFrom(parallel,parLapply)
importFrom(parallel,parSapply)
importFrom(parallel,stopCluster)
Expand Down
13 changes: 11 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
# Development

## Added
- Update implementations of `Service$apply` operation for `Backend` classes to
validate the provided `margin` argument before running the parallel operation.
- Add helper `Helper$check_array_margins` to validate the margins provided to
the `Service$apply` operation.
- Add exception `Exception$array_margins_not_compatible` for using improper
margins in the `Service$apply` operation.
- Add exception `Exception$primitive_as_task_not_allowed` for trying to decorate
primitive functions with progress tracking in the `ProgressTrackingContext`
class.
- Add helper `Helper$is_of_class` to check if an object is of a given class.
- Add optional arguments to the `get_output` operation of `SyncBackend` for
consistency.
- Add more tests to improve coverage.
- Add `par_lapply` function to the user `API`. The `par_lapply` function can be
used to run tasks in parallel akin to `parallel::parLapply`.
- Add implementation for `Service$lapply` and `Service$apply` operations for all
classes that implement the `Service` interface.
- Add `par_lapply` and `par_apply` functions to the user `API`. These functions
can be used to run tasks in parallel akin to `parallel::parLapply` and
`parallel::parApply`, respectively.
- Add `UserApiConsumer` `R6` class that provides an opinionated wrapper around
the developer `API` of the `parabar` package. All parallel operations (e.g.,
`par_sapply` and `par_lapply`) follow more or less the same pattern. The
Expand Down
48 changes: 48 additions & 0 deletions R/AsyncBackend.R
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,21 @@ AsyncBackend <- R6::R6Class("AsyncBackend",
}, args = list(x, fun, dots))
},

# Run tasks asynchronously via the cluster in the session.
.apply = function(x, margin, fun, ...) {
# Capture the `...`.
dots <- list(...)

# Perform the evaluation from the `R` session.
private$.cluster$call(function(x, margin, fun, dots) {
# Run the task.
output <- do.call(parallel::parApply, c(list(cluster, x, margin, fun), dots))

# Return to the session.
return(output)
}, args = list(x, margin, fun, dots))
},

# Clear the current output on the backend.
.clear_output = function() {
# Clear output.
Expand Down Expand Up @@ -480,6 +495,39 @@ AsyncBackend <- R6::R6Class("AsyncBackend",
private$.lapply(x, fun, ...)
},


#' @description
#' Run a task on the backend akin to [parallel::parApply()].
#'
#' @param x An array to pass to the `fun` function.
#'
#' @param margin A numeric vector indicating the dimensions of `x` the
#' `fun` function should be applied over. For example, for a matrix,
#' `margin = 1` indicates applying `fun` rows-wise, `margin = 2`
#' indicates applying `fun` columns-wise, and `margin = c(1, 2)`
#' indicates applying `fun` element-wise. Named dimensions are also
#' possible depending on `x`. See [parallel::parApply()] and
#' [base::apply()] for more details.
#'
#' @param fun A function to apply to `x` according to the `margin`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
apply = function(x, margin, fun, ...) {
# Throw if backend is busy.
private$.throw_if_backend_is_busy()

# Validate provided margins.
Helper$check_array_margins(margin, dim(x))

# Deploy the task asynchronously.
private$.apply(x, margin, fun, ...)
},

#' @description
#' Get the output of the task execution.
#'
Expand Down
26 changes: 26 additions & 0 deletions R/Context.R
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,32 @@ Context <- R6::R6Class("Context",
private$.backend$lapply(x = x, fun = fun, ...)
},

#' @description
#' Run a task on the backend akin to [parallel::parApply()].
#'
#' @param x An array to pass to the `fun` function.
#'
#' @param margin A numeric vector indicating the dimensions of `x` the
#' `fun` function should be applied over. For example, for a matrix,
#' `margin = 1` indicates applying `fun` rows-wise, `margin = 2`
#' indicates applying `fun` columns-wise, and `margin = c(1, 2)`
#' indicates applying `fun` element-wise. Named dimensions are also
#' possible depending on `x`. See [parallel::parApply()] and
#' [base::apply()] for more details.
#'
#' @param fun A function to apply to `x` according to the `margin`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
apply = function(x, margin, fun, ...) {
# Consume the backend API.
private$.backend$apply(x = x, margin = margin, fun = fun, ...)
},

#' @description
#' Get the output of the task execution.
#'
Expand Down
17 changes: 17 additions & 0 deletions R/Exception.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#' \item{\code{Exception$type_not_assignable()}}{Exception for when providing incorrect object types.}
#' \item{\code{Exception$unknown_package_option()}}{Exception for when requesting unknown package options.}
#' \item{\code{Exception$primitive_as_task_not_allowed()}}{Exception for when decorating primitive functions with progress tracking.}
#' \item{\code{Exception$array_margins_not_compatible(actual, allowed)}}{Exception for using improper margins in the `Service$apply` operation.}
#' }
#'
#' @export
Expand Down Expand Up @@ -127,3 +128,19 @@ Exception$primitive_as_task_not_allowed <- function() {
stop(message, call. = FALSE)
}

# Exception for providing incompatible margins in the `apply` operation.
Exception$array_margins_not_compatible <- function(margins, dimensions) {
# Convert the margins to character.
margins <- paste(margins, collapse = ", ")

# Convert the dimensions to character.
dimensions <- paste(dimensions, collapse = ", ")

# Construct exception message.
message = paste0(
"Margins {", margins, "} not compatible with array dimensions {", dimensions, "}."
)

# Throw the error.
stop(message, call. = FALSE)
}
19 changes: 19 additions & 0 deletions R/Helper.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#' \item{\code{Helper$get_option()}}{Get package option, or corresponding default value.}
#' \item{\code{Helper$set_option()}}{Set package option.}
#' \item{\code{Helper$check_object_type()}}{Check the type of a given object.}
#' \item{\code{Helper$check_array_margins(margins, dimensions)}}{Helper to check array margins for the `Service$apply` operation.}
#' }
#'
#' @export
Expand Down Expand Up @@ -74,3 +75,21 @@ Helper$check_object_type <- function(object, expected_type) {
Exception$type_not_assignable(type, expected_type)
}
}

# Helper for checking the array margins provided for the `apply` operation.
Helper$check_array_margins <- function(margins, dimensions) {
# Conditions to ensure the margins are valid.
violations <- c(
# Ensure all margins are unique.
duplicated(margins),

# Ensure all margins are within the array dimensions.
margins > length(dimensions)
)

# If any violations are found.
if (any(violations)) {
# Throw an error.
Exception$array_margins_not_compatible(margins, dimensions)
}
}
36 changes: 36 additions & 0 deletions R/ProgressTrackingContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,42 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext",

# Execute the task via the `lapply` backend operation.
private$.execute(operation = operation, fun = fun, total = length(x))
},

#' @description
#' Run a task on the backend akin to [parallel::parApply()].
#'
#' @param x An array to pass to the `fun` function.
#'
#' @param margin A numeric vector indicating the dimensions of `x` the
#' `fun` function should be applied over. For example, for a matrix,
#' `margin = 1` indicates applying `fun` rows-wise, `margin = 2`
#' indicates applying `fun` columns-wise, and `margin = c(1, 2)`
#' indicates applying `fun` element-wise. Named dimensions are also
#' possible depending on `x`. See [parallel::parApply()] and
#' [base::apply()] for more details.
#'
#' @param fun A function to apply to `x` according to the `margin`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
apply = function(x, margin, fun, ...) {
# Determine the number of task executions.
total <- prod(dim(x)[margin])

# Prepare the backend operation with early evaluated `...`.
operation <- bquote(
do.call(
super$apply, c(list(x = .(x), margin = .(margin), fun = fun), .(list(...)))
)
)

# Execute the task via the `lapply` backend operation.
private$.execute(operation = operation, fun = fun, total = total)
}
),

Expand Down
25 changes: 25 additions & 0 deletions R/Service.R
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,31 @@ Service <- R6::R6Class("Service",
Exception$method_not_implemented()
},

#' @description
#' Run a task on the backend akin to [parallel::parApply()].
#'
#' @param x An array to pass to the `fun` function.
#'
#' @param margin A numeric vector indicating the dimensions of `x` the
#' `fun` function should be applied over. For example, for a matrix,
#' `margin = 1` indicates applying `fun` rows-wise, `margin = 2`
#' indicates applying `fun` columns-wise, and `margin = c(1, 2)`
#' indicates applying `fun` element-wise. Named dimensions are also
#' possible depending on `x`. See [parallel::parApply()] and
#' [base::apply()] for more details.
#'
#' @param fun A function to apply to `x` according to the `margin`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
apply = function(x, margin, fun, ...) {
Exception$method_not_implemented()
},

#' @description
#' Get the output of the task execution.
#'
Expand Down
35 changes: 35 additions & 0 deletions R/SyncBackend.R
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ SyncBackend <- R6::R6Class("SyncBackend",
parallel::parLapply(private$.cluster, X = x, fun = fun, ...)
},

# A wrapper around `parallel:parApply` to run tasks on the cluster.
.apply = function(x, margin, fun, ...) {
# Run the task and return the results.
parallel::parApply(private$.cluster, X = x, MARGIN = margin, FUN = fun, ...)
},

# Clear the current output on the backend.
.clear_output = function() {
# Clear output.
Expand Down Expand Up @@ -310,6 +316,35 @@ SyncBackend <- R6::R6Class("SyncBackend",
private$.output = private$.lapply(x, fun, ...)
},

#' @description
#' Run a task on the backend akin to [parallel::parApply()].
#'
#' @param x An array to pass to the `fun` function.
#'
#' @param margin A numeric vector indicating the dimensions of `x` the
#' `fun` function should be applied over. For example, for a matrix,
#' `margin = 1` indicates applying `fun` rows-wise, `margin = 2`
#' indicates applying `fun` columns-wise, and `margin = c(1, 2)`
#' indicates applying `fun` element-wise. Named dimensions are also
#' possible depending on `x`. See [parallel::parApply()] and
#' [base::apply()] for more details.
#'
#' @param fun A function to apply to `x` according to the `margin`.
#'
#' @param ... Additional arguments to pass to the `fun` function.
#'
#' @return
#' This method returns void. The output of the task execution must be
#' stored in the private field `.output` on the [`parabar::Backend`]
#' abstract class, and is accessible via the `get_output()` method.
apply = function(x, margin, fun, ...) {
# Validate provided margins.
Helper$check_array_margins(margin, dim(x))

# Deploy the task synchronously.
private$.output = private$.apply(x, margin, fun, ...)
},

#' @description
#' Get the output of the task execution.
#'
Expand Down
42 changes: 42 additions & 0 deletions R/UserApiConsumer.R
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,48 @@ UserApiConsumer <- R6::R6Class("UserApiConsumer",
)
)

# Execute the `lapply` operation accordingly and return the results.
private$.execute(backend, parallel, sequential)
},

#' @description
#' Execute a task in parallel akin to [parallel::parApply()].
#'
#' @param backend An object of class [`parabar::Backend`] as returned by
#' the [parabar::start_backend()] function. It can also be `NULL` to run
#' the task sequentially via [base::apply()].
#'
#' @param x An array to pass to the `fun` function.
#'
#' @param margin A numeric vector indicating the dimensions of `x` the
#' `fun` function should be applied over. For example, for a matrix,
#' `margin = 1` indicates applying `fun` rows-wise, `margin = 2`
#' indicates applying `fun` columns-wise, and `margin = c(1, 2)`
#' indicates applying `fun` element-wise. Named dimensions are also
#' possible depending on `x`. See [parallel::parApply()] and
#' [base::apply()] for more details.
#'
#' @param fun A function to apply to `x` according to the `margin`.
#'
#' @return
#' The dimensions of the output vary according to the `margin` argument.
#' Consult the documentation of [base::apply()] for a detailed
#' explanation on how the output is structured.
apply = function(backend, x, margin, fun, ...) {
# Prepare the sequential operation.
sequential <- bquote(
do.call(
base::apply, c(list(X = .(x), MARGIN = .(margin), FUN = .(fun)), .(list(...)))
)
)

# Prepare the parallel operation.
parallel <- bquote(
do.call(
context$apply, c(list(x = .(x), margin = .(margin), fun = .(fun)), .(list(...)))
)
)

# Execute the `lapply` operation accordingly and return the results.
private$.execute(backend, parallel, sequential)
}
Expand Down
12 changes: 11 additions & 1 deletion R/exports.r
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ par_lapply <- function(backend = NULL, x, fun, ...) {
# Create an user API consumer.
consumer <- UserApiConsumer$new()

# Execute the task using the `sapply` parallel operation.
# Execute the task using the `lapply` parallel operation.
consumer$lapply(backend = backend, x = x, fun = fun, ...)
}

#' @template par-apply
#' @export
par_apply <- function(backend = NULL, x, margin, fun, ...) {
# Create an user API consumer.
consumer <- UserApiConsumer$new()

# Execute the task using the `apply` parallel operation.
consumer$apply(backend = backend, x = x, margin = margin, fun = fun, ...)
}
2 changes: 1 addition & 1 deletion R/parabar-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# Imports.
#' @importFrom parallel detectCores makeCluster stopCluster clusterExport
#' @importFrom parallel clusterEvalQ parSapply parLapply clusterCall
#' @importFrom parallel clusterEvalQ parSapply parLapply parApply clusterCall
#' @importFrom R6 R6Class
#' @importFrom progress progress_bar
#' @importFrom callr r_session
Expand Down

0 comments on commit 7b43457

Please sign in to comment.