-
Notifications
You must be signed in to change notification settings - Fork 828
/
objectwatcher.go
324 lines (274 loc) · 14.5 KB
/
objectwatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
package objectwatcher
import (
"context"
"fmt"
"reflect"
"strings"
"sync"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/restmapper"
)
const (
generationPrefix = "gen:"
resourceVersionPrefix = "rv:"
)
// ObjectWatcher manages operations for object dispatched to member clusters.
type ObjectWatcher interface {
Create(clusterName string, desireObj *unstructured.Unstructured) error
Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error
Delete(clusterName string, desireObj *unstructured.Unstructured) error
NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error)
}
// ClientSetFunc is used to generate client set of member cluster
type ClientSetFunc func(c string, client client.Client) (*util.DynamicClusterClient, error)
type objectWatcherImpl struct {
Lock sync.RWMutex
RESTMapper meta.RESTMapper
KubeClientSet client.Client
VersionRecord map[string]map[string]string
ClusterClientSetFunc ClientSetFunc
resourceInterpreter resourceinterpreter.ResourceInterpreter
}
// NewObjectWatcher returns an instance of ObjectWatcher
func NewObjectWatcher(kubeClientSet client.Client, restMapper meta.RESTMapper, clusterClientSetFunc ClientSetFunc, interpreter resourceinterpreter.ResourceInterpreter) ObjectWatcher {
return &objectWatcherImpl{
KubeClientSet: kubeClientSet,
VersionRecord: make(map[string]map[string]string),
RESTMapper: restMapper,
ClusterClientSetFunc: clusterClientSetFunc,
resourceInterpreter: interpreter,
}
}
func (o *objectWatcherImpl) Create(clusterName string, desireObj *unstructured.Unstructured) error {
dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName)
return err
}
gvr, err := restmapper.GetGroupVersionResource(o.RESTMapper, desireObj.GroupVersionKind())
if err != nil {
klog.Errorf("Failed to create resource(kind=%s, %s/%s) in cluster %s as mapping GVK to GVR failed: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err)
return err
}
// Karmada will adopt creating resource due to an existing resource in member cluster, because we don't want to force update or delete the resource created by users.
// users should resolve the conflict in person.
clusterObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Create(context.TODO(), desireObj, metav1.CreateOptions{})
if err != nil {
// The 'IsAlreadyExists' conflict may happen in following known scenarios:
// - 1. In a reconcile process, the execution controller successfully applied resource to member cluster but failed to update the work conditions(Applied=True),
// when reconcile again, the controller will try to apply(by create) the resource again.
// - 2. The resource already exist in the member cluster but it's not created by karmada.
if apierrors.IsAlreadyExists(err) {
existObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Get(context.TODO(), desireObj.GetName(), metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get exist resource(kind=%s, %s/%s) in cluster %v: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err)
}
// If the existing resource is managed by Karmada, then just update it.
if util.GetLabelValue(desireObj.GetLabels(), workv1alpha1.WorkNameLabel) == util.GetLabelValue(existObj.GetLabels(), workv1alpha1.WorkNameLabel) {
return o.Update(clusterName, desireObj, existObj)
}
// The existing resource is not managed by Karmada, then we should consult conflict resolution instruction in annotation.
switch util.GetAnnotationValue(desireObj.GetAnnotations(), workv1alpha2.ResourceConflictResolutionAnnotation) {
case workv1alpha2.ResourceConflictResolutionOverwrite:
klog.Infof("Overwriting the resource(kind=%s, %s/%s) as %s=%s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(),
workv1alpha2.ResourceConflictResolutionAnnotation, workv1alpha2.ResourceConflictResolutionOverwrite)
return o.Update(clusterName, desireObj, existObj)
default:
// The existing resource is not managed by Karmada, and no conflict resolution found, avoid updating the existing resource by default.
return fmt.Errorf("resource(kind=%s, %s/%s) already exist in cluster %v and the %s strategy value is empty, karmada will not manage this resource",
desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, workv1alpha2.ResourceConflictResolutionAnnotation,
)
}
}
klog.Errorf("Failed to create resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err)
return err
}
klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName)
// record version
o.recordVersion(clusterObj, dynamicClusterClient.ClusterName)
return nil
}
func (o *objectWatcherImpl) retainClusterFields(desired, observed *unstructured.Unstructured) (*unstructured.Unstructured, error) {
// Pass the same ResourceVersion as in the cluster object for update operation, otherwise operation will fail.
desired.SetResourceVersion(observed.GetResourceVersion())
// Retain finalizers since they will typically be set by
// controllers in a member cluster. It is still possible to set the fields
// via overrides.
desired.SetFinalizers(observed.GetFinalizers())
// Retain ownerReferences since they will typically be set by controllers in a member cluster.
desired.SetOwnerReferences(observed.GetOwnerReferences())
// Merge annotations since they will typically be set by controllers in a member cluster
// and be set by user in karmada-controller-plane.
util.MergeAnnotations(desired, observed)
if o.resourceInterpreter.HookEnabled(desired.GroupVersionKind(), configv1alpha1.InterpreterOperationRetain) {
return o.resourceInterpreter.Retain(desired, observed)
}
return desired, nil
}
func (o *objectWatcherImpl) Update(clusterName string, desireObj, clusterObj *unstructured.Unstructured) error {
dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName)
return err
}
gvr, err := restmapper.GetGroupVersionResource(o.RESTMapper, desireObj.GroupVersionKind())
if err != nil {
klog.Errorf("Failed to update resource(kind=%s, %s/%s) in cluster %s as mapping GVK to GVR failed: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err)
return err
}
desireObj, err = o.retainClusterFields(desireObj, clusterObj)
if err != nil {
klog.Errorf("Failed to retain fields for resource(kind=%s, %s/%s) in cluster %s: %v", clusterObj.GetKind(), clusterObj.GetNamespace(), clusterObj.GetName(), clusterName, err)
return err
}
resource, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireObj, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err)
return err
}
klog.Infof("Updated resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName)
// record version
o.recordVersion(resource, clusterName)
return nil
}
func (o *objectWatcherImpl) Delete(clusterName string, desireObj *unstructured.Unstructured) error {
dynamicClusterClient, err := o.ClusterClientSetFunc(clusterName, o.KubeClientSet)
if err != nil {
klog.Errorf("Failed to build dynamic cluster client for cluster %s.", clusterName)
return err
}
gvr, err := restmapper.GetGroupVersionResource(o.RESTMapper, desireObj.GroupVersionKind())
if err != nil {
klog.Errorf("Failed to delete resource(kind=%s, %s/%s) in cluster %s as mapping GVK to GVR failed: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName, err)
return err
}
// Set deletion strategy to background explicitly even though it's the default strategy for most of the resources.
// The reason for this is to fix the exception case that Kubernetes does on Job(batch/v1).
// In kubernetes, the Job's default deletion strategy is "Orphan", that will cause the "Pods" created by "Job"
// still exist after "Job" has been deleted.
// Refer to https://github.com/karmada-io/karmada/issues/969 for more details.
deleteBackground := metav1.DeletePropagationBackground
deleteOption := metav1.DeleteOptions{
PropagationPolicy: &deleteBackground,
}
err = dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Delete(context.TODO(), desireObj.GetName(), deleteOption)
if apierrors.IsNotFound(err) {
err = nil
}
if err != nil {
klog.Errorf("Failed to delete resource %v in cluster %s, err is %v ", desireObj.GetName(), clusterName, err)
return err
}
klog.Infof("Deleted resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), clusterName)
objectKey := o.genObjectKey(desireObj)
o.deleteVersionRecord(dynamicClusterClient.ClusterName, objectKey)
return nil
}
func (o *objectWatcherImpl) genObjectKey(obj *unstructured.Unstructured) string {
return obj.GroupVersionKind().String() + "/" + obj.GetNamespace() + "/" + obj.GetName()
}
// recordVersion will add or update resource version records
func (o *objectWatcherImpl) recordVersion(clusterObj *unstructured.Unstructured, clusterName string) {
objVersion := objectVersion(clusterObj)
objectKey := o.genObjectKey(clusterObj)
if o.isClusterVersionRecordExist(clusterName) {
o.updateVersionRecord(clusterName, objectKey, objVersion)
} else {
o.addVersionRecord(clusterName, objectKey, objVersion)
}
}
// isClusterVersionRecordExist checks if the version record map of given member cluster exist
func (o *objectWatcherImpl) isClusterVersionRecordExist(clusterName string) bool {
o.Lock.RLock()
defer o.Lock.RUnlock()
_, exist := o.VersionRecord[clusterName]
return exist
}
// getVersionRecord will return the recorded version of given resource(if exist)
func (o *objectWatcherImpl) getVersionRecord(clusterName, resourceName string) (string, bool) {
o.Lock.RLock()
defer o.Lock.RUnlock()
version, exist := o.VersionRecord[clusterName][resourceName]
return version, exist
}
// addVersionRecord will add new version record of given resource
func (o *objectWatcherImpl) addVersionRecord(clusterName, resourceName, version string) {
o.Lock.Lock()
defer o.Lock.Unlock()
o.VersionRecord[clusterName] = map[string]string{resourceName: version}
}
// updateVersionRecord will update the recorded version of given resource
func (o *objectWatcherImpl) updateVersionRecord(clusterName, resourceName, version string) {
o.Lock.Lock()
defer o.Lock.Unlock()
o.VersionRecord[clusterName][resourceName] = version
}
// deleteVersionRecord will delete the recorded version of given resource
func (o *objectWatcherImpl) deleteVersionRecord(clusterName, resourceName string) {
o.Lock.Lock()
defer o.Lock.Unlock()
delete(o.VersionRecord[clusterName], resourceName)
}
func (o *objectWatcherImpl) NeedsUpdate(clusterName string, desiredObj, clusterObj *unstructured.Unstructured) (bool, error) {
// get resource version
version, exist := o.getVersionRecord(clusterName, desiredObj.GroupVersionKind().String()+"/"+desiredObj.GetNamespace()+"/"+desiredObj.GetName())
if !exist {
klog.Errorf("Failed to update resource(kind=%s, %s/%s) in cluster %s for the version record does not exist", desiredObj.GetKind(), desiredObj.GetNamespace(), desiredObj.GetName(), clusterName)
return false, fmt.Errorf("failed to update resource(kind=%s, %s/%s) in cluster %s for the version record does not exist", desiredObj.GetKind(), desiredObj.GetNamespace(), desiredObj.GetName(), clusterName)
}
return objectNeedsUpdate(desiredObj, clusterObj, version), nil
}
/*
This code is lifted from the kubefed codebase. It's a list of functions to determine whether the provided cluster
object needs to be updated according to the desired object and the recorded version.
For reference: https://github.com/kubernetes-sigs/kubefed/blob/master/pkg/controller/util/propagatedversion.go#L30-L59
*/
// objectVersion retrieves the field type-prefixed value used for
// determining currency of the given cluster object.
func objectVersion(clusterObj *unstructured.Unstructured) string {
generation := clusterObj.GetGeneration()
if generation != 0 {
return fmt.Sprintf("%s%d", generationPrefix, generation)
}
return fmt.Sprintf("%s%s", resourceVersionPrefix, clusterObj.GetResourceVersion())
}
// objectNeedsUpdate determines whether the 2 objects provided cluster
// object needs to be updated according to the desired object and the
// recorded version.
func objectNeedsUpdate(desiredObj, clusterObj *unstructured.Unstructured, recordedVersion string) bool {
targetVersion := objectVersion(clusterObj)
if recordedVersion != targetVersion {
return true
}
// If versions match and the version is sourced from the
// generation field, a further check of metadata equivalency is
// required.
return strings.HasPrefix(targetVersion, generationPrefix) && !objectMetaObjEquivalent(desiredObj, clusterObj)
}
// objectMetaObjEquivalent checks if cluster-independent, user provided data in two given ObjectMeta are equal. If in
// the future the ObjectMeta structure is expanded then any field that is not populated
// by the api server should be included here.
func objectMetaObjEquivalent(a, b metav1.Object) bool {
if a.GetName() != b.GetName() {
return false
}
if a.GetNamespace() != b.GetNamespace() {
return false
}
aLabels := a.GetLabels()
bLabels := b.GetLabels()
if !reflect.DeepEqual(aLabels, bLabels) && (len(aLabels) != 0 || len(bLabels) != 0) {
return false
}
return true
}