Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a background R session and keep running code in it #56

Merged
merged 22 commits into from
Jun 4, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ BugReports: https://github.com/r-lib/callr/issues
RoxygenNote: 6.0.1.9000
Roxygen: list(markdown = TRUE)
Imports:
processx (>= 3.1.0),
processx (>= 3.1.0.9001),
R6,
utils
Suggests:
covr,
testthat,
withr
Remotes:
r-lib/processx@feature/pollable-connection
Encoding: UTF-8
6 changes: 6 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ export(r_copycat)
export(r_process)
export(r_process_options)
export(r_safe)
export(r_session)
export(r_session_options)
export(r_vanilla)
export(rcmd)
export(rcmd_bg)
Expand All @@ -18,6 +20,10 @@ export(rcmd_safe)
export(rcmd_safe_env)
export(run)
importFrom(R6,R6Class)
importFrom(processx,conn_create_fd)
importFrom(processx,conn_create_pipepair)
importFrom(processx,conn_read_lines)
importFrom(processx,conn_write)
importFrom(processx,poll)
importFrom(processx,process)
importFrom(processx,run)
Expand Down
3 changes: 2 additions & 1 deletion R/r-process.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
#' External R Process
#'
#' An R process that runs in the background. This is an R6 class that
#' extends the [process] class.
#' extends the [processx::process] class. The process starts in the
#' background, evaluates an R function call, and then quits.
#'
#' @section Usage:
#' ```
Expand Down
306 changes: 306 additions & 0 deletions R/r-session.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,306 @@

#' External R Session
#'
#' A permanent R session that runs in the background. This is an R6 class
#' that extends the [processx::process] class.
#'
#' The process is started at the creation of the object, and then it can
#' be used to evaluate R function calls, one at a time.
#'
#' @section Usage:
#' ```
#' rs <- r_session$new(options)
#' rs$run(func, args = list())
#' rs$call(func, args = list())
#' rs$wait_for_call(timeout = -1)
#' rs$get_result()
#' rs$get_running_time()
#' rs$get_state()
#' rs$finish()
#' ```
#'
#' @section Arguments:
#' * `options`: A list of options created via [r_session_options()].
#' * `func`: Function object to call in the background R process.
#' * Please read the notes for the similar argument of [r()]
#' * `args`: Arguments to pass to the function. Must be a list.
#' * `timeout`: Timeout in milliseconds.
#'
#' @section Details:
#' `r_session$new()` creates a new R background process. It returns
#' immediately, i.e. before the process is actually ready to run. You may
#' call `wait_for_call()` to make sure it is ready.
#'
#' `rs$run()` is similar to [r()], but runs the function in the `rs` R
#' session. Note that if a timeout happens, the session and the background
#' computation is not terminated. You can call `rs$finish()` to terminate
#' the R process. There is currently no way to terminate the computation
#' without terminating the background R process.
#'
#' `rs$call()` starts running a function in the background R session, and
#' returns immediately. To check if the function is done, call the
#' `wait_for_call()` method. To get the result call the `get_result()`
#' method.
#'
#' `rs$wait_for_call()` waits for an `rs$call()` computation, or the R
#' session startup to finish. This is essentially a poll operation.
#' If there is no computation running, it returns immediately.
#'
#' `rs$get_result()` returns the result of the last `rs$call()`
#' computation. (Or the result of the last `rs$run()`, if it was
#' interrupted.) If there is no result to return, because the computation
#' has not finished yet, or some other reason, it throws an error.
#'
#' `rs$get_running_time()` returns the elapsed time since the R process
#' has started, and the elapsed time since the current computation has
#' started. The latter is NA if there is no active computation.
#'
#' `rs$get_state()` return the state of the R session. Possible values:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a way to get the status code if the process has finished?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It inherits all methods from processx::process, so you have rs$get_exit_status().

