-
Notifications
You must be signed in to change notification settings - Fork 398
/
dynamic_serving_content.go
233 lines (191 loc) · 6.85 KB
/
dynamic_serving_content.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamiccertificates
import (
"context"
"crypto/tls"
"fmt"
"io/ioutil"
"sync/atomic"
"time"
"github.com/fsnotify/fsnotify"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
// DynamicCertKeyPairContent provides a CertKeyContentProvider that can dynamically react to new file content
type DynamicCertKeyPairContent struct {
name string
// certFile is the name of the certificate file to read.
certFile string
// keyFile is the name of the key file to read.
keyFile string
// certKeyPair is a certKeyContent that contains the last read, non-zero length content of the key and cert
certKeyPair atomic.Value
listeners []Listener
// queue only ever has one item, but it has nice error handling backoff/retry semantics
queue workqueue.RateLimitingInterface
}
var _ CertKeyContentProvider = &DynamicCertKeyPairContent{}
var _ ControllerRunner = &DynamicCertKeyPairContent{}
// NewDynamicServingContentFromFiles returns a dynamic CertKeyContentProvider based on a cert and key filename
func NewDynamicServingContentFromFiles(purpose, certFile, keyFile string) (*DynamicCertKeyPairContent, error) {
if len(certFile) == 0 || len(keyFile) == 0 {
return nil, fmt.Errorf("missing filename for serving cert")
}
name := fmt.Sprintf("%s::%s::%s", purpose, certFile, keyFile)
ret := &DynamicCertKeyPairContent{
name: name,
certFile: certFile,
keyFile: keyFile,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("DynamicCABundle-%s", purpose)),
}
if err := ret.loadCertKeyPair(); err != nil {
return nil, err
}
return ret, nil
}
// AddListener adds a listener to be notified when the serving cert content changes.
func (c *DynamicCertKeyPairContent) AddListener(listener Listener) {
c.listeners = append(c.listeners, listener)
}
// loadCertKeyPair determines the next set of content for the file.
func (c *DynamicCertKeyPairContent) loadCertKeyPair() error {
cert, err := ioutil.ReadFile(c.certFile)
if err != nil {
return err
}
key, err := ioutil.ReadFile(c.keyFile)
if err != nil {
return err
}
if len(cert) == 0 || len(key) == 0 {
return fmt.Errorf("missing content for serving cert %q", c.Name())
}
// Ensure that the key matches the cert and both are valid
_, err = tls.X509KeyPair(cert, key)
if err != nil {
return err
}
newCertKey := &certKeyContent{
cert: cert,
key: key,
}
// check to see if we have a change. If the values are the same, do nothing.
existing, ok := c.certKeyPair.Load().(*certKeyContent)
if ok && existing != nil && existing.Equal(newCertKey) {
return nil
}
c.certKeyPair.Store(newCertKey)
klog.V(2).InfoS("Loaded a new cert/key pair", "name", c.Name())
for _, listener := range c.listeners {
listener.Enqueue()
}
return nil
}
// RunOnce runs a single sync loop
func (c *DynamicCertKeyPairContent) RunOnce(ctx context.Context) error {
return c.loadCertKeyPair()
}
// Run starts the controller and blocks until context is killed.
func (c *DynamicCertKeyPairContent) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.InfoS("Starting controller", "name", c.name)
defer klog.InfoS("Shutting down controller", "name", c.name)
// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, ctx.Done())
// start the loop that watches the cert and key files until stopCh is closed.
go wait.Until(func() {
if err := c.watchCertKeyFile(ctx.Done()); err != nil {
klog.ErrorS(err, "Failed to watch cert and key file, will retry later")
}
}, time.Minute, ctx.Done())
<-ctx.Done()
}
func (c *DynamicCertKeyPairContent) watchCertKeyFile(stopCh <-chan struct{}) error {
// Trigger a check here to ensure the content will be checked periodically even if the following watch fails.
c.queue.Add(workItemKey)
w, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("error creating fsnotify watcher: %v", err)
}
defer w.Close()
if err := w.Add(c.certFile); err != nil {
return fmt.Errorf("error adding watch for file %s: %v", c.certFile, err)
}
if err := w.Add(c.keyFile); err != nil {
return fmt.Errorf("error adding watch for file %s: %v", c.keyFile, err)
}
// Trigger a check in case the file is updated before the watch starts.
c.queue.Add(workItemKey)
for {
select {
case e := <-w.Events:
if err := c.handleWatchEvent(e, w); err != nil {
return err
}
case err := <-w.Errors:
return fmt.Errorf("received fsnotify error: %v", err)
case <-stopCh:
return nil
}
}
}
// handleWatchEvent triggers reloading the cert and key file, and restarts a new watch if it's a Remove or Rename event.
// If one file is updated before the other, the loadCertKeyPair method will catch the mismatch and will not apply the
// change. When an event of the other file is received, it will trigger reloading the files again and the new content
// will be loaded and used.
func (c *DynamicCertKeyPairContent) handleWatchEvent(e fsnotify.Event, w *fsnotify.Watcher) error {
// This should be executed after restarting the watch (if applicable) to ensure no file event will be missing.
defer c.queue.Add(workItemKey)
if !e.Has(fsnotify.Remove) && !e.Has(fsnotify.Rename) {
return nil
}
if err := w.Remove(e.Name); err != nil {
klog.InfoS("Failed to remove file watch, it may have been deleted", "file", e.Name, "err", err)
}
if err := w.Add(e.Name); err != nil {
return fmt.Errorf("error adding watch for file %s: %v", e.Name, err)
}
return nil
}
func (c *DynamicCertKeyPairContent) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *DynamicCertKeyPairContent) processNextWorkItem() bool {
dsKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)
err := c.loadCertKeyPair()
if err == nil {
c.queue.Forget(dsKey)
return true
}
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
c.queue.AddRateLimited(dsKey)
return true
}
// Name is just an identifier
func (c *DynamicCertKeyPairContent) Name() string {
return c.name
}
// CurrentCertKeyContent provides cert and key byte content
func (c *DynamicCertKeyPairContent) CurrentCertKeyContent() ([]byte, []byte) {
certKeyContent := c.certKeyPair.Load().(*certKeyContent)
return certKeyContent.cert, certKeyContent.key
}