Skip to content

Commit

Permalink
Merge pull request #49 from mihaiconstantin/mihaiconstantin/issue44
Browse files Browse the repository at this point in the history
Fix hanging progress bar on task errors in `ProgressTrackingContext`. Closes #44.
  • Loading branch information
mihaiconstantin committed Feb 5, 2024
2 parents 8bbeaab + e00c163 commit 22ff294
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 1 deletion.
6 changes: 6 additions & 0 deletions R/Options.R
Expand Up @@ -84,6 +84,12 @@ Options <- R6::R6Class("Options",
#' records. The default value is `0.001`.
progress_timeout = 0.001,

#' @field progress_wait A numeric value indicating the approximate
#' duration (i.e., in seconds) to wait between progress bar updates
#' before checking if the task has finished (i.e., possibly with an
#' error). The default value is `0.1`.
progress_wait = 0.1,

#' @field progress_bar_type A character string indicating the default
#' bar type to use with compatible backends. Possible values are
#' `"modern"` (the default) or `"basic"`.
Expand Down
29 changes: 28 additions & 1 deletion R/ProgressTrackingContext.R
Expand Up @@ -199,6 +199,9 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext",
# Get the checking delay from options.
timeout <- Helper$get_option("progress_timeout")

# Get the waiting time between progress bar updates from options.
wait <- Helper$get_option("progress_wait")

# Initialize the bar at the initial starting point.
do.call(
private$.bar$create,
Expand All @@ -213,7 +216,13 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext",
# Counter for the number of tasks processed.
tasks_processed <- 0

# While there are still tasks1 to be processed.
# Counter for the loop cycles without progress bar updates.
cycles_without_tasks_processed <- 0

# Maximum allowed loop cycles without progress bar updates.
allowed_cycles_without_tasks_processed <- ceiling(wait / timeout)

# While there are still tasks to be processed.
while (tasks_processed < total) {
# Get the current number of tasks processed.
current_tasks_processed <- length(readLines(log, warn = FALSE))
Expand All @@ -223,8 +232,26 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext",
# Update the number of tasks processed.
tasks_processed <- current_tasks_processed

# Reset the cycles without progress bar updates.
cycles_without_tasks_processed <- 0

# Update the progress bar to completed state.
private$.bar$update(tasks_processed)

# Jump to next iteration.
next
}

# Otherwise, record a new cycle without progress bar update.
cycles_without_tasks_processed <- cycles_without_tasks_processed + 1

# If the number of cycles without progress bar updates exceeded the allowed number.
if (cycles_without_tasks_processed > allowed_cycles_without_tasks_processed &&
# And the session has results ready to be read (i.e., the task is completed).
private$.backend$task_state$task_is_completed
) {
# Break the loop to interrupt the progress bar updating.
break
}

# Wait a bit.
Expand Down
5 changes: 5 additions & 0 deletions man/Options.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

132 changes: 132 additions & 0 deletions tests/testthat/helpers.R
Expand Up @@ -628,6 +628,138 @@ tests_set_for_progress_tracking_context <- function(context, task) {
tests_set_for_task_execution_with_progress_tracking(operation, context, expected_output)
}

# Set of tests for executing tasks that throw errors in a progress tracking context.
tests_set_for_task_execution_with_progress_tracking_and_error <- function(operation, context, expected_error) {
# Clear the progress output on exit.
on.exit({
# Clear the output.
context$progress_bar_output <- NULL
})

# Create a bar factory.
bar_factory <- BarFactory$new()

# Get a basic bar instance.
bar <- bar_factory$get("basic")

# Register the bar with the context object.
context$set_bar(bar)

# Configure the bar.
context$configure_bar(
style = 3
)

# Run the task that throws an error in parallel.
eval(operation)

# Expect that the task interrupted the progress bar with an error message.
expect_error(context$get_output(wait = TRUE), expected_error)

# Get a modern bar instance.
bar <- bar_factory$get("modern")

# Register the bar with the same context object.
context$set_bar(bar)

# Configure the bar.
context$configure_bar(
show_after = 0,
format = ":bar| :percent",
clear = FALSE,
force = TRUE
)

# Run the task in parallel.
eval(operation)

# Expect that the task interrupted the progress bar with an error message.
expect_error(context$get_output(wait = TRUE), expected_error)
}

# Set of tests for progress bar interruptions in a tracking context.
tests_set_for_progress_tracking_context_with_error <- function(context) {
# Check the type.
Helper$check_object_type(context, "ProgressTrackingContextTester")

# Clean-up.
on.exit({
# Set default values for package options.
set_default_options()
})

# Reduce waiting time between progress bar updates.
set_option("progress_wait", 0.01)

# Define the task.
task <- function(x, error_x = 1, sleep = 0) {
# Sleep a bit or not.
Sys.sleep(sleep)

# Randomly sample when to throw an error.
if(any(x == error_x)) {
# Throw an error.
stop("Intentional task error.")
}

# Compute something.
output <- x + 1

# Return the result.
return(output)
}

# Construct the expected error message.
expected_error <- as_text(task(x = 1, error_x = 1))

# Select task arguments.
x <- sample(1:100, 100)
error_x <- sample(length(x), 1)
sleep <- sample(c(0, 0.001, 0.002), 1)

# Create the `sapply` operation.
operation <- bquote(context$sapply(.(x), .(task), error_x = .(error_x), sleep = .(sleep)))

# Tests for the `sapply` operation in a progress tracking context with error in the task.
tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error)

# Create the `lapply` operation.
operation <- bquote(context$lapply(.(x), .(task), error_x = .(error_x), sleep = .(sleep)))

# Tests for the `lapply` operation in a progress tracking context with error in the task.
tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error)

# Redefine `x` as a matrix for the `apply` operation.
x <- matrix(rnorm(100^2), nrow = 100, ncol = 100)

# Sample new error `x`.
error_x <- sample(x, 1)

# Define the `apply` operation over rows.
operation <- bquote(context$apply(.(x), 1, .(task), error_x = .(error_x), sleep = .(sleep)))

# Tests for the `apply` operation over rows in a progress tracking context with error in the task.
tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error)

