/
task.R
134 lines (122 loc) · 3.8 KB
/
task.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
TASK_PENDING <- "PENDING"
TASK_RUNNING <- "RUNNING"
TASK_COMPLETE <- "COMPLETE"
TASK_ERROR <- "ERROR"
TASK_ORPHAN <- "ORPHAN"
TASK_REDIRECT <- "REDIRECT"
TASK_MISSING <- "MISSING"
TASK_CANCELLED <- "CANCELLED"
##' Save and reload tasks. Tasks consist of an expression bound to a
##' \code{context}.
##'
##' @title Save and reload tasks
##'
##' @param expr An expression to save
##'
##' @param context A \code{context} object
##'
##' @param envir Passed through to \code{context_save} when locating
##' local variables.
##'
##' @param depends_on Optional vector of task ids that this task
##' depends on
##'
##' @return An identifier that can be used to retrieve or run the task
##' later. This is simply a short string.
##'
##' @export
##' @rdname task
task_save <- function(expr, context, envir = parent.frame(), depends_on = NULL) {
## NOTE: until updated, this requires a single expression now. I'll
## get support for a series of related tasks in in another update, I
## think. With more than one task we can either do 3 mset updates,
## n updates or or one massive one.
assert_is(context, "context")
assert_is(expr, "call")
verify_dependencies_exist(depends_on, context)
db <- context_db_get(context)
dat <- prepare_expression(expr, envir, db)
dat$id <- ids::random_id()
dat$context_id <- context$id
class(dat) <- "task"
db$mset(dat$id,
list(dat, TASK_PENDING, context$id, Sys.time()),
c("tasks", "task_status", "task_context", "task_time_sub"))
if (!is.null(depends_on)) {
db$set(dat$id, depends_on, "task_deps")
}
dat$id
}
verify_dependencies_exist <- function(depends_on, context) {
if (!is.null(depends_on)) {
dependencies_exist <- task_exists(depends_on, context)
if (!all(dependencies_exist)) {
missing <- unique(depends_on[!dependencies_exist])
error_msg <- ngettext(
length(missing),
"Failed to save as dependency %s does not exist.",
"Failed to save as dependencies %s do not exist.")
stop(sprintf(error_msg, paste0(missing, collapse = ", ")))
}
}
}
##' Delete a task, including its results.
##' @title Delete a task
##' @inheritParams task_status
##' @inheritParams task_log
##' @export
##' @return \code{TRUE} if a task was actually deleted.
task_delete <- function(ids, root) {
root <- context_root_get(root)
db <- root$db
n <- length(ids)
ns <- c("tasks", "task_status", "task_results")
if (n == 1L) {
res <- any(db$del(ids, ns))
} else {
m <- length(ns)
res <- db$del(rep(ids, each = m), rep(ns, n))
res <- apply(matrix(res, m, n), 2, any)
}
## TODO: delete the log if it is present (for this reason, this
## function takes 'root' not 'db')
invisible(res)
}
task_context <- function(ids, db) {
db <- context_db_get(db)
vcapply(db$mget(ids, "task_context", missing = NA_character_), identity)
}
##' @export
print.task <- function(x, ...) {
print_ad_hoc(x)
}
task_read <- function(id, db) {
db <- context_db_get(db)
ret <- db$get(id, "tasks")
ret$db <- db
ret
}
##' Run a task in a separate process, using [callr::r]. Unlike
##' [context::task_run] this does not return the value, and is called
##' for the side effect of writing to the context.
##'
##' @title Run a task in separate process
##'
##' @inheritParams context_read
##'
##' @param task_id A task identifier
##'
##' @param path_log Path to log file
##'
##' @export
task_run_external <- function(root, identifier, task_id, path_log) {
dir.create(dirname(path_log), FALSE, TRUE)
args <- list(root, identifier, task_id)
callr::r(task_run_external_helper, args, package = "context",
stderr = path_log, stdout = path_log)
}
task_run_external_helper <- function(root, identifier, task_id) {
ctx <- context::context_load(context::context_read(identifier, root))
context::task_run(task_id, ctx)
invisible()
}