-
Notifications
You must be signed in to change notification settings - Fork 15
/
sync.go
175 lines (157 loc) · 5.62 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package sync
import (
"os"
"reflect"
"github.com/kaskada-ai/kaskada/clients/cli/api"
"github.com/kaskada-ai/kaskada/clients/cli/utils"
"github.com/rs/zerolog/log"
"github.com/sergi/go-diff/diffmatchpatch"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/reflect/protoreflect"
)
// SyncCmd represents the sync command
var SyncCmd = &cobra.Command{
Use: "sync",
Short: "A set of commands for interacting with kaskada resources as code",
/*
Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example:
Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
*/
}
func init() {
SyncCmd.AddCommand(exportCmd)
SyncCmd.AddCommand(planCmd)
SyncCmd.AddCommand(applyCmd)
}
type planResult struct {
resourcesToCreate []protoreflect.ProtoMessage
resourcesToReplace []protoreflect.ProtoMessage
resourcesToSkip []protoreflect.ProtoMessage
}
func plan(apiClient api.ApiClient, files []string) (*planResult, error) {
// get combined spec from all the files
materializations := map[string]interface{}{}
tables := map[string]interface{}{}
views := map[string]interface{}{}
desiredResources := []protoreflect.ProtoMessage{}
for _, file := range files {
contents, err := os.ReadFile(file)
if err != nil {
return nil, err
}
spec, err := utils.YamlToSpec(contents)
if err != nil {
return nil, err
}
for _, materialization := range spec.Materializations {
if _, found := materializations[materialization.MaterializationName]; !found {
materializations[materialization.MaterializationName] = nil
desiredResources = append(desiredResources, materialization)
} else {
log.Warn().Str("name", materialization.MaterializationName).Str("file", file).Msg("skipping duplicate materialization")
}
}
for _, table := range spec.Tables {
if _, found := tables[table.TableName]; !found {
tables[table.TableName] = nil
desiredResources = append(desiredResources, table)
} else {
log.Warn().Str("name", table.TableName).Str("file", file).Msg("skipping duplicate table")
}
}
for _, view := range spec.Views {
if _, found := views[view.ViewName]; !found {
views[view.ViewName] = nil
desiredResources = append(desiredResources, view)
} else {
log.Warn().Str("name", view.ViewName).Str("file", file).Msg("skipping duplicate view")
}
}
}
result := planResult{
resourcesToCreate: []protoreflect.ProtoMessage{},
resourcesToReplace: []protoreflect.ProtoMessage{},
resourcesToSkip: []protoreflect.ProtoMessage{},
}
// diff combined spec with reality
for _, desired := range desiredResources {
switch compareResource(apiClient, desired) {
case Create:
result.resourcesToCreate = append(result.resourcesToCreate, desired)
case Replace:
result.resourcesToReplace = append(result.resourcesToReplace, desired)
default:
result.resourcesToSkip = append(result.resourcesToSkip, desired)
}
}
return &result, nil
}
func apply(apiClient api.ApiClient, plan planResult) error {
for _, resource := range plan.resourcesToCreate {
subLogger := log.With().Str("kind", reflect.TypeOf(resource).String()).Str("name", api.GetName(resource)).Logger()
if _, err := apiClient.Create(resource); err != nil {
subLogger.Error().Err(err).Str("kind", reflect.TypeOf(resource).String()).Str("name", api.GetName(resource)).Msg("issue creating resource")
return err
}
subLogger.Info().Msg("created resource with provided spec")
}
for _, resource := range plan.resourcesToReplace {
subLogger := log.With().Str("kind", reflect.TypeOf(resource).String()).Str("name", api.GetName(resource)).Logger()
if err := apiClient.Delete(resource, true); err != nil {
subLogger.Error().Err(err).Msg("issue deleting resource before re-create")
return err
}
if _, err := apiClient.Create(resource); err != nil {
subLogger.Error().Err(err).Msg("issue re-creating resource after delete")
return err
}
subLogger.Info().Msg("updated resource with provided spec")
}
return nil
}
type CompareResult int
const (
Error CompareResult = 0
Skip CompareResult = 1
Create CompareResult = 2
Replace CompareResult = 3
)
func compareResource(apiClient api.ApiClient, desired protoreflect.ProtoMessage) CompareResult {
subLogger := log.With().Str("kind", reflect.TypeOf(desired).String()).Str("name", api.GetName(desired)).Logger()
actual, err := apiClient.Get(desired)
if err != nil {
errStatus, ok := status.FromError(err)
if ok && errStatus.Code() == codes.NotFound {
subLogger.Info().Msg("resource not found on system, will create it")
return Create
} else {
subLogger.Error().Err(err).Msg("issue checking resource, skipping it")
return Error
}
}
actualYaml, err := utils.ProtoToYaml(api.ClearOutputOnlyFields(actual))
if err != nil {
subLogger.Error().Err(err).Msg("issue converting actual resource to yaml for diff, skipping it")
return Error
}
desiredYaml, err := utils.ProtoToYaml(desired)
if err != nil {
subLogger.Error().Err(err).Msg("issue converting desired resource to yaml for diff, skipping it")
return Error
}
if string(actualYaml) != string(desiredYaml) {
subLogger.Info().Msg("resource different than version on system, will replace it")
dmp := diffmatchpatch.New()
diffs := dmp.DiffMain(string(actualYaml), string(desiredYaml), true)
subLogger.Info().Msgf("Diff: %s", dmp.DiffPrettyText(diffs))
return Replace
} else {
subLogger.Info().Msg("resource identical to version on system, will skip it")
return Skip
}
}