#' * `"starting"`: starting up,
#' * `"idle"`: ready to compute,
#' * `"busy"`: computing right now,
#' * `"ready"`: computation finished, result can be read out,
#' * `"finished"`: the R process has finished.
#' `rs$get_state()` automatically updates the state, i.e. it performs a
#' quick `wait_for_call()`, if needed.
#'
#' `r$finish()` terminates the current computation and the R process.
#' The session object will be in `"finished"` state after this.
#'
#' @name r_session
#' @examples
#' \dontrun{
#' opt <- r_session_options()
#' rs <- r_ression$new(opt)
#'
#' rs$run(function() 1 + 2)
#'
#' rs$call(function() Sys.sleep(1))
#' rs$get_state()
#' rs$wait_for_call()
#'
#' rs$get_result()
#' }
NULL


#' @importFrom R6 R6Class
#' @export

r_session <- R6Class(
"r_session",
inherit = process,

public = list(
initialize = function(options)
rs_init(self, private, super, options),
run = function(func, args = list(), timeout = -1)
rs_run(self, private, func, args, timeout),
call = function(func, args = list())
rs_call(self, private, func, args),
wait_for_call = function(timeout = -1)
rs_wait_for_call(self, private, timeout),
get_result = function()
rs_get_result(self, private),
get_running_time = function()
rs_get_running_time(self, private),
get_state = function()
rs_get_state(self, private),
finish = function(grace = 200)
rs_finish(self, private, grace)
),

private = list(
options = NULL,
state = NULL,
started_at = NULL,
fun_started_at = as.POSIXct(NA),
pipe = NULL, # Two connections, for me and child

func_file = NULL,
res_file = NULL,

update_state = function()
rs__update_state(self, private),
report_back = function(code, text = "")
rs__report_back(self, private, code, text),
write_for_sure = function(text)
rs__write_for_sure(self, private, text)
)
)

#' @importFrom processx conn_create_pipepair

rs_init <- function(self, private, super, options) {

options$func <- options$func %||% function() { }
options$args <- list()
options <- convert_and_check_my_args(options)
options <- setup_context(options)
options <- setup_r_binary_and_args(options, script_file = FALSE)

private$options <- options

private$pipe <- conn_create_pipepair()

with_envvar(
options$env,
super$initialize(options$bin, options$real_cmdargs, stdin = "|",
stdout = options$stdout, stderr = options$stderr,
connections = list(private$pipe[[2]]))
)
private$started_at <- Sys.time()
private$state <- "starting"

## Make child report back when ready
private$report_back(200, "ready to go")

invisible(self)
}

rs_run <- function(self, private, func, args, timeout) {
self$call(func, args)
self$wait_for_call(timeout)
self$get_result()
}

rs_call <- function(self, private, func, args) {
private$update_state()
if (private$state != "idle") stop("R session busy")

## Save the function in a file
private$options$func <- func
private$options$args <- args
private$options$func_file <- save_function_to_temp(private$options)
private$options$result_file <- tempfile()
private$options$tmp_files <-
c(private$options$tmp_files, private$options$func_file,
private$options$result_file)

## Run an expr that loads it, in the child process, with error handlers
expr <- make_vanilla_script_expr(private$options$func_file,
private$options$result_file,
private$options$error)
cmd <- paste0(deparse(expr), "\n")

## Write this to stdin
private$write_for_sure(cmd)

## Report back when done
report_str <- paste0("DONE", basename(private$options$result_file))
private$report_back(200, report_str)

private$state <- "busy"
}

rs_wait_for_call <- function(self, private, timeout) {
if (private$state %in% c("finished", "ready", "idle")) return()

pr <- poll(list(private$pipe[[1]]), timeout)[[1]]
if (pr == "ready") {
if (private$state == "starting") {
private$state <- "idle"
} else {
private$state <- "ready"
}
invisible(conn_read_lines(private$pipe[[1]], 1))
} else {
invisible()
}
}

rs_get_result <- function(self, private) {
if (private$state != "ready") private$update_state()

get_my_result <- function() {
## This is artificial...
out <- list(
status = 0,
stdout = "",
stderr = "",
timeout = FALSE
)
res <- get_result(out, private$options)
private$state <- "idle"
unlink(private$options$tmp_files, recursive = TRUE)
private$options$tmp_files <- NULL
res
}

switch(
private$state,
"finished" = stop("R session already finished"),
"idle" = stop("R session is idle"),
"busy" = stop("R session still busy"),
"starting" = stop("R session still starting"),
"ready" = get_my_result()
)
}

