forked from istio/istio
-
Notifications
You must be signed in to change notification settings - Fork 1
/
watcher.go
151 lines (132 loc) · 3.98 KB
/
watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Copyright 2017 Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package envoy
import (
"context"
"crypto/sha256"
"hash"
"io/ioutil"
"os"
"time"
"github.com/howeyc/fsnotify"
"istio.io/istio/pkg/log"
)
const (
// defaultMinDelay is the minimum amount of time between delivery of two successive events via updateFunc.
defaultMinDelay = 10 * time.Second
)
// Watcher triggers reloads on changes to the proxy config
type Watcher interface {
// Run the watcher loop (blocking call)
Run(context.Context)
}
type watcher struct {
certs []string
updates chan<- interface{}
}
// NewWatcher creates a new watcher instance from a proxy agent and a set of monitored certificate file paths
func NewWatcher(
certs []string,
updates chan<- interface{}) Watcher {
return &watcher{
certs: certs,
updates: updates,
}
}
func (w *watcher) Run(ctx context.Context) {
// kick start the proxy with partial state (in case there are no notifications coming)
w.SendConfig()
// monitor certificates
go watchCerts(ctx, w.certs, watchFileEvents, defaultMinDelay, w.SendConfig)
<-ctx.Done()
log.Info("Watcher has successfully terminated")
}
func (w *watcher) SendConfig() {
h := sha256.New()
generateCertHash(h, w.certs)
w.updates <- h.Sum(nil)
}
type watchFileEventsFn func(ctx context.Context, wch <-chan *fsnotify.FileEvent,
minDelay time.Duration, notifyFn func())
// watchFileEvents watches for changes on a channel and notifies via notifyFn().
// The function batches changes so that related changes are processed together.
// The function ensures that notifyFn() is called no more than one time per minDelay.
// The function does not return until the the context is canceled.
func watchFileEvents(ctx context.Context, wch <-chan *fsnotify.FileEvent, minDelay time.Duration, notifyFn func()) {
// timer and channel for managing minDelay.
var timeChan <-chan time.Time
var timer *time.Timer
for {
select {
case ev := <-wch:
log.Infof("watchFileEvents: %s", ev.String())
if timer != nil {
continue
}
// create new timer
timer = time.NewTimer(minDelay)
timeChan = timer.C
case <-timeChan:
// reset timer
timeChan = nil
timer.Stop()
timer = nil
log.Info("watchFileEvents: notifying")
notifyFn()
case <-ctx.Done():
log.Info("watchFileEvents has successfully terminated")
return
}
}
}
// watchCerts watches all certificates and calls the provided
// `updateFunc` method when changes are detected. This method is blocking
// so it should be run as a goroutine.
// updateFunc will not be called more than one time per minDelay.
func watchCerts(ctx context.Context, certs []string, watchFileEventsFn watchFileEventsFn,
minDelay time.Duration, updateFunc func()) {
fw, err := fsnotify.NewWatcher()
if err != nil {
log.Warnf("failed to create a watcher for certificate files: %v", err)
return
}
defer func() {
if err := fw.Close(); err != nil {
log.Warnf("closing watcher encounters an error %v", err)
}
}()
// watch all files
for _, c := range certs {
if err := fw.Watch(c); err != nil {
log.Warnf("watching %s encounters an error %v", c, err)
return
}
log.Infof("watching %s for changes", c)
}
watchFileEventsFn(ctx, fw.Event, minDelay, updateFunc)
}
func generateCertHash(h hash.Hash, certs []string) {
for _, cert := range certs {
if _, err := os.Stat(cert); os.IsNotExist(err) {
continue
}
bs, err := ioutil.ReadFile(cert)
if err != nil {
continue
}
if _, err := h.Write(bs); err != nil {
log.Warna(err)
}
}
}