From 1612f1f7b8e2180b9d5f56ab6cbbe39859981241 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Fri, 25 May 2018 11:48:12 +0100 Subject: [PATCH 01/22] r_session, first working version on Unix --- DESCRIPTION | 4 +- NAMESPACE | 6 + R/r-session.R | 214 ++++++++++++++++++++++++++++++++ R/script.R | 9 +- R/setup.R | 8 +- R/utils.R | 2 + tests/testthat/helper.R | 8 ++ tests/testthat/test-r-session.R | 30 +++++ 8 files changed, 274 insertions(+), 7 deletions(-) create mode 100644 R/r-session.R create mode 100644 tests/testthat/test-r-session.R diff --git a/DESCRIPTION b/DESCRIPTION index f923ba02..7e4279f7 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -18,11 +18,13 @@ BugReports: https://github.com/r-lib/callr/issues RoxygenNote: 6.0.1.9000 Roxygen: list(markdown = TRUE) Imports: - processx (>= 3.1.0), + processx (>= 3.1.0.9001), R6, utils Suggests: covr, testthat, withr +Remotes: + r-lib/processx Encoding: UTF-8 diff --git a/NAMESPACE b/NAMESPACE index 738cac44..f9a91bad 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -8,6 +8,8 @@ export(r_copycat) export(r_process) export(r_process_options) export(r_safe) +export(r_session) +export(r_session_options) export(r_vanilla) export(rcmd) export(rcmd_bg) @@ -18,6 +20,10 @@ export(rcmd_safe) export(rcmd_safe_env) export(run) importFrom(R6,R6Class) +importFrom(processx,conn_create_fd) +importFrom(processx,conn_create_pipepair) +importFrom(processx,conn_read_lines) +importFrom(processx,conn_write) importFrom(processx,poll) importFrom(processx,process) importFrom(processx,run) diff --git a/R/r-session.R b/R/r-session.R new file mode 100644 index 00000000..d9a1f961 --- /dev/null +++ b/R/r-session.R @@ -0,0 +1,214 @@ + +#' @importFrom R6 R6Class +#' @export + +r_session <- R6Class( + "r_session", + inherit = process, + + public = list( + initialize = function(options) + rs_init(self, private, super, options), + run = function(func, args = list(), timeout = -1) + rs_run(self, private, func, args, timeout), + call = function(func, args = list()) + rs_call(self, private, func, args), + wait = function(timeout = -1) + rs_wait(self, private, timeout), + get_result = function() + rs_get_result(self, private), + get_running_time = function() + rs_get_running_time(self, private), + get_state = function() + rs_get_state(self, private), + finish = function(grace = 200) + rs_finish(self, private, grace) + ), + + private = list( + options = NULL, + state = NULL, + started_at = NULL, + fun_started_at = as.POSIXct(NA), + pipe = NULL, # Two connections, for me and child + + func_file = NULL, + res_file = NULL, + + update_state = function() + rs__update_state(self, private), + report_back = function(code, text = "") + rs__report_back(self, private, code, text), + write_for_sure = function(text) + rs__write_for_sure(self, private, text) + ) +) + +#' @importFrom processx conn_create_pipepair + +rs_init <- function(self, private, super, options) { + + options$func <- options$func %||% function() { } + options <- convert_and_check_my_args(options) + options <- setup_context(options) + options <- setup_r_binary_and_args(options, script_file = FALSE) + + private$options <- options + + private$pipe <- conn_create_pipepair() + + with_envvar( + options$env, + super$initialize(options$bin, options$real_cmdargs, stdin = "|", + stdout = options$stdout, stderr = options$stderr, + connections = list(private$pipe[[2]])) + ) + private$started_at <- Sys.time() + private$state <- "starting" + + ## Make child report back when ready + private$report_back(200, "ready to go") + + invisible(self) +} + +rs_run <- function(self, private, func, args, timeout) { + self$call(func, args) + self$wait(timeout) + self$get_result() +} + +rs_call <- function(self, private, func, args) { + private$update_state() + if (private$state != "idle") stop("R session busy") + + ## Save the function in a file + private$options$func <- func + private$options$args <- args + private$options$func_file <- save_function_to_temp(private$options) + private$options$result_file <- tempfile() + private$options$tmp_files <- + c(private$options$tmp_files, private$options$func_file, + private$options$result_file) + + ## Run an expr that loads it, in the child process, with error handlers + expr <- make_vanilla_script_expr(private$options$func_file, + private$options$result_file, + private$options$error) + cmd <- paste0(deparse(expr), "\n") + + ## Write this to stdin + private$write_for_sure(cmd) + + ## Report back when done + report_str <- paste0("DONE", basename(private$options$result_file)) + private$report_back(200, report_str) + + private$state <- "busy" +} + +rs_wait <- function(self, private, timeout) { + if (private$state %in% c("finished", "ready", "idle")) return() + + pr <- poll(list(private$pipe[[1]]), timeout)[[1]] + if (pr == "ready") { + if (private$state == "starting") { + private$state <- "idle" + } else { + private$state <- "ready" + } + invisible(conn_read_lines(private$pipe[[1]], 1)) + } else { + invisible() + } +} + +rs_get_result <- function(self, private) { + if (private$state != "ready") private$update_state() + + get_my_result <- function() { + ## This is artificial... + out <- list( + status = 0, + stdout = "", + stderr = "", + timeout = FALSE + ) + res <- get_result(out, private$options) + private$state <- "idle" + unlink(private$options$tmp_files, recursive = TRUE) + private$options$tmp_files <- NULL + res + } + + switch( + private$state, + "finished" = stop("R session already finished"), + "idle" = stop("R session is idle"), + "busy" = stop("R session still busy"), + "starting" = stop("R session still starting"), + "ready" = get_my_result() + ) +} + +rs_get_running_time <- function(self, private) { + now <- Sys.time() + c(total = now - private$started_at, + current = now - private$fun_started_at) +} + +rs_get_state <- function(self, private) { + private$update_state() + private$state +} + +rs_finish <- function(self, private, grace) { + close(self$get_input_connection()) + self$poll_io(grace) + self$kill() + private$state <- "finished" + private$fun_started_at <- as.POSIXct(NA) +} + +#' @importFrom processx conn_read_lines + +rs__update_state <- function(self, private) { + self$wait(timeout = 0) +} + +rs__report_back <- function(self, private, code, text) { + cmd <- paste0(deparse(rs__status_expr(code, text, fd = 3)), "\n") + private$write_for_sure(cmd) +} + +rs__write_for_sure <- function(self, private, text) { + while (1) { + text <- self$write_input(text) + if (!length(text)) break; + Sys.sleep(.1) + } +} + +#' @importFrom processx conn_create_fd conn_write + +rs__status_expr <- function(code, text = "", fd = 3) { + substitute( + { + code_ <- code; fd_ <- fd; text_ <- text + con <- processx::conn_create_fd(fd_, close = FALSE) + data <- paste0(code_, " ", text_, "\n") + while (1) { + data <- processx::conn_write(con, data) + if (!length(data)) break; + Sys.sleep(.1) + } + }, + list(code = code, fd = fd, text = text) + ) +} + +#' @export + +r_session_options <- function(...) { + r_process_options(...) +} diff --git a/R/script.R b/R/script.R index 52c279c7..9c55204c 100644 --- a/R/script.R +++ b/R/script.R @@ -1,5 +1,5 @@ -make_vanilla_script <- function(expr_file, res, error) { +make_vanilla_script_expr <- function(expr_file, res, error) { ## Code to handle errors in the child ## This will inserted into the main script @@ -37,7 +37,7 @@ make_vanilla_script <- function(expr_file, res, error) { ## ## It is important that we do not create any temporary variables, ## the function is called from an empty global environment. - script <- substitute( + substitute( { withCallingHandlers( # nocov start { @@ -56,8 +56,11 @@ make_vanilla_script <- function(expr_file, res, error) { list(`__error__` = err, `__expr_file__` = expr_file, `__res__` = res) ) +} - script <- deparse(script) +make_vanilla_script_file <- function(expr_file, res, error) { + expr <- make_vanilla_script_expr(expr_file, res, error) + script <- deparse(expr) tmp <- tempfile() cat(script, file = tmp, sep = "\n") diff --git a/R/setup.R b/R/setup.R index 0cd2974b..52f0b5fe 100644 --- a/R/setup.R +++ b/R/setup.R @@ -3,7 +3,8 @@ setup_script_files <- function(options) { within(options, { func_file <- save_function_to_temp(options) result_file <- tempfile() - script_file <- make_vanilla_script(func_file, result_file, options$error) + script_file <- make_vanilla_script_file( + func_file, result_file, options$error) tmp_files <- c(tmp_files, func_file, script_file, result_file) }) } @@ -119,10 +120,11 @@ setup_callbacks <- function(options) { options } -setup_r_binary_and_args <- function(options) { +setup_r_binary_and_args <- function(options, script_file = TRUE) { exec <- if (os_platform() == "windows") "Rterm" else "R" options$bin <- file.path(R.home("bin"), exec) - options$real_cmdargs <- c(options$cmdargs, "-f", options$script_file) + options$real_cmdargs <- + c(options$cmdargs, if (script_file) c("-f", options$script_file)) options } diff --git a/R/utils.R b/R/utils.R index 98495a74..c9683ea5 100644 --- a/R/utils.R +++ b/R/utils.R @@ -1,4 +1,6 @@ +`%||%` <- function(l, r) if (is.null(l)) r else l + is.named <- function(x) { length(names(x)) == length(x) && all(names(x) != "") } diff --git a/tests/testthat/helper.R b/tests/testthat/helper.R index c43ccc9c..0a8936d4 100644 --- a/tests/testthat/helper.R +++ b/tests/testthat/helper.R @@ -7,3 +7,11 @@ try_silently <- function(expr) { message = function(x) "message" ) } + +r_session_wait_or_kill <- function(x, state = "ready") { + x$wait(3000) + if (x$get_state() != state) { + x$kill() + stop("R session not ready...") + } +} diff --git a/tests/testthat/test-r-session.R b/tests/testthat/test-r-session.R new file mode 100644 index 00000000..84c14a6a --- /dev/null +++ b/tests/testthat/test-r-session.R @@ -0,0 +1,30 @@ + +context("r_session") + +test_that("regular use", { + opt <- r_session_options() + rs <- r_session$new(opt) + + ## Wait until ready, but max 3s + r_session_wait_or_kill(rs, "idle") + + ## Start a command + rs$call(function() 42) + r_session_wait_or_kill(rs, "ready") + + ## Get result + expect_equal(rs$get_result(), 42) + expect_equal(rs$get_state(), "idle") + + ## Run another command, with arguments + rs$call(function(x, y) x + y, list(x = 42, y = 42)) + r_session_wait_or_kill(rs, "ready") + + ## Get result + expect_equal(rs$get_result(), 84) + expect_equal(rs$get_state(), "idle") + + ## Finish + rs$finish() + expect_equal(rs$get_state(), "finished") +}) From 5ccdf1f0ae0f7af589b429e0b928cf26658941c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Fri, 25 May 2018 12:08:21 +0100 Subject: [PATCH 02/22] Need feature/pollable-connection branch from processx --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index 7e4279f7..9994e1f7 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -26,5 +26,5 @@ Suggests: testthat, withr Remotes: - r-lib/processx + r-lib/processx@feature/pollable-connection Encoding: UTF-8 From 206f7548aa1317da2bba70d49bd54a986a458607 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Fri, 25 May 2018 12:57:46 +0100 Subject: [PATCH 03/22] r_session$wait -> wait_for_call, avoid process$wait clash processx::process already has a wait method... --- R/r-session.R | 10 +++++----- tests/testthat/helper.R | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/R/r-session.R b/R/r-session.R index d9a1f961..21bd9574 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -13,8 +13,8 @@ r_session <- R6Class( rs_run(self, private, func, args, timeout), call = function(func, args = list()) rs_call(self, private, func, args), - wait = function(timeout = -1) - rs_wait(self, private, timeout), + wait_for_call = function(timeout = -1) + rs_wait_for_call(self, private, timeout), get_result = function() rs_get_result(self, private), get_running_time = function() @@ -74,7 +74,7 @@ rs_init <- function(self, private, super, options) { rs_run <- function(self, private, func, args, timeout) { self$call(func, args) - self$wait(timeout) + self$wait_for_call(timeout) self$get_result() } @@ -107,7 +107,7 @@ rs_call <- function(self, private, func, args) { private$state <- "busy" } -rs_wait <- function(self, private, timeout) { +rs_wait_for_call <- function(self, private, timeout) { if (private$state %in% c("finished", "ready", "idle")) return() pr <- poll(list(private$pipe[[1]]), timeout)[[1]] @@ -173,7 +173,7 @@ rs_finish <- function(self, private, grace) { #' @importFrom processx conn_read_lines rs__update_state <- function(self, private) { - self$wait(timeout = 0) + self$wait_for_call(timeout = 0) } rs__report_back <- function(self, private, code, text) { diff --git a/tests/testthat/helper.R b/tests/testthat/helper.R index 0a8936d4..6f3e76a2 100644 --- a/tests/testthat/helper.R +++ b/tests/testthat/helper.R @@ -9,7 +9,7 @@ try_silently <- function(expr) { } r_session_wait_or_kill <- function(x, state = "ready") { - x$wait(3000) + x$wait_for_call(3000) if (x$get_state() != state) { x$kill() stop("R session not ready...") From c72475c3a1661c5edf5e82a83f9d284ad444c6e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Fri, 25 May 2018 13:00:10 +0100 Subject: [PATCH 04/22] Test case for r_session$run() --- tests/testthat/test-r-session.R | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/testthat/test-r-session.R b/tests/testthat/test-r-session.R index 84c14a6a..e6d85fec 100644 --- a/tests/testthat/test-r-session.R +++ b/tests/testthat/test-r-session.R @@ -28,3 +28,15 @@ test_that("regular use", { rs$finish() expect_equal(rs$get_state(), "finished") }) + +test_that("run", { + opt <- r_session_options() + rs <- r_session$new(opt) + + ## Wait until ready, but max 3s + r_session_wait_or_kill(rs, "idle") + + expect_equal(rs$run(function() 42), 42) + expect_equal(rs$run(function() 42), 42) + expect_equal(rs$run(function(x, y) x + y, list(x = 42, y = 42)), 84) +}) From 52eaf6235747cffc640bb593e43a73a98604cfb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Fri, 25 May 2018 16:21:49 +0100 Subject: [PATCH 05/22] Docs for r_session --- R/r-process.R | 3 +- R/r-session.R | 94 ++++++++++++++++++++++++++++++++++++++- man/r_process.Rd | 3 +- man/r_session.Rd | 95 ++++++++++++++++++++++++++++++++++++++++ man/r_session_options.Rd | 14 ++++++ 5 files changed, 206 insertions(+), 3 deletions(-) create mode 100644 man/r_session.Rd create mode 100644 man/r_session_options.Rd diff --git a/R/r-process.R b/R/r-process.R index e5b73056..f77f2263 100644 --- a/R/r-process.R +++ b/R/r-process.R @@ -2,7 +2,8 @@ #' External R Process #' #' An R process that runs in the background. This is an R6 class that -#' extends the [process] class. +#' extends the [processx::process] class. The process starts in the +#' background, evaluates an R function call, and then quits. #' #' @section Usage: #' ``` diff --git a/R/r-session.R b/R/r-session.R index 21bd9574..a4d4e52e 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -1,4 +1,89 @@ +#' External R Session +#' +#' A permanent R session that runs in the background. This is an R6 class +#' that extends the [processx::process] class. +#' +#' The process is started at the creation of the object, and then it can +#' be used to evaluate R function calls, one at a time. +#' +#' @section Usage: +#' ``` +#' rs <- r_session$new(options) +#' rs$run(func, args = list()) +#' rs$call(func, args = list()) +#' rs$wait_for_call(timeout = -1) +#' rs$get_result() +#' rs$get_running_time() +#' rs$get_state() +#' rs$finish() +#' ``` +#' +#' @section Arguments: +#' * `options`: A list of options created via [r_session_options()]. +#' * `func`: Function object to call in the background R process. +#' * Please read the notes for the similar argument of [r()] +#' * `args`: Arguments to pass to the function. Must be a list. +#' * `timeout`: Timeout in milliseconds. +#' +#' @section Details: +#' `r_session$new()` creates a new R background process. It returns +#' immediately, i.e. before the process is actually ready to run. You may +#' call `wait_for_call()` to make sure it is ready. +#' +#' `rs$run()` is similar to [r()], but runs the function in the `rs` R +#' session. Note that if a timeout happens, the session and the background +#' computation is not terminated. You can call `rs$finish()` to terminate +#' the R process. There is currently no way to terminate the computation +#' without terminating the background R process. +#' +#' `rs$call()` starts running a function in the background R session, and +#' returns immediately. To check if the function is done, call the +#' `wait_for_call()` method. To get the result call the `get_result()` +#' method. +#' +#' `rs$wait_for_call()` waits for an `rs$call()` computation, or the R +#' session startup to finish. This is essentially a poll operation. +#' If there is no computation running, it returns immediately. +#' +#' `rs$get_result()` returns the result of the last `rs$call()` +#' computation. (Or the result of the last `rs$run()`, if it was +#' interrupted.) If there is no result to return, because the computation +#' has not finished yet, or some other reason, it throws an error. +#' +#' `rs$get_running_time()` returns the elapsed time since the R process +#' has started, and the elapsed time since the current computation has +#' started. The latter is NA if there is no active computation. +#' +#' `rs$get_state()` return the state of the R session. Possible values: +#' * `"starting"`: starting up, +#' * `"idle"`: ready to compute, +#' * `"busy"`: computing right now, +#' * `"ready"`: computation finished, result can be read out, +#' * `"finished"`: the R process has finished. +#' `rs$get_state()` automatically updates the state, i.e. it performs a +#' quick `wait_for_call()`, if needed. +#' +#' `r$finish()` terminates the current computation and the R process. +#' The session object will be in `"finished"` state after this. +#' +#' @name r_session +#' @examples +#' \dontrun{ +#' opt <- r_session_options() +#' rs <- r_ression$new(opt) +#' +#' rs$run(function() 1 + 2) +#' +#' rs$call(function() Sys.sleep(1)) +#' rs$get_state() +#' rs$wait_for_call() +#' +#' rs$get_result() +#' } +NULL + + #' @importFrom R6 R6Class #' @export @@ -49,6 +134,7 @@ r_session <- R6Class( rs_init <- function(self, private, super, options) { options$func <- options$func %||% function() { } + options$args <- list() options <- convert_and_check_my_args(options) options <- setup_context(options) options <- setup_r_binary_and_args(options, script_file = FALSE) @@ -207,8 +293,14 @@ rs__status_expr <- function(code, text = "", fd = 3) { ) } +#' Create options for an [r_session] object +#' +#' @param ... Options to override, named arguments. +#' #' @export r_session_options <- function(...) { - r_process_options(...) + opt <- r_process_options(...) + opt$func <- opt$args <- NULL + opt } diff --git a/man/r_process.Rd b/man/r_process.Rd index 00939b0b..047bd01e 100644 --- a/man/r_process.Rd +++ b/man/r_process.Rd @@ -5,7 +5,8 @@ \title{External R Process} \description{ An R process that runs in the background. This is an R6 class that -extends the \link{process} class. +extends the \link[processx:process]{processx::process} class. The process starts in the +background, evaluates an R function call, and then quits. } \section{Usage}{ \preformatted{rp <- r_process$new(options) diff --git a/man/r_session.Rd b/man/r_session.Rd new file mode 100644 index 00000000..7ae46877 --- /dev/null +++ b/man/r_session.Rd @@ -0,0 +1,95 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/r-session.R +\name{r_session} +\alias{r_session} +\title{External R Session} +\description{ +A permanent R session that runs in the background. This is an R6 class +that extends the \link[processx:process]{processx::process} class. +} +\details{ +The process is started at the creation of the object, and then it can +be used to evaluate R function calls, one at a time. +} +\section{Usage}{ +\preformatted{rs <- r_session$new(options) +rs$run(func, args = list()) +rs$call(func, args = list()) +rs$wait_for_call(timeout = -1) +rs$get_result() +rs$get_running_time() +rs$get_state() +rs$finish() +} +} + +\section{Arguments}{ + +\itemize{ +\item \code{options}: A list of options created via \code{\link[=r_session_options]{r_session_options()}}. +\item \code{func}: Function object to call in the background R process. +\item Please read the notes for the similar argument of \code{\link[=r]{r()}} +\item \code{args}: Arguments to pass to the function. Must be a list. +\item \code{timeout}: Timeout in milliseconds. +} +} + +\section{Details}{ + +\code{r_session$new()} creates a new R background process. It returns +immediately, i.e. before the process is actually ready to run. You may +call \code{wait_for_call()} to make sure it is ready. + +\code{rs$run()} is similar to \code{\link[=r]{r()}}, but runs the function in the \code{rs} R +session. Note that if a timeout happens, the session and the background +computation is not terminated. You can call \code{rs$finish()} to terminate +the R process. There is currently no way to terminate the computation +without terminating the background R process. + +\code{rs$call()} starts running a function in the background R session, and +returns immediately. To check if the function is done, call the +\code{wait_for_call()} method. To get the result call the \code{get_result()} +method. + +\code{rs$wait_for_call()} waits for an \code{rs$call()} computation, or the R +session startup to finish. This is essentially a poll operation. +If there is no computation running, it returns immediately. + +\code{rs$get_result()} returns the result of the last \code{rs$call()} +computation. (Or the result of the last \code{rs$run()}, if it was +interrupted.) If there is no result to return, because the computation +has not finished yet, or some other reason, it throws an error. + +\code{rs$get_running_time()} returns the elapsed time since the R process +has started, and the elapsed time since the current computation has +started. The latter is NA if there is no active computation. + +\code{rs$get_state()} return the state of the R session. Possible values: +\itemize{ +\item \code{"starting"}: starting up, +\item \code{"idle"}: ready to compute, +\item \code{"busy"}: computing right now, +\item \code{"ready"}: computation finished, result can be read out, +\item \code{"finished"}: the R process has finished. +\code{rs$get_state()} automatically updates the state, i.e. it performs a +quick \code{wait_for_call()}, if needed. +} + +\code{r$finish()} terminates the current computation and the R process. +The session object will be in \code{"finished"} state after this. +} + +\examples{ +\dontrun{ +opt <- r_session_options() +rs <- r_ression$new(opt) + +rs$run(function() 1 + 2) + +rs$call(function() Sys.sleep(1)) +rs$get_state() +rs$wait_for_call() + +rs$get_result() +} +} diff --git a/man/r_session_options.Rd b/man/r_session_options.Rd new file mode 100644 index 00000000..1ae8f653 --- /dev/null +++ b/man/r_session_options.Rd @@ -0,0 +1,14 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/r-session.R +\name{r_session_options} +\alias{r_session_options} +\title{Create options for an \link{r_session} object} +\usage{ +r_session_options(...) +} +\arguments{ +\item{...}{Options to override, named arguments.} +} +\description{ +Create options for an \link{r_session} object +} From f5a223f9e141dd02addd23146484779fe3b64e7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Fri, 25 May 2018 16:59:39 +0100 Subject: [PATCH 06/22] Better cleanup in tests --- tests/testthat/test-r-session.R | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/testthat/test-r-session.R b/tests/testthat/test-r-session.R index e6d85fec..7c39310c 100644 --- a/tests/testthat/test-r-session.R +++ b/tests/testthat/test-r-session.R @@ -4,6 +4,7 @@ context("r_session") test_that("regular use", { opt <- r_session_options() rs <- r_session$new(opt) + on.exit(rs$kill()) ## Wait until ready, but max 3s r_session_wait_or_kill(rs, "idle") @@ -27,11 +28,13 @@ test_that("regular use", { ## Finish rs$finish() expect_equal(rs$get_state(), "finished") + expect_false(rs$is_alive()) }) test_that("run", { opt <- r_session_options() rs <- r_session$new(opt) + on.exit(rs$kill()) ## Wait until ready, but max 3s r_session_wait_or_kill(rs, "idle") @@ -39,4 +42,9 @@ test_that("run", { expect_equal(rs$run(function() 42), 42) expect_equal(rs$run(function() 42), 42) expect_equal(rs$run(function(x, y) x + y, list(x = 42, y = 42)), 84) + + ## Finish + rs$finish() + expect_equal(rs$get_state(), "finished") + expect_false(rs$is_alive()) }) From f60a493f7c767f264c2612db0ac3a34905db401f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sat, 26 May 2018 10:44:37 +0100 Subject: [PATCH 07/22] r_session tests for stdout and stderr --- tests/testthat/test-r-session.R | 34 +++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/testthat/test-r-session.R b/tests/testthat/test-r-session.R index 7c39310c..69a57060 100644 --- a/tests/testthat/test-r-session.R +++ b/tests/testthat/test-r-session.R @@ -48,3 +48,37 @@ test_that("run", { expect_equal(rs$get_state(), "finished") expect_false(rs$is_alive()) }) + +test_that("get stdout", { + opt <- r_session_options() + rs <- r_session$new(opt) + on.exit(rs$kill()) + + ## Wait until ready, but max 3s + r_session_wait_or_kill(rs, "idle") + + rs$call(function(x) cat("foobar\n")) + r_session_wait_or_kill(rs, "ready") + + expect_null(rs$get_result()) + expect_equal(rs$read_output_lines(), "foobar") + + rs$finish() +}) + +test_that("get stderr", { + opt <- r_session_options() + rs <- r_session$new(opt) + on.exit(rs$kill()) + + ## Wait until ready, but max 3s + r_session_wait_or_kill(rs, "idle") + + rs$call(function(x) message("message me!")) + r_session_wait_or_kill(rs, "ready") + + expect_null(rs$get_result()) + expect_equal(rs$read_error_lines(), "message me!") + + rs$finish() +}) From 1fe02ca1864cc96cfa8f79384e65c38d8998a018 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sat, 26 May 2018 21:54:08 +0100 Subject: [PATCH 08/22] Simplify r_session with processx poll connection --- DESCRIPTION | 4 ++-- R/r-session.R | 38 +++++++++++++++------------------ tests/testthat/helper.R | 6 +++--- tests/testthat/test-r-session.R | 16 +++++++------- 4 files changed, 30 insertions(+), 34 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 9994e1f7..2e19c1f9 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -18,7 +18,7 @@ BugReports: https://github.com/r-lib/callr/issues RoxygenNote: 6.0.1.9000 Roxygen: list(markdown = TRUE) Imports: - processx (>= 3.1.0.9001), + processx (>= 3.1.0.9003), R6, utils Suggests: @@ -26,5 +26,5 @@ Suggests: testthat, withr Remotes: - r-lib/processx@feature/pollable-connection + r-lib/processx@feature/poll-anything Encoding: UTF-8 diff --git a/R/r-session.R b/R/r-session.R index a4d4e52e..fa0369ad 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -12,7 +12,6 @@ #' rs <- r_session$new(options) #' rs$run(func, args = list()) #' rs$call(func, args = list()) -#' rs$wait_for_call(timeout = -1) #' rs$get_result() #' rs$get_running_time() #' rs$get_state() @@ -29,7 +28,7 @@ #' @section Details: #' `r_session$new()` creates a new R background process. It returns #' immediately, i.e. before the process is actually ready to run. You may -#' call `wait_for_call()` to make sure it is ready. +#' call `poll_io()` to make sure it is ready. #' #' `rs$run()` is similar to [r()], but runs the function in the `rs` R #' session. Note that if a timeout happens, the session and the background @@ -39,13 +38,9 @@ #' #' `rs$call()` starts running a function in the background R session, and #' returns immediately. To check if the function is done, call the -#' `wait_for_call()` method. To get the result call the `get_result()` +#' `poll_io()` method. To get the result call the `get_result()` #' method. #' -#' `rs$wait_for_call()` waits for an `rs$call()` computation, or the R -#' session startup to finish. This is essentially a poll operation. -#' If there is no computation running, it returns immediately. -#' #' `rs$get_result()` returns the result of the last `rs$call()` #' computation. (Or the result of the last `rs$run()`, if it was #' interrupted.) If there is no result to return, because the computation @@ -62,7 +57,7 @@ #' * `"ready"`: computation finished, result can be read out, #' * `"finished"`: the R process has finished. #' `rs$get_state()` automatically updates the state, i.e. it performs a -#' quick `wait_for_call()`, if needed. +#' quick `poll_io()`, if needed. #' #' `r$finish()` terminates the current computation and the R process. #' The session object will be in `"finished"` state after this. @@ -77,7 +72,7 @@ #' #' rs$call(function() Sys.sleep(1)) #' rs$get_state() -#' rs$wait_for_call() +#' rs$poll_io(-1) #' #' rs$get_result() #' } @@ -98,8 +93,6 @@ r_session <- R6Class( rs_run(self, private, func, args, timeout), call = function(func, args = list()) rs_call(self, private, func, args), - wait_for_call = function(timeout = -1) - rs_wait_for_call(self, private, timeout), get_result = function() rs_get_result(self, private), get_running_time = function() @@ -115,11 +108,13 @@ r_session <- R6Class( state = NULL, started_at = NULL, fun_started_at = as.POSIXct(NA), - pipe = NULL, # Two connections, for me and child + pipe = NULL, func_file = NULL, res_file = NULL, + wait_for_call = function(timeout = -1) + rs__wait_for_call(self, private, timeout), update_state = function() rs__update_state(self, private), report_back = function(code, text = "") @@ -141,14 +136,15 @@ rs_init <- function(self, private, super, options) { private$options <- options - private$pipe <- conn_create_pipepair() - with_envvar( options$env, super$initialize(options$bin, options$real_cmdargs, stdin = "|", stdout = options$stdout, stderr = options$stderr, - connections = list(private$pipe[[2]])) + poll_connection = TRUE) ) + + private$pipe <- self$get_poll_connection() + private$started_at <- Sys.time() private$state <- "starting" @@ -160,7 +156,7 @@ rs_init <- function(self, private, super, options) { rs_run <- function(self, private, func, args, timeout) { self$call(func, args) - self$wait_for_call(timeout) + self$poll_io(timeout) self$get_result() } @@ -193,17 +189,17 @@ rs_call <- function(self, private, func, args) { private$state <- "busy" } -rs_wait_for_call <- function(self, private, timeout) { +rs__wait_for_call <- function(self, private, timeout) { if (private$state %in% c("finished", "ready", "idle")) return() - pr <- poll(list(private$pipe[[1]]), timeout)[[1]] - if (pr == "ready") { + pr <- self$poll_io(timeout) + if (pr[["process"]] == "ready") { if (private$state == "starting") { private$state <- "idle" } else { private$state <- "ready" } - invisible(conn_read_lines(private$pipe[[1]], 1)) + invisible(conn_read_lines(private$pipe, 1)) } else { invisible() } @@ -259,7 +255,7 @@ rs_finish <- function(self, private, grace) { #' @importFrom processx conn_read_lines rs__update_state <- function(self, private) { - self$wait_for_call(timeout = 0) + private$wait_for_call(timeout = 0) } rs__report_back <- function(self, private, code, text) { diff --git a/tests/testthat/helper.R b/tests/testthat/helper.R index 6f3e76a2..88e96453 100644 --- a/tests/testthat/helper.R +++ b/tests/testthat/helper.R @@ -8,9 +8,9 @@ try_silently <- function(expr) { ) } -r_session_wait_or_kill <- function(x, state = "ready") { - x$wait_for_call(3000) - if (x$get_state() != state) { +r_session_wait_or_kill <- function(x) { + pr <- poll(list(x$get_poll_connection()), 3000)[[1]] + if (pr != "ready") { x$kill() stop("R session not ready...") } diff --git a/tests/testthat/test-r-session.R b/tests/testthat/test-r-session.R index 69a57060..ac4fc9d3 100644 --- a/tests/testthat/test-r-session.R +++ b/tests/testthat/test-r-session.R @@ -7,11 +7,11 @@ test_that("regular use", { on.exit(rs$kill()) ## Wait until ready, but max 3s - r_session_wait_or_kill(rs, "idle") + r_session_wait_or_kill(rs) ## Start a command rs$call(function() 42) - r_session_wait_or_kill(rs, "ready") + r_session_wait_or_kill(rs) ## Get result expect_equal(rs$get_result(), 42) @@ -19,7 +19,7 @@ test_that("regular use", { ## Run another command, with arguments rs$call(function(x, y) x + y, list(x = 42, y = 42)) - r_session_wait_or_kill(rs, "ready") + r_session_wait_or_kill(rs) ## Get result expect_equal(rs$get_result(), 84) @@ -37,7 +37,7 @@ test_that("run", { on.exit(rs$kill()) ## Wait until ready, but max 3s - r_session_wait_or_kill(rs, "idle") + r_session_wait_or_kill(rs) expect_equal(rs$run(function() 42), 42) expect_equal(rs$run(function() 42), 42) @@ -55,10 +55,10 @@ test_that("get stdout", { on.exit(rs$kill()) ## Wait until ready, but max 3s - r_session_wait_or_kill(rs, "idle") + r_session_wait_or_kill(rs) rs$call(function(x) cat("foobar\n")) - r_session_wait_or_kill(rs, "ready") + r_session_wait_or_kill(rs) expect_null(rs$get_result()) expect_equal(rs$read_output_lines(), "foobar") @@ -72,10 +72,10 @@ test_that("get stderr", { on.exit(rs$kill()) ## Wait until ready, but max 3s - r_session_wait_or_kill(rs, "idle") + r_session_wait_or_kill(rs) rs$call(function(x) message("message me!")) - r_session_wait_or_kill(rs, "ready") + r_session_wait_or_kill(rs) expect_null(rs$get_result()) expect_equal(rs$read_error_lines(), "message me!") From 24cc24279322f8670db973c0a601e7b14d81e1e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Mon, 28 May 2018 01:40:07 +0200 Subject: [PATCH 09/22] Catch errors, interrupts as well This does not really matter for the one-shot functions (except that their exit code is now different), but it does matter for r_session. --- R/result.R | 7 ++----- R/script.R | 31 ++++++++++++++++++------------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/R/result.R b/R/result.R index b03405de..4550386a 100644 --- a/R/result.R +++ b/R/result.R @@ -89,12 +89,9 @@ get_result <- function(output, options) { #' @importFrom utils head tail clean_stack <- function(stack) { - ## We remove the first 4 calls (withCallingHandlers, - ## saveRDS, do.call and do.call) and the last two - ## (.handleSimpleError and h(simpleerror). att <- attributes(stack) - att$names <- head(tail(att$names, -4), -2) - res <- head(tail(stack, -4), -2) + att$names <- head(tail(att$names, -11), -2) + res <- head(tail(stack, -11), -2) attributes(res) <- att res diff --git a/R/script.R b/R/script.R index 9c55204c..5c16e5a6 100644 --- a/R/script.R +++ b/R/script.R @@ -38,19 +38,24 @@ make_vanilla_script_expr <- function(expr_file, res, error) { ## It is important that we do not create any temporary variables, ## the function is called from an empty global environment. substitute( - { - withCallingHandlers( # nocov start - { - saveRDS( - do.call( - do.call, - c(readRDS(`__expr_file__`), list(envir = .GlobalEnv)), - envir = .GlobalEnv - ), - file = `__res__` - ) - }, - error = function(e) { `__error__`; stop(e) } + { + tryCatch( # nocov start + withCallingHandlers( + { + saveRDS( + do.call( + do.call, + c(readRDS(`__expr_file__`), list(envir = .GlobalEnv)), + envir = .GlobalEnv + ), + file = `__res__` + ) + }, + error = function(e) { `__error__` }, + interrupt = function(e) { `__error__` } + ), + error = function(e) e, + interrupt = function(e) e ) # nocov end }, From 12a4706ab58b6d08159a929b6cfd37863d91f912 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Mon, 28 May 2018 08:07:45 +0200 Subject: [PATCH 10/22] Need newer processx --- DESCRIPTION | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 2e19c1f9..50e85d06 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -18,7 +18,7 @@ BugReports: https://github.com/r-lib/callr/issues RoxygenNote: 6.0.1.9000 Roxygen: list(markdown = TRUE) Imports: - processx (>= 3.1.0.9003), + processx (>= 3.1.0.9004), R6, utils Suggests: @@ -26,5 +26,5 @@ Suggests: testthat, withr Remotes: - r-lib/processx@feature/poll-anything + r-lib/processx@feature/interrupt Encoding: UTF-8 From c0e6b7f69e4c81034b876e7bf26adba633e49789 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Thu, 31 May 2018 11:38:48 +0100 Subject: [PATCH 11/22] r_session: use temporary files to get stdout/stderr There is one file for each call. --- DESCRIPTION | 2 +- R/r-session.R | 88 +++++++++++++++++++++++++++------ R/script.R | 48 ++++++++++++++++-- R/utils.R | 10 ++++ tests/testthat/test-r-session.R | 41 +++++---------- 5 files changed, 140 insertions(+), 49 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 50e85d06..83ed85e4 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -26,5 +26,5 @@ Suggests: testthat, withr Remotes: - r-lib/processx@feature/interrupt + r-lib/processx@feature/set-std Encoding: UTF-8 diff --git a/R/r-session.R b/R/r-session.R index fa0369ad..179d5b0a 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -13,6 +13,7 @@ #' rs$run(func, args = list()) #' rs$call(func, args = list()) #' rs$get_result() +#' rs$get_result_and_output() #' rs$get_running_time() #' rs$get_state() #' rs$finish() @@ -23,7 +24,6 @@ #' * `func`: Function object to call in the background R process. #' * Please read the notes for the similar argument of [r()] #' * `args`: Arguments to pass to the function. Must be a list. -#' * `timeout`: Timeout in milliseconds. #' #' @section Details: #' `r_session$new()` creates a new R background process. It returns @@ -87,7 +87,7 @@ r_session <- R6Class( inherit = process, public = list( - initialize = function(options) + initialize = function(options = r_session_options()) rs_init(self, private, super, options), run = function(func, args = list(), timeout = -1) rs_run(self, private, func, args, timeout), @@ -95,12 +95,19 @@ r_session <- R6Class( rs_call(self, private, func, args), get_result = function() rs_get_result(self, private), + get_result_and_output = function() + rs_get_result_and_output(self, private), get_running_time = function() rs_get_running_time(self, private), get_state = function() rs_get_state(self, private), finish = function(grace = 200) - rs_finish(self, private, grace) + rs_finish(self, private, grace), + finalize = function() { + unlink(private$tmp_output_file) + unlink(private$tmp_error_file) + if ("finalize" %in% ls(super)) super$finalize() + } ), private = list( @@ -110,6 +117,9 @@ r_session <- R6Class( fun_started_at = as.POSIXct(NA), pipe = NULL, + tmp_output_file = character(), + tmp_error_file = character(), + func_file = NULL, res_file = NULL, @@ -130,6 +140,7 @@ rs_init <- function(self, private, super, options) { options$func <- options$func %||% function() { } options$args <- list() + options <- convert_and_check_my_args(options) options <- setup_context(options) options <- setup_r_binary_and_args(options, script_file = FALSE) @@ -143,13 +154,20 @@ rs_init <- function(self, private, super, options) { poll_connection = TRUE) ) + ## Make child report back when ready + private$report_back(200, "ready to go") + private$pipe <- self$get_poll_connection() private$started_at <- Sys.time() private$state <- "starting" - ## Make child report back when ready - private$report_back(200, "ready to go") + if (!is.null(f <- self$get_output_file()) && f != "|") { + private$stdout_file_con <- file(f, open = "rb", blocking = TRUE) + } + if (!is.null(f <- self$get_error_file()) && f != "|") { + private$stderr_file_con <- file(f, open = "rb", blocking = TRUE) + } invisible(self) } @@ -157,7 +175,7 @@ rs_init <- function(self, private, super, options) { rs_run <- function(self, private, func, args, timeout) { self$call(func, args) self$poll_io(timeout) - self$get_result() + self$get_result_and_output() } rs_call <- function(self, private, func, args) { @@ -173,10 +191,20 @@ rs_call <- function(self, private, func, args) { c(private$options$tmp_files, private$options$func_file, private$options$result_file) + ## Maybe we need to redirect stdout / stderr + re_stdout <- if (is.null(private$options$stdout)) { + private$tmp_output_file <- tempfile() + } + re_stderr <- if (is.null(private$options$stderr)) { + private$tmp_error_file <- tempfile() + } + ## Run an expr that loads it, in the child process, with error handlers expr <- make_vanilla_script_expr(private$options$func_file, private$options$result_file, - private$options$error) + private$options$error, + re_stdout = re_stdout, + re_stderr = re_stderr) cmd <- paste0(deparse(expr), "\n") ## Write this to stdin @@ -206,21 +234,33 @@ rs__wait_for_call <- function(self, private, timeout) { } rs_get_result <- function(self, private) { + rs_get_result_and_output(self, private)$result +} + +rs_get_result_and_output <- function(self, private) { if (private$state != "ready") private$update_state() get_my_result <- function() { - ## This is artificial... - out <- list( + out <- if (!is.null(private$tmp_output_file)) { + read_all(private$tmp_output_file) + } + err <- if (!is.null(private$tmp_error_file)) { + read_all(private$tmp_error_file) + } + unlink(c(private$tmp_output_file, private$tmp_error_file)) + private$tmp_output_file <- private$tmp_error_file <- NULL + outp <- list( status = 0, - stdout = "", - stderr = "", + stdout = out %||% "", + stderr = err %||% "", timeout = FALSE ) - res <- get_result(out, private$options) + res <- get_result(outp, private$options) private$state <- "idle" unlink(private$options$tmp_files, recursive = TRUE) private$options$tmp_files <- NULL - res + + list(result = res, output = out, error = err) } switch( @@ -296,7 +336,23 @@ rs__status_expr <- function(code, text = "", fd = 3) { #' @export r_session_options <- function(...) { - opt <- r_process_options(...) - opt$func <- opt$args <- NULL - opt + update_options(r_session_options_default(), ...) +} + +r_session_options_default <- function() { + list( + func = NULL, + args = NULL, + libpath = .libPaths(), + repos = c(getOption("repos"), CRAN = "https://cloud.r-project.org"), + stdout = NULL, + stderr = NULL, + error = getOption("callr.error", "error"), + cmdargs = c("--no-site-file", "--slave", + "--no-save", "--no-restore"), + system_profile = FALSE, + user_profile = FALSE, + env = character(), + supervise = FALSE + ) } diff --git a/R/script.R b/R/script.R index 5c16e5a6..e393ed12 100644 --- a/R/script.R +++ b/R/script.R @@ -1,5 +1,6 @@ -make_vanilla_script_expr <- function(expr_file, res, error) { +make_vanilla_script_expr <- function(expr_file, res, error, + re_stdout = NULL, re_stderr = NULL) { ## Code to handle errors in the child ## This will inserted into the main script @@ -27,6 +28,37 @@ make_vanilla_script_expr <- function(expr_file, res, error) { stop("Unknown `error` argument: `", error, "`") } + ## stdout / stderr redirection + if (!is.null(re_stdout)) { + xstdout <- substitute( + processx::conn_set_stdout( + .__ocon__ <- processx::conn_create_file(`__fn__`, write = TRUE)), + list(`__fn__` = re_stdout) + ) + xstdout2 <- substitute({ + processx::conn_set_stdout( + processx::conn_create_file(tempfile(), write = TRUE)) + close(.__ocon__); + }) + } else { + xstdout <- xstdout2 <- substitute(invisible()) + } + + if (!is.null(re_stderr)) { + xstderr <- substitute( + processx::conn_set_stderr( + .__econ__ <- processx::conn_create_file(`__fn__`, write = TRUE)), + list(`__fn__` = re_stderr) + ) + xstderr2 <- substitute({ + processx::conn_set_stderr( + processx::conn_create_file(tempfile(), write = TRUE)) + close(.__econ__); + }) + } else { + xstderr <- xstderr2 <- substitute(invisible()) + } + ## The function to run and its arguments are saved as a list: ## list(fun, args). args itself is a list. ## So the first do.call will create the call: do.call(fun, args) @@ -42,14 +74,20 @@ make_vanilla_script_expr <- function(expr_file, res, error) { tryCatch( # nocov start withCallingHandlers( { - saveRDS( - do.call( + `__stdout__` + `__stderr__` + saveRDS( + do.call( do.call, c(readRDS(`__expr_file__`), list(envir = .GlobalEnv)), envir = .GlobalEnv ), file = `__res__` ) + flush(stdout()) + flush(stderr()) + `__stdout2__` + `__stderr2__` }, error = function(e) { `__error__` }, interrupt = function(e) { `__error__` } @@ -59,7 +97,9 @@ make_vanilla_script_expr <- function(expr_file, res, error) { ) # nocov end }, - list(`__error__` = err, `__expr_file__` = expr_file, `__res__` = res) + list(`__error__` = err, `__expr_file__` = expr_file, `__res__` = res, + `__stdout__` = xstdout, `__stderr__` = xstderr, + `__stdout2__` = xstdout2, `__stderr2__` = xstderr2) ) } diff --git a/R/utils.R b/R/utils.R index c9683ea5..44bd951a 100644 --- a/R/utils.R +++ b/R/utils.R @@ -57,3 +57,13 @@ is_string <- function(x) { length(x) == 1 && !is.na(x) } + +read_all <- function(filename) { + con <- file(filename, open = "rb") + on.exit(close(con), add = TRUE) + res <- raw(0) + while (length(more <- readBin(con, what = "raw", 10000)) && length(more)) { + res <- c(res, more) + } + rawToChar(res) +} diff --git a/tests/testthat/test-r-session.R b/tests/testthat/test-r-session.R index ac4fc9d3..f3840599 100644 --- a/tests/testthat/test-r-session.R +++ b/tests/testthat/test-r-session.R @@ -39,9 +39,11 @@ test_that("run", { ## Wait until ready, but max 3s r_session_wait_or_kill(rs) - expect_equal(rs$run(function() 42), 42) - expect_equal(rs$run(function() 42), 42) - expect_equal(rs$run(function(x, y) x + y, list(x = 42, y = 42)), 84) + expect_equal(rs$run(function() 42)$result, 42) + expect_equal(rs$run(function() 42)$result, 42) + expect_equal( + rs$run(function(x, y) x + y, list(x = 42, y = 42))$result, + 84) ## Finish rs$finish() @@ -49,36 +51,19 @@ test_that("run", { expect_false(rs$is_alive()) }) -test_that("get stdout", { - opt <- r_session_options() - rs <- r_session$new(opt) - on.exit(rs$kill()) - - ## Wait until ready, but max 3s - r_session_wait_or_kill(rs) - - rs$call(function(x) cat("foobar\n")) - r_session_wait_or_kill(rs) - - expect_null(rs$get_result()) - expect_equal(rs$read_output_lines(), "foobar") - rs$finish() -}) - -test_that("get stderr", { - opt <- r_session_options() - rs <- r_session$new(opt) +test_that("get stdout/stderr from file", { + rs <- r_session$new() on.exit(rs$kill()) ## Wait until ready, but max 3s r_session_wait_or_kill(rs) - rs$call(function(x) message("message me!")) - r_session_wait_or_kill(rs) - - expect_null(rs$get_result()) - expect_equal(rs$read_error_lines(), "message me!") + for (i in 1:10) { + res <- rs$run(function() { cat("foo\n"); message("bar"); 42 }) + expect_equal(res, list(result = 42, output = "foo\n", error = "bar\n")) - rs$finish() + res <- rs$run(function() { cat("bar\n"); message("foo"); 43 }) + expect_equal(res, list(result = 43, output = "bar\n", error = "foo\n")) + } }) From b2f1c312b31af4f27baf8bc9a9251e63477e8fed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Thu, 31 May 2018 11:57:31 +0100 Subject: [PATCH 12/22] Fix and test for $interrupt() --- R/result.R | 1 + tests/testthat/test-r-session.R | 15 +++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/R/result.R b/R/result.R index 4550386a..f7914337 100644 --- a/R/result.R +++ b/R/result.R @@ -65,6 +65,7 @@ get_result <- function(output, options) { ) if (err[[1]] == "error") { + err[[2]]$message <- err[[2]]$message %||% "interrupt" stop(err[[2]]) } else if (err[[1]] == "stack") { diff --git a/tests/testthat/test-r-session.R b/tests/testthat/test-r-session.R index f3840599..d7bef587 100644 --- a/tests/testthat/test-r-session.R +++ b/tests/testthat/test-r-session.R @@ -67,3 +67,18 @@ test_that("get stdout/stderr from file", { expect_equal(res, list(result = 43, output = "bar\n", error = "foo\n")) } }) + +test_that("interrupt", { + rs <- r_session$new() + on.exit(rs$kill()) + + ## Wait until ready, but max 3s + r_session_wait_or_kill(rs) + + rs$call(function() Sys.sleep(5)) + Sys.sleep(0.5) + rs$interrupt() + rs$poll_io(1000) + err <- tryCatch(rs$get_result(), interrupt = function(e) e) + expect_s3_class(err, "interrupt") +}) From 9a73004a8ee718a47bbc2a278a08ad363ae34ef4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sat, 2 Jun 2018 09:47:33 +0100 Subject: [PATCH 13/22] Use processx master It has everything merged now. --- DESCRIPTION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DESCRIPTION b/DESCRIPTION index 83ed85e4..b396707f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -26,5 +26,5 @@ Suggests: testthat, withr Remotes: - r-lib/processx@feature/set-std + r-lib/processx Encoding: UTF-8 From e3e84338db94898252cd7941be7d8f14b621dafd Mon Sep 17 00:00:00 2001 From: Gabor Csardi Date: Sun, 3 Jun 2018 03:05:30 -0700 Subject: [PATCH 14/22] Fix r_session interrupts --- R/r-session.R | 6 ++++-- R/script.R | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/R/r-session.R b/R/r-session.R index 179d5b0a..8c00ea11 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -241,10 +241,12 @@ rs_get_result_and_output <- function(self, private) { if (private$state != "ready") private$update_state() get_my_result <- function() { - out <- if (!is.null(private$tmp_output_file)) { + out <- if (!is.null(private$tmp_output_file) && + file.exists(private$tmp_output_file)) { read_all(private$tmp_output_file) } - err <- if (!is.null(private$tmp_error_file)) { + err <- if (!is.null(private$tmp_error_file) && + file.exists(private$tmp_error_file)) { read_all(private$tmp_error_file) } unlink(c(private$tmp_output_file, private$tmp_error_file)) diff --git a/R/script.R b/R/script.R index e393ed12..b7d6489e 100644 --- a/R/script.R +++ b/R/script.R @@ -92,8 +92,8 @@ make_vanilla_script_expr <- function(expr_file, res, error, error = function(e) { `__error__` }, interrupt = function(e) { `__error__` } ), - error = function(e) e, - interrupt = function(e) e + error = function(e) { `__stdout2__`; `__stderr2__`; e }, + interrupt = function(e) { `__stdout2__`; `__stderr2__`; e } ) # nocov end }, From 550c533319c1e5ef7c0317829913533a5add658d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sun, 3 Jun 2018 18:57:16 +0100 Subject: [PATCH 15/22] Session: make sure finish() kills process --- R/r-session.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/R/r-session.R b/R/r-session.R index 8c00ea11..ca7ffb3d 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -290,6 +290,8 @@ rs_finish <- function(self, private, grace) { close(self$get_input_connection()) self$poll_io(grace) self$kill() + self$wait(1000) + if (self$is_alive()) stop("Could not kill background R session") private$state <- "finished" private$fun_started_at <- as.POSIXct(NA) } From 38282276435a93d82f6f29919bb95a09725db4ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sun, 3 Jun 2018 18:57:42 +0100 Subject: [PATCH 16/22] Make session work with stdout/stderr pipes --- R/r-session.R | 6 +++--- tests/testthat/test-r-session.R | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/R/r-session.R b/R/r-session.R index ca7ffb3d..5e6a187d 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -117,8 +117,8 @@ r_session <- R6Class( fun_started_at = as.POSIXct(NA), pipe = NULL, - tmp_output_file = character(), - tmp_error_file = character(), + tmp_output_file = NULL, + tmp_error_file = NULL, func_file = NULL, res_file = NULL, @@ -174,7 +174,7 @@ rs_init <- function(self, private, super, options) { rs_run <- function(self, private, func, args, timeout) { self$call(func, args) - self$poll_io(timeout) + poll(list(private$pipe), timeout) self$get_result_and_output() } diff --git a/tests/testthat/test-r-session.R b/tests/testthat/test-r-session.R index d7bef587..e24b0f4b 100644 --- a/tests/testthat/test-r-session.R +++ b/tests/testthat/test-r-session.R @@ -68,6 +68,25 @@ test_that("get stdout/stderr from file", { } }) +test_that("stdout/stderr from pipe", { + opt <- r_session_options(stdout = "|", stderr = "|") + rs <- r_session$new(opt) + on.exit(rs$kill()) + + ## Wait until ready, but max 3s + r_session_wait_or_kill(rs) + + res <- rs$run(function() { cat("foo\n"); message("bar"); 42 }) + expect_equal(res, list(result = 42, output = NULL, error = NULL)) + + res <- rs$run(function() { cat("bar\n"); message("foo"); 43 }) + expect_equal(res, list(result = 43, output = NULL, error = NULL)) + + rs$finish() + expect_equal(rs$read_all_output_lines(), c("foo", "bar")) + expect_equal(rs$read_all_error_lines(), c("bar", "foo")) +}) + test_that("interrupt", { rs <- r_session$new() on.exit(rs$kill()) From b6ac7469f02c63eb90331315574e301b29538975 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sun, 3 Jun 2018 19:57:18 +0100 Subject: [PATCH 17/22] Remove some dead code --- R/r-session.R | 7 ------- 1 file changed, 7 deletions(-) diff --git a/R/r-session.R b/R/r-session.R index 5e6a187d..28e6d801 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -162,13 +162,6 @@ rs_init <- function(self, private, super, options) { private$started_at <- Sys.time() private$state <- "starting" - if (!is.null(f <- self$get_output_file()) && f != "|") { - private$stdout_file_con <- file(f, open = "rb", blocking = TRUE) - } - if (!is.null(f <- self$get_error_file()) && f != "|") { - private$stderr_file_con <- file(f, open = "rb", blocking = TRUE) - } - invisible(self) } From 8fc630a529b1314bb45952e565b083b3172b4c67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sun, 3 Jun 2018 20:15:30 +0100 Subject: [PATCH 18/22] Session: set process to idle on error When we read out the result. --- R/r-session.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/r-session.R b/R/r-session.R index 28e6d801..6ec1934d 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -250,8 +250,8 @@ rs_get_result_and_output <- function(self, private) { stderr = err %||% "", timeout = FALSE ) - res <- get_result(outp, private$options) private$state <- "idle" + res <- get_result(outp, private$options) unlink(private$options$tmp_files, recursive = TRUE) private$options$tmp_files <- NULL From 0592cb32d2ae02c9306d546e8c92e8a3c7ef8777 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sun, 3 Jun 2018 21:14:32 +0100 Subject: [PATCH 19/22] r_session$run() handles interrupts --- R/r-session.R | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/R/r-session.R b/R/r-session.R index 6ec1934d..9314026d 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -167,8 +167,26 @@ rs_init <- function(self, private, super, options) { rs_run <- function(self, private, func, args, timeout) { self$call(func, args) - poll(list(private$pipe), timeout) - self$get_result_and_output() + + tryCatch( + { + poll(list(private$pipe), timeout) + self$get_result_and_output() + }, + interrupt = function(e) { + self$interrupt() + ## The R process will catch the interrupt, and then save the + ## error object to a file, but this might still take some time, + ## so we need to poll here. If the bg process ignores + ## interrupts, then we throw an error. + ps <- poll(list(private$pipe), 1000)[[1]] + if (ps == "timeout") { + stop("Background process ignores interrupt, still running") + } else { + self$get_result_and_output() + } + } + ) } rs_call <- function(self, private, func, args) { From 59ea0cab46d3291a2d33012c6e1900530a35ddd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sun, 3 Jun 2018 21:20:19 +0100 Subject: [PATCH 20/22] r_session: print method --- R/r-session.R | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/R/r-session.R b/R/r-session.R index 9314026d..707c1c99 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -107,6 +107,15 @@ r_session <- R6Class( unlink(private$tmp_output_file) unlink(private$tmp_error_file) if ("finalize" %in% ls(super)) super$finalize() + }, + print = function(...) { + cat( + sep = "", + "R SESSION, ", + if (self$is_alive()) "alive, ", + self$get_state(), ", ", + "pid ", self$get_pid(), ".") + invisible(self) } ), From 778055f0441eabdb6e55603fd784a8eee4a86964 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sun, 3 Jun 2018 21:25:48 +0100 Subject: [PATCH 21/22] r_session: better error messages When session is not in the right state for an operation. --- R/r-session.R | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/R/r-session.R b/R/r-session.R index 707c1c99..91f0f5e6 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -200,6 +200,8 @@ rs_run <- function(self, private, func, args, timeout) { rs_call <- function(self, private, func, args) { private$update_state() + if (private$state == "finished") stop("R session finished") + if (private$state == "ready") stop("Need to read out R session result") if (private$state != "idle") stop("R session busy") ## Save the function in a file @@ -288,7 +290,7 @@ rs_get_result_and_output <- function(self, private) { switch( private$state, "finished" = stop("R session already finished"), - "idle" = stop("R session is idle"), + "idle" = stop("No result in R session"), "busy" = stop("R session still busy"), "starting" = stop("R session still starting"), "ready" = get_my_result() From 5244bea039c4777d59000723292f29b0ffd85254 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=A1bor=20Cs=C3=A1rdi?= Date: Sun, 3 Jun 2018 21:26:20 +0100 Subject: [PATCH 22/22] r_session: get_state() checks if alive In case it crashed, or was $kill()-ed. --- R/r-session.R | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/R/r-session.R b/R/r-session.R index 91f0f5e6..f706319c 100644 --- a/R/r-session.R +++ b/R/r-session.R @@ -321,7 +321,11 @@ rs_finish <- function(self, private, grace) { #' @importFrom processx conn_read_lines rs__update_state <- function(self, private) { - private$wait_for_call(timeout = 0) + if (!self$is_alive()) { + self$finish() + } else { + private$wait_for_call(timeout = 0) + } } rs__report_back <- function(self, private, code, text) {