-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
cluster_predicates.go
275 lines (234 loc) · 11.1 KB
/
cluster_predicates.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package predicates implements predicate utilities.
package predicates
import (
"fmt"
"github.com/go-logr/logr"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util/conditions"
)
// ClusterCreateInfraReady returns a predicate that returns true for a create event when a cluster has Status.InfrastructureReady set as true
// it also returns true if the resource provided is not a Cluster to allow for use with controller-runtime NewControllerManagedBy.
func ClusterCreateInfraReady(logger logr.Logger) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
log := logger.WithValues("predicate", "ClusterCreateInfraReady", "eventType", "create")
c, ok := e.Object.(*clusterv1.Cluster)
if !ok {
log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.Object))
return false
}
log = log.WithValues("Cluster", klog.KObj(c))
// Only need to trigger a reconcile if the Cluster.Status.InfrastructureReady is true
if c.Status.InfrastructureReady {
log.V(6).Info("Cluster infrastructure is ready, allowing further processing")
return true
}
log.V(4).Info("Cluster infrastructure is not ready, blocking further processing")
return false
},
UpdateFunc: func(e event.UpdateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
GenericFunc: func(e event.GenericEvent) bool { return false },
}
}
// ClusterCreateNotPaused returns a predicate that returns true for a create event when a cluster has Spec.Paused set as false
// it also returns true if the resource provided is not a Cluster to allow for use with controller-runtime NewControllerManagedBy.
func ClusterCreateNotPaused(logger logr.Logger) predicate.Funcs {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
log := logger.WithValues("predicate", "ClusterCreateNotPaused", "eventType", "create")
c, ok := e.Object.(*clusterv1.Cluster)
if !ok {
log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.Object))
return false
}
log = log.WithValues("Cluster", klog.KObj(c))
// Only need to trigger a reconcile if the Cluster.Spec.Paused is false
if !c.Spec.Paused {
log.V(6).Info("Cluster is not paused, allowing further processing")
return true
}
log.V(4).Info("Cluster is paused, blocking further processing")
return false
},
UpdateFunc: func(e event.UpdateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
GenericFunc: func(e event.GenericEvent) bool { return false },
}
}
// ClusterUpdateInfraReady returns a predicate that returns true for an update event when a cluster has Status.InfrastructureReady changed from false to true
// it also returns true if the resource provided is not a Cluster to allow for use with controller-runtime NewControllerManagedBy.
func ClusterUpdateInfraReady(logger logr.Logger) predicate.Funcs {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
log := logger.WithValues("predicate", "ClusterUpdateInfraReady", "eventType", "update")
oldCluster, ok := e.ObjectOld.(*clusterv1.Cluster)
if !ok {
log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld))
return false
}
log = log.WithValues("Cluster", klog.KObj(oldCluster))
newCluster := e.ObjectNew.(*clusterv1.Cluster)
if !oldCluster.Status.InfrastructureReady && newCluster.Status.InfrastructureReady {
log.V(6).Info("Cluster infrastructure became ready, allowing further processing")
return true
}
log.V(4).Info("Cluster infrastructure did not become ready, blocking further processing")
return false
},
CreateFunc: func(e event.CreateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
GenericFunc: func(e event.GenericEvent) bool { return false },
}
}
// ClusterUpdateUnpaused returns a predicate that returns true for an update event when a cluster has Spec.Paused changed from true to false
// it also returns true if the resource provided is not a Cluster to allow for use with controller-runtime NewControllerManagedBy.
func ClusterUpdateUnpaused(logger logr.Logger) predicate.Funcs {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
log := logger.WithValues("predicate", "ClusterUpdateUnpaused", "eventType", "update")
oldCluster, ok := e.ObjectOld.(*clusterv1.Cluster)
if !ok {
log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld))
return false
}
log = log.WithValues("Cluster", klog.KObj(oldCluster))
newCluster := e.ObjectNew.(*clusterv1.Cluster)
if oldCluster.Spec.Paused && !newCluster.Spec.Paused {
log.V(4).Info("Cluster was unpaused, allowing further processing")
return true
}
// This predicate always work in "or" with Paused predicates
// so the logs are adjusted to not provide false negatives/verbosity al V<=5.
log.V(6).Info("Cluster was not unpaused, blocking further processing")
return false
},
CreateFunc: func(e event.CreateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
GenericFunc: func(e event.GenericEvent) bool { return false },
}
}
// ClusterUnpaused returns a Predicate that returns true on Cluster creation events where Cluster.Spec.Paused is false
// and Update events when Cluster.Spec.Paused transitions to false.
// This implements a common requirement for many cluster-api and provider controllers (such as Cluster Infrastructure
// controllers) to resume reconciliation when the Cluster is unpaused.
// Example use:
//
// err := controller.Watch(
// &source.Kind{Type: &clusterv1.Cluster{}},
// &handler.EnqueueRequestsFromMapFunc{
// ToRequests: clusterToMachines,
// },
// predicates.ClusterUnpaused(r.Log),
// )
func ClusterUnpaused(logger logr.Logger) predicate.Funcs {
log := logger.WithValues("predicate", "ClusterUnpaused")
// Use any to ensure we process either create or update events we care about
return Any(log, ClusterCreateNotPaused(log), ClusterUpdateUnpaused(log))
}
// ClusterControlPlaneInitialized returns a Predicate that returns true on Update events
// when ControlPlaneInitializedCondition on a Cluster changes to true.
// Example use:
//
// err := controller.Watch(
// &source.Kind{Type: &clusterv1.Cluster{}},
// &handler.EnqueueRequestsFromMapFunc{
// ToRequests: clusterToMachines,
// },
// predicates.ClusterControlPlaneInitialized(r.Log),
// )
func ClusterControlPlaneInitialized(logger logr.Logger) predicate.Funcs {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
log := logger.WithValues("predicate", "ClusterControlPlaneInitialized", "eventType", "update")
oldCluster, ok := e.ObjectOld.(*clusterv1.Cluster)
if !ok {
log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld))
return false
}
log = log.WithValues("Cluster", klog.KObj(oldCluster))
newCluster := e.ObjectNew.(*clusterv1.Cluster)
if !conditions.IsTrue(oldCluster, clusterv1.ControlPlaneInitializedCondition) &&
conditions.IsTrue(newCluster, clusterv1.ControlPlaneInitializedCondition) {
log.V(6).Info("Cluster ControlPlaneInitialized was set, allow further processing")
return true
}
log.V(6).Info("Cluster ControlPlaneInitialized hasn't changed, blocking further processing")
return false
},
CreateFunc: func(e event.CreateEvent) bool { return false },
DeleteFunc: func(e event.DeleteEvent) bool { return false },
GenericFunc: func(e event.GenericEvent) bool { return false },
}
}
// ClusterUnpausedAndInfrastructureReady returns a Predicate that returns true on Cluster creation events where
// both Cluster.Spec.Paused is false and Cluster.Status.InfrastructureReady is true and Update events when
// either Cluster.Spec.Paused transitions to false or Cluster.Status.InfrastructureReady transitions to true.
// This implements a common requirement for some cluster-api and provider controllers (such as Machine Infrastructure
// controllers) to resume reconciliation when the Cluster is unpaused and when the infrastructure becomes ready.
// Example use:
//
// err := controller.Watch(
// &source.Kind{Type: &clusterv1.Cluster{}},
// &handler.EnqueueRequestsFromMapFunc{
// ToRequests: clusterToMachines,
// },
// predicates.ClusterUnpausedAndInfrastructureReady(r.Log),
// )
func ClusterUnpausedAndInfrastructureReady(logger logr.Logger) predicate.Funcs {
log := logger.WithValues("predicate", "ClusterUnpausedAndInfrastructureReady")
// Only continue processing create events if both not paused and infrastructure is ready
createPredicates := All(log, ClusterCreateNotPaused(log), ClusterCreateInfraReady(log))
// Process update events if either Cluster is unpaused or infrastructure becomes ready
updatePredicates := Any(log, ClusterUpdateUnpaused(log), ClusterUpdateInfraReady(log))
// Use any to ensure we process either create or update events we care about
return Any(log, createPredicates, updatePredicates)
}
// ClusterHasTopology returns a Predicate that returns true when cluster.Spec.Topology
// is NOT nil and false otherwise.
func ClusterHasTopology(logger logr.Logger) predicate.Funcs {
return predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
return processIfTopologyManaged(logger.WithValues("predicate", "ClusterHasTopology", "eventType", "update"), e.ObjectNew)
},
CreateFunc: func(e event.CreateEvent) bool {
return processIfTopologyManaged(logger.WithValues("predicate", "ClusterHasTopology", "eventType", "create"), e.Object)
},
DeleteFunc: func(e event.DeleteEvent) bool {
return processIfTopologyManaged(logger.WithValues("predicate", "ClusterHasTopology", "eventType", "delete"), e.Object)
},
GenericFunc: func(e event.GenericEvent) bool {
return processIfTopologyManaged(logger.WithValues("predicate", "ClusterHasTopology", "eventType", "generic"), e.Object)
},
}
}
func processIfTopologyManaged(logger logr.Logger, object client.Object) bool {
cluster, ok := object.(*clusterv1.Cluster)
if !ok {
logger.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", object))
return false
}
log := logger.WithValues("Cluster", klog.KObj(cluster))
if cluster.Spec.Topology != nil {
log.V(6).Info("Cluster has topology, allowing further processing")
return true
}
log.V(6).Info("Cluster does not have topology, blocking further processing")
return false
}