Skip to content

Commit

Permalink
Add logging
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau-lilly committed Jan 4, 2024
1 parent b3e76e5 commit 4dba22a
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 14 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Description: In computationally demanding analysis projects,
'clustermq' by Schubert (2019) <doi:10.1093/bioinformatics/btz284>),
and 'batchtools' by Lang, Bischel, and Surmann (2017)
<doi:10.21105/joss.00135>.
Version: 0.7.0.9004
Version: 0.7.0.9005
License: MIT + file LICENSE
URL: https://wlandau.github.io/crew/, https://github.com/wlandau/crew
BugReports: https://github.com/wlandau/crew/issues
Expand Down
4 changes: 3 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# crew 0.7.0.9004 (development)
# crew 0.7.0.9005 (development)

* Configure workers to send themselves a termination signal if the connection to the dispatcher is broken (#141, @psychelzh). Huge thanks to @shikokuchuo for the support through https://github.com/shikokuchuo/mirai/issues/87, https://github.com/shikokuchuo/mirai/pull/88, and https://github.com/shikokuchuo/nanonext/pull/25!
* Throw a warning from `controller$map()` if at least one task threw one. `warnings = FALSE` suppresses this behavior.
* Set `output = TRUE` in `daemon()` so `stdout` and `stderr` streams print.
* Add new arguments `local_log_directory` and `local_log_join` to write to local log files.

# crew 0.7.0

Expand Down
8 changes: 6 additions & 2 deletions R/crew_controller_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ crew_controller_local <- function(
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
launch_max = 5L
launch_max = 5L,
local_log_directory = NULL,
local_log_join = TRUE
) {
crew_deprecate(
name = "seconds_exit",
Expand Down Expand Up @@ -70,7 +72,9 @@ crew_controller_local <- function(
reset_options = reset_options,
garbage_collection = garbage_collection,
launch_max = launch_max,
tls = tls
tls = tls,
local_log_directory = local_log_directory,
local_log_join = local_log_join
)
controller <- crew_controller(client = client, launcher = launcher)
controller$validate()
Expand Down
2 changes: 2 additions & 0 deletions R/crew_eval.R
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ crew_eval <- function(
list2env(x = globals, envir = globalenv())
envir <- list2env(x = data, parent = globalenv())
capture_error <- function(condition) {
message(paste("Error:", conditionMessage(condition))) # for log files
state$error <- crew_eval_message(condition)
state$error_class <- class(condition)
state$trace <- paste(as.character(sys.calls()), collapse = "\n")
NULL
}
capture_warning <- function(condition) {
message(paste("Warning:", conditionMessage(condition))) # for log files
state$count_warnings <- (state$count_warnings %||% 0L) + 1L
should_store_warning <- (state$count_warnings < crew_eval_max_warnings) &&
(nchar(state$warnings %||% "") < crew_eval_max_nchar)
Expand Down
3 changes: 2 additions & 1 deletion R/crew_launcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,12 @@ crew_class_launcher <- R6::R6Class(
list(
url = socket,
autoexit = signal_disconnect,
cleanup = cleanup,
output = TRUE,
maxtasks = private$.tasks_max,
idletime = private$.seconds_idle * 1000,
walltime = private$.seconds_wall * 1000,
timerstart = private$.tasks_timers,
cleanup = cleanup,
tls = private$.tls$worker(name = private$.name),
rs = mirai::nextstream(private$.name)
)
Expand Down
146 changes: 142 additions & 4 deletions R/crew_launcher_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@
#' @description Create an `R6` object to launch and maintain
#' local process workers.
#' @inheritParams crew_launcher
#' @param local_log_directory Either `NULL` or a character of length 1
#' with the file path to a directory to write worker-specific log files
#' with standard output and standard error messages.
#' Each log file represents a single *instance* of a running worker,
#' so there will be more log files
#' if a given worker starts and terminates a lot. Set to `NULL` to suppress
#' log files (default).
#' @param local_log_join Logical of length 1. If `TRUE`, `crew` will write
#' standard output and standard error to the same log file for
#' each worker instance. If `FALSE`, then they these two streams
#' will go to different log files with informative suffixes.
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' client <- crew_client()
Expand Down Expand Up @@ -31,7 +42,9 @@ crew_launcher_local <- function(
reset_options = FALSE,
garbage_collection = FALSE,
launch_max = 5L,
tls = crew::crew_tls()
tls = crew::crew_tls(),
local_log_directory = NULL,
local_log_join = TRUE
) {
crew_deprecate(
name = "seconds_exit",
Expand All @@ -56,7 +69,9 @@ crew_launcher_local <- function(
reset_options = reset_options,
garbage_collection = garbage_collection,
launch_max = launch_max,
tls = tls
tls = tls,
local_log_directory = local_log_directory,
local_log_join = local_log_join
)
launcher$validate()
launcher
Expand All @@ -83,7 +98,125 @@ crew_class_launcher_local <- R6::R6Class(
classname = "crew_class_launcher_local",
inherit = crew_class_launcher,
cloneable = FALSE,
private = list(
.local_log_directory = NULL,
.local_log_join = NULL,
.log_prepare = function() {
if (!is.null(private$.local_log_directory)) {
dir_create(private$.local_log_directory)
}
},
.log_path = function(name, type) {
directory <- private$.local_log_directory
if (is.null(directory)) {
return(NULL)
}
suffix <- if_any(private$.local_log_join, "", paste0("-", type))
file.path(directory, sprintf("%s%s.log", name, suffix))
}
),
active = list(
#' @field local_log_directory See [crew_launcher_local()].
local_log_directory = function() {
.subset2(private, ".local_log_directory")
},
#' @field local_log_join See [crew_launcher_local()].
local_log_join = function() {
.subset2(private, ".local_log_join")
}
),
public = list(
#' @description Local launcher constructor.
#' @return An `R6` object with the local launcher.
#' @param name See [crew_launcher()].
#' @param seconds_interval See [crew_launcher()].
#' @param seconds_timeout See [crew_launcher()].
#' @param seconds_launch See [crew_launcher()].
#' @param seconds_idle See [crew_launcher()].
#' @param seconds_wall See [crew_launcher()].
#' @param seconds_exit See [crew_launcher()].
#' @param tasks_max See [crew_launcher()].
#' @param tasks_timers See [crew_launcher()].
#' @param reset_globals See [crew_launcher()].
#' @param reset_packages See [crew_launcher()].
#' @param reset_options See [crew_launcher()].
#' @param garbage_collection See [crew_launcher()].
#' @param launch_max See [crew_launcher()].
#' @param tls See [crew_launcher()].
#' @param processes See [crew_launcher()].
#' @param local_log_directory See [crew_launcher_local()].
#' @param local_log_join See [crew_launcher_local()].
#' @examples
#' if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
#' client <- crew_client()
#' client$start()
#' launcher <- crew_launcher_local(name = client$name)
#' launcher$start(workers = client$workers)
#' launcher$launch(index = 1L)
#' m <- mirai::mirai("result", .compute = client$name)
#' Sys.sleep(0.25)
#' m$data
#' client$terminate()
#' }
initialize = function(
name = NULL,
seconds_interval = NULL,
seconds_timeout = NULL,
seconds_launch = NULL,
seconds_idle = NULL,
seconds_wall = NULL,
seconds_exit = NULL,
tasks_max = NULL,
tasks_timers = NULL,
reset_globals = NULL,
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL,
launch_max = NULL,
tls = NULL,
processes = NULL,
local_log_directory = NULL,
local_log_join = NULL
) {
super$initialize(
name = name,
seconds_interval = seconds_interval,
seconds_timeout = seconds_timeout,
seconds_launch = seconds_launch,
seconds_idle = seconds_idle,
seconds_wall = seconds_wall,
seconds_exit = seconds_exit,
tasks_max = tasks_max,
tasks_timers = tasks_timers,
reset_globals = reset_globals,
reset_packages = reset_packages,
reset_options = reset_options,
garbage_collection = garbage_collection,
launch_max = launch_max,
tls = tls,
processes = processes
)
private$.local_log_directory <- local_log_directory
private$.local_log_join <- local_log_join
},
#' @description Validate the local launcher.
#' @return `NULL` (invisibly).
validate = function() {
super$validate()
crew_assert(
private$.local_log_directory %|||% "x",
is.character(.),
length(.) == 1L,
!anyNA(.),
nzchar(.),
message = "local_log_directory must be NULL or a valid directory path."
)
crew_assert(
private$.local_log_join,
isTRUE(.) || isFALSE(.),
message = "local_log_join must be TRUE or FALSE."
)
},
#' @description Launch a local process worker which will
#' dial into a socket.
#' @details The `call` argument is R code that will run to
Expand All @@ -94,7 +227,9 @@ crew_class_launcher_local <- R6::R6Class(
#' later on.
#' @param call Character of length 1 with a namespaced call to
#' [crew_worker()] which will run in the worker and accept tasks.
#' @param name Character of length 1 with an informative worker name.
#' @param name Character of length 1 with a long informative worker name
#' which contains the `launcher`, `worker`, and `instance` arguments
#' described below.
#' @param launcher Character of length 1, name of the launcher.
#' @param worker Positive integer of length 1, index of the worker.
#' This worker index remains the same even when the current instance
Expand All @@ -105,10 +240,13 @@ crew_class_launcher_local <- R6::R6Class(
launch_worker = function(call, name, launcher, worker, instance) {
bin <- if_any(tolower(Sys.info()[["sysname"]]) == "windows", "R.exe", "R")
path <- file.path(R.home("bin"), bin)
private$.log_prepare()
processx::process$new(
command = path,
args = c("-e", call),
cleanup = TRUE
cleanup = TRUE,
stdout = private$.log_path(name = name, type = "stdout"),
stderr = private$.log_path(name = name, type = "stderr")
)
},
#' @description Terminate a local process worker.
Expand Down
Loading

0 comments on commit 4dba22a

Please sign in to comment.