-
Notifications
You must be signed in to change notification settings - Fork 117
/
migrator.go
173 lines (151 loc) · 5.63 KB
/
migrator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package migrator
import (
"context"
"fmt"
"time"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers"
)
/**
* Any entity specific actions when a catalog is touched will be here.
* EG: on delete sources will drop the table and models will drop the view
* Any future entity specific cache invalidation will go here as well.
*
* TODO: does migrator name fit this?
* TODO: is this in the right place?
*/
var Migrators = make(map[drivers.ObjectType]EntityMigrator)
func Register(t drivers.ObjectType, artifact EntityMigrator) {
if Migrators[t] != nil {
panic(fmt.Errorf("already registered migrator type with name '%v'", t))
}
Migrators[t] = artifact
}
type Options struct {
InstanceEnv map[string]string
IngestStorageLimitInBytes int64
}
type EntityMigrator interface {
Create(ctx context.Context, olap drivers.OLAPStore, repo drivers.RepoStore, opts Options, catalog *drivers.CatalogEntry) error
Update(ctx context.Context, olap drivers.OLAPStore, repo drivers.RepoStore, opts Options, oldCatalog *drivers.CatalogEntry, newCatalog *drivers.CatalogEntry) error
Rename(ctx context.Context, olap drivers.OLAPStore, from string, catalog *drivers.CatalogEntry) error
Delete(ctx context.Context, olap drivers.OLAPStore, catalog *drivers.CatalogEntry) error
GetDependencies(ctx context.Context, olap drivers.OLAPStore, catalog *drivers.CatalogEntry) ([]string, []*drivers.CatalogEntry)
Validate(ctx context.Context, olap drivers.OLAPStore, catalog *drivers.CatalogEntry) []*runtimev1.ReconcileError
// IsEqual checks everything but the name
IsEqual(ctx context.Context, cat1 *drivers.CatalogEntry, cat2 *drivers.CatalogEntry) bool
ExistsInOlap(ctx context.Context, olap drivers.OLAPStore, catalog *drivers.CatalogEntry) (bool, error)
}
func Create(ctx context.Context, olap drivers.OLAPStore, repo drivers.RepoStore, opts Options, catalog *drivers.CatalogEntry) error {
migrator, ok := getMigrator(catalog)
if !ok {
// no error here. not all migrators are needed
return nil
}
return migrator.Create(ctx, olap, repo, opts, catalog)
}
func Update(ctx context.Context, olap drivers.OLAPStore, repo drivers.RepoStore, opts Options, oldCatalog, newCatalog *drivers.CatalogEntry) error {
migrator, ok := getMigrator(newCatalog)
if !ok {
// no error here. not all migrators are needed
return nil
}
return migrator.Update(ctx, olap, repo, opts, oldCatalog, newCatalog)
}
func Rename(ctx context.Context, olap drivers.OLAPStore, from string, catalog *drivers.CatalogEntry) error {
migrator, ok := getMigrator(catalog)
if !ok {
// no error here. not all migrators are needed
return nil
}
return migrator.Rename(ctx, olap, from, catalog)
}
func Delete(ctx context.Context, olap drivers.OLAPStore, catalog *drivers.CatalogEntry) error {
migrator, ok := getMigrator(catalog)
if !ok {
// no error here. not all migrators are needed
return nil
}
return migrator.Delete(ctx, olap, catalog)
}
func GetDependencies(ctx context.Context, olap drivers.OLAPStore, catalog *drivers.CatalogEntry) ([]string, []*drivers.CatalogEntry) {
migrator, ok := getMigrator(catalog)
if !ok {
// no error here. not all migrators are needed
return []string{}, nil
}
return migrator.GetDependencies(ctx, olap, catalog)
}
// Validate also returns list of dependents.
func Validate(ctx context.Context, olap drivers.OLAPStore, catalog *drivers.CatalogEntry) []*runtimev1.ReconcileError {
migrator, ok := getMigrator(catalog)
if !ok {
// no error here. not all migrators are needed
return nil
}
return migrator.Validate(ctx, olap, catalog)
}
// IsEqual checks everything but the name.
func IsEqual(ctx context.Context, cat1, cat2 *drivers.CatalogEntry) bool {
if cat1.Type != cat2.Type {
return false
}
migrator, ok := getMigrator(cat1)
if !ok {
// no error here. not all migrators are needed
return false
}
return migrator.IsEqual(ctx, cat1, cat2)
}
func ExistsInOlap(ctx context.Context, olap drivers.OLAPStore, catalog *drivers.CatalogEntry) (bool, error) {
migrator, ok := getMigrator(catalog)
if !ok {
// no error here. not all migrators are needed
return false, nil
}
return migrator.ExistsInOlap(ctx, olap, catalog)
}
func SetSchema(ctx context.Context, olap drivers.OLAPStore, catalog *drivers.CatalogEntry) error {
// TODO: do we need to push this to individual implementations?
if catalog.Type == drivers.ObjectTypeMetricsView {
return nil
}
table, err := olap.InformationSchema().Lookup(ctx, catalog.Name)
if err != nil {
return err
}
switch catalog.Type {
case drivers.ObjectTypeTable:
catalog.GetTable().Schema = table.Schema
case drivers.ObjectTypeSource:
catalog.GetSource().Schema = table.Schema
case drivers.ObjectTypeModel:
catalog.GetModel().Schema = table.Schema
}
return nil
}
func LastUpdated(ctx context.Context, instID string, repo drivers.RepoStore, catalog *drivers.CatalogEntry) (time.Time, error) {
// TODO: do we need to push this to individual implementations to handle just local_file source?
if catalog.Type != drivers.ObjectTypeSource || catalog.GetSource().Connector != "local_file" {
// return a very old time
return time.Time{}, nil
}
stat, err := repo.Stat(ctx, instID, catalog.GetSource().Properties.Fields["path"].GetStringValue())
if err != nil {
return time.Time{}, err
}
return stat.LastUpdated, nil
}
func CreateValidationError(filePath, message string) []*runtimev1.ReconcileError {
return []*runtimev1.ReconcileError{
{
Code: runtimev1.ReconcileError_CODE_VALIDATION,
FilePath: filePath,
Message: message,
},
}
}
func getMigrator(catalog *drivers.CatalogEntry) (EntityMigrator, bool) {
m, ok := Migrators[catalog.Type]
return m, ok
}