-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.go
317 lines (272 loc) · 8.37 KB
/
app.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
package app
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/jlewi/hydros/pkg/controllers"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/go-logr/zapr"
"github.com/jlewi/hydros/api/v1alpha1"
"github.com/jlewi/hydros/pkg/config"
"github.com/jlewi/hydros/pkg/ecrutil"
"github.com/jlewi/hydros/pkg/files"
"github.com/jlewi/hydros/pkg/github"
"github.com/jlewi/hydros/pkg/gitops"
"github.com/jlewi/hydros/pkg/images"
"github.com/jlewi/hydros/pkg/util"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"go.uber.org/zap"
)
// App is a struct to hold values needed across all commands.
// Intent is to simplify initialization across commands.
type App struct {
Config *config.Config
Registry *controllers.Registry
}
func NewApp() *App {
return &App{}
}
// LoadConfig loads the config. It takes an optional command. The command allows values to be overwritten from
// the CLI.
func (a *App) LoadConfig(cmd *cobra.Command) error {
// N.B. at this point we haven't configured any logging so zap just returns the default logger.
// TODO(jeremy): Should we just initialize the logger without cfg and then reinitialize it after we've read the config?
if err := config.InitViper(cmd); err != nil {
return err
}
cfg := config.GetConfig()
if problems := cfg.IsValid(); len(problems) > 0 {
fmt.Fprintf(os.Stdout, "Invalid configuration; %s\n", strings.Join(problems, "\n"))
return fmt.Errorf("invalid configuration; fix the problems and then try again")
}
a.Config = cfg
return nil
}
func (a *App) SetupLogging() error {
if a.Config == nil {
return errors.New("Config is nil; call LoadConfig first")
}
cfg := a.Config
// Use a non-json configuration configuration
c := zap.NewDevelopmentConfig()
// Use the keys used by cloud logging
// https://cloud.google.com/logging/docs/structured-logging
c.EncoderConfig.LevelKey = "severity"
c.EncoderConfig.TimeKey = "time"
c.EncoderConfig.MessageKey = "message"
lvl := cfg.GetLogLevel()
zapLvl := zap.NewAtomicLevel()
if err := zapLvl.UnmarshalText([]byte(lvl)); err != nil {
return errors.Wrapf(err, "Could not convert level %v to ZapLevel", lvl)
}
c.Level = zapLvl
newLogger, err := c.Build()
if err != nil {
panic(fmt.Sprintf("Failed to initialize zap logger; error %v", err))
}
zap.ReplaceGlobals(newLogger)
return nil
}
// SetupRegistry sets up the registry with a list of registered controllers
func (a *App) SetupRegistry() error {
if a.Config == nil {
return errors.New("Config is nil; call LoadConfig first")
}
a.Registry = &controllers.Registry{}
// Register controllers
image, err := images.NewController()
if err != nil {
return err
}
if err := a.Registry.Register(v1alpha1.ImageGVK, image); err != nil {
return err
}
replicator, err := images.NewReplicator()
if err != nil {
return err
}
if err := a.Registry.Register(v1alpha1.ReplicatedImageGVK, replicator); err != nil {
return err
}
releaser, err := github.NewReleaser(*a.Config)
if err != nil {
return err
}
if err := a.Registry.Register(v1alpha1.GitHubReleaserGVK, releaser); err != nil {
return err
}
return nil
}
// ApplyPaths applies the resources in the specified paths.
// Paths can be files or directories.
func (a *App) ApplyPaths(ctx context.Context, paths []string, period time.Duration, force bool) error {
log := util.LogFromContext(ctx)
if a.Config.GitHub == nil {
return errors.New("GitHub configuration is missing; You need to run hydros config set github.appID and hydros config set github.privateKey")
}
if a.Config.GitHub.PrivateKey == "" {
return errors.New("GitHub configuration is missing github.privateKey; You need to run hydros config set github.privateKey")
}
if a.Config.GitHub.AppID <= 0 {
return errors.New("GitHub configuration is missing github.appID; You need to run hydros config set github.appID")
}
for _, resourcePath := range paths {
newPaths, err := util.FindYamlFiles(resourcePath)
if err != nil {
log.Error(err, "Failed to find YAML files", "path", resourcePath)
return err
}
paths = append(paths, newPaths...)
}
syncNames := map[string]string{}
for _, path := range paths {
err := a.apply(ctx, path, syncNames, period, force)
if err != nil {
log.Error(err, "Apply failed", "path", path)
}
}
if len(syncNames) == 0 {
err := fmt.Errorf("No hydros resources found")
log.Error(err, "No hydros resources found", "paths", paths)
return err
}
// Wait for ever
if period > 0 {
select {}
}
return nil
}
func (a *App) apply(ctx context.Context, path string, syncNames map[string]string, period time.Duration, force bool) error {
if a.Registry == nil {
return errors.New("Registry is nil; call SetupRegistry first")
}
log := zapr.NewLogger(zap.L())
log.Info("Reading file", "path", path)
rNodes, err := util.ReadYaml(path)
if err != nil {
return err
}
allErrors := &util.ListOfErrors{
Causes: []error{},
}
for _, n := range rNodes {
m, err := n.GetMeta()
if err != nil {
log.Error(err, "Failed to get metadata", "n", n)
continue
}
log.Info("Read resource", "meta", m)
switch m.Kind {
case v1alpha1.ManifestSyncKind:
manifestSync := &v1alpha1.ManifestSync{}
err := n.Document().Decode(manifestSync)
if err != nil {
log.Error(err, "Failed to decode ManifestSync")
continue
}
name := manifestSync.Metadata.Name
if f, ok := syncNames[name]; ok {
err := fmt.Errorf("Multiple ManifestSync with name %v", name)
log.Error(err, "Multiple ManifestSync objects found with the same name", "name", name, "using", f, "new", path)
allErrors.AddCause(err)
continue
}
syncNames[name] = path
secret, err := files.Read(a.Config.GitHub.PrivateKey)
if err != nil {
return errors.Wrapf(err, "Could not read file: %v", a.Config.GitHub.PrivateKey)
}
manager, err := github.NewTransportManager(int64(a.Config.GitHub.AppID), secret, log)
if err != nil {
log.Error(err, "TransportManager creation failed")
return err
}
syncer, err := gitops.NewSyncer(manifestSync, manager, gitops.SyncWithWorkDir(a.Config.GetWorkDir()), gitops.SyncWithLogger(log))
if err != nil {
log.Error(err, "Failed to create syncer")
allErrors.AddCause(err)
continue
}
if period > 0 {
go syncer.RunPeriodically(period)
} else {
if err := syncer.RunOnce(force); err != nil {
log.Error(err, "Failed to run Sync")
allErrors.AddCause(err)
}
}
case v1alpha1.RepoGVK.Kind:
syncNames[m.Name] = path
repo := &v1alpha1.RepoConfig{}
if err := n.YNode().Decode(&repo); err != nil {
log.Error(err, "Failed to decode RepoConfig")
allErrors.AddCause(err)
continue
}
c, err := gitops.NewRepoController(*a.Config, a.Registry, repo)
if err != nil {
return err
}
if period > 0 {
go c.RunPeriodically(context.Background(), period)
} else {
if err := c.Reconcile(context.Background()); err != nil {
return err
}
}
case v1alpha1.EcrPolicySyncKind:
syncNames[m.Name] = path
region := "us-west-2"
log.Info("Creating a default AWS session", "region", region)
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
})
if err != nil {
log.Error(err, "Failed to create AWS session")
allErrors.AddCause(err)
continue
}
c, err := ecrutil.NewEcrPolicySyncController(sess, ecrutil.EcrPolicySyncWithLogger(log))
if err != nil {
log.Error(err, "Failed to create EcrPolicySyncController")
allErrors.AddCause(err)
continue
}
for {
if err := c.Apply(n); err != nil {
log.Error(err, "Failed to create apply resource", "name", m.Name)
allErrors.AddCause(err)
continue
}
if period == 0 {
break
}
log.Info("Sleep", "duration", period)
time.Sleep(period)
}
default:
// Going forward we should be using the registry
gvk := schema.FromAPIVersionAndKind(m.APIVersion, m.Kind)
controller, err := a.Registry.GetController(gvk)
if err != nil {
log.Error(err, "Unsupported kind", "gvk", gvk)
allErrors.AddCause(err)
continue
}
if err := controller.ReconcileNode(ctx, n); err != nil {
log.Error(err, "Failed to reconcile resource", "name", m.Name, "namespace", m.Namespace, "gvk", gvk)
allErrors.AddCause(err)
}
}
}
if len(allErrors.Causes) == 0 {
return nil
}
allErrors.Final = fmt.Errorf("failed to apply one or more resources")
return allErrors
}