/
heartbeat_impl.R
256 lines (228 loc) · 7.85 KB
/
heartbeat_impl.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
##' Create a heartbeat instance. This can be used by running
##' `obj$start()` which will reset the TTL (Time To Live) on `key` every
##' `period` seconds (don't set this too high). If the R process
##' dies, then the key will expire after `3 * period` seconds (or
##' set `expire`) and another application can tell that this R
##' instance has died.
##'
##' @title Create a heartbeat instance
##'
##' @export
##' @examples
##'
##' if (redux::redis_available()) {
##' rand_str <- function() {
##' paste(sample(letters, 20, TRUE), collapse = "")
##' }
##' key <- sprintf("rrq:heartbeat:%s", rand_str())
##' h <- rrq::rrq_heartbeat$new(key, 1, expire = 2)
##' con <- redux::hiredis()
##'
##' # The heartbeat key exists
##' con$EXISTS(key)
##'
##' # And has an expiry of less than 2000ms
##' con$PTTL(key)
##'
##' # We can manually stop the heartbeat, and 2s later the key will
##' # stop existing
##' h$stop()
##'
##' Sys.sleep(2)
##' con$EXISTS(key) # 0
##'
##' # This is required to close any processes opened by this
##' # example, normally you would not need this.
##' processx:::supervisor_kill()
##' }
##' @importFrom R6 R6Class
##' @rdname heartbeat
rrq_heartbeat <- R6::R6Class(
"rrq_heartbeat",
cloneable = FALSE,
public = list(
##' @description Create a heartbeat object
##'
##' @param key Key to use. Once the heartbeat starts it will
##' create this key and set it to expire after `expiry` seconds.
##'
##' @param period Timeout period (in seconds)
##'
##' @param expire Key expiry time (in seconds)
##'
##' @param value Value to store in the key. By default it stores the
##' expiry time, so the time since last heartbeat can be computed.
##' This will be converted to character with `as.character`
##' before saving into Redis
##'
##' @param config Configuration parameters passed through to
##' `redux::redis_config`. Provide as either a named list or a
##' `redis_config` object. This allows host, port, password,
##' db, etc all to be set.
##'
##' @param start Should the heartbeat be started immediately?
##'
##' @param timeout Time, in seconds, to wait for the heartbeat to
##' appear. It should generally appear very quickly (within a
##' second unless your connection is very slow) so this can be
##' generally left alone.
initialize = function(key, period, expire = 3 * period,
value = expire, config = NULL,
start = TRUE, timeout = 10) {
assert_scalar_character(key)
assert_scalar(value) # will be converted to character
assert_scalar_positive_integer(expire)
assert_scalar_positive_integer(period)
assert_scalar_logical(start)
assert_valid_timeout(timeout)
if (expire <= period) {
cli::cli_abort("'expire' must be longer than 'period'")
}
private$config <- redux::redis_config(config = config)
private$key <- key
private$key_kill <- heartbeat_key_kill(key)
private$value <- as.character(value)
private$period <- as.integer(period)
private$expire <- as.integer(expire)
private$timeout <- timeout
if (start) {
self$start()
}
},
##' @description Report if heartbeat process is running. This will be
##' `TRUE` if the process has been started and has not stopped.
is_running = function() {
if (is.null(private$process)) {
FALSE
} else {
private$process$is_alive()
}
},
##' @description Start the heartbeat process. An error will be thrown
##' if it is already running.
start = function() {
if (self$is_running()) {
cli::cli_abort("Already running on key '{private$key}'")
}
private$process <- heartbeat_process(
private$config, private$key, private$value,
private$period, private$expire)
con <- redux::hiredis(private$config)
wait_timeout("Did not start in time", private$timeout, function() {
if (!private$process$is_alive()) {
## Will cause an error
private$process$get_result()
}
con$EXISTS(private$key) == 0
})
invisible(self)
},
##' @description Stop the heartbeat process
##' @param wait Logical, indicating if we should wait until the
##' heartbeat process terminates (should take only a fraction of a
##' second)
stop = function(wait = TRUE) {
assert_scalar_logical(wait)
if (!self$is_running()) {
cli::cli_abort("Heartbeat not running on key '{private$key}'")
}
con <- redux::hiredis(private$config)
con$RPUSH(private$key_kill, 0)
process <- private$process
private$process <- NULL
if (wait) {
wait_timeout("Did not stop in time", private$timeout,
function() process$is_alive())
}
invisible(self)
},
##' @description Format method, used by R6 to nicely print the object
##' @param ... Additional arguments, currently ignored
format = function(...) {
c("<heartbeat>\n",
sprintf(" - running: %s", tolower(self$is_running())),
sprintf(" - key: %s", private$key),
sprintf(" - period: %d", private$period),
sprintf(" - expire: %d", private$expire),
sprintf(" - redis:\n%s",
paste0(" ", capture.output(print(private$config))[-1],
collapse = "\n")))
}
),
private = list(
config = NULL,
process = NULL,
key = NULL,
key_kill = NULL,
period = NULL,
expire = NULL,
timeout = NULL,
value = NULL
))
##' Send a kill signal (typically `SIGTERM`) to terminate a process
##' that is running a heartbeat. This is used by
##' [`rrq::rrq_controller`] in order to tear down workers, even if
##' they are processing a task. When a heartbeat process is created,
##' in its main loop it will listen for requests to kill via this
##' function and will forward them to the worker. This is primarily
##' useful where workers are on a different physical machine to the
##' controller where [tools::pskill()] cannot be used.
##'
##' @title Kill a process running a heartbeat
##'
##' @param key The heartbeat key
##'
##' @param signal A signal to send (typically `tools::SIGTERM` for a
##' "polite" shutdown)
##'
##' @param con A hiredis object
##'
##' @export
##' @examples
##' if (redux::redis_available()) {
##' rand_str <- function() {
##' paste(sample(letters, 20, TRUE), collapse = "")
##' }
##' # Suppose we have a process that exposes a heartbeat running on
##' # this key:
##' key <- sprintf("rrq:heartbeat:%s", rand_str())
##'
##' # We can send it a SIGTERM signal over redis using:
##' con <- redux::hiredis()
##' rrq::rrq_heartbeat_kill(con, key, tools::SIGTERM)
##' }
rrq_heartbeat_kill <- function(con, key, signal = tools::SIGTERM) {
assert_scalar_character(key)
con$RPUSH(heartbeat_key_kill(key), signal)
invisible()
}
heartbeat_key_kill <- function(key) {
paste0(key, ":kill")
}
heartbeat_process <- function(config, key, value, period, expire) {
args <- list(config = config, key = key, value = value,
period = period, expire = expire, parent = Sys.getpid())
callr::r_bg(function(...) heartbeat_thread(...),
args = args, package = TRUE, supervise = TRUE)
}
heartbeat_thread <- function(config, key, value, period, expire, parent) {
con <- redux::hiredis(config)
con$SET(key, value)
on.exit(con$DEL(key))
key_kill <- heartbeat_key_kill(key)
con$DEL(key_kill)
repeat {
con$EXPIRE(key, expire)
ans <- con$BLPOP(key_kill, period)
if (!is.null(ans)) {
value <- ans[[2L]]
if (value %in% c(tools::SIGKILL, tools::SIGTERM)) {
con$DEL(c(key, key_kill))
tools::pskill(parent, value)
} else if (value == tools::SIGINT) {
tools::pskill(parent, value)
}
break
}
}
}