# Define the `apply` operation over columns.
operation <- bquote(context$apply(.(x), 2, .(task), error_x = .(error_x), sleep = .(sleep)))

# Tests for the `apply` operation over columns in a progress tracking context with error in the task.
tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error)

# Redefine a smaller `x` matrix for the `apply` operation applied element-wise.
x <- matrix(rnorm(10^2), nrow = 10, ncol = 10)

# Sample new error `x`.
error_x <- sample(x, 1)

# Define the `apply` operation element-wise.
operation <- bquote(context$apply(.(x), c(1, 2), .(task), error_x = .(error_x), sleep = .(sleep)))

# Tests for the `apply` operation over all elements in a progress tracking context with error in the task.
tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error)
}

#endregion


Expand Down
33 changes: 33 additions & 0 deletions tests/testthat/test-progress-tracking-context.R
Expand Up @@ -214,3 +214,36 @@ test_that("'ProgressTrackingContext' executes the task in parallel correctly", {
# Stop the backend.
context$stop()
})


test_that("'ProgressTrackingContext' interrupts progress tracking on task error correctly", {
# Create a specification.
specification <- Specification$new()

# Set the number of cores.
specification$set_cores(cores = 2)

# Determine the cluster type.
cluster_type <- pick_cluster_type(specification$types)

# Set the cluster type.
specification$set_type(type = cluster_type)

# Create an asynchronous backend object.
backend <- AsyncBackend$new()

# Create a progress tracking context object.
context <- ProgressTrackingContextTester$new()

# Register the backend with the context object.
context$set_backend(backend)

# Start the backend.
context$start(specification)

# Expect correctly interrupted progress bars.
tests_set_for_progress_tracking_context_with_error(context)

# Stop the backend.
context$stop()
})

0 comments on commit 22ff294

Please sign in to comment.