-
Notifications
You must be signed in to change notification settings - Fork 876
/
monitor.go
235 lines (196 loc) · 7.81 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package webhookconfig
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/event"
"github.com/kyverno/kyverno/pkg/tls"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
)
//maxRetryCount defines the max deadline count
const (
tickerInterval time.Duration = 30 * time.Second
idleCheckInterval time.Duration = 60 * time.Second
idleDeadline time.Duration = idleCheckInterval * 5
)
// Monitor stores the last webhook request time and monitors registered webhooks.
//
// If a webhook is not received in the idleCheckInterval the monitor triggers a
// change in the Kyverno deployment to force a webhook request. If no requests
// are received after idleDeadline the webhooks are deleted and re-registered.
//
// Each instance has an in-memory flag lastSeenRequestTime, recording the last
// received admission timestamp by the current instance. And the latest timestamp
// (latestTimestamp) is recorded in the annotation of the Kyverno deployment,
// this annotation could be updated by any instance. If the duration from
// latestTimestamp is longer than idleCheckInterval, the monitor triggers an
// annotation update; otherwise lastSeenRequestTime is updated to latestTimestamp.
//
//
// Webhook configurations are checked every tickerInterval across all instances.
// Currently the check only queries for the expected resource name, and does
// not compare other details like the webhook settings.
//
type Monitor struct {
// leaseClient is used to manage Kyverno lease
leaseClient coordinationv1.LeaseInterface
// lastSeenRequestTime records the timestamp
// of the latest received admission request
lastSeenRequestTime time.Time
mu sync.RWMutex
log logr.Logger
}
// NewMonitor returns a new instance of webhook monitor
func NewMonitor(kubeClient kubernetes.Interface, log logr.Logger) (*Monitor, error) {
monitor := &Monitor{
leaseClient: kubeClient.CoordinationV1().Leases(config.KyvernoNamespace),
lastSeenRequestTime: time.Now(),
log: log,
}
return monitor, nil
}
// Time returns the last request time
func (t *Monitor) Time() time.Time {
t.mu.RLock()
defer t.mu.RUnlock()
return t.lastSeenRequestTime
}
// SetTime updates the last request time
func (t *Monitor) SetTime(tm time.Time) {
t.mu.Lock()
defer t.mu.Unlock()
t.lastSeenRequestTime = tm
}
// Run runs the checker and verify the resource update
func (t *Monitor) Run(register *Register, certRenewer *tls.CertRenewer, eventGen event.Interface, stopCh <-chan struct{}) {
logger := t.log.WithName("webhookMonitor")
logger.V(3).Info("starting webhook monitor", "interval", idleCheckInterval.String())
status := newStatusControl(t.leaseClient, eventGen, logger.WithName("WebhookStatusControl"))
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
createDefaultWebhook := register.createDefaultWebhook
for {
select {
case webhookKind := <-createDefaultWebhook:
logger.Info("received recreation request for resource webhook")
if webhookKind == kindMutating {
err := register.createResourceMutatingWebhookConfiguration(register.readCaData())
if err != nil {
logger.Error(err, "failed to create default MutatingWebhookConfiguration for resources, the webhook will be reconciled", "interval", tickerInterval)
}
} else if webhookKind == kindValidating {
err := register.createResourceValidatingWebhookConfiguration(register.readCaData())
if err != nil {
logger.Error(err, "failed to create default ValidatingWebhookConfiguration for resources, the webhook will be reconciled", "interval", tickerInterval)
}
}
case <-ticker.C:
err := registerWebhookIfNotPresent(register, t.log.WithName("registerWebhookIfNotPresent"))
if err != nil {
t.log.Error(err, "")
}
// update namespaceSelector every 30 seconds
go func() {
if register.autoUpdateWebhooks {
logger.V(4).Info("updating webhook configurations for namespaceSelector with latest kyverno ConfigMap")
register.UpdateWebhookChan <- true
}
}()
timeDiff := time.Since(t.Time())
lastRequestTimeFromAnn := lastRequestTimeFromAnnotation(t.leaseClient, t.log.WithName("lastRequestTimeFromAnnotation"))
if lastRequestTimeFromAnn == nil {
if err := status.UpdateLastRequestTimestmap(t.Time()); err != nil {
logger.Error(err, "failed to annotate deployment for lastRequestTime")
} else {
logger.Info("initialized lastRequestTimestamp", "time", t.Time())
}
continue
}
switch {
case timeDiff > idleDeadline:
err := fmt.Errorf("webhook hasn't received requests in %v, updating Kyverno to verify webhook status", idleDeadline.String())
logger.Error(err, "webhook check failed", "time", t.Time(), "lastRequestTimestamp", lastRequestTimeFromAnn)
// update deployment to renew lastSeenRequestTime
if err := status.failure(); err != nil {
logger.Error(err, "failed to annotate deployment webhook status to failure")
if err := register.Register(); err != nil {
logger.Error(err, "Failed to register webhooks")
}
}
continue
case timeDiff > 2*idleCheckInterval:
if skipWebhookCheck(register, logger.WithName("skipWebhookCheck")) {
logger.Info("skip validating webhook status, Kyverno is in rolling update")
continue
}
if t.Time().Before(*lastRequestTimeFromAnn) {
t.SetTime(*lastRequestTimeFromAnn)
logger.V(3).Info("updated in-memory timestamp", "time", lastRequestTimeFromAnn)
}
}
idleT := time.Since(*lastRequestTimeFromAnn)
if idleT > idleCheckInterval {
if t.Time().After(*lastRequestTimeFromAnn) {
logger.V(3).Info("updating annotation lastRequestTimestamp with the latest in-memory timestamp", "time", t.Time(), "lastRequestTimestamp", lastRequestTimeFromAnn)
if err := status.UpdateLastRequestTimestmap(t.Time()); err != nil {
logger.Error(err, "failed to update lastRequestTimestamp annotation")
}
}
}
// if the status was false before then we update it to true
// send request to update the Kyverno deployment
if err := status.success(); err != nil {
logger.Error(err, "failed to annotate deployment webhook status to success")
}
case <-stopCh:
// handler termination signal
logger.V(2).Info("stopping webhook monitor")
return
}
}
}
func registerWebhookIfNotPresent(register *Register, logger logr.Logger) error {
if skipWebhookCheck(register, logger.WithName("skipWebhookCheck")) {
logger.Info("skip validating webhook status, Kyverno is in rolling update")
return nil
}
if err := register.Check(); err != nil {
logger.Error(err, "missing webhooks")
if err := register.Register(); err != nil {
return errors.Wrap(err, "failed to register webhooks")
}
}
return nil
}
func lastRequestTimeFromAnnotation(leaseClient coordinationv1.LeaseInterface, logger logr.Logger) *time.Time {
lease, err := leaseClient.Get(context.TODO(), "kyverno", metav1.GetOptions{})
if err != nil {
logger.Info("Lease 'kyverno' not found. Starting clean-up...")
}
timeStamp := lease.GetAnnotations()
if timeStamp == nil {
logger.Info("timestamp not set in the annotation, setting")
return nil
}
annTime, err := time.Parse(time.RFC3339, timeStamp[annLastRequestTime])
if err != nil {
logger.Error(err, "failed to parse timestamp annotation", "timeStamp", timeStamp[annLastRequestTime])
return nil
}
return &annTime
}
// skipWebhookCheck returns true if Kyverno is in rolling update
func skipWebhookCheck(register *Register, logger logr.Logger) bool {
deploy, err := register.GetKubePolicyDeployment()
if err != nil {
logger.Info("unable to get Kyverno deployment", "reason", err.Error())
return false
}
return tls.IsKyvernoInRollingUpdate(deploy, logger)
}