-
Notifications
You must be signed in to change notification settings - Fork 106
/
catalog.go
142 lines (117 loc) · 3.14 KB
/
catalog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package sqlite
import (
"context"
"time"
"github.com/rilldata/rill/runtime/drivers"
)
type catalogStore struct {
*connection
instanceID string
}
func (c *catalogStore) NextControllerVersion(ctx context.Context) (int64, error) {
_, err := c.db.ExecContext(ctx, "INSERT OR IGNORE INTO controller_version(instance_id, version) VALUES (?, 0)", c.instanceID)
if err != nil {
return 0, err
}
_, err = c.db.ExecContext(ctx, "UPDATE controller_version SET version = version + 1 WHERE instance_id=?", c.instanceID)
if err != nil {
return 0, err
}
// TODO: Get it transactionally
var version int64
err = c.db.QueryRowContext(ctx, "SELECT version FROM controller_version WHERE instance_id=?", c.instanceID).Scan(&version)
if err != nil {
return 0, err
}
return version, nil
}
func (c *catalogStore) CheckControllerVersion(ctx context.Context, v int64) error {
var version int64
err := c.db.QueryRowContext(ctx, "SELECT version FROM controller_version WHERE instance_id=?", c.instanceID).Scan(&version)
if err != nil {
return err
}
if version != v {
return drivers.ErrInconsistentControllerVersion
}
return nil
}
func (c *catalogStore) FindResources(ctx context.Context) ([]drivers.Resource, error) {
rows, err := c.db.QueryxContext(ctx, "SELECT kind, name, data FROM catalogv2 WHERE instance_id=? ORDER BY kind, lower(name)", c.instanceID)
if err != nil {
return nil, err
}
defer rows.Close()
var res []drivers.Resource
for rows.Next() {
r := drivers.Resource{}
err := rows.Scan(&r.Kind, &r.Name, &r.Data)
if err != nil {
return nil, err
}
res = append(res, r)
}
if rows.Err() != nil {
return nil, err
}
return res, nil
}
func (c *catalogStore) CreateResource(ctx context.Context, v int64, r drivers.Resource) error {
err := c.CheckControllerVersion(ctx, v) // TODO: Do it transactionally
if err != nil {
return err
}
now := time.Now()
_, err = c.db.ExecContext(
ctx,
"INSERT INTO catalogv2(instance_id, kind, name, data, created_on, updated_on) VALUES (?, ?, ?, ?, ?, ?)",
c.instanceID,
r.Kind,
r.Name,
r.Data,
now,
now,
)
if err != nil {
return err
}
return nil
}
func (c *catalogStore) UpdateResource(ctx context.Context, v int64, r drivers.Resource) error {
err := c.CheckControllerVersion(ctx, v) // TODO: Do it transactionally
if err != nil {
return err
}
_, err = c.db.ExecContext(
ctx,
"UPDATE catalogv2 SET name=?, data=?, updated_on=? WHERE instance_id=? AND kind=? AND lower(name)=lower(?)",
r.Name,
r.Data,
time.Now(),
c.instanceID,
r.Kind,
r.Name,
)
if err != nil {
return err
}
return nil
}
func (c *catalogStore) DeleteResource(ctx context.Context, v int64, k, n string) error {
err := c.CheckControllerVersion(ctx, v) // TODO: Do it transactionally
if err != nil {
return err
}
_, err = c.db.ExecContext(ctx, "DELETE FROM catalogv2 WHERE instance_id=? AND kind=? AND lower(name)=lower(?)", c.instanceID, k, n)
if err != nil {
return err
}
return nil
}
func (c *catalogStore) DeleteResources(ctx context.Context) error {
_, err := c.db.ExecContext(ctx, "DELETE FROM catalogv2 WHERE instance_id=?", c.instanceID)
if err != nil {
return err
}
return nil
}