Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 2.0.1.9006
Version: 2.0.1.9007
Description: Designed for simplicity, a 'mirai' evaluates an R expression
asynchronously in a parallel process, locally or distributed over the
network. The result is automatically available upon completion. Modern
Expand Down
1 change: 0 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ export(nextget)
export(nextstream)
export(register_cluster)
export(remote_config)
export(saisei)
export(serial_config)
export(ssh_config)
export(status)
Expand Down
8 changes: 6 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# mirai 2.0.1.9006 (development)
# mirai 2.0.1.9007 (development)

#### Behavioural Changes

* `daemons()` now requires an explicit reset before providing revised settings for a compute profile, and will error otherwise.
* `mirai_map()` now errors if daemons have not yet been set (rather than warn and launch one local daemon).
* Removal of mirai v1 compatibility features:
+ `saisei()` is now removed as no longer required.
+ `daemons()` dispatcher argument "thread" is removed.
+ `daemons()` dispatcher arguments "process" and "thread" are formally deprecated and will be removed in a future version.

#### Updates

Expand All @@ -13,7 +17,7 @@
+ Fixes language objects being evaluated before the map function is applied (#194).
+ Fixes classes of objects in a dataframe being dropped during a multiple map (#196).
+ Better `cli` errors when collecting a 'mirai_map'.
* `status()` call failures when using dispatcher now return the appropriate 'errorValue'.
* Fixes `daemons(NULL)` not causing all daemons started with `autoexit = FALSE` to quit, regression introduced in mirai v2.0.0.
* Requires nanonext >= 1.5.0.

# mirai 2.0.1
Expand Down
81 changes: 0 additions & 81 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@ daemon <- function(url, dispatcher = FALSE, ..., asyncdial = FALSE, autoexit = T
cleanup = TRUE, output = FALSE, idletime = Inf, walltime = Inf,
maxtasks = Inf, id = NULL, tls = NULL, rs = NULL) {

missing(dispatcher) && return(
v1_daemon(url = url, asyncdial = asyncdial, autoexit = autoexit,
cleanup = cleanup, output = output, idletime = idletime,
walltime = walltime, maxtasks = maxtasks, ..., tls = tls, rs = rs)
)

cv <- cv()
sock <- socket(if (dispatcher) "poly" else "rep")
on.exit(reap(sock))
Expand Down Expand Up @@ -243,85 +237,10 @@ dial_and_sync_socket <- function(sock, url, asyncdial = FALSE, tls = NULL) {
pipe_notify(sock, cv = NULL, add = TRUE)
}

parse_cleanup <- function(cleanup)
if (is.logical(cleanup))
c(cleanup, cleanup, cleanup, FALSE) else
c(as.integer(cleanup) %% 2L, (clr <- as.raw(cleanup)) & as.raw(2L), clr & as.raw(4L), clr & as.raw(8L))

perform_cleanup <- function(cleanup) {
if (cleanup[1L]) rm(list = (vars <- names(.GlobalEnv))[!vars %in% .[["vars"]]], envir = .GlobalEnv)
if (cleanup[2L]) lapply((new <- search())[!new %in% .[["se"]]], detach, character.only = TRUE)
if (cleanup[3L]) options(.[["op"]])
if (cleanup[4L]) gc(verbose = FALSE)
}

do_cleanup <- function() {
rm(list = (vars <- names(.GlobalEnv))[!vars %in% .[["vars"]]], envir = .GlobalEnv)
lapply((new <- search())[!new %in% .[["se"]]], detach, character.only = TRUE)
options(.[["op"]])
}

snapshot <- function() `[[<-`(`[[<-`(`[[<-`(., "op", .Options), "se", search()), "vars", names(.GlobalEnv))

# Legacy compatibility functions ----------------------------------------------

v1_daemon <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE,
output = FALSE, maxtasks = Inf, idletime = Inf, walltime = Inf,
timerstart = 0L, ..., tls = NULL, rs = NULL) {

cv <- cv()
sock <- socket("rep")
on.exit(reap(sock))
`[[<-`(., "sock", sock)
autoexit && pipe_notify(sock, cv = cv, remove = TRUE, flag = autoexit)
if (length(tls)) tls <- tls_config(client = tls)
dial_and_sync_socket(sock, url, asyncdial = asyncdial, tls = tls)

if (is.numeric(rs)) `[[<-`(.GlobalEnv, ".Random.seed", as.integer(rs))
idletime <- if (idletime > walltime) walltime else if (is.finite(idletime)) idletime
cleanup <- parse_cleanup(cleanup)
if (!output) {
devnull <- file(nullfile(), open = "w", blocking = FALSE)
sink(file = devnull)
sink(file = devnull, type = "message")
on.exit({
sink(type = "message")
sink()
close(devnull)
}, add = TRUE)
}
snapshot()
count <- 0L
start <- mclock()

repeat {

ctx <- .context(sock)
aio <- recv_aio(ctx, mode = 1L, timeout = idletime, cv = cv)
wait(cv) || break
m <- collect_aio(aio)
is.object(m) && {
count < timerstart && {
start <- mclock()
next
}
break
}
data <- eval_mirai(m)
count <- count + 1L

(count >= maxtasks || count > timerstart && mclock() - start >= walltime) && {
.mark()
send(ctx, data, mode = 1L, block = TRUE)
aio <- recv_aio(ctx, mode = 8L, cv = cv)
wait(cv)
break
}

send(ctx, data, mode = 1L, block = TRUE)
perform_cleanup(cleanup)
if (count <= timerstart) start <- mclock()

}

}
82 changes: 21 additions & 61 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -260,24 +260,11 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,
cv <- cv()
urld <- local_url()
sock <- req_socket(urld)
res <- launch_sync_dispatcher(sock, sock, wa52(urld, dots, url), output, tls, pass, serial)
res <- launch_sync_dispatcher(sock, wa5(urld, dots, url), output, tls, pass, serial)
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sock, res, cv, envir)
`[[<-`(envir, "msgid", 0L)
},
{
n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(._[["n_one"]])
tls <- configure_tls(url, tls, pass, envir, returnconfig = FALSE)
cv <- cv()
urld <- local_url()
urlc <- sprintf("%s%s", urld, "c")
sock <- req_socket(urld)
sockc <- req_socket(urlc)
res <- launch_sync_dispatcher(sock, sockc, wa5(urld, dots, n, urlc, url), output, tls, pass)
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sockc, res, cv, envir)
launches <- n
},
stop(._[["dispatcher_args"]])
)
`[[<-`(.., .compute, `[[<-`(`[[<-`(`[[<-`(envir, "sock", sock), "n", launches), "dots", dots))
Expand All @@ -299,7 +286,6 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,

