/
connections.go
343 lines (294 loc) · 10 KB
/
connections.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
package runtime
import (
"context"
"fmt"
"maps"
"path/filepath"
"strconv"
"strings"
"github.com/rilldata/rill/runtime/drivers"
)
var ErrAdminNotConfigured = fmt.Errorf("an admin service is not configured for this instance")
var ErrAINotConfigured = fmt.Errorf("an AI service is not configured for this instance")
func (r *Runtime) AcquireSystemHandle(ctx context.Context, connector string) (drivers.Handle, func(), error) {
for _, c := range r.opts.SystemConnectors {
if c.Name == connector {
cfg := make(map[string]any, len(c.Config)+1)
for k, v := range c.Config {
cfg[strings.ToLower(k)] = v
}
cfg["allow_host_access"] = r.opts.AllowHostAccess
return r.getConnection(ctx, "", c.Type, cfg)
}
}
return nil, nil, fmt.Errorf("connector %s doesn't exist", connector)
}
// AcquireHandle returns instance specific handle
func (r *Runtime) AcquireHandle(ctx context.Context, instanceID, connector string) (drivers.Handle, func(), error) {
cfg, err := r.ConnectorConfig(ctx, instanceID, connector)
if err != nil {
return nil, nil, err
}
if ctx.Err() != nil {
// Many code paths around connection acquisition leverage caches that won't actually touch the ctx.
// So we take this moment to make sure the ctx gets checked for cancellation at least every once in a while.
return nil, nil, ctx.Err()
}
return r.getConnection(ctx, instanceID, cfg.Driver, cfg.Resolve())
}
func (r *Runtime) Repo(ctx context.Context, instanceID string) (drivers.RepoStore, func(), error) {
inst, err := r.Instance(ctx, instanceID)
if err != nil {
return nil, nil, err
}
conn, release, err := r.AcquireHandle(ctx, instanceID, inst.RepoConnector)
if err != nil {
return nil, nil, err
}
repo, ok := conn.AsRepoStore(instanceID)
if !ok {
release()
return nil, release, fmt.Errorf("connector %q is not a valid project file store", inst.RepoConnector)
}
return repo, release, nil
}
func (r *Runtime) Admin(ctx context.Context, instanceID string) (drivers.AdminService, func(), error) {
inst, err := r.Instance(ctx, instanceID)
if err != nil {
return nil, nil, err
}
// The admin connector is optional
if inst.AdminConnector == "" {
return nil, nil, ErrAdminNotConfigured
}
conn, release, err := r.AcquireHandle(ctx, instanceID, inst.AdminConnector)
if err != nil {
return nil, nil, err
}
admin, ok := conn.AsAdmin(instanceID)
if !ok {
release()
return nil, nil, fmt.Errorf("connector %q is not a valid admin service", inst.AdminConnector)
}
return admin, release, nil
}
func (r *Runtime) AI(ctx context.Context, instanceID string) (drivers.AIService, func(), error) {
inst, err := r.Instance(ctx, instanceID)
if err != nil {
return nil, nil, err
}
// The AI connector is optional
if inst.AIConnector == "" {
return nil, nil, ErrAINotConfigured
}
conn, release, err := r.AcquireHandle(ctx, instanceID, inst.AIConnector)
if err != nil {
return nil, nil, err
}
ai, ok := conn.AsAI(instanceID)
if !ok {
release()
return nil, nil, fmt.Errorf("connector %q is not a valid AI service", inst.AIConnector)
}
return ai, release, nil
}
// OLAP returns a handle for an OLAP data store.
// The connector argument is optional. If not provided, the instance's default OLAP connector is used.
func (r *Runtime) OLAP(ctx context.Context, instanceID, connector string) (drivers.OLAPStore, func(), error) {
inst, err := r.Instance(ctx, instanceID)
if err != nil {
return nil, nil, err
}
if connector == "" {
connector = inst.ResolveOLAPConnector()
}
conn, release, err := r.AcquireHandle(ctx, instanceID, connector)
if err != nil {
return nil, nil, err
}
olap, ok := conn.AsOLAP(instanceID)
if !ok {
release()
return nil, nil, fmt.Errorf("connector %q is not a valid OLAP data store", connector)
}
return olap, release, nil
}
func (r *Runtime) Catalog(ctx context.Context, instanceID string) (drivers.CatalogStore, func(), error) {
inst, err := r.Instance(ctx, instanceID)
if err != nil {
return nil, nil, err
}
if inst.EmbedCatalog {
conn, release, err := r.AcquireHandle(ctx, instanceID, inst.ResolveOLAPConnector())
if err != nil {
return nil, nil, err
}
store, ok := conn.AsCatalogStore(instanceID)
if !ok {
release()
return nil, nil, fmt.Errorf("can't embed catalog because it is not supported by the connector %q", inst.ResolveOLAPConnector())
}
return store, release, nil
}
if inst.CatalogConnector == "" {
store, ok := r.metastore.AsCatalogStore(instanceID)
if !ok {
return nil, nil, fmt.Errorf("metastore cannot serve as catalog")
}
return store, func() {}, nil
}
conn, release, err := r.AcquireHandle(ctx, instanceID, inst.CatalogConnector)
if err != nil {
return nil, nil, err
}
store, ok := conn.AsCatalogStore(instanceID)
if !ok {
release()
return nil, nil, fmt.Errorf("connector %q is not a valid catalog store", inst.CatalogConnector)
}
return store, release, nil
}
func (r *Runtime) ConnectorConfig(ctx context.Context, instanceID, name string) (*ConnectorConfig, error) {
inst, err := r.Instance(ctx, instanceID)
if err != nil {
return nil, err
}
res := &ConnectorConfig{}
// Search for connector definition in instance
for _, c := range inst.Connectors {
if c.Name == name {
res.Driver = c.Type
res.Preset = maps.Clone(c.Config) // Cloning because Preset may be mutated later, but the inst object is shared.
break
}
}
// Search for connector definition in rill.yaml
for _, c := range inst.ProjectConnectors {
if c.Name == name {
res.Driver = c.Type
res.Project = maps.Clone(c.Config) // Cloning because Project may be mutated later, but the inst object is shared.
break
}
}
// Search for implicit connectors (where the name matches a driver name)
if res.Driver == "" {
_, ok := drivers.Drivers[name]
if ok {
res.Driver = name
}
}
// Return if search for connector driver was unsuccessful
if res.Driver == "" {
return nil, fmt.Errorf("unknown connector %q", name)
}
// Build res.Env config based on instance variables matching the format "connector.name.var"
vars := inst.ResolveVariables()
prefix := fmt.Sprintf("connector.%s.", name)
for k, v := range vars {
if after, found := strings.CutPrefix(k, prefix); found {
if res.Env == nil {
res.Env = make(map[string]string)
}
res.Env[after] = v
}
}
// For backwards compatibility, certain root-level variables apply to certain implicit connectors.
// NOTE: This switches on connector.Name, not connector.Type, because this only applies to implicit connectors.
switch name {
case "s3", "athena", "redshift":
res.setPreset("aws_access_key_id", vars["aws_access_key_id"], false)
res.setPreset("aws_secret_access_key", vars["aws_secret_access_key"], false)
res.setPreset("aws_session_token", vars["aws_session_token"], false)
case "azure":
res.setPreset("azure_storage_account", vars["azure_storage_account"], false)
res.setPreset("azure_storage_key", vars["azure_storage_key"], false)
res.setPreset("azure_storage_sas_token", vars["azure_storage_sas_token"], false)
res.setPreset("azure_storage_connection_string", vars["azure_storage_connection_string"], false)
case "gcs":
res.setPreset("google_application_credentials", vars["google_application_credentials"], false)
case "bigquery":
res.setPreset("google_application_credentials", vars["google_application_credentials"], false)
case "motherduck":
res.setPreset("token", vars["token"], false)
res.setPreset("dsn", "", true)
case "local_file":
// The "local_file" connector needs to know the repo root.
// TODO: This is an ugly hack. But how can we get rid of it?
if inst.RepoConnector != "local_file" { // The RepoConnector shouldn't be named "local_file", but let's still try to avoid infinite recursion
repo, release, err := r.Repo(ctx, instanceID)
if err != nil {
return nil, err
}
res.setPreset("dsn", repo.Root(), true)
release()
}
}
// Apply built-in system-wide config
res.setPreset("allow_host_access", strconv.FormatBool(r.opts.AllowHostAccess), true)
// data_dir stores persistent data
res.setPreset("data_dir", filepath.Join(r.opts.DataDir, instanceID, name), true)
// temp_dir stores temporary data. The logic that creates any temporary file here should also delete them.
// The contents will also be deleted on runtime restarts.
res.setPreset("temp_dir", filepath.Join(r.opts.DataDir, instanceID, "tmp"), true)
// Done
return res, nil
}
// ConnectorConfig holds and resolves connector configuration.
// We support three levels of configuration:
// 1. Preset: provided when creating the instance (or set by the system, such as allow_host_access). Cannot be overridden.
// 2. Project: defined in the rill.yaml file. Can be overridden by the env.
// 3. Env: defined in the instance's variables (in the format "connector.name.var").
type ConnectorConfig struct {
Driver string
Preset map[string]string
Project map[string]string
Env map[string]string
}
// Resolve returns the final resolved connector configuration.
// It guarantees that all keys in the result are lowercase.
func (c *ConnectorConfig) Resolve() map[string]any {
n := len(c.Preset) + len(c.Project) + len(c.Env)
if n == 0 {
return nil
}
cfg := make(map[string]any, n)
for k, v := range c.Project {
cfg[strings.ToLower(k)] = v
}
for k, v := range c.Env {
cfg[strings.ToLower(k)] = v
}
for k, v := range c.Preset {
cfg[strings.ToLower(k)] = v
}
return cfg
}
// ResolveString is similar to Resolve, but it returns a map of strings.
func (c *ConnectorConfig) ResolveStrings() map[string]string {
n := len(c.Preset) + len(c.Project) + len(c.Env)
if n == 0 {
return nil
}
cfg := make(map[string]string, n)
for k, v := range c.Project {
cfg[strings.ToLower(k)] = v
}
for k, v := range c.Env {
cfg[strings.ToLower(k)] = v
}
for k, v := range c.Preset {
cfg[strings.ToLower(k)] = v
}
return cfg
}
// setPreset sets a preset value.
// If the provided value is empty, it will not be added unless force is true.
func (c *ConnectorConfig) setPreset(k, v string, force bool) {
if v == "" && !force {
return
}
if c.Preset == nil {
c.Preset = make(map[string]string)
}
c.Preset[k] = v
}