-
Notifications
You must be signed in to change notification settings - Fork 5
/
databases.Rmd
279 lines (231 loc) · 7.73 KB
/
databases.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
---
title: "mirai - Databases and Arrow"
vignette: >
%\VignetteIndexEntry{mirai - Databases and Arrow}
%\VignetteEngine{knitr::knitr}
%\VignetteEncoding{UTF-8}
---
### Database Hosting - The Basics
`mirai` supports the hosting of multiple database connections across processes on the local machine or a remote server. `everywhere()` easily sets up identical database connections in each daemon process.
The following represents a simple example, which sets up 2 local daemons, and then opens a connection to the same SQLite file database in each daemon.
``` r
file <- tempfile()
library(mirai)
daemons(2)
#> [1] 2
```
``` r
everywhere({
library(DBI)
con <<- dbConnect(RSQLite::SQLite(), file)
}, file = file)
```
`mirai()` calls may then be used to write to or query the database, and may be executed on either daemon.
``` r
m <- mirai(dbWriteTable(con, "iris", iris))
m[]
#> [1] TRUE
```
``` r
m <- mirai(dbListTables(con))
m[]
#> [1] "iris"
```
``` r
m <- mirai(dbGetQuery(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6'))
m[]
#> Sepal.Length Sepal.Width Petal.Length Petal.Width Species
#> 1 4.4 2.9 1.4 0.2 setosa
#> 2 4.3 3.0 1.1 0.1 setosa
#> 3 4.4 3.0 1.3 0.2 setosa
#> 4 4.5 2.3 1.3 0.3 setosa
#> 5 4.4 3.2 1.3 0.2 setosa
```
`everywhere()` can be used again to cleanly tear down the databases, before resetting daemons.
``` r
everywhere(dbDisconnect(con))
daemons(0)
#> [1] 0
```
### Database Hosting - Using Arrow Database Connectivity
It is possible using the `DBI` interface to access and manipulate data in the Apache Arrow data format efficiently through ABDC (Arrow Database Connectivity).
The example below creates an in-memory SQLite connection using the `adbcsqlite` backend.
Serialization is set up with the relevant serialization and deserialization functions from the `arrow` package. Note that the format class is 'nanoarrow_array_stream' as `nanoarrow` is the backend for all queries made by the DBI `db*Arrow()` functions.
``` r
library(mirai)
daemons(1)
#> [1] 1
```
``` r
everywhere({
library(DBI) # `adbi` and `adbcsqlite` packages must also be installed
con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = ":memory:")
})
serialization(
refhook = list(arrow::write_to_raw,
function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE)),
class = "nanoarrow_array_stream"
)
```
`mirai()` calls may then be used to write to or query the database all in the Arrow format.
``` r
m <- mirai(dbWriteTableArrow(con, "iris", iris))
m[]
#> [1] TRUE
```
``` r
m <- mirai(dbReadTableArrow(con, "iris"))
m[]
#> Table
#> 150 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <string>
```
``` r
m <- mirai(dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6'))
m[]
#> Table
#> 5 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <string>
```
Due to the tight integration of the `mirai` serialization mechanism with R's 'refhook' system, we can easily return complex / nested objects containing multiple queries in the Arrow format:
``` r
m <- mirai({
a <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6')
b <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Width" < 2.6')
x <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Petal.Length" < 1.5')
y <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Petal.Width" < 0.2')
list(sepal = list(length = a, width = b), petal = list(length = x, width = y))
})
m[]
#> $sepal
#> $sepal$length
#> Table
#> 5 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <string>
#>
#> $sepal$width
#> Table
#> 19 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <string>
#>
#>
#> $petal
#> $petal$length
#> Table
#> 24 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <string>
#>
#> $petal$width
#> Table
#> 5 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <string>
```
As before, `everywhere()` can be used again to cleanly tear down the databases, before resetting daemons.
``` r
everywhere(dbDisconnect(con))
daemons(0)
#> [1] 0
```
### Shiny / mirai / DBI / ADBC Integrated Example
The following is an example of how database connections hosted in mirai daemons may be used to power a Shiny app.
The one-time `serialization()` setup ensures seamless transport of Apache Arrow data, and occurs in the global environment outside of `server()`.
A new database connection is created in a new daemon process for every new Shiny session. The resources are freed when a sesssion ends. This logic is all defined within `server()`. A unique ID is used to identify each session, and is specified as the 'compute profile' for daemons.
Non-dispatcher daemons are created as scheduling is not required (all queries expected to take roughly the same time, and in this case each session uses only one daemon anyway).
Shiny ExtendedTask is then used to perform each query via a `mirai()` call, using the session-specific compute profile.
``` r
library(mirai)
library(secretbase)
library(shiny)
library(bslib)
# write 'iris' dataset to temp database file (for this demonstration)
file <- tempfile()
con <- DBI::dbConnect(adbi::adbi("adbcsqlite"), uri = file)
DBI::dbWriteTableArrow(con, "iris", iris)
DBI::dbDisconnect(con)
# common input parameters
slmin <- min(iris$Sepal.Length)
slmax <- max(iris$Sepal.Length)
ui <- page_fluid(
p("The time is ", textOutput("current_time", inline = TRUE)),
hr(),
h3("Shiny / mirai / DBI / ADBC demonstration"),
p("New daemon-hosted database connection is created for every Shiny session"),
sliderInput(
"sl", "Query iris dataset based on Sepal Length", min = slmin, max = slmax,
value = c(slmin, slmax), width = "75%"
),
input_task_button("btn", "Return query"),
tableOutput("table")
)
# uses Shiny ExtendedTask with mirai
server <- function(input, output, session) {
# create unique session id by hashing current time with a random key
id <- secretbase::siphash13(Sys.time(), key = nanonext::random(4L))
# create new daemon for each session (set dispatcher = FALSE)
daemons(1L, dispatcher = FALSE, .compute = id)
# tear down daemon when session ends
session$onEnded(function() daemons(0L, .compute = id))
# everywhere() loads DBI and creates ADBC connection in each daemon
everywhere(
{
library(DBI) # `adbi` and `adbcsqlite` packages must also be installed
con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = file)
},
file = file,
.compute = id
)
output$current_time <- renderText({
invalidateLater(1000)
format(Sys.time(), "%H:%M:%S %p")
})
task <- ExtendedTask$new(
function(...) mirai(
dbGetQueryArrow(
con,
sprintf(
"SELECT * FROM iris WHERE \"Sepal.Length\" BETWEEN %.2f AND %.2f",
sl[1L],
sl[2L]
)
),
...,
.compute = id
)
) |> bind_task_button("btn")
observeEvent(input$btn, task$invoke(sl = input$sl))
output$table <- renderTable(task$result())
}
# serialization() specifies the native Arrow serialization functions
serialization(
refhook = list(arrow::write_to_raw, nanoarrow::read_nanoarrow),
class = "nanoarrow_array_stream"
)
# run Shiny app
shinyApp(ui = ui, server = server)
# deletes temp database file (for this demonstration)
unlink(file)
```