rs_get_running_time <- function(self, private) {
now <- Sys.time()
c(total = now - private$started_at,
current = now - private$fun_started_at)
}

rs_get_state <- function(self, private) {
private$update_state()
private$state
}

rs_finish <- function(self, private, grace) {
close(self$get_input_connection())
self$poll_io(grace)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 200 (ms?) enough time to ensure the everything closes gracefully?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

200ms, yes. Should be, unless the R session has .Last which takes longer to run. For this odd case, the grace period is user configurable.

self$kill()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What signal does this send to the child process? SIGKILL, SIGTERM, SIGINT or SIGQUIT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SIGKILL On windows there is nothing else, and then it is better to behave the same way across platforms.

private$state <- "finished"
private$fun_started_at <- as.POSIXct(NA)
}

#' @importFrom processx conn_read_lines

rs__update_state <- function(self, private) {
self$wait_for_call(timeout = 0)
}

rs__report_back <- function(self, private, code, text) {
cmd <- paste0(deparse(rs__status_expr(code, text, fd = 3)), "\n")
private$write_for_sure(cmd)
}

rs__write_for_sure <- function(self, private, text) {
while (1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this could be

while(length(self$write_input(text)) > 0) {
  Sys.sleep(.1)
}

Although perhaps you feel that obscures what function is doing the work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to save the return value of write_input(), though. That's how write_input() works: it writes as much as it can, and returns the leftover that it could not write.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, so you would need to do something like

while(length(text <- self$write_input(text)) > 0) {
  Sys.sleep(1)
}

Which isn't much of an improvement, so it is fine as is.

text <- self$write_input(text)
if (!length(text)) break;
Sys.sleep(.1)
}
}

#' @importFrom processx conn_create_fd conn_write

rs__status_expr <- function(code, text = "", fd = 3) {
substitute(
{
code_ <- code; fd_ <- fd; text_ <- text
con <- processx::conn_create_fd(fd_, close = FALSE)
data <- paste0(code_, " ", text_, "\n")
while (1) {
data <- processx::conn_write(con, data)
if (!length(data)) break;
Sys.sleep(.1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the sleep time here and on L274 should be a parameter of the object?

Copy link
Member Author

@gaborcsardi gaborcsardi May 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be, but this sleep is actually not very important. It is a safety net that is almost never needed. Basically it is only needed if the write buffer is full, but the write buffer is at least 4k or 8k, and we don't send messages longer than that, and only ever keep at most one message in the buffer. So it is not really important to make this user configurable, I think.

}
},
list(code = code, fd = fd, text = text)
)
}

#' Create options for an [r_session] object
#'
#' @param ... Options to override, named arguments.
#'
#' @export

r_session_options <- function(...) {
opt <- r_process_options(...)
opt$func <- opt$args <- NULL
opt
}
9 changes: 6 additions & 3 deletions R/script.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

make_vanilla_script <- function(expr_file, res, error) {
make_vanilla_script_expr <- function(expr_file, res, error) {

## Code to handle errors in the child
## This will inserted into the main script
Expand Down Expand Up @@ -37,7 +37,7 @@ make_vanilla_script <- function(expr_file, res, error) {
##
## It is important that we do not create any temporary variables,
## the function is called from an empty global environment.
script <- substitute(
substitute(
{
withCallingHandlers( # nocov start
{
Expand All @@ -56,8 +56,11 @@ make_vanilla_script <- function(expr_file, res, error) {

list(`__error__` = err, `__expr_file__` = expr_file, `__res__` = res)
)
}

script <- deparse(script)
make_vanilla_script_file <- function(expr_file, res, error) {
expr <- make_vanilla_script_expr(expr_file, res, error)
script <- deparse(expr)

tmp <- tempfile()
cat(script, file = tmp, sep = "\n")
Expand Down
Loading