if (signal) send_signal(envir)
reap(envir[["sock"]])
is.null(envir[["sockc"]]) || reap(envir[["sockc"]])
..[[.compute]] <- NULL -> envir

} else if (is.null(envir)) {
Expand All @@ -319,22 +305,12 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,
{
cv <- cv()
sock <- req_socket(urld)
res <- launch_sync_dispatcher(sock, sock, wa42(urld, dots, envir[["stream"]], n), output, serial = serial)
res <- launch_sync_dispatcher(sock, wa4(urld, dots, envir[["stream"]], n), output, serial = serial)
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sock, res, cv, envir)
for (i in seq_len(n)) next_stream(envir)
`[[<-`(envir, "msgid", 0L)
},
{
cv <- cv()
sock <- req_socket(urld)
urlc <- sprintf("%s%s", urld, "c")
sockc <- req_socket(urlc)
res <- launch_sync_dispatcher(sock, sockc, wa4(urld, dots, envir[["stream"]], n, urlc), output)
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sockc, res, cv, envir)
for (i in seq_len(n)) next_stream(envir)
},
stop(._[["dispatcher_args"]])
)
`[[<-`(.., .compute, `[[<-`(`[[<-`(`[[<-`(envir, "sock", sock), "n", n), "dots", dots))
Expand Down Expand Up @@ -443,8 +419,7 @@ status <- function(.compute = "default") {
envir <- ..[[.compute]]
is.null(envir) && return(list(connections = 0L, daemons = 0L))
length(envir[["msgid"]]) && return(dispatcher_status(envir))
list(connections = as.integer(stat(envir[["sock"]], "pipes")),
daemons = if (is.null(envir[["sockc"]])) envir[["urls"]] else query_status(envir))
list(connections = as.integer(stat(envir[["sock"]], "pipes")), daemons = envir[["urls"]])

}

Expand Down Expand Up @@ -507,13 +482,11 @@ init_envir_stream <- function(seed) {
envir
}

tokenized_url <- function(url) sprintf("%s/%s", url, random(12L))

req_socket <- function(url, tls = NULL, resend = 0L)
`opt<-`(socket("req", listen = url, tls = tls), "req:resend-time", resend)

parse_dispatcher <- function(x)
if (is.logical(x)) 1L + (!is.na(x) && x) else if (x == "process" || x == "thread") 3L else if (x == "none") 1L else 4L
if (is.logical(x)) 1L + (!is.na(x) && x) else if (x == "process" || x == "thread") 2L else if (x == "none") 1L else 3L

