forked from r-dbi/RPostgres
-
Notifications
You must be signed in to change notification settings - Fork 0
/
async-queries.Rmd
154 lines (128 loc) · 4.88 KB
/
async-queries.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
---
title: "Using Asynchronous Queries in RPostgres"
author: "Zeb Burke-Conte"
output: rmarkdown::html_vignette
vignette: >
%\VignetteIndexEntry{Using Asynchronous Queries in RPostgres}
%\VignetteEngine{knitr::rmarkdown}
\usepackage[utf8]{inputenc}
---
```{r, echo = FALSE, output = FALSE}
library(DBI)
knitr::opts_chunk$set(
error = (Sys.getenv("IN_PKGDOWN") != "true"),
collapse = TRUE,
comment = "#>",
eval = RPostgres::postgresHasDefault()
)
con <- NULL
rp <- NULL
rs <- NULL
```
Imagine that you want to run database queries without your R process idling to wait for them.
For example, you might be running a Shiny app that needs to handle multiple users at a time.
If the main work of your app is being done in a database, requests will start to queue up
behind each other, even if R isn't doing anything, because it is waiting for `dbSendQuery`
to return.
In this situation, what you want is the ability to send a query without waiting
for the result, then check if it's done and use the result at a later time. This
is exactly what RPostgres::dbSendQueryAsync does.
## Send a query\
```{r}
con <- dbConnect(RPostgres::Postgres())
dbExecute(con, "DROP TABLE IF EXISTS mtcars;")
dbWriteTable(con, "mtcars", mtcars)
rs <- RPostgres::dbSendQueryAsync(con, "SELECT * FROM mtcars")
dbFetch(rs)
dbClearResult(rs)
```
`dbSendQueryAsync` has exactly the same arguments and return type as `dbSendQuery`.
If you follow it directly with `dbFetch`, it behaves the same too, since `dbFetch` will
block until the query's result has completed.
However, the difference is that you can run R code in between `dbSendQueryAsync` and
`dbFetch`, and it executes *while the query is being executed by the database*.
```{r}
rs <- RPostgres::dbSendQueryAsync(con, "SELECT avg(a.disp), avg(d.disp)
FROM mtcars as a
JOIN mtcars as b ON 1 = 1
JOIN mtcars as c ON 1 = 1
JOIN mtcars as d ON 1 = 1
WHERE d.cyl = a.cyl")
a <- 0
for (i in 1:10000) {
if (i %% 3 == 0) {
a <- a + 1
} else {
a <- round(a * 1.1)
}
}
print(a)
dbFetch(rs)
dbClearResult(rs)
```
## Check the status
Of course, if you don't want your query sending to block, you probably don't want to
block waiting for the database at `dbFetch` either. That's why `RPostgres::dbHasExecutedQuery`
exists. (This is not to be confused with `dbHasCompleted`, a DBI method that returns
whether you have consumed all the rows of a result set.)
If `dbHasExecutedQuery` returns true, you are guaranteed to be able to call `dbFetch`
without blocking, because the database has already sent back the result of your
query. If `dbHasExecutedQuery` returns false, the query is still processing and
`dbFetch` will block until it is done.
For example, here's how to use `dbHasExecutedQuery` to send three queries in
parallel on three different database connections, but only move on when all three
results are available.
```{r}
con2 <- dbConnect(RPostgres::Postgres())
con3 <- dbConnect(RPostgres::Postgres())
rs1 <- RPostgres::dbSendQueryAsync(con, "SELECT avg(a.disp) FROM mtcars as a WHERE a.cyl = 4")
rs2 <- RPostgres::dbSendQueryAsync(con2, "SELECT avg(a.disp) FROM mtcars as a WHERE a.cyl = 6")
rs3 <- RPostgres::dbSendQueryAsync(con3, "SELECT avg(a.disp) FROM mtcars as a WHERE a.cyl = 8")
while(!all(RPostgres::dbHasExecutedQuery(rs1), RPostgres::dbHasExecutedQuery(rs2), RPostgres::dbHasExecutedQuery(rs3))) {
Sys.sleep(0.1)
}
result <- rbind(dbFetch(rs1), dbFetch(rs2), dbFetch(rs3))
dbClearResult(rs1)
dbClearResult(rs2)
dbClearResult(rs3)
```
## What about promises?
The `promises` package provides a set of tools to greatly simplify working with
concurrent/asynchronous operations in R.
RPostgres does not have built-in support for promises, but it is easy to build a
promise using only the above functions. You'll need to asynchronously poll, using
`later`, to see when a result set is done, like so:
```{r}
promises::promise(function(resolve, reject) {
rs <- RPostgres::dbSendQueryAsync(con, "SELECT avg(a.disp), avg(d.disp)
FROM mtcars as a
JOIN mtcars as b ON 1 = 1
JOIN mtcars as c ON 1 = 1
JOIN mtcars as d ON 1 = 1
WHERE d.cyl = a.cyl")
check <- function() {
if (RPostgres:::dbHasExecutedQuery(rs)) {
result <- dbFetch(rs)
dbClearResult(rs)
resolve(result)
} else {
later::later(check, 0.1)
}
}
later::later(check, 0.1)
}) %>%
then(function(value) {
cat("The operation completed!\n")
print(value)
})
cat('Code down here executes first!\n')
```
```{r, echo = FALSE}
if (!is.null(rs)) { dbClearResult(rs) ; rs <- NULL }
if (!is.null(rs1)) { dbClearResult(rs1) ; rs1 <- NULL }
if (!is.null(rs2)) { dbClearResult(rs2) ; rs2 <- NULL }
if (!is.null(rs3)) { dbClearResult(rs3) ; rs3 <- NULL }
if (!is.null(con)) { dbDisconnect(con) ; con <- NULL }
if (!is.null(con2)) { dbDisconnect(con2) ; con2 <- NULL }
if (!is.null(con3)) { dbDisconnect(con3) ; con3 <- NULL }
```