/
configure.R
97 lines (91 loc) · 3.79 KB
/
configure.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
##' Configure rrq options. This function must be called before either
##' a controller or worker connects to a queue, as the options will
##' apply to both. The function may only be called once on a given
##' queue as there is no facility (yet) to update options. Currently
##' the options concern only storage, and specifically how larger
##' objects will be saved (using [`rrq::object_store`].
##'
##' @section Storage:
##'
##' Every time that a task is saved, or a task is completed, results
##' are saved into the Redis database. Because Redis is an in-memory
##' database, it's not a great idea to save very large objects into
##' it (if you ran 100 jobs in parallel and each saved a 2GB object
##' you'd likely take down your redis server). In addition, `redux`
##' does not support directly saving objects larger than `2^31 - 1`
##' bytes into Redis. So, for some use cases we need to consider
##' where to store larger objects.
##'
##' The strategy here is to "offload" the larger objects - bigger than
##' some user-given size - onto some other storage system. Currently
##' the only alternative supported is a disk store
##' ([`rrq::object_store_offload_disk`]) but we hope to expand this
##' later. So if your task returns a 3GB object then we will spill
##' that to disk rather than failing to save that into
##' Redis.
##'
##' How big is an object? We serialise the object
##' (`redux::object_to_bin` just wraps [`serialize`]) which creates
##' a vector of bytes and that is saved into the database. To get an
##' idea of how large things are you can do:
##' `length(redux::object_to_bin(your_object))`. At the time this
##' documentation was written, `mtcars` was `3807` bytes, and a
##' million random numbers was `8,000,031` bytes. It's unlikely that
##' a `store_max_size` of less than 1MB will be sensible.
##'
##' @title Configure rrq
##'
##' @param queue_id The queue id; the same as you would pass to
##' [rrq::rrq_controller]
##'
##' @param con A redis connection
##'
##' @param ... Additional arguments - this must be empty. This
##' argument exists so that all additional arguments must be passed
##' by name.
##'
##' @param store_max_size The maximum object size, in bytes, before
##' being moved to the offload store. If given, then larger data
##' will be saved in `offload_path` (using
##' [`rrq::object_store_offload_disk`])
##'
##' @param offload_path The path to create an offload store at (passed
##' to [`rrq::object_store_offload_disk`]). The directory will be
##' created if it does not exist. If not given (or `NULL`) but
##' `store_max_size` is finite, then trying to save large objects
##' will throw an error.
##'
##' @return Invisibly, a list with processed configuration information
##' @export
rrq_configure <- function(queue_id, con = redux::hiredis(), ...,
store_max_size = Inf, offload_path = NULL) {
if (length(list(...)) > 0) {
## Could use rlang::check_dot_used here, but that errors at exit,
## which is slightly different.
cli::cli_abort("Unconsumed dot arguments")
}
keys <- rrq_keys(queue_id)
assert_scalar_numeric(store_max_size)
if (!is.null(offload_path)) {
assert_scalar_character(offload_path)
}
config <- list(store_max_size = store_max_size,
offload_path = offload_path)
prev <- con$GET(keys$configuration)
if (is.null(prev)) {
con$SET(keys$configuration, object_to_bin(config))
} else if (!identical(unserialize(prev), config)) {
cli::cli_abort(
"Can't set configuration for queue '{queue_id}' as it already exists")
}
invisible(config)
}
rrq_configure_read <- function(con, keys) {
config <- con$GET(keys$configuration)
if (is.null(config)) {
config <- rrq_configure(keys$queue_id, con)
} else {
config <- bin_to_object(config)
}
config
}