diff --git a/DESCRIPTION b/DESCRIPTION index f776ee0ba..e7c971aab 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: mirai Type: Package Title: Minimalist Async Evaluation Framework for R -Version: 2.0.1.9006 +Version: 2.0.1.9007 Description: Designed for simplicity, a 'mirai' evaluates an R expression asynchronously in a parallel process, locally or distributed over the network. The result is automatically available upon completion. Modern diff --git a/NAMESPACE b/NAMESPACE index 880be417d..02d790d1c 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -47,7 +47,6 @@ export(nextget) export(nextstream) export(register_cluster) export(remote_config) -export(saisei) export(serial_config) export(ssh_config) export(status) diff --git a/NEWS.md b/NEWS.md index bfe851b99..3a9b97be4 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,9 +1,13 @@ -# mirai 2.0.1.9006 (development) +# mirai 2.0.1.9007 (development) #### Behavioural Changes * `daemons()` now requires an explicit reset before providing revised settings for a compute profile, and will error otherwise. * `mirai_map()` now errors if daemons have not yet been set (rather than warn and launch one local daemon). +* Removal of mirai v1 compatibility features: + + `saisei()` is now removed as no longer required. + + `daemons()` dispatcher argument "thread" is removed. + + `daemons()` dispatcher arguments "process" and "thread" are formally deprecated and will be removed in a future version. #### Updates @@ -13,7 +17,7 @@ + Fixes language objects being evaluated before the map function is applied (#194). + Fixes classes of objects in a dataframe being dropped during a multiple map (#196). + Better `cli` errors when collecting a 'mirai_map'. -* `status()` call failures when using dispatcher now return the appropriate 'errorValue'. +* Fixes `daemons(NULL)` not causing all daemons started with `autoexit = FALSE` to quit, regression introduced in mirai v2.0.0. * Requires nanonext >= 1.5.0. # mirai 2.0.1 diff --git a/R/daemon.R b/R/daemon.R index 10aa3f685..ff038f993 100644 --- a/R/daemon.R +++ b/R/daemon.R @@ -101,12 +101,6 @@ daemon <- function(url, dispatcher = FALSE, ..., asyncdial = FALSE, autoexit = T cleanup = TRUE, output = FALSE, idletime = Inf, walltime = Inf, maxtasks = Inf, id = NULL, tls = NULL, rs = NULL) { - missing(dispatcher) && return( - v1_daemon(url = url, asyncdial = asyncdial, autoexit = autoexit, - cleanup = cleanup, output = output, idletime = idletime, - walltime = walltime, maxtasks = maxtasks, ..., tls = tls, rs = rs) - ) - cv <- cv() sock <- socket(if (dispatcher) "poly" else "rep") on.exit(reap(sock)) @@ -243,18 +237,6 @@ dial_and_sync_socket <- function(sock, url, asyncdial = FALSE, tls = NULL) { pipe_notify(sock, cv = NULL, add = TRUE) } -parse_cleanup <- function(cleanup) - if (is.logical(cleanup)) - c(cleanup, cleanup, cleanup, FALSE) else - c(as.integer(cleanup) %% 2L, (clr <- as.raw(cleanup)) & as.raw(2L), clr & as.raw(4L), clr & as.raw(8L)) - -perform_cleanup <- function(cleanup) { - if (cleanup[1L]) rm(list = (vars <- names(.GlobalEnv))[!vars %in% .[["vars"]]], envir = .GlobalEnv) - if (cleanup[2L]) lapply((new <- search())[!new %in% .[["se"]]], detach, character.only = TRUE) - if (cleanup[3L]) options(.[["op"]]) - if (cleanup[4L]) gc(verbose = FALSE) -} - do_cleanup <- function() { rm(list = (vars <- names(.GlobalEnv))[!vars %in% .[["vars"]]], envir = .GlobalEnv) lapply((new <- search())[!new %in% .[["se"]]], detach, character.only = TRUE) @@ -262,66 +244,3 @@ do_cleanup <- function() { } snapshot <- function() `[[<-`(`[[<-`(`[[<-`(., "op", .Options), "se", search()), "vars", names(.GlobalEnv)) - -# Legacy compatibility functions ---------------------------------------------- - -v1_daemon <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE, - output = FALSE, maxtasks = Inf, idletime = Inf, walltime = Inf, - timerstart = 0L, ..., tls = NULL, rs = NULL) { - - cv <- cv() - sock <- socket("rep") - on.exit(reap(sock)) - `[[<-`(., "sock", sock) - autoexit && pipe_notify(sock, cv = cv, remove = TRUE, flag = autoexit) - if (length(tls)) tls <- tls_config(client = tls) - dial_and_sync_socket(sock, url, asyncdial = asyncdial, tls = tls) - - if (is.numeric(rs)) `[[<-`(.GlobalEnv, ".Random.seed", as.integer(rs)) - idletime <- if (idletime > walltime) walltime else if (is.finite(idletime)) idletime - cleanup <- parse_cleanup(cleanup) - if (!output) { - devnull <- file(nullfile(), open = "w", blocking = FALSE) - sink(file = devnull) - sink(file = devnull, type = "message") - on.exit({ - sink(type = "message") - sink() - close(devnull) - }, add = TRUE) - } - snapshot() - count <- 0L - start <- mclock() - - repeat { - - ctx <- .context(sock) - aio <- recv_aio(ctx, mode = 1L, timeout = idletime, cv = cv) - wait(cv) || break - m <- collect_aio(aio) - is.object(m) && { - count < timerstart && { - start <- mclock() - next - } - break - } - data <- eval_mirai(m) - count <- count + 1L - - (count >= maxtasks || count > timerstart && mclock() - start >= walltime) && { - .mark() - send(ctx, data, mode = 1L, block = TRUE) - aio <- recv_aio(ctx, mode = 8L, cv = cv) - wait(cv) - break - } - - send(ctx, data, mode = 1L, block = TRUE) - perform_cleanup(cleanup) - if (count <= timerstart) start <- mclock() - - } - -} diff --git a/R/daemons.R b/R/daemons.R index 8d4411855..f10a8e200 100644 --- a/R/daemons.R +++ b/R/daemons.R @@ -260,24 +260,11 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ..., cv <- cv() urld <- local_url() sock <- req_socket(urld) - res <- launch_sync_dispatcher(sock, sock, wa52(urld, dots, url), output, tls, pass, serial) + res <- launch_sync_dispatcher(sock, wa5(urld, dots, url), output, tls, pass, serial) is.object(res) && stop(._[["sync_dispatcher"]]) store_dispatcher(sock, res, cv, envir) `[[<-`(envir, "msgid", 0L) }, - { - n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(._[["n_one"]]) - tls <- configure_tls(url, tls, pass, envir, returnconfig = FALSE) - cv <- cv() - urld <- local_url() - urlc <- sprintf("%s%s", urld, "c") - sock <- req_socket(urld) - sockc <- req_socket(urlc) - res <- launch_sync_dispatcher(sock, sockc, wa5(urld, dots, n, urlc, url), output, tls, pass) - is.object(res) && stop(._[["sync_dispatcher"]]) - store_dispatcher(sockc, res, cv, envir) - launches <- n - }, stop(._[["dispatcher_args"]]) ) `[[<-`(.., .compute, `[[<-`(`[[<-`(`[[<-`(envir, "sock", sock), "n", launches), "dots", dots)) @@ -299,7 +286,6 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ..., if (signal) send_signal(envir) reap(envir[["sock"]]) - is.null(envir[["sockc"]]) || reap(envir[["sockc"]]) ..[[.compute]] <- NULL -> envir } else if (is.null(envir)) { @@ -319,22 +305,12 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ..., { cv <- cv() sock <- req_socket(urld) - res <- launch_sync_dispatcher(sock, sock, wa42(urld, dots, envir[["stream"]], n), output, serial = serial) + res <- launch_sync_dispatcher(sock, wa4(urld, dots, envir[["stream"]], n), output, serial = serial) is.object(res) && stop(._[["sync_dispatcher"]]) store_dispatcher(sock, res, cv, envir) for (i in seq_len(n)) next_stream(envir) `[[<-`(envir, "msgid", 0L) }, - { - cv <- cv() - sock <- req_socket(urld) - urlc <- sprintf("%s%s", urld, "c") - sockc <- req_socket(urlc) - res <- launch_sync_dispatcher(sock, sockc, wa4(urld, dots, envir[["stream"]], n, urlc), output) - is.object(res) && stop(._[["sync_dispatcher"]]) - store_dispatcher(sockc, res, cv, envir) - for (i in seq_len(n)) next_stream(envir) - }, stop(._[["dispatcher_args"]]) ) `[[<-`(.., .compute, `[[<-`(`[[<-`(`[[<-`(envir, "sock", sock), "n", n), "dots", dots)) @@ -443,8 +419,7 @@ status <- function(.compute = "default") { envir <- ..[[.compute]] is.null(envir) && return(list(connections = 0L, daemons = 0L)) length(envir[["msgid"]]) && return(dispatcher_status(envir)) - list(connections = as.integer(stat(envir[["sock"]], "pipes")), - daemons = if (is.null(envir[["sockc"]])) envir[["urls"]] else query_status(envir)) + list(connections = as.integer(stat(envir[["sock"]], "pipes")), daemons = envir[["urls"]]) } @@ -507,13 +482,11 @@ init_envir_stream <- function(seed) { envir } -tokenized_url <- function(url) sprintf("%s/%s", url, random(12L)) - req_socket <- function(url, tls = NULL, resend = 0L) `opt<-`(socket("req", listen = url, tls = tls), "req:resend-time", resend) parse_dispatcher <- function(x) - if (is.logical(x)) 1L + (!is.na(x) && x) else if (x == "process" || x == "thread") 3L else if (x == "none") 1L else 4L + if (is.logical(x)) 1L + (!is.na(x) && x) else if (x == "process" || x == "thread") 2L else if (x == "none") 1L else 3L parse_dots <- function(...) { ...length() || return("") @@ -531,50 +504,47 @@ parse_tls <- function(tls) libp <- function(lp = .libPaths()) lp[file.exists(file.path(lp, "mirai"))][1L] -wa31 <- function(url, dots, rs, tls = NULL) - shQuote(sprintf("mirai::daemon(\"%s\"%s%s,rs=c(%s))", url, dots, parse_tls(tls), paste0(rs, collapse = ","))) - -wa3 <- function(url, dots, rs, tls = NULL) +wa2 <- function(url, dots, rs, tls = NULL) shQuote(sprintf("mirai::daemon(\"%s\"%s%s,rs=c(%s),dispatcher=FALSE)", url, dots, parse_tls(tls), paste0(rs, collapse = ","))) -wa32 <- function(url, dots, rs, tls = NULL) +wa3 <- function(url, dots, rs, tls = NULL) shQuote(sprintf("mirai::daemon(\"%s\"%s%s,rs=c(%s),dispatcher=TRUE)", url, dots, parse_tls(tls), paste0(rs, collapse = ","))) -wa4 <- function(urld, dots, rs, n, urlc) - shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",n=%d,rs=c(%s),monitor=\"%s\"%s)", libp(), urld, n, paste0(rs, collapse= ","), urlc, dots)) - -wa42 <- function(urld, dots, rs, n) +wa4 <- function(urld, dots, rs, n) shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",n=%d,rs=c(%s)%s)", libp(), urld, n, paste0(rs, collapse= ","), dots)) -wa5 <- function(urld, dots, n, urlc, url) - shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",c(\"%s\"),n=%d,monitor=\"%s\"%s)", libp(), urld, paste0(url, collapse = "\",\""), n, urlc, dots)) - -wa52 <- function(urld, dots, url) +wa5 <- function(urld, dots, url) shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",url=\"%s\"%s)", libp(), urld, url, dots)) launch_daemon <- function(args, output) system2(.command, args = c("-e", args), stdout = output, stderr = output, wait = FALSE) -launch_sync_dispatcher <- function(sock, sockc, args, output, tls = NULL, pass = NULL, serial = NULL) { +query_dispatcher <- function(sock, command, send_mode = 2L, recv_mode = 5L, block = .limit_short) + if (r <- send(sock, command, mode = send_mode, block = block)) r else + recv(sock, mode = recv_mode, block = block) + +launch_sync_dispatcher <- function(sock, args, output, tls = NULL, pass = NULL, serial = NULL) { pkgs <- Sys.getenv("R_DEFAULT_PACKAGES") system2(.command, args = c("--default-packages=NULL", "--vanilla", "-e", args), stdout = output, stderr = output, wait = FALSE) if (is.list(serial)) `opt<-`(sock, "serial", serial) - query_dispatcher(sockc, list(pkgs, tls, pass, serial), send_mode = 1L, recv_mode = 2L, block = .limit_long) + query_dispatcher(sock, list(pkgs, tls, pass, serial), send_mode = 1L, recv_mode = 2L, block = .limit_long) } launch_sync_daemons <- function(seq, sock, urld, dots, envir, output) { cv <- cv() pipe_notify(sock, cv = cv, add = TRUE) for (i in seq) - launch_daemon(wa3(urld, dots, next_stream(envir)), output) + launch_daemon(wa2(urld, dots, next_stream(envir)), output) for (i in seq) until(cv, .limit_long) || return(pipe_notify(sock, cv = NULL, add = TRUE)) !pipe_notify(sock, cv = NULL, add = TRUE) } -store_dispatcher <- function(sockc, res, cv, envir) - `[[<-`(`[[<-`(`[[<-`(`[[<-`(envir, "sockc", sockc), "urls", res[-1L]), "pid", as.integer(res[1L])), "cv", cv) +store_dispatcher <- function(sock, res, cv, envir) + `[[<-`(`[[<-`(`[[<-`(`[[<-`(envir, "sock", sock), "urls", res[-1L]), "pid", as.integer(res[1L])), "cv", cv) + +sub_real_port <- function(port, url) sub("(?<=:)0(?![^/])", port, url, perl = TRUE) check_store_url <- function(sock, envir) { listener <- attr(sock, "listener")[[1L]] @@ -585,24 +555,14 @@ check_store_url <- function(sock, envir) { } send_signal <- function(envir) { - signals <- max(length(envir[["urls"]]), stat(envir[["sock"]], "pipes")) + signals <- if (is.null(envir[["msgid"]])) stat(envir[["sock"]], "pipes") else + query_dispatcher(envir[["sock"]], c(0L, 0L))[1L] for (i in seq_len(signals)) { send(envir[["sock"]], ._scm_., mode = 2L) msleep(10L) } } -query_status <- function(envir) { - res <- query_dispatcher(envir[["sockc"]], 0L) - `attributes<-`( - res, - list( - dim = c(envir[["n"]], 5L), - dimnames = list(envir[["urls"]], c("i", "online", "instance", "assigned", "complete")) - ) - ) -} - dispatcher_status <- function(envir) { status <- query_dispatcher(envir[["sock"]], c(0L, 0L)) is.object(status) && return(status) diff --git a/R/dispatcher.R b/R/dispatcher.R index 4a6e11154..1a1071dc4 100644 --- a/R/dispatcher.R +++ b/R/dispatcher.R @@ -51,18 +51,13 @@ #' @param pass [default NULL] (required only if the private key supplied to #' \sQuote{tls} is encrypted with a password) For security, should be provided #' through a function that returns this value, rather than directly. -#' @param monitor unused legacy parameter - do not specify this value. #' #' @return Invisible NULL. #' #' @export #' dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL, - rs = NULL, monitor = NULL) { - - missing(monitor) || - return(v1_dispatcher(host = host, url = url, n = n, ..., - tls = tls, pass = pass, rs = rs, monitor = monitor)) + rs = NULL) { n <- if (is.numeric(n)) as.integer(n) else length(url) n > 0L || stop(._[["missing_url"]]) @@ -108,7 +103,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL, dots <- parse_dots(...) output <- attr(dots, "output") for (i in seq_len(n)) - launch_daemon(wa32(url, dots, next_stream(envir)), output) + launch_daemon(wa3(url, dots, next_stream(envir)), output) for (i in seq_len(n)) until(cv, .limit_long) || stop(._[["sync_daemons"]]) @@ -120,7 +115,10 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL, } } else { - url <- check_url(psock) + listener <- attr(psock, "listener")[[1L]] + url <- opt(listener, "url") + if (parse_url(url)[["port"]] == "0") + url <- sub_real_port(opt(listener, "tcp-bound-port"), url) } send(ctx, c(Sys.getpid(), url), mode = 2L, block = TRUE) @@ -236,235 +234,3 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL, ) } - -# internals -------------------------------------------------------------------- - -get_ports <- function(url, n) { - baseurl <- parse_url(url) - if (startsWith(baseurl[["scheme"]], "t")) { - if (baseurl[["port"]] == "0") integer(n) else seq.int(from = baseurl[["port"]], length.out = n) - } -} - -sub_real_port <- function(port, url) sub("(?<=:)0(?![^/])", port, url, perl = TRUE) - -check_url <- function(sock) { - listener <- attr(sock, "listener")[[1L]] - url <- opt(listener, "url") - if (parse_url(url)[["port"]] == "0") - url <- sub_real_port(opt(listener, "tcp-bound-port"), url) - url -} - -query_dispatcher <- function(sock, command, send_mode = 2L, recv_mode = 5L, block = .limit_short) - if (r <- send(sock, command, mode = send_mode, block = block)) r else - recv(sock, mode = recv_mode, block = block) - -create_req <- function(ctx, cv) - list(ctx = ctx, req = recv_aio(ctx, mode = 8L, cv = cv)) - -# Legacy compatibility functions ----------------------------------------------- - -v1_dispatcher <- function(host, url = NULL, n = NULL, ..., retry = FALSE, - token = FALSE, tls = NULL, pass = NULL, rs = NULL, monitor = NULL) { - - n <- if (is.numeric(n)) as.integer(n) else length(url) - n > 0L || stop(._[["missing_url"]]) - - cv <- cv() - sock <- socket("rep") - on.exit(reap(sock)) - pipe_notify(sock, cv = cv, remove = TRUE, flag = TRUE) - dial_and_sync_socket(sock, host) - - ctrchannel <- is.character(monitor) - if (ctrchannel) { - sockc <- socket("rep") - on.exit(reap(sockc), add = TRUE, after = FALSE) - pipe_notify(sockc, cv = cv, remove = TRUE, flag = TRUE) - dial_and_sync_socket(sockc, monitor) - cmessage <- recv(sockc, mode = 1L, block = .limit_long) - is.object(cmessage) && stop(._[["sync_dispatcher"]]) - if (nzchar(cmessage[[1L]])) - Sys.setenv(R_DEFAULT_PACKAGES = cmessage[[1L]]) else - Sys.unsetenv("R_DEFAULT_PACKAGES") - } - - auto <- is.null(url) - vectorised <- length(url) == n - seq_n <- seq_len(n) - basenames <- servernames <- character(n) - activestore <- instance <- complete <- assigned <- integer(n) - serverfree <- !integer(n) - active <- servers <- queue <- vector(mode = "list", length = n) - if (auto) { - dots <- parse_dots(...) - output <- attr(dots, "output") - } else { - ports <- get_ports(url, n) - if (length(ports)) token <- FALSE - if (ctrchannel && is.character(cmessage[[2L]]) && is.null(tls)) { - tls <- cmessage[[2L]] - pass <- cmessage[[3L]] - } - if (length(tls)) - tls <- tls_config(server = tls, pass = pass) - } - pass <- NULL - - envir <- new.env(hash = FALSE) - if (is.numeric(rs)) `[[<-`(envir, "stream", as.integer(rs)) - - for (i in seq_n) { - burl <- if (auto) .urlscheme else - if (vectorised) url[i] else - if (is.null(ports)) sprintf(if (startsWith(url, "ipc")) "%s-%d" else "%s/%d", url, i) else - sub(ports[1L], ports[i], url, fixed = TRUE) - nurl <- if (auto) local_url() else if (token) tokenized_url(burl) else burl - ncv <- cv() - nsock <- req_socket(NULL, resend = retry * .intmax) - pipe_notify(nsock, cv = ncv, cv2 = cv, add = TRUE, remove = TRUE) - lock(nsock, cv = ncv) - listen(nsock, url = nurl, tls = tls, error = TRUE) - listener <- attr(nsock, "listener")[[1L]] - listurl <- opt(listener, "url") - if (i == 1L && !auto && parse_url(listurl)[["port"]] == "0") { - realport <- opt(listener, "tcp-bound-port") - listurl <- sub_real_port(realport, nurl) - if (!vectorised || n == 1L) { - url <- sub_real_port(realport, url) - burl <- sub_real_port(realport, burl) - } - } - - auto && launch_daemon(wa31(nurl, dots, next_stream(envir)), output) - - basenames[i] <- burl - servernames[i] <- listurl - servers[[i]] <- nsock - active[[i]] <- ncv - queue[[i]] <- create_req(.context(sock), cv) - } - - on.exit(lapply(servers, reap), add = TRUE, after = TRUE) - - if (auto) - for (i in seq_n) - until(cv, .limit_long) || stop(._[["sync_daemons"]]) - - if (ctrchannel) { - send(sockc, c(Sys.getpid(), servernames), mode = 2L) - cmessage <- recv_aio(sockc, mode = 5L, cv = cv) - } - - suspendInterrupts( - repeat { - - wait(cv) || break - - cv_values <- as.integer(lapply(active, cv_value)) - activevec <- cv_values %% 2L - changes <- (activevec - activestore) > 0L - activestore <- activevec - if (any(changes)) { - instance[changes] <- abs(instance[changes]) + 1L - serverfree <- serverfree | changes - } - - ctrchannel && !unresolved(cmessage) && { - i <- .subset2(cmessage, "value") - if (i) { - if (i > 0L && !activevec[[i]]) { - reap(attr(servers[[i]], "listener")[[1L]]) - attr(servers[[i]], "listener") <- NULL - data <- servernames[i] <- if (auto) local_url() else tokenized_url(basenames[i]) - instance[i] <- -abs(instance[i]) - listen(servers[[i]], url = data, tls = tls, error = TRUE) - - } else if (i < 0L) { - i <- -i - reap(servers[[i]]) - servers[[i]] <- nsock <- req_socket(NULL, resend = retry * .intmax) - pipe_notify(nsock, cv = active[[i]], cv2 = cv, add = TRUE, remove = TRUE) - lock(nsock, cv = active[[i]]) - data <- servernames[i] <- if (auto) local_url() else tokenized_url(basenames[i]) - instance[i] <- -abs(instance[i]) - listen(nsock, url = data, tls = tls, error = TRUE) - - } else { - data <- "" - - } - } else { - data <- as.integer(c(seq_n, activevec, instance, assigned, complete)) - } - send(sockc, data, mode = 2L) - cmessage <- recv_aio(sockc, mode = 5L, cv = cv) - next - } - - for (i in seq_n) - if (length(queue[[i]]) > 2L && !unresolved(queue[[i]][["req"]])) { - req <- .subset2(queue[[i]][["req"]], "value") - if (is.object(req)) req <- serialize(req, NULL, xdr = FALSE) - send(queue[[i]][["ctx"]], req, mode = 2L, block = TRUE) - q <- queue[[i]][["daemon"]] - if (req[4L]) { - send(queue[[i]][["rctx"]], NULL, mode = 2L, block = TRUE) - reap(queue[[i]][["rctx"]]) - } else { - serverfree[q] <- TRUE - } - complete[q] <- complete[q] + 1L - queue[[i]] <- create_req(.context(sock), cv) - } - - free <- which(serverfree & activevec) - - if (length(free)) - for (q in free) - for (i in seq_n) { - if (length(queue[[i]]) == 2L && !unresolved(queue[[i]][["req"]])) { - queue[[i]][["rctx"]] <- .context(servers[[q]]) - queue[[i]][["req"]] <- request(queue[[i]][["rctx"]], .subset2(queue[[i]][["req"]], "value"), - send_mode = 2L, recv_mode = 8L, cv = cv) - queue[[i]][["daemon"]] <- q - serverfree[q] <- FALSE - assigned[q] <- assigned[q] + 1L - break - } - serverfree[q] || break - } - - } - ) - -} - -#' Saisei (Regenerate Token) -#' -#' [DEPRECATED] This is a legacy function used only with the legacy v1 -#' dispatcher, and will be removed in a future release. -#' -#' @inheritParams mirai -#' @param i integer index number URL to regenerate at dispatcher. -#' @param force [default FALSE] logical value whether to regenerate the URL even -#' when there is an existing active connection. -#' -#' @return The regenerated character URL upon success, or else NULL. -#' -#' @keywords internal -#' @export -#' -saisei <- function(i, force = FALSE, .compute = "default") { - - envir <- ..[[.compute]] - length(envir[["msgid"]]) && return() - i <- as.integer(i[1L]) - length(envir[["sockc"]]) && i > 0L && i <= envir[["n"]] && !startsWith(envir[["urls"]][i], "t") || return() - r <- query_dispatcher(envir[["sockc"]], if (force) -i else i, recv_mode = 9L) - is.character(r) && nzchar(r) || return() - envir[["urls"]][i] <- r - r - -} diff --git a/R/launchers.R b/R/launchers.R index 158ac5aa4..b811baba4 100644 --- a/R/launchers.R +++ b/R/launchers.R @@ -79,7 +79,7 @@ launch_local <- function(n = 1L, ..., tls = NULL, .compute = "default") { envir <- ..[[.compute]] is.null(envir) && stop(._[["daemons_unset"]]) url <- envir[["urls"]][1L] - write_args <- if (length(envir[["msgid"]])) wa32 else wa3 + write_args <- if (length(envir[["msgid"]])) wa3 else wa2 dots <- if (missing(..1)) envir[["dots"]] else parse_dots(...) output <- attr(dots, "output") if (is.null(tls)) tls <- envir[["tls"]] @@ -118,7 +118,7 @@ launch_remote <- function(n = 1L, remote = remote_config(), ..., tls = NULL, .co envir <- ..[[.compute]] is.null(envir) && stop(._[["daemons_unset"]]) url <- envir[["urls"]][1L] - write_args <- if (length(envir[["msgid"]])) wa32 else wa3 + write_args <- if (length(envir[["msgid"]])) wa3 else wa2 dots <- if (missing(..1)) envir[["dots"]] else parse_dots(...) if (is.null(tls)) tls <- envir[["tls"]] diff --git a/R/mirai.R b/R/mirai.R index 36bd26b4f..9a1993b4e 100644 --- a/R/mirai.R +++ b/R/mirai.R @@ -254,7 +254,7 @@ everywhere <- function(.expr, ..., .args = list(), .compute = "default") { as.expression(if (is.symbol(expr) && exists(as.character(expr), envir = parent.frame()) && is.language(.expr)) .expr else expr) ) - if (is.null(envir[["sockc"]])) { + if (is.null(envir[["msgid"]])) { vec <- vector(mode = "list", length = max(stat(envir[["sock"]], "pipes"), envir[["n"]])) for (i in seq_along(vec)) vec[[i]] <- mirai(.expr, ..., .args = .args, .compute = .compute) diff --git a/README.Rmd b/README.Rmd index e6a89d481..9f7841fad 100644 --- a/README.Rmd +++ b/README.Rmd @@ -17,7 +17,6 @@ knitr::opts_chunk$set( [![CRAN status](https://www.r-pkg.org/badges/version/mirai)](https://CRAN.R-project.org/package=mirai) -[![R-multiverse status](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fcommunity.r-multiverse.org%2Fapi%2Fpackages%2Fmirai&query=%24.Version&label=r-multiverse)](https://community.r-multiverse.org/mirai) [![R-universe status](https://shikokuchuo.r-universe.dev/badges/mirai)](https://shikokuchuo.r-universe.dev/mirai) [![R-CMD-check](https://github.com/shikokuchuo/mirai/workflows/R-CMD-check/badge.svg)](https://github.com/shikokuchuo/mirai/actions) [![codecov](https://codecov.io/gh/shikokuchuo/mirai/graph/badge.svg)](https://app.codecov.io/gh/shikokuchuo/mirai) @@ -105,7 +104,7 @@ All errors are returned as 'errorValues', facilitating recovery from partial fai ### Design Concepts -`mirai` is designed from the ground up to provide a production-grade experience. +mirai is designed from the ground up to provide a production-grade experience. - Fast + 1,000x more responsive compared to common alternatives [[1]](https://github.com/shikokuchuo/mirai/pull/142#issuecomment-2457589563) @@ -126,9 +125,9 @@ All errors are returned as 'errorValues', facilitating recovery from partial fai ### Integrations -The following core integrations are documented, with usage examples in the linked vignettes: +mirai features the following core integrations, with usage examples in the linked vignettes: -[R parallel](https://shikokuchuo.net/mirai/articles/parallel.html)   Provides an alternative communications backend for R, implementing a new parallel cluster type, a feature request by R-Core at R Project Sprint 2023. 'miraiCluster' may also be used with `foreach` via `doParallel`. +[R parallel](https://shikokuchuo.net/mirai/articles/parallel.html)   Provides an alternative communications backend for R, implementing a new parallel cluster type, a feature request by R-Core at R Project Sprint 2023. 'miraiCluster' may also be used with 'foreach' via 'doParallel'. [promises](https://shikokuchuo.net/mirai/articles/promises.html)   Implements the next generation of completely event-driven, non-polling promises. 'mirai' may be used interchageably with 'promises', including with the promise pipe `%...>%`. @@ -142,31 +141,31 @@ The following core integrations are documented, with usage examples in the linke ### Powering Crew and Targets High Performance Computing -[targets](https://docs.ropensci.org/targets/)   Targets, a Make-like pipeline tool for statistics and data science, has integrated and adopted `crew` as its default high-performance computing backend. +[targets](https://docs.ropensci.org/targets/)   Targets, a Make-like pipeline tool for statistics and data science, has integrated and adopted the crew package as its default high-performance computing backend. -[crew](https://wlandau.github.io/crew/)   Crew is a distributed worker-launcher extending `mirai` to different distributed computing platforms, from traditional clusters to cloud services. +[crew](https://wlandau.github.io/crew/)   Crew is a distributed worker-launcher extending mirai to different distributed computing platforms, from traditional clusters to cloud services. -[crew.cluster](https://wlandau.github.io/crew.cluster/)   `crew.cluster` enables mirai-based workflows on traditional high-performance computing clusters using LFS, PBS/TORQUE, SGE and Slurm. +[crew.cluster](https://wlandau.github.io/crew.cluster/)   'crew.cluster' enables mirai-based workflows on traditional high-performance computing clusters using LFS, PBS/TORQUE, SGE and Slurm. -[crew.aws.batch](https://wlandau.github.io/crew.aws.batch/)   `crew.aws.batch` extends `mirai` to cloud computing using AWS Batch. +[crew.aws.batch](https://wlandau.github.io/crew.aws.batch/)   'crew.aws.batch' extends mirai to cloud computing using AWS Batch. ### Thanks We would like to thank in particular: -[Will Landau](https://github.com/wlandau/) for being instrumental in shaping development of the package, from initiating the original request for persistent daemons, through to orchestrating robustness testing for the high performance computing requirements of `crew` and `targets`. +[Will Landau](https://github.com/wlandau/) for being instrumental in shaping development of the package, from initiating the original request for persistent daemons, through to orchestrating robustness testing for the high performance computing requirements of crew and targets. -[Joe Cheng](https://github.com/jcheng5/) for integrating the `promises` method to work seamlessly within Shiny, and prototyping event-driven promises. +[Joe Cheng](https://github.com/jcheng5/) for integrating the 'promises' method to work seamlessly within Shiny, and prototyping event-driven promises. -[Luke Tierney](https://github.com/ltierney/) of R Core, for discussion on L'Ecuyer-CMRG streams to ensure statistical independence in parallel processing, and making it possible for `mirai` to be the first 'alternative communications backend for R'. +[Luke Tierney](https://github.com/ltierney/) of R Core, for discussion on L'Ecuyer-CMRG streams to ensure statistical independence in parallel processing, and making it possible for mirai to be the first 'alternative communications backend for R'. [Henrik Bengtsson](https://github.com/HenrikBengtsson/) for valuable insights leading to the interface accepting broader usage patterns. -[Daniel Falbel](https://github.com/dfalbel/) for discussion around an efficient solution to serialization and transmission of `torch` tensors. +[Daniel Falbel](https://github.com/dfalbel/) for discussion around an efficient solution to serialization and transmission of torch tensors. -[Kirill Müller](https://github.com/krlmlr/) for discussion on using 'daemons' to host Arrow database connections. +[Kirill Müller](https://github.com/krlmlr/) for discussion on using parallel processes to host Arrow database connections. -[R Consortium](https://r-consortium.org/)  for funding work on the TLS implementation in `nanonext`, used to provide secure connections in `mirai`. +[R Consortium](https://r-consortium.org/)  for funding work on the TLS implementation in nanonext, used to provide secure connections in mirai. ### Installation diff --git a/README.md b/README.md index 2cf762738..c31cb27f5 100644 --- a/README.md +++ b/README.md @@ -7,8 +7,6 @@ [![CRAN status](https://www.r-pkg.org/badges/version/mirai)](https://CRAN.R-project.org/package=mirai) -[![R-multiverse -status](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fcommunity.r-multiverse.org%2Fapi%2Fpackages%2Fmirai&query=%24.Version&label=r-multiverse)](https://community.r-multiverse.org/mirai) [![R-universe status](https://shikokuchuo.r-universe.dev/badges/mirai)](https://shikokuchuo.r-universe.dev/mirai) [![R-CMD-check](https://github.com/shikokuchuo/mirai/workflows/R-CMD-check/badge.svg)](https://github.com/shikokuchuo/mirai/actions) @@ -78,7 +76,7 @@ To wait for and collect the return value, use the mirai’s `[]` method: ``` r m[] -#> [1] 7.048859 7.102751 7.849110 9.330095 7.536337 +#> [1] 6.318813 6.082929 7.058028 7.869515 7.154402 ``` As a mirai represents an async operation, it is never necessary to wait @@ -92,7 +90,7 @@ while (unresolved(m)) { m #> < mirai [$data] > m$data -#> [1] 7.048859 7.102751 7.849110 9.330095 7.536337 +#> [1] 6.318813 6.082929 7.058028 7.869515 7.154402 ``` #### Daemons @@ -151,7 +149,7 @@ over alternative map implementations. ### Design Concepts -`mirai` is designed from the ground up to provide a production-grade +mirai is designed from the ground up to provide a production-grade experience. - Fast @@ -177,14 +175,14 @@ experience. ### Integrations -The following core integrations are documented, with usage examples in +mirai features the following core integrations, with usage examples in the linked vignettes: [R parallel](https://shikokuchuo.net/mirai/articles/parallel.html)   Provides an alternative communications backend for R, implementing a new parallel cluster type, a feature request by R-Core at R Project -Sprint 2023. ‘miraiCluster’ may also be used with `foreach` via -`doParallel`. +Sprint 2023. ‘miraiCluster’ may also be used with ‘foreach’ via +‘doParallel’. [promises](https://shikokuchuo.net/mirai/articles/promises.html)   Implements the next generation of completely event-driven, non-polling @@ -212,21 +210,21 @@ to be used seamlessly across parallel processes. [targets](https://docs.ropensci.org/targets/)   Targets, a Make-like pipeline tool for statistics and data science, -has integrated and adopted `crew` as its default high-performance -computing backend. +has integrated and adopted the crew package as its default +high-performance computing backend. [crew](https://wlandau.github.io/crew/) -  Crew is a distributed worker-launcher extending `mirai` to different +  Crew is a distributed worker-launcher extending mirai to different distributed computing platforms, from traditional clusters to cloud services. [crew.cluster](https://wlandau.github.io/crew.cluster/) -  `crew.cluster` enables mirai-based workflows on traditional +  ‘crew.cluster’ enables mirai-based workflows on traditional high-performance computing clusters using LFS, PBS/TORQUE, SGE and Slurm. [crew.aws.batch](https://wlandau.github.io/crew.aws.batch/) -  `crew.aws.batch` extends `mirai` to cloud computing using AWS Batch. +  ‘crew.aws.batch’ extends mirai to cloud computing using AWS Batch. ### Thanks @@ -235,29 +233,29 @@ We would like to thank in particular: [Will Landau](https://github.com/wlandau/) for being instrumental in shaping development of the package, from initiating the original request for persistent daemons, through to orchestrating robustness testing for -the high performance computing requirements of `crew` and `targets`. +the high performance computing requirements of crew and targets. -[Joe Cheng](https://github.com/jcheng5/) for integrating the `promises` +[Joe Cheng](https://github.com/jcheng5/) for integrating the ‘promises’ method to work seamlessly within Shiny, and prototyping event-driven promises. [Luke Tierney](https://github.com/ltierney/) of R Core, for discussion on L’Ecuyer-CMRG streams to ensure statistical independence in parallel -processing, and making it possible for `mirai` to be the first +processing, and making it possible for mirai to be the first ‘alternative communications backend for R’. [Henrik Bengtsson](https://github.com/HenrikBengtsson/) for valuable insights leading to the interface accepting broader usage patterns. [Daniel Falbel](https://github.com/dfalbel/) for discussion around an -efficient solution to serialization and transmission of `torch` tensors. +efficient solution to serialization and transmission of torch tensors. [Kirill Müller](https://github.com/krlmlr/) for discussion on using -‘daemons’ to host Arrow database connections. +parallel processes to host Arrow database connections. [R Consortium](https://r-consortium.org/)  -for funding work on the TLS implementation in `nanonext`, used to -provide secure connections in `mirai`. +for funding work on the TLS implementation in nanonext, used to provide +secure connections in mirai. ### Installation diff --git a/man/dispatcher.Rd b/man/dispatcher.Rd index 204b21895..64917ada2 100644 --- a/man/dispatcher.Rd +++ b/man/dispatcher.Rd @@ -4,16 +4,7 @@ \alias{dispatcher} \title{Dispatcher} \usage{ -dispatcher( - host, - url = NULL, - n = NULL, - ..., - tls = NULL, - pass = NULL, - rs = NULL, - monitor = NULL -) +dispatcher(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL, rs = NULL) } \arguments{ \item{host}{the character host URL to dial (where tasks are sent from), @@ -46,8 +37,6 @@ through a function that returns this value, rather than directly.} \item{rs}{[default NULL] the initial value of .Random.seed. This is set automatically using L'Ecuyer-CMRG RNG streams generated by the host process and should not be independently supplied.} - -\item{monitor}{unused legacy parameter - do not specify this value.} } \value{ Invisible NULL. diff --git a/man/saisei.Rd b/man/saisei.Rd deleted file mode 100644 index 6348f8d47..000000000 --- a/man/saisei.Rd +++ /dev/null @@ -1,25 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/dispatcher.R -\name{saisei} -\alias{saisei} -\title{Saisei (Regenerate Token)} -\usage{ -saisei(i, force = FALSE, .compute = "default") -} -\arguments{ -\item{i}{integer index number URL to regenerate at dispatcher.} - -\item{force}{[default FALSE] logical value whether to regenerate the URL even -when there is an existing active connection.} - -\item{.compute}{[default 'default'] character value for the compute profile -to use (each compute profile has its own independent set of daemons).} -} -\value{ -The regenerated character URL upon success, or else NULL. -} -\description{ -[DEPRECATED] This is a legacy function used only with the legacy v1 -dispatcher, and will be removed in a future release. -} -\keyword{internal} diff --git a/tests/tests.R b/tests/tests.R index db389c1c5..1a454adf8 100644 --- a/tests/tests.R +++ b/tests/tests.R @@ -104,6 +104,7 @@ connection && { Sys.sleep(1L) test_type("integer", status(.compute = "new")[["connections"]]) test_error(mirai_map(1:2, "a function", .compute = "new"), "must be of type function, not character") + test_error(daemons(url = local_url(), .compute = "new"), "daemons already set") test_zero(daemons(0L, .compute = "new")) } # additional daemons tests @@ -123,7 +124,7 @@ connection && { # mirai_map tests connection && { Sys.sleep(1L) - m <- with(daemons(1, dispatcher = "none", .compute = "ml"), { + m <- with(daemons(1, dispatcher = FALSE, .compute = "ml"), { if (is.null(tryCatch(mirai_map(list(1, "a", 2), sum, .compute = "ml")[.stop], error = function(e) NULL))) mirai_map(1:3, rnorm, .args = list(mean = 20, 2), .compute = "ml")[] }) @@ -260,7 +261,6 @@ connection && Sys.getenv("NOT_CRAN") == "true" && { test_zero(collect_mirai(mm, ".flat")) m <- mirai(b, .timeout = 1000) if (!is_error_value(m[])) test_equal(m[], 2L) - test_null(saisei(1)) test_zero(daemons(0)) test_tls <- function(cert) { file <- tempfile() @@ -356,34 +356,5 @@ connection && Sys.getenv("NOT_CRAN") == "true" && { test_equal(daemons()[["mirai"]][["completed"]], 20000L) test_zero(daemons(0)) } -# legacy interface tests -connection && Sys.getenv("NOT_CRAN") == "true" && { - Sys.sleep(0.5) - option <- 15L - Sys.setenv(R_DEFAULT_PACKAGES = "stats,utils") - test_equal(1L, daemons(1, dispatcher = "process", maxtasks = 10L, timerstart = 1L, walltime = 500L, idletime = 500L, seed = 1546, cleanup = option, autoexit = tools::SIGCONT)) - Sys.unsetenv("R_DEFAULT_PACKAGES") - Sys.sleep(1L) - mq <- mirai(runif(1L), .timeout = 1000) - test_true(is.numeric(mq[])) - mq <- mirai(Sys.sleep(0.7), .timeout = 500) - test_class("matrix", status()[["daemons"]]) - test_null(saisei(i = 1L)) - Sys.sleep(1L) - test_zero(daemons(0)) - test_equal(daemons(url = "wss://127.0.0.1:0", dispatcher = "process", output = TRUE, token = TRUE, walltime = 500L, idletime = 505L), 1L) - test_equal(nextget("n"), 1L) - test_equal(length(nextget("urls")), 1L) - test_class("matrix", status()$daemons) - test_null(saisei(i = 0L)) - test_print(saisei(i = 1L)) - test_print(saisei(i = 1L, force = TRUE)) - Sys.sleep(0.1) - test_zero(daemons(0)) - test_equal(daemons(n = 2L, url = "tls+tcp://127.0.0.1:0", dispatcher = "thread", token = TRUE, idletime = Inf), 2L) - test_class("matrix", status()$daemons) - Sys.sleep(0.1) - test_zero(daemons(0)) -} test_zero(daemons(0)) Sys.sleep(1L)