diff --git a/R/Options.R b/R/Options.R index c0d802d..c185f9c 100644 --- a/R/Options.R +++ b/R/Options.R @@ -84,6 +84,12 @@ Options <- R6::R6Class("Options", #' records. The default value is `0.001`. progress_timeout = 0.001, + #' @field progress_wait A numeric value indicating the approximate + #' duration (i.e., in seconds) to wait between progress bar updates + #' before checking if the task has finished (i.e., possibly with an + #' error). The default value is `0.1`. + progress_wait = 0.1, + #' @field progress_bar_type A character string indicating the default #' bar type to use with compatible backends. Possible values are #' `"modern"` (the default) or `"basic"`. diff --git a/R/ProgressTrackingContext.R b/R/ProgressTrackingContext.R index cdc000b..3b7dda4 100644 --- a/R/ProgressTrackingContext.R +++ b/R/ProgressTrackingContext.R @@ -199,6 +199,9 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext", # Get the checking delay from options. timeout <- Helper$get_option("progress_timeout") + # Get the waiting time between progress bar updates from options. + wait <- Helper$get_option("progress_wait") + # Initialize the bar at the initial starting point. do.call( private$.bar$create, @@ -213,7 +216,13 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext", # Counter for the number of tasks processed. tasks_processed <- 0 - # While there are still tasks1 to be processed. + # Counter for the loop cycles without progress bar updates. + cycles_without_tasks_processed <- 0 + + # Maximum allowed loop cycles without progress bar updates. + allowed_cycles_without_tasks_processed <- ceiling(wait / timeout) + + # While there are still tasks to be processed. while (tasks_processed < total) { # Get the current number of tasks processed. current_tasks_processed <- length(readLines(log, warn = FALSE)) @@ -223,8 +232,26 @@ ProgressTrackingContext <- R6::R6Class("ProgressTrackingContext", # Update the number of tasks processed. tasks_processed <- current_tasks_processed + # Reset the cycles without progress bar updates. + cycles_without_tasks_processed <- 0 + # Update the progress bar to completed state. private$.bar$update(tasks_processed) + + # Jump to next iteration. + next + } + + # Otherwise, record a new cycle without progress bar update. + cycles_without_tasks_processed <- cycles_without_tasks_processed + 1 + + # If the number of cycles without progress bar updates exceeded the allowed number. + if (cycles_without_tasks_processed > allowed_cycles_without_tasks_processed && + # And the session has results ready to be read (i.e., the task is completed). + private$.backend$task_state$task_is_completed + ) { + # Break the loop to interrupt the progress bar updating. + break } # Wait a bit. diff --git a/man/Options.Rd b/man/Options.Rd index 7abd07b..4801ad9 100644 --- a/man/Options.Rd +++ b/man/Options.Rd @@ -80,6 +80,11 @@ tracking should be enabled (i.e., \code{TRUE}) or disabled (i.e., in seconds) between subsequent checks of the log file for new progress records. The default value is \code{0.001}.} +\item{\code{progress_wait}}{A numeric value indicating the approximate +duration (i.e., in seconds) to wait between progress bar updates +before checking if the task has finished (i.e., possibly with an +error). The default value is \code{0.1}.} + \item{\code{progress_bar_type}}{A character string indicating the default bar type to use with compatible backends. Possible values are \code{"modern"} (the default) or \code{"basic"}.} diff --git a/tests/testthat/helpers.R b/tests/testthat/helpers.R index 338ed49..82143d8 100644 --- a/tests/testthat/helpers.R +++ b/tests/testthat/helpers.R @@ -628,6 +628,138 @@ tests_set_for_progress_tracking_context <- function(context, task) { tests_set_for_task_execution_with_progress_tracking(operation, context, expected_output) } +# Set of tests for executing tasks that throw errors in a progress tracking context. +tests_set_for_task_execution_with_progress_tracking_and_error <- function(operation, context, expected_error) { + # Clear the progress output on exit. + on.exit({ + # Clear the output. + context$progress_bar_output <- NULL + }) + + # Create a bar factory. + bar_factory <- BarFactory$new() + + # Get a basic bar instance. + bar <- bar_factory$get("basic") + + # Register the bar with the context object. + context$set_bar(bar) + + # Configure the bar. + context$configure_bar( + style = 3 + ) + + # Run the task that throws an error in parallel. + eval(operation) + + # Expect that the task interrupted the progress bar with an error message. + expect_error(context$get_output(wait = TRUE), expected_error) + + # Get a modern bar instance. + bar <- bar_factory$get("modern") + + # Register the bar with the same context object. + context$set_bar(bar) + + # Configure the bar. + context$configure_bar( + show_after = 0, + format = ":bar| :percent", + clear = FALSE, + force = TRUE + ) + + # Run the task in parallel. + eval(operation) + + # Expect that the task interrupted the progress bar with an error message. + expect_error(context$get_output(wait = TRUE), expected_error) +} + +# Set of tests for progress bar interruptions in a tracking context. +tests_set_for_progress_tracking_context_with_error <- function(context) { + # Check the type. + Helper$check_object_type(context, "ProgressTrackingContextTester") + + # Clean-up. + on.exit({ + # Set default values for package options. + set_default_options() + }) + + # Reduce waiting time between progress bar updates. + set_option("progress_wait", 0.01) + + # Define the task. + task <- function(x, error_x = 1, sleep = 0) { + # Sleep a bit or not. + Sys.sleep(sleep) + + # Randomly sample when to throw an error. + if(any(x == error_x)) { + # Throw an error. + stop("Intentional task error.") + } + + # Compute something. + output <- x + 1 + + # Return the result. + return(output) + } + + # Construct the expected error message. + expected_error <- as_text(task(x = 1, error_x = 1)) + + # Select task arguments. + x <- sample(1:100, 100) + error_x <- sample(length(x), 1) + sleep <- sample(c(0, 0.001, 0.002), 1) + + # Create the `sapply` operation. + operation <- bquote(context$sapply(.(x), .(task), error_x = .(error_x), sleep = .(sleep))) + + # Tests for the `sapply` operation in a progress tracking context with error in the task. + tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error) + + # Create the `lapply` operation. + operation <- bquote(context$lapply(.(x), .(task), error_x = .(error_x), sleep = .(sleep))) + + # Tests for the `lapply` operation in a progress tracking context with error in the task. + tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error) + + # Redefine `x` as a matrix for the `apply` operation. + x <- matrix(rnorm(100^2), nrow = 100, ncol = 100) + + # Sample new error `x`. + error_x <- sample(x, 1) + + # Define the `apply` operation over rows. + operation <- bquote(context$apply(.(x), 1, .(task), error_x = .(error_x), sleep = .(sleep))) + + # Tests for the `apply` operation over rows in a progress tracking context with error in the task. + tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error) + + # Define the `apply` operation over columns. + operation <- bquote(context$apply(.(x), 2, .(task), error_x = .(error_x), sleep = .(sleep))) + + # Tests for the `apply` operation over columns in a progress tracking context with error in the task. + tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error) + + # Redefine a smaller `x` matrix for the `apply` operation applied element-wise. + x <- matrix(rnorm(10^2), nrow = 10, ncol = 10) + + # Sample new error `x`. + error_x <- sample(x, 1) + + # Define the `apply` operation element-wise. + operation <- bquote(context$apply(.(x), c(1, 2), .(task), error_x = .(error_x), sleep = .(sleep))) + + # Tests for the `apply` operation over all elements in a progress tracking context with error in the task. + tests_set_for_task_execution_with_progress_tracking_and_error(operation, context, expected_error) +} + #endregion diff --git a/tests/testthat/test-progress-tracking-context.R b/tests/testthat/test-progress-tracking-context.R index a6e4be4..85de458 100644 --- a/tests/testthat/test-progress-tracking-context.R +++ b/tests/testthat/test-progress-tracking-context.R @@ -214,3 +214,36 @@ test_that("'ProgressTrackingContext' executes the task in parallel correctly", { # Stop the backend. context$stop() }) + + +test_that("'ProgressTrackingContext' interrupts progress tracking on task error correctly", { + # Create a specification. + specification <- Specification$new() + + # Set the number of cores. + specification$set_cores(cores = 2) + + # Determine the cluster type. + cluster_type <- pick_cluster_type(specification$types) + + # Set the cluster type. + specification$set_type(type = cluster_type) + + # Create an asynchronous backend object. + backend <- AsyncBackend$new() + + # Create a progress tracking context object. + context <- ProgressTrackingContextTester$new() + + # Register the backend with the context object. + context$set_backend(backend) + + # Start the backend. + context$start(specification) + + # Expect correctly interrupted progress bars. + tests_set_for_progress_tracking_context_with_error(context) + + # Stop the backend. + context$stop() +})