-
Notifications
You must be signed in to change notification settings - Fork 111
/
models.go
144 lines (126 loc) · 4.94 KB
/
models.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package models
import (
"context"
"errors"
"fmt"
"strings"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/activity"
"github.com/rilldata/rill/runtime/services/catalog/migrator"
"github.com/rilldata/rill/runtime/services/catalog/migrator/sources"
"go.uber.org/zap"
)
func init() {
migrator.Register(drivers.ObjectTypeModel, &modelMigrator{})
}
type modelMigrator struct{}
func (m *modelMigrator) Create(ctx context.Context, olap drivers.OLAPStore, repo drivers.RepoStore, opts migrator.Options, catalogObj *drivers.CatalogEntry, logger *zap.Logger, ac activity.Client) error {
sql := catalogObj.GetModel().Sql
materialize := catalogObj.GetModel().Materialize
return olap.CreateTableAsSelect(ctx, catalogObj.Name, !materialize, sql)
}
func (m *modelMigrator) Update(ctx context.Context, olap drivers.OLAPStore, repo drivers.RepoStore, opts migrator.Options, oldCatalogObj, newCatalogObj *drivers.CatalogEntry, logger *zap.Logger, ac activity.Client) error {
if oldCatalogObj.Name != newCatalogObj.Name {
// should not happen but just to be sure
return errors.New("update is called but model name has changed")
}
oldModel := oldCatalogObj.GetModel()
newModel := newCatalogObj.GetModel()
oldMaterializeType := getMaterializeType(oldModel.Materialize)
newMaterializeType := getMaterializeType(newModel.Materialize)
// if materialize type changed, drop the old type before creating the new
// (since "CREATE OR REPLACE" doesn't handle changes from VIEW to TABLE or vice versa)
if oldMaterializeType != newMaterializeType {
err := m.Delete(ctx, olap, oldCatalogObj)
if err != nil {
return err
}
}
// re-create to ensure column checks and/or re-materialization in case underlying source changed
return m.Create(ctx, olap, repo, opts, newCatalogObj, logger, nil)
}
func getMaterializeType(materialize bool) string {
if materialize {
return "TABLE"
}
return "VIEW"
}
func (m *modelMigrator) Rename(ctx context.Context, olap drivers.OLAPStore, from string, catalogObj *drivers.CatalogEntry) error {
materializeType := getMaterializeType(catalogObj.GetModel().Materialize)
if strings.EqualFold(from, catalogObj.Name) {
tempName := fmt.Sprintf("__rill_temp_%s", from)
err := olap.Exec(ctx, &drivers.Statement{
Query: fmt.Sprintf("ALTER %s %q RENAME TO %q", materializeType, from, tempName),
Priority: 100,
})
if err != nil {
return err
}
from = tempName
}
return olap.Exec(ctx, &drivers.Statement{
Query: fmt.Sprintf("ALTER %s %q RENAME TO %q", materializeType, from, catalogObj.Name),
Priority: 100,
})
}
func (m *modelMigrator) Delete(ctx context.Context, olap drivers.OLAPStore, catalogObj *drivers.CatalogEntry) error {
materializeType := getMaterializeType(catalogObj.GetModel().Materialize)
return olap.Exec(ctx, &drivers.Statement{
Query: fmt.Sprintf("DROP %s IF EXISTS %q", materializeType, catalogObj.Name),
Priority: 100,
})
}
func (m *modelMigrator) GetDependencies(ctx context.Context, olap drivers.OLAPStore, catalog *drivers.CatalogEntry) ([]string, []*drivers.CatalogEntry) {
model := catalog.GetModel()
dependencies := ExtractTableNames(model.Sql)
embeddedSourcesMap := make(map[string]*drivers.CatalogEntry)
for i, dependency := range dependencies {
source, ok := sources.ParseEmbeddedSource(dependency)
if !ok {
continue
}
if _, ok := embeddedSourcesMap[source.Name]; ok {
continue
}
embeddedSourcesMap[source.Name] = &drivers.CatalogEntry{
Name: source.Name,
Type: drivers.ObjectTypeSource,
Object: source,
Path: source.Properties.AsMap()["path"].(string),
Embedded: true,
}
// replace the dependency
dependencies[i] = source.Name
model.Sql = strings.ReplaceAll(model.Sql, dependency, source.Name)
}
embeddedSources := make([]*drivers.CatalogEntry, 0)
for _, embeddedSource := range embeddedSourcesMap {
embeddedSources = append(embeddedSources, embeddedSource)
}
return dependencies, embeddedSources
}
func (m *modelMigrator) Validate(ctx context.Context, olap drivers.OLAPStore, catalog *drivers.CatalogEntry) []*runtimev1.ReconcileError {
model := catalog.GetModel()
err := olap.Exec(ctx, &drivers.Statement{
Query: model.Sql,
Priority: 100,
DryRun: true,
})
if err != nil {
return migrator.CreateValidationError(catalog.Path, err.Error())
}
return nil
}
func (m *modelMigrator) IsEqual(ctx context.Context, cat1, cat2 *drivers.CatalogEntry) bool {
return cat1.GetModel().Dialect == cat2.GetModel().Dialect && strings.EqualFold(cat1.GetModel().Sql, cat2.GetModel().Sql) && cat1.GetModel().Materialize == cat2.GetModel().Materialize
}
func (m *modelMigrator) ExistsInOlap(ctx context.Context, olap drivers.OLAPStore, catalog *drivers.CatalogEntry) (bool, error) {
_, err := olap.InformationSchema().Lookup(ctx, catalog.Name)
if errors.Is(err, drivers.ErrNotFound) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}