generated from kubernetes/kubernetes-template-project
-
Notifications
You must be signed in to change notification settings - Fork 69
/
agent.go
437 lines (414 loc) · 13.6 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"time"
"github.com/sirupsen/logrus"
"gopkg.in/fsnotify.v1"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/prow/pkg/interrupts"
)
// Delta represents the before and after states of a Config change detected by the Agent.
type Delta struct {
Before, After Config
}
// DeltaChan is a channel to receive config delta events when config changes.
type DeltaChan = chan<- Delta
// Agent watches a path and automatically loads the config stored
// therein.
type Agent struct {
mut sync.RWMutex // do not export Lock, etc methods
c *Config
subscriptions []DeltaChan
}
// IsConfigMapMount determines whether the provided directory is a configmap mounted directory
func IsConfigMapMount(path string) (bool, error) {
files, err := os.ReadDir(path)
if err != nil {
return false, fmt.Errorf("Could not read provided directory %s: %w", path, err)
}
for _, file := range files {
if file.Name() == "..data" {
return true, nil
}
}
return false, nil
}
// GetCMMountWatcher returns a function that watches a configmap mounted directory and runs the provided "eventFunc" every time
// the directory gets updated and the provided "errFunc" every time it encounters an error.
// Example of a possible eventFunc:
//
// func() error {
// value, err := RunUpdate()
// if err != nil {
// return err
// }
// globalValue = value
// return nil
// }
//
// Example of errFunc:
//
// func(err error, msg string) {
// logrus.WithError(err).Error(msg)
// }
func GetCMMountWatcher(eventFunc func() error, errFunc func(error, string), path string) (func(ctx context.Context), error) {
isCMMount, err := IsConfigMapMount(path)
if err != nil {
return nil, err
} else if !isCMMount {
return nil, fmt.Errorf("Provided directory %s is not a configmap directory", path)
}
w, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
err = w.Add(path)
if err != nil {
return nil, err
}
logrus.Debugf("Watching %s", path)
dataPath := filepath.Join(path, "..data")
return func(ctx context.Context) {
for {
select {
case <-ctx.Done():
if err := w.Close(); err != nil {
errFunc(err, fmt.Sprintf("failed to close fsnotify watcher for directory %s", path))
}
return
case event := <-w.Events:
if event.Name == dataPath && event.Op == fsnotify.Create {
err := eventFunc()
if err != nil {
errFunc(err, fmt.Sprintf("event function for watch directory %s failed", path))
}
}
case err := <-w.Errors:
errFunc(err, fmt.Sprintf("received fsnotify error for directory %s", path))
}
}
}, nil
}
// GetFileWatcher returns a function that watches the specified file(s), running the "eventFunc" whenever an event for the file(s) occurs
// and the "errFunc" whenever an error is encountered. In this function, the eventFunc has access to the watcher, allowing the eventFunc
// to add new files/directories to be watched as needed.
// Example of a possible eventFunc:
//
// func(w *fsnotify.Watcher) error {
// value, err := RunUpdate()
// if err != nil {
// return err
// }
// globalValue = value
// newFiles := getNewFiles()
// for _, file := range newFiles {
// if err := w.Add(file); err != nil {
// return err
// }
// }
// return nil
// }
//
// Example of errFunc:
//
// func(err error, msg string) {
// logrus.WithError(err).Error(msg)
// }
func GetFileWatcher(eventFunc func(*fsnotify.Watcher) error, errFunc func(error, string), files ...string) (func(ctx context.Context), error) {
w, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
for _, file := range files {
if err := w.Add(file); err != nil {
return nil, err
}
}
logrus.Debugf("Watching %d files", len(files))
logrus.Tracef("Watching files: %v", files)
return func(ctx context.Context) {
for {
select {
case <-ctx.Done():
if err := w.Close(); err != nil {
errFunc(err, fmt.Sprintf("failed to close fsnotify watcher for files: %v", files))
}
return
case <-w.Events:
err := eventFunc(w)
if err != nil {
errFunc(err, fmt.Sprintf("event function failed watching files: %v", files))
}
case err := <-w.Errors:
errFunc(err, fmt.Sprintf("received fsnotify error watching files: %v", files))
}
}
}, nil
}
// ListCMsAndDirs returns a 2 sets of strings containing the paths of configmapped directories and standard
// directories respectively starting from the provided path. This can be used to watch a large number of
// files, some of which may be populated via configmaps
func ListCMsAndDirs(path string) (cms sets.Set[string], dirs sets.Set[string], err error) {
cms = sets.New[string]()
dirs = sets.New[string]()
err = filepath.Walk(path, func(path string, info os.FileInfo, _ error) error {
// We only need to watch directories as creation, deletion, and writes
// for files in a directory trigger events for the directory
if info != nil && info.IsDir() {
if isCM, err := IsConfigMapMount(path); err != nil {
return fmt.Errorf("Failed to check is path %s is configmap mounted: %w", path, err)
} else if isCM {
cms.Insert(path)
// configmaps can't have nested directories
return filepath.SkipDir
} else {
dirs.Insert(path)
return nil
}
}
return nil
})
return cms, dirs, err
}
func watchConfigs(ca *Agent, prowConfig, jobConfig string, supplementalProwConfigDirs []string, supplementalProwConfigsFileNameSuffix string, additionals ...func(*Config) error) error {
cmEventFunc := func() error {
c, err := Load(prowConfig, jobConfig, supplementalProwConfigDirs, supplementalProwConfigsFileNameSuffix, additionals...)
if err != nil {
return err
}
ca.Set(c)
return nil
}
// We may need to add more directories to be watched
dirsEventFunc := func(w *fsnotify.Watcher) error {
c, err := Load(prowConfig, jobConfig, supplementalProwConfigDirs, supplementalProwConfigsFileNameSuffix, additionals...)
if err != nil {
return err
}
ca.Set(c)
// TODO(AlexNPavel): Is there a chance that a ConfigMap mounted directory may appear without making a new pod? If yes, handle that.
_, dirs, err := ListCMsAndDirs(jobConfig)
if err != nil {
return err
}
for _, supplementalProwConfigDir := range supplementalProwConfigDirs {
_, additionalDirs, err := ListCMsAndDirs(supplementalProwConfigDir)
if err != nil {
return err
}
dirs.Insert(additionalDirs.UnsortedList()...)
}
for dir := range dirs {
// Adding a file or directory that already exists in fsnotify is a no-op, so it is safe to always run Add
if err := w.Add(dir); err != nil {
return err
}
}
return nil
}
errFunc := func(err error, msg string) {
logrus.WithField("prowConfig", prowConfig).
WithField("jobConfig", jobConfig).
WithError(err).Error(msg)
}
cms := sets.New[string]()
dirs := sets.New[string]()
// TODO(AlexNPavel): allow empty jobConfig till fully migrate config to subdirs
if jobConfig != "" {
stat, err := os.Stat(jobConfig)
if err != nil {
return err
}
// TODO(AlexNPavel): allow single file jobConfig till fully migrate config to subdirs
if stat.IsDir() {
var err error
// jobConfig points to directories of configs that may be nested
cms, dirs, err = ListCMsAndDirs(jobConfig)
if err != nil {
return err
}
} else {
// If jobConfig is a single file, we handle it identically to how prowConfig is handled
if jobIsCMMounted, err := IsConfigMapMount(filepath.Dir(jobConfig)); err != nil {
return err
} else if jobIsCMMounted {
cms.Insert(filepath.Dir(jobConfig))
} else {
dirs.Insert(jobConfig)
}
}
}
// The prow config is always a single file
if prowIsCMMounted, err := IsConfigMapMount(filepath.Dir(prowConfig)); err != nil {
return err
} else if prowIsCMMounted {
cms.Insert(filepath.Dir(prowConfig))
} else {
dirs.Insert(prowConfig)
}
var runFuncs []func(context.Context)
for cm := range cms {
runFunc, err := GetCMMountWatcher(cmEventFunc, errFunc, cm)
if err != nil {
return err
}
runFuncs = append(runFuncs, runFunc)
}
if len(dirs) > 0 {
runFunc, err := GetFileWatcher(dirsEventFunc, errFunc, dirs.UnsortedList()...)
if err != nil {
return err
}
runFuncs = append(runFuncs, runFunc)
}
for _, runFunc := range runFuncs {
interrupts.Run(runFunc)
}
return nil
}
// StartWatch will begin watching the config files at the provided paths. If the
// first load fails, Start will return the error and abort. Future load failures
// will log the failure message but continue attempting to load.
// This function will replace Start in a future release.
func (ca *Agent) StartWatch(prowConfig, jobConfig string, supplementalProwConfigDirs []string, supplementalProwConfigsFileNameSuffix string, additionals ...func(*Config) error) error {
c, err := Load(prowConfig, jobConfig, supplementalProwConfigDirs, supplementalProwConfigsFileNameSuffix, additionals...)
if err != nil {
return err
}
ca.Set(c)
watchConfigs(ca, prowConfig, jobConfig, supplementalProwConfigDirs, supplementalProwConfigsFileNameSuffix, additionals...)
return nil
}
func lastConfigModTime(prowConfig, jobConfig string) (time.Time, error) {
// Check if the file changed to see if it needs to be re-read.
// os.Stat follows symbolic links, which is how ConfigMaps work.
prowStat, err := os.Stat(prowConfig)
if err != nil {
logrus.WithField("prowConfig", prowConfig).WithError(err).Error("Error loading prow config.")
return time.Time{}, err
}
recentModTime := prowStat.ModTime()
// TODO(krzyzacy): allow empty jobConfig till fully migrate config to subdirs
if jobConfig != "" {
jobConfigStat, err := os.Stat(jobConfig)
if err != nil {
logrus.WithField("jobConfig", jobConfig).WithError(err).Error("Error loading job configs.")
return time.Time{}, err
}
if jobConfigStat.ModTime().After(recentModTime) {
recentModTime = jobConfigStat.ModTime()
}
}
return recentModTime, nil
}
// Start will begin polling the config file at the path. If the first load
// fails, Start will return the error and abort. Future load failures will log
// the failure message but continue attempting to load.
func (ca *Agent) Start(prowConfig, jobConfig string, additionalProwConfigDirs []string, supplementalProwConfigsFileNameSuffix string, additionals ...func(*Config) error) error {
lastModTime, err := lastConfigModTime(prowConfig, jobConfig)
if err != nil {
lastModTime = time.Time{}
}
c, err := Load(prowConfig, jobConfig, additionalProwConfigDirs, supplementalProwConfigsFileNameSuffix, additionals...)
if err != nil {
return err
}
ca.Set(c)
go func() {
// Rarely, if two changes happen in the same second, mtime will
// be the same for the second change, and an mtime-based check would
// fail. Reload periodically just in case.
skips := 0
for range time.Tick(1 * time.Second) {
if skips < 600 {
recentModTime, err := lastConfigModTime(prowConfig, jobConfig)
if err != nil {
continue
}
if !recentModTime.After(lastModTime) {
skips++
continue // file hasn't been modified
}
lastModTime = recentModTime
}
if c, err := Load(prowConfig, jobConfig, additionalProwConfigDirs, supplementalProwConfigsFileNameSuffix, additionals...); err != nil {
logrus.WithField("prowConfig", prowConfig).
WithField("jobConfig", jobConfig).
WithError(err).Error("Error loading config.")
} else {
skips = 0
ca.Set(c)
}
}
}()
return nil
}
// Subscribe registers the channel for messages on config reload.
// The caller can expect a copy of the previous and current config
// to be sent down the subscribed channel when a new configuration
// is loaded.
func (ca *Agent) Subscribe(subscription DeltaChan) {
ca.mut.Lock()
defer ca.mut.Unlock()
ca.subscriptions = append(ca.subscriptions, subscription)
}
// Getter returns the current Config in a thread-safe manner.
type Getter func() *Config
// Config returns the latest config. Do not modify the config.
func (ca *Agent) Config() *Config {
ca.mut.RLock()
defer ca.mut.RUnlock()
return ca.c
}
// Set sets the config. Useful for testing.
// Also used by statusreconciler to load last known config
func (ca *Agent) Set(c *Config) {
ca.mut.Lock()
defer ca.mut.Unlock()
var oldConfig Config
if ca.c != nil {
oldConfig = *ca.c
}
delta := Delta{oldConfig, *c}
ca.c = c
for _, subscription := range ca.subscriptions {
go func(sub DeltaChan) { // wait a minute to send each event
end := time.NewTimer(time.Minute)
select {
case sub <- delta:
case <-end.C:
}
if !end.Stop() { // prevent new events
<-end.C // drain the pending event
}
}(subscription)
}
}
// SetWithoutBroadcast sets the config, but does not broadcast the event to
// those listening for config reload changes. This is useful if you want to
// modify the Config in the Agent, from the point of view of the subscriber to
// the new one that was detected from the DeltaChan; if you just used Set()
// instead of this in such a situation, you would end up clogging the DeltaChan
// because you would be acting as both the consumer and producer of the
// DeltaChan.
func (ca *Agent) SetWithoutBroadcast(c *Config) {
ca.mut.Lock()
defer ca.mut.Unlock()
ca.c = c
}