-
Notifications
You must be signed in to change notification settings - Fork 117
/
registry.go
223 lines (192 loc) · 6.04 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package runtime
import (
"context"
"errors"
"fmt"
"strconv"
"github.com/rilldata/rill/runtime/compilers/rillv1beta"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/observability"
"go.uber.org/zap"
)
func (r *Runtime) FindInstances(ctx context.Context) ([]*drivers.Instance, error) {
return r.Registry().FindInstances(ctx)
}
func (r *Runtime) FindInstance(ctx context.Context, instanceID string) (*drivers.Instance, error) {
return r.Registry().FindInstance(ctx, instanceID)
}
func (r *Runtime) CreateInstance(ctx context.Context, inst *drivers.Instance) error {
// Check OLAP connection
olap, _, err := r.checkOlapConnection(inst)
if err != nil {
return err
}
defer olap.Close()
// Check repo connection
repo, repoStore, err := r.checkRepoConnection(inst)
if err != nil {
return err
}
defer repo.Close()
// Check that it's a driver that supports embedded catalogs
if inst.EmbedCatalog {
_, ok := olap.CatalogStore()
if !ok {
return errors.New("driver does not support embedded catalogs")
}
}
// Prepare connections for use
err = olap.Migrate(ctx)
if err != nil {
return fmt.Errorf("failed to prepare instance: %w", err)
}
err = repo.Migrate(ctx)
if err != nil {
return fmt.Errorf("failed to prepare instance: %w", err)
}
c := rillv1beta.New(repoStore, inst.ID)
proj, err := c.ProjectConfig(ctx)
if err != nil {
return err
}
inst.ProjectVariables = proj.Variables
// this is a hack to set variables and pass to connectors
// ideally the runtime should propagate this flag to connectors.Env
if inst.Variables == nil {
inst.Variables = make(map[string]string)
}
inst.Variables["allow_host_access"] = strconv.FormatBool(r.opts.AllowHostAccess)
// Create instance
err = r.Registry().CreateInstance(ctx, inst)
if err != nil {
return err
}
return nil
}
func (r *Runtime) DeleteInstance(ctx context.Context, instanceID string, dropDB bool) error {
inst, err := r.Registry().FindInstance(ctx, instanceID)
if err != nil {
if errors.Is(err, drivers.ErrNotFound) {
return nil
}
return err
}
// delete instance related data if catalog is not embedded
if !inst.EmbedCatalog {
catalog, err := r.Catalog(ctx, instanceID)
if err != nil {
return err
}
err = catalog.DeleteEntries(ctx, instanceID)
if err != nil {
return err
}
}
if dropDB {
olap, err := r.OLAP(ctx, instanceID)
if err != nil {
return err
}
// ignoring the dropDB error since if db is already dropped it may not be possible to retry
err = olap.DropDB()
if err != nil {
r.logger.Error("could not drop database", zap.Error(err), zap.String("instance_id", instanceID), observability.ZapCtx(ctx))
}
}
r.evictCaches(ctx, inst)
// delete instance
return r.Registry().DeleteInstance(ctx, instanceID)
}
// EditInstance edits exisiting instance.
// Confirming to put api specs, it is expected to send entire existing instance data.
// The API compares and only evicts caches if drivers or dsn is changed.
// This is done to ensure that db handlers are not unnecessarily closed
func (r *Runtime) EditInstance(ctx context.Context, inst *drivers.Instance) error {
olderInstance, err := r.Registry().FindInstance(ctx, inst.ID)
if err != nil {
return err
}
// 1. changes in olap driver or olap dsn
olapChanged := olderInstance.OLAPDriver != inst.OLAPDriver || olderInstance.OLAPDSN != inst.OLAPDSN
if olapChanged {
// Check OLAP connection
olap, _, err := r.checkOlapConnection(inst)
if err != nil {
return err
}
defer olap.Close()
// Prepare connections for use
err = olap.Migrate(ctx)
if err != nil {
return fmt.Errorf("failed to prepare instance: %w", err)
}
}
// 2. Check that it's a driver that supports embedded catalogs
if inst.EmbedCatalog {
olapConn, err := r.connCache.get(ctx, inst.ID, inst.OLAPDriver, inst.OLAPDSN)
if err != nil {
return err
}
_, ok := olapConn.CatalogStore()
if !ok {
return errors.New("driver does not support embedded catalogs")
}
}
// 3. changes in repo driver or repo dsn
repoChanged := inst.RepoDriver != olderInstance.RepoDriver || inst.RepoDSN != olderInstance.RepoDSN
if repoChanged {
// Check repo connection
repo, _, err := r.checkRepoConnection(inst)
if err != nil {
return err
}
defer repo.Close()
// Prepare connections for use
err = repo.Migrate(ctx)
if err != nil {
return fmt.Errorf("failed to prepare instance: %w", err)
}
}
// evict caches if connections need to be updated
if olapChanged || repoChanged {
r.evictCaches(ctx, olderInstance)
}
// update variables
if inst.Variables == nil {
inst.Variables = make(map[string]string)
}
inst.Variables["allow_host_access"] = strconv.FormatBool(r.opts.AllowHostAccess)
// update the entire instance for now to avoid building queries in some complicated way
return r.Registry().EditInstance(ctx, inst)
}
// TODO :: this is a rudimentary solution and ideally should be done in some producer/consumer pattern
func (r *Runtime) evictCaches(ctx context.Context, inst *drivers.Instance) {
// evict and close exisiting connection
r.connCache.evict(ctx, inst.ID, inst.OLAPDriver, inst.OLAPDSN)
r.connCache.evict(ctx, inst.ID, inst.RepoDriver, inst.RepoDSN)
// evict catalog cache
r.migrationMetaCache.evict(ctx, inst.ID)
// query cache can't be evicted since key is a combination of instance ID and other parameters
}
func (r *Runtime) checkRepoConnection(inst *drivers.Instance) (drivers.Connection, drivers.RepoStore, error) {
repo, err := drivers.Open(inst.RepoDriver, inst.RepoDSN, r.logger)
if err != nil {
return nil, nil, err
}
repoStore, ok := repo.RepoStore()
if !ok {
return nil, nil, fmt.Errorf("not a valid repo driver: '%s'", inst.RepoDriver)
}
return repo, repoStore, nil
}
func (r *Runtime) checkOlapConnection(inst *drivers.Instance) (drivers.Connection, drivers.OLAPStore, error) {
olap, err := drivers.Open(inst.OLAPDriver, inst.OLAPDSN, r.logger)
if err != nil {
return nil, nil, err
}
olapStore, ok := olap.OLAPStore()
if !ok {
return nil, nil, fmt.Errorf("not a valid OLAP driver: '%s'", inst.OLAPDriver)
}
return olap, olapStore, nil
}