/
verb-compute.R
138 lines (121 loc) · 4.05 KB
/
verb-compute.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#' Compute results of a query
#'
#' These are methods for the dplyr generics [collapse()], [compute()],
#' and [collect()]. `collapse()` creates a subquery, `compute()` stores
#' the results in a remote table, and `collect()` executes the query and
#' downloads the data into R.
#'
#' @export
#' @param x A lazy data frame backed by a database query.
#' @importFrom dplyr collapse
#' @examples
#' library(dplyr, warn.conflicts = FALSE)
#'
#' db <- memdb_frame(a = c(3, 4, 1, 2), b = c(5, 1, 2, NA))
#' db %>% filter(a <= 2) %>% collect()
collapse.tbl_sql <- function(x, ...) {
sql <- db_sql_render(x$src$con, x)
tbl_src_dbi(x$src, sql, colnames(x)) %>%
group_by(!!! syms(op_grps(x))) %>%
arrange.tbl_lazy(!!!op_sort(x))
}
# compute -----------------------------------------------------------------
#' @rdname collapse.tbl_sql
#' @param name Table name in remote database.
#' @param temporary Should the table be temporary (`TRUE`, the default) or
#' persistent (`FALSE`)?
#' @inheritParams copy_to.src_sql
#' @inheritParams collect.tbl_sql
#' @export
#' @importFrom dplyr compute
compute.tbl_sql <- function(x,
name = NULL,
temporary = TRUE,
unique_indexes = list(),
indexes = list(),
analyze = TRUE,
...,
cte = FALSE) {
check_bool(temporary)
if (is.null(name)) {
if (!temporary) {
lifecycle::deprecate_warn(
"2.3.3",
what = "compute(name = 'must be provided when `temporary = FALSE`')"
)
}
name <- unique_table_name()
}
name <- as_table_path(name, x$src$con)
vars <- op_vars(x)
compute_check_indexes(x, indexes)
compute_check_indexes(x, unique_indexes)
x_aliased <- select(x, !!! syms(vars)) # avoids problems with SQLite quoting (#1754)
sql <- db_sql_render(x$src$con, x_aliased$lazy_query, cte = cte)
name <- db_compute(x$src$con, name, sql,
temporary = temporary,
unique_indexes = unique_indexes,
indexes = indexes,
analyze = analyze,
...
)
tbl_src_dbi(x$src, name, colnames(x)) %>%
group_by(!!!syms(op_grps(x))) %>%
window_order(!!!op_sort(x))
}
compute_check_indexes <- function(x,
indexes,
...,
arg = caller_arg(indexes),
error_call = caller_env()) {
if (is.null(indexes)) {
return()
}
check_dots_empty()
force(arg)
if (!is.list(indexes) && !is.character(indexes)) {
stop_input_type(
indexes,
c("a character vector", "a list of characters"),
arg = arg,
call = error_call
)
}
if (is.list(indexes)) {
indexes <- unique(unlist(indexes))
}
x_nms <- colnames(x)
missing <- setdiff(indexes, x_nms)
if (!is_empty(missing)) {
message <- c(
"All columns specified through {.arg {arg}} must exist in {.arg x}.",
i = "The following columns are missing from {.arg {arg}}: {.field {missing}}."
)
cli_abort(message, call = error_call)
}
}
# collect -----------------------------------------------------------------
#' @rdname collapse.tbl_sql
#' @param n Number of rows to fetch. Defaults to `Inf`, meaning all rows.
#' @param warn_incomplete Warn if `n` is less than the number of result rows?
#' @param cte `r lifecycle::badge("experimental")`
#' Use common table expressions in the generated SQL?
#' @importFrom dplyr collect
#' @export
collect.tbl_sql <- function(x, ..., n = Inf, warn_incomplete = TRUE, cte = FALSE) {
if (identical(n, Inf)) {
n <- -1
} else {
# Gives the query planner information that it might be able to take
# advantage of
x <- head(x, n)
}
sql <- db_sql_render(x$src$con, x, cte = cte)
withCallingHandlers(
out <- db_collect(x$src$con, sql, n = n, warn_incomplete = warn_incomplete, ...),
error = function(cnd) {
cli_abort("Failed to collect lazy table.", parent = cnd)
}
)
dplyr::grouped_df(out, intersect(op_grps(x), names(out)))
}