parse_dots <- function(...) {
...length() || return("")
Expand All @@ -531,50 +504,47 @@ parse_tls <- function(tls)

libp <- function(lp = .libPaths()) lp[file.exists(file.path(lp, "mirai"))][1L]

wa31 <- function(url, dots, rs, tls = NULL)
shQuote(sprintf("mirai::daemon(\"%s\"%s%s,rs=c(%s))", url, dots, parse_tls(tls), paste0(rs, collapse = ",")))

wa3 <- function(url, dots, rs, tls = NULL)
wa2 <- function(url, dots, rs, tls = NULL)
shQuote(sprintf("mirai::daemon(\"%s\"%s%s,rs=c(%s),dispatcher=FALSE)", url, dots, parse_tls(tls), paste0(rs, collapse = ",")))

wa32 <- function(url, dots, rs, tls = NULL)
wa3 <- function(url, dots, rs, tls = NULL)
shQuote(sprintf("mirai::daemon(\"%s\"%s%s,rs=c(%s),dispatcher=TRUE)", url, dots, parse_tls(tls), paste0(rs, collapse = ",")))

wa4 <- function(urld, dots, rs, n, urlc)
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",n=%d,rs=c(%s),monitor=\"%s\"%s)", libp(), urld, n, paste0(rs, collapse= ","), urlc, dots))

wa42 <- function(urld, dots, rs, n)
wa4 <- function(urld, dots, rs, n)
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",n=%d,rs=c(%s)%s)", libp(), urld, n, paste0(rs, collapse= ","), dots))

wa5 <- function(urld, dots, n, urlc, url)
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",c(\"%s\"),n=%d,monitor=\"%s\"%s)", libp(), urld, paste0(url, collapse = "\",\""), n, urlc, dots))

wa52 <- function(urld, dots, url)
wa5 <- function(urld, dots, url)
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",url=\"%s\"%s)", libp(), urld, url, dots))

launch_daemon <- function(args, output)
system2(.command, args = c("-e", args), stdout = output, stderr = output, wait = FALSE)

launch_sync_dispatcher <- function(sock, sockc, args, output, tls = NULL, pass = NULL, serial = NULL) {
query_dispatcher <- function(sock, command, send_mode = 2L, recv_mode = 5L, block = .limit_short)
if (r <- send(sock, command, mode = send_mode, block = block)) r else
recv(sock, mode = recv_mode, block = block)

launch_sync_dispatcher <- function(sock, args, output, tls = NULL, pass = NULL, serial = NULL) {
pkgs <- Sys.getenv("R_DEFAULT_PACKAGES")
system2(.command, args = c("--default-packages=NULL", "--vanilla", "-e", args), stdout = output, stderr = output, wait = FALSE)
if (is.list(serial))
`opt<-`(sock, "serial", serial)
query_dispatcher(sockc, list(pkgs, tls, pass, serial), send_mode = 1L, recv_mode = 2L, block = .limit_long)
query_dispatcher(sock, list(pkgs, tls, pass, serial), send_mode = 1L, recv_mode = 2L, block = .limit_long)
}

launch_sync_daemons <- function(seq, sock, urld, dots, envir, output) {
cv <- cv()
pipe_notify(sock, cv = cv, add = TRUE)
for (i in seq)
launch_daemon(wa3(urld, dots, next_stream(envir)), output)
launch_daemon(wa2(urld, dots, next_stream(envir)), output)
for (i in seq)
until(cv, .limit_long) || return(pipe_notify(sock, cv = NULL, add = TRUE))
!pipe_notify(sock, cv = NULL, add = TRUE)
}

store_dispatcher <- function(sockc, res, cv, envir)
`[[<-`(`[[<-`(`[[<-`(`[[<-`(envir, "sockc", sockc), "urls", res[-1L]), "pid", as.integer(res[1L])), "cv", cv)
store_dispatcher <- function(sock, res, cv, envir)
`[[<-`(`[[<-`(`[[<-`(`[[<-`(envir, "sock", sock), "urls", res[-1L]), "pid", as.integer(res[1L])), "cv", cv)

sub_real_port <- function(port, url) sub("(?<=:)0(?![^/])", port, url, perl = TRUE)

check_store_url <- function(sock, envir) {
listener <- attr(sock, "listener")[[1L]]
Expand All @@ -585,24 +555,14 @@ check_store_url <- function(sock, envir) {
}

send_signal <- function(envir) {
signals <- max(length(envir[["urls"]]), stat(envir[["sock"]], "pipes"))
signals <- if (is.null(envir[["msgid"]])) stat(envir[["sock"]], "pipes") else
query_dispatcher(envir[["sock"]], c(0L, 0L))[1L]
for (i in seq_len(signals)) {
send(envir[["sock"]], ._scm_., mode = 2L)
msleep(10L)
}
}

query_status <- function(envir) {
res <- query_dispatcher(envir[["sockc"]], 0L)
`attributes<-`(
res,
list(
dim = c(envir[["n"]], 5L),
dimnames = list(envir[["urls"]], c("i", "online", "instance", "assigned", "complete"))
)
)
}

dispatcher_status <- function(envir) {
status <- query_dispatcher(envir[["sock"]], c(0L, 0L))
is.object(status) && return(status)
Expand Down
Loading
Loading