generated from SAP/repository-template
-
Notifications
You must be signed in to change notification settings - Fork 1
/
reconciler.go
1281 lines (1190 loc) · 56.3 KB
/
reconciler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
SPDX-FileCopyrightText: 2023 SAP SE or an SAP affiliate company and component-operator-runtime contributors
SPDX-License-Identifier: Apache-2.0
*/
package reconciler
import (
"context"
"encoding/json"
"fmt"
"math"
"strconv"
"github.com/iancoleman/strcase"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sap/go-generics/sets"
"github.com/sap/go-generics/slices"
corev1 "k8s.io/api/core/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
apitypes "k8s.io/apimachinery/pkg/types"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"github.com/sap/component-operator-runtime/pkg/cluster"
"github.com/sap/component-operator-runtime/pkg/status"
"github.com/sap/component-operator-runtime/pkg/types"
)
const (
objectReasonCreated = "Created"
objectReasonUpdated = "Updated"
objectReasonUpdateError = "UpdateError"
objectReasonDeleted = "Deleted"
objectReasonDeleteError = "DeleteError"
)
const (
scopeUnknown = iota
scopeNamespaced
scopeCluster
)
const (
minOrder = math.MinInt16
maxOrder = math.MaxInt16
)
var adoptionPolicyByAnnotation = map[string]AdoptionPolicy{
types.AdoptionPolicyNever: AdoptionPolicyNever,
types.AdoptionPolicyIfUnowned: AdoptionPolicyIfUnowned,
types.AdoptionPolicyAlways: AdoptionPolicyAlways,
}
var reconcilePolicyByAnnotation = map[string]ReconcilePolicy{
types.ReconcilePolicyOnObjectChange: ReconcilePolicyOnObjectChange,
types.ReconcilePolicyOnObjectOrComponentChange: ReconcilePolicyOnObjectOrComponentChange,
types.ReconcilePolicyOnce: ReconcilePolicyOnce,
}
var updatePolicyByAnnotation = map[string]UpdatePolicy{
types.UpdatePolicyRecreate: UpdatePolicyRecreate,
types.UpdatePolicyReplace: UpdatePolicyReplace,
types.UpdatePolicySsaMerge: UpdatePolicySsaMerge,
types.UpdatePolicySsaOverride: UpdatePolicySsaOverride,
}
var deletePolicyByAnnotation = map[string]DeletePolicy{
types.DeletePolicyDelete: DeletePolicyDelete,
types.DeletePolicyOrphan: DeletePolicyOrphan,
}
// ReconcilerOptions are creation options for a Reconciler.
type ReconcilerOptions struct {
// Whether namespaces are auto-created if missing.
// If unspecified, true is assumed.
CreateMissingNamespaces *bool
// How to react if a dependent object exists but has no or a different owner.
// If unspecified, AdoptionPolicyIfUnowned is assumed.
// Can be overridden by annotation on object level.
AdoptionPolicy *AdoptionPolicy
// How to perform updates to dependent objects.
// If unspecified, UpdatePolicyReplace is assumed.
// Can be overridden by annotation on object level.
UpdatePolicy *UpdatePolicy
// How to analyze the state of the dependent objects.
// If unspecified, an optimized kstatus based implementation is used.
StatusAnalyzer status.StatusAnalyzer
// Prometheus metrics to be populated by the reconciler.
Metrics ReconcilerMetrics
}
// ReconcilerMetrics defines metrics that the reconciler can populate.
// Metrics specified as nil will be ignored.
type ReconcilerMetrics struct {
ReadCounter prometheus.Counter
CreateCounter prometheus.Counter
UpdateCounter prometheus.Counter
DeleteCounter prometheus.Counter
}
// Reconciler manages specified objects in the given target cluster.
type Reconciler struct {
name string
client cluster.Client
statusAnalyzer status.StatusAnalyzer
metrics ReconcilerMetrics
createMissingNamespaces bool
adoptionPolicy AdoptionPolicy
reconcilePolicy ReconcilePolicy
updatePolicy UpdatePolicy
deletePolicy DeletePolicy
labelKeyOwnerId string
annotationKeyOwnerId string
annotationKeyDigest string
annotationKeyAdoptionPolicy string
annotationKeyReconcilePolicy string
annotationKeyUpdatePolicy string
annotationKeyDeletePolicy string
annotationKeyApplyOrder string
annotationKeyPurgeOrder string
annotationKeyDeleteOrder string
}
// Create new reconciler.
func NewReconciler(name string, clnt cluster.Client, options ReconcilerOptions) *Reconciler {
// TOOD: validate options
if options.CreateMissingNamespaces == nil {
options.CreateMissingNamespaces = ref(true)
}
if options.AdoptionPolicy == nil {
options.AdoptionPolicy = ref(AdoptionPolicyIfUnowned)
}
if options.UpdatePolicy == nil {
options.UpdatePolicy = ref(UpdatePolicyReplace)
}
if options.StatusAnalyzer == nil {
options.StatusAnalyzer = status.NewStatusAnalyzer(name)
}
return &Reconciler{
name: name,
client: clnt,
statusAnalyzer: options.StatusAnalyzer,
metrics: options.Metrics,
createMissingNamespaces: *options.CreateMissingNamespaces,
adoptionPolicy: *options.AdoptionPolicy,
reconcilePolicy: ReconcilePolicyOnObjectChange,
updatePolicy: *options.UpdatePolicy,
deletePolicy: DeletePolicyDelete,
labelKeyOwnerId: name + "/" + types.LabelKeySuffixOwnerId,
annotationKeyOwnerId: name + "/" + types.AnnotationKeySuffixOwnerId,
annotationKeyDigest: name + "/" + types.AnnotationKeySuffixDigest,
annotationKeyAdoptionPolicy: name + "/" + types.AnnotationKeySuffixAdoptionPolicy,
annotationKeyReconcilePolicy: name + "/" + types.AnnotationKeySuffixReconcilePolicy,
annotationKeyUpdatePolicy: name + "/" + types.AnnotationKeySuffixUpdatePolicy,
annotationKeyDeletePolicy: name + "/" + types.AnnotationKeySuffixDeletePolicy,
annotationKeyApplyOrder: name + "/" + types.AnnotationKeySuffixApplyOrder,
annotationKeyPurgeOrder: name + "/" + types.AnnotationKeySuffixPurgeOrder,
annotationKeyDeleteOrder: name + "/" + types.AnnotationKeySuffixDeleteOrder,
}
}
// Apply given object manifests to the target cluster and maintain inventory. That means:
// - non-existent objects will be created
// - existing objects will be updated if there is a drift (see below)
// - redundant objects will be removed.
//
// Existing objects will only be updated or deleted if the owner id check is successful; that means:
// - the object's owner id matches the specified ownerId or
// - the object's owner id does not match the specified ownerId, and the effective adoption policy is AdoptionPolicyAlways or
// - the object has no or empty owner id set, and the effective adoption policy is AdoptionPolicyAlways or AdoptionPolicyIfUnowned.
//
// Objects which are instances of namespaced types will be placed into the namespace passed to Apply(), if they have no namespace defined in their manifest.
// An update of an existing object will be performed if it is considered to be out of sync; that means:
// - the object's manifest has changed, and the effective reconcile policy is ReconcilePolicyOnObjectChange or ReconcilePolicyOnObjectOrComponentChange or
// - the specified component revision has changed and the effective reconcile policy is ReconcilePolicyOnObjectOrComponentChange.
//
// The update itself will be done as follows:
// - if the effective update policy is UpdatePolicyReplace, a http PUT request will be sent to the Kubernetes API
// - if the effective update policy is UpdatePolicySsaMerge or UpdatePolicySsaOverride, a server-side-apply http PATCH request will be sent;
// while UpdatePolicySsaMerge just implements the Kubernetes standard behavior (leaving foreign non-conflicting fields untouched), UpdatePolicySsaOverride
// will re-claim (and therefore potentially drop) fields owned by certain field managers, such as kubectl and helm
// - if the effective update policy is UpdatePolicyRecreate, the object will be deleted and recreated.
//
// Redundant objects will be removed; that means, in the regular case, a http DELETE request will be sent to the Kubernetes API; if the object specifies
// its delete policy as DeletePolicyOrphan, no physcial deletion will be performed, and the object will be left around in the cluster; however it will be no
// longer be part of the inventory.
//
// Objects will be applied and deleted in waves, according to their apply/delete order. Objects which specify a purge order will be deleted from the cluster at the
// end of the wave specified as purge order; other than redundant objects, a purged object will remain as Completed in the inventory;
// and it might be re-applied/re-purged in case it runs out of sync. Within a wave, objects are processed following a certain internal order;
// in particular, instances of types which are part of the wave are processed only if all other objects in that wave have a ready state.
//
// This method will change the passed inventory (add or remove elements, change elements). If Apply() returns true, then all objects are successfully reconciled;
// otherwise, if it returns false, the caller should recall it timely, until it returns true. In any case, the passed inventory should match the state of the
// inventory after the previous invocation of Apply(); usually, the caller saves the inventory after calling Apply(), and loads it before calling Apply().
// The namespace and ownerId arguments should not be changed across subsequent invocations of Apply(); the componentRevision should be incremented only.
func (r *Reconciler) Apply(ctx context.Context, inventory *[]*InventoryItem, objects []client.Object, namespace string, ownerId string, componentRevision int64) (bool, error) {
var err error
log := log.FromContext(ctx)
hashedOwnerId := sha256base32([]byte(ownerId))
// perform some initial validation
for _, object := range objects {
if object.GetGenerateName() != "" {
// TODO: the object key string representation below will probably be incomplete because of missing metadata.name
return false, fmt.Errorf("object %s specifies metadata.generateName (but dependent objects are not allowed to do so)", types.ObjectKeyToString(object))
}
}
// normalize objects; that means:
// - check that unstructured objects have valid type information set, and convert them to their concrete type if known to the scheme
// - check that non-unstructured types are known to the scheme, and validate/set their type information
objects, err = normalizeObjects(objects, r.client.Scheme())
if err != nil {
return false, errors.Wrap(err, "error normalizing objects")
}
// perform cleanup on object manifests
for _, object := range objects {
removeLabel(object, r.labelKeyOwnerId)
removeAnnotation(object, r.annotationKeyOwnerId)
removeAnnotation(object, r.annotationKeyDigest)
}
// validate type and set namespace for namespaced objects which have no namespace set
for _, object := range objects {
// note: due to the normalization done before, every object will now have a valid object kind set
gvk := object.GetObjectKind().GroupVersionKind()
// TODO: client now has a method IsObjectNamespaced(); can we use this instead?
scope := scopeUnknown
restMapping, err := r.client.RESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version)
if err == nil {
scope = scopeFromRestMapping(restMapping)
} else if !apimeta.IsNoMatchError(err) {
return false, errors.Wrapf(err, "error getting rest mapping for object %s", types.ObjectKeyToString(object))
}
for _, crd := range getCrds(objects) {
if crd.Spec.Group == gvk.Group && crd.Spec.Names.Kind == gvk.Kind {
// TODO: validate that scope obtained from crd matches scope from rest mapping (if one was found there)
scope = scopeFromCrd(crd)
err = nil
break
}
}
for _, apiService := range getApiServices(objects) {
if apiService.Spec.Group == gvk.Group && apiService.Spec.Version == gvk.Version {
err = nil
break
}
}
if err != nil {
return false, errors.Wrapf(err, "error getting rest mapping for object %s", types.ObjectKeyToString(object))
}
if object.GetNamespace() == "" && scope == scopeNamespaced {
object.SetNamespace(namespace)
}
if object.GetNamespace() != "" && scope == scopeCluster {
object.SetNamespace("")
}
}
// note: after this point there still can be objects in the list which
// - have a namespace set although they are not namespaced
// - do not have a namespace set although they are namespaced
// which exactly happens if
// 1. the object is incorrectly specified and
// 2. calling RESTMapping() above returned a NoMatchError (i.e. the type is currently not known to the api server) and
// 3. the type belongs to a (new) api service which is part of the inventory
// such entries can cause trouble, e.g. because the duplicate check, or InventoryItem.Match() might not work reliably ...
// TODO: should we allow at all that api services and according instances are deployed together?
// check that there are no duplicate objects
objectKeys := sets.New[string]()
for _, object := range objects {
objectKey := types.ObjectKeyToString(object)
if sets.Contains(objectKeys, objectKey) {
return false, fmt.Errorf("duplicate object %s", objectKey)
}
sets.Add(objectKeys, objectKey)
}
// validate annotations
for _, object := range objects {
if _, err := r.getAdoptionPolicy(object); err != nil {
return false, errors.Wrapf(err, "error validating object %s", types.ObjectKeyToString(object))
}
if _, err := r.getReconcilePolicy(object); err != nil {
return false, errors.Wrapf(err, "error validating object %s", types.ObjectKeyToString(object))
}
if _, err := r.getUpdatePolicy(object); err != nil {
return false, errors.Wrapf(err, "error validating object %s", types.ObjectKeyToString(object))
}
if _, err := r.getDeletePolicy(object); err != nil {
return false, errors.Wrapf(err, "error validating object %s", types.ObjectKeyToString(object))
}
if _, err := r.getApplyOrder(object); err != nil {
return false, errors.Wrapf(err, "error validating object %s", types.ObjectKeyToString(object))
}
if _, err := r.getPurgeOrder(object); err != nil {
return false, errors.Wrapf(err, "error validating object %s", types.ObjectKeyToString(object))
}
if _, err := r.getDeleteOrder(object); err != nil {
return false, errors.Wrapf(err, "error validating object %s", types.ObjectKeyToString(object))
}
// TODO: should status-hint be validated here as well?
}
// define getter functions for later usage
getAdoptionPolicy := func(object client.Object) AdoptionPolicy {
// note: this must() is ok because we checked the generated objects above, and this function will be called for these objects only
return must(r.getAdoptionPolicy(object))
}
getReconcilePolicy := func(object client.Object) ReconcilePolicy {
// note: this must() is ok because we checked the generated objects above, and this function will be called for these objects only
return must(r.getReconcilePolicy(object))
}
getUpdatePolicy := func(object client.Object) UpdatePolicy {
// note: this must() is ok because we checked the generated objects above, and this function will be called for these objects only
return must(r.getUpdatePolicy(object))
}
getDeletePolicy := func(object client.Object) DeletePolicy {
// note: this must() is ok because we checked the generated objects above, and this function will be called for these objects only
return must(r.getDeletePolicy(object))
}
getApplyOrder := func(object client.Object) int {
// note: this must() is ok because we checked the generated objects above, and this function will be called for these objects only
return must(r.getApplyOrder(object))
}
getPurgeOrder := func(object client.Object) int {
// note: this must() is ok because we checked the generated objects above, and this function will be called for these objects only
return must(r.getPurgeOrder(object))
}
getDeleteOrder := func(object client.Object) int {
// note: this must() is ok because we checked the generated objects above, and this function will be called for these objects only
return must(r.getDeleteOrder(object))
}
// perform further validations of object set
for _, object := range objects {
switch {
case isNamespace(object):
if getPurgeOrder(object) <= maxOrder {
return false, errors.Wrapf(fmt.Errorf("namespaces must not define a purge order"), "error validating object %s", types.ObjectKeyToString(object))
}
case isCrd(object):
if getPurgeOrder(object) <= maxOrder {
return false, errors.Wrapf(fmt.Errorf("custom resource definitions must not define a purge order"), "error validating object %s", types.ObjectKeyToString(object))
}
case isApiService(object):
if getPurgeOrder(object) <= maxOrder {
return false, errors.Wrapf(fmt.Errorf("api services must not define a purge order"), "error validating object %s", types.ObjectKeyToString(object))
}
}
}
// add/update inventory with target objects
// TODO: review this; it would be cleaner to use a DeepCopy method for a []*InventoryItem type (if there would be such a type)
newInventory := slices.Collect(*inventory, func(item *InventoryItem) *InventoryItem { return item.DeepCopy() })
numAdded := 0
for _, object := range objects {
// retrieve inventory item belonging to this object (if existing)
item := getItem(newInventory, object)
// calculate object digest
// note: if the effective reconcile policy of an object changes, it will always be reconciled at least one more time;
// this is in particular the case if the policy changes from or to ReconcilePolicyOnce.
digest, err := calculateObjectDigest(object, item, componentRevision, getReconcilePolicy(object))
if err != nil {
return false, errors.Wrapf(err, "error calculating digest for object %s", types.ObjectKeyToString(object))
}
// if item was not found, append an empty item
if item == nil {
// TODO: should the owner id check happen always (not only if the object is unknown to the inventory)?
// fetch object (if existing)
existingObject, err := r.readObject(ctx, object)
if err != nil {
return false, errors.Wrapf(err, "error reading object %s", types.ObjectKeyToString(object))
}
// check ownership
// note: failing already here in case of a conflict prevents problems during apply and, in particular, during deletion
if existingObject != nil {
adoptionPolicy := getAdoptionPolicy(object)
existingOwnerId := existingObject.GetLabels()[r.labelKeyOwnerId]
if existingOwnerId == "" {
if adoptionPolicy != AdoptionPolicyIfUnowned && adoptionPolicy != AdoptionPolicyAlways {
return false, fmt.Errorf("found existing object %s without owner", types.ObjectKeyToString(object))
}
} else if existingOwnerId != hashedOwnerId {
if adoptionPolicy != AdoptionPolicyAlways {
return false, fmt.Errorf("owner conflict; object %s is owned by %s", types.ObjectKeyToString(object), existingObject.GetAnnotations()[r.annotationKeyOwnerId])
}
}
}
newInventory = append(newInventory, &InventoryItem{})
item = newInventory[len(newInventory)-1]
numAdded++
}
// update item
gvk := object.GetObjectKind().GroupVersionKind()
item.Group = gvk.Group
item.Version = gvk.Version
item.Kind = gvk.Kind
item.Namespace = object.GetNamespace()
item.Name = object.GetName()
item.AdoptionPolicy = getAdoptionPolicy(object)
item.ReconcilePolicy = getReconcilePolicy(object)
item.UpdatePolicy = getUpdatePolicy(object)
item.DeletePolicy = getDeletePolicy(object)
item.ApplyOrder = getApplyOrder(object)
item.DeleteOrder = getDeleteOrder(object)
item.ManagedTypes = getManagedTypes(object)
if digest != item.Digest {
item.Digest = digest
item.Phase = PhaseScheduledForApplication
item.Status = status.InProgressStatus
}
}
// mark obsolete inventory items (clear digest)
for _, item := range newInventory {
found := false
for _, object := range objects {
if item.Matches(object) {
found = true
break
}
}
if !found && item.Digest != "" {
item.Digest = ""
item.Phase = PhaseScheduledForDeletion
item.Status = status.TerminatingStatus
}
}
// validate object set:
// - check that all managed instances have apply-order greater than or equal to the according managed type
// - check that all managed instances have delete-order less than or equal to the according managed type
// - check that no managed types are about to be deleted (empty digest) unless all related managed instances are as well
// - check that all contained objects have apply-order greater than or equal to the according namespace
// - check that all contained objects have delete-order less than or equal to the according namespace
// - check that no namespaces are about to be deleted (empty digest) unless all contained objects are as well
for _, item := range newInventory {
if isCrd(item) || isApiService(item) {
for _, _item := range newInventory {
if isManagedBy(item, _item) {
if _item.ApplyOrder < item.ApplyOrder {
return false, fmt.Errorf("error valdidating object set (%s): managed instance must not have an apply order lesser than the one of its type", _item)
}
if _item.DeleteOrder > item.DeleteOrder {
return false, fmt.Errorf("error valdidating object set (%s): managed instance must not have a delete order greater than the one of its type", _item)
}
if _item.Digest != "" && item.Digest == "" {
return false, fmt.Errorf("error valdidating object set (%s): managed instance is not being deleted, but the managing type is", _item)
}
}
}
}
if isNamespace(item) {
for _, _item := range newInventory {
if _item.Namespace == item.Name {
if _item.ApplyOrder < item.ApplyOrder {
return false, fmt.Errorf("error valdidating object set (%s): namespaced object must not have an apply order lesser than the one of its namespace", _item)
}
if _item.DeleteOrder > item.DeleteOrder {
return false, fmt.Errorf("error valdidating object set (%s): namespaced object must not have a delete order greater than the one of its namespace", _item)
}
if _item.Digest != "" && item.Digest == "" {
return false, fmt.Errorf("error valdidating object set (%s): namespaced object is not being deleted, but the namespace is", _item)
}
}
}
}
}
// accept inventory for further processing, put into right order for future deletion
*inventory = sortObjectsForDelete(newInventory)
// trigger another reconcile if something was added (to be sure that it is persisted)
if numAdded > 0 {
return false, nil
}
// note: after this point it is guaranteed that
// - the in-memory inventory reflects the target state
// - the persisted inventory at least has the same object keys as the in-memory inventory
// now it is about to synchronize the cluster state with the inventory
// delete redundant objects and maintain inventory;
// objects are deleted in waves according to their delete order;
// that means, only if all redundant objects of a wave are gone or comppleted, the next
// wave will be processed; within each wave, objects which are instances of managed
// types are deleted before all other objects, and namespaces will only be deleted
// if they are not used by any object in the inventory (note that this may cause deadlocks)
numManagedToBeDeleted := 0
numToBeDeleted := 0
for k, item := range *inventory {
// if this is the first object of an order, then
// count instances of managed types in this wave which are about to be deleted
if k == 0 || (*inventory)[k-1].DeleteOrder < item.DeleteOrder {
log.V(2).Info("begin of deletion wave", "order", item.DeleteOrder)
numManagedToBeDeleted = 0
for j := k; j < len(*inventory) && (*inventory)[j].DeleteOrder == item.DeleteOrder; j++ {
_item := (*inventory)[j]
if (_item.Phase == PhaseScheduledForDeletion || _item.Phase == PhaseScheduledForCompletion || _item.Phase == PhaseDeleting || _item.Phase == PhaseCompleting) && isInstanceOfManagedType(*inventory, _item) {
numManagedToBeDeleted++
}
}
}
if item.Phase == PhaseScheduledForDeletion || item.Phase == PhaseScheduledForCompletion || item.Phase == PhaseDeleting || item.Phase == PhaseCompleting {
// fetch object (if existing)
existingObject, err := r.readObject(ctx, item)
if err != nil {
return false, errors.Wrapf(err, "error reading object %s", item)
}
orphan := item.DeletePolicy == DeletePolicyOrphan
switch item.Phase {
case PhaseScheduledForDeletion:
// delete namespaces after all contained inventory items
// delete all instances of managed types before remaining objects; this ensures that no objects are prematurely
// deleted which are needed for the deletion of the managed instances, such as webhook servers, api servers, ...
if (!isNamespace(item) || !isNamespaceUsed(*inventory, item.Name)) && (numManagedToBeDeleted == 0 || isInstanceOfManagedType(*inventory, item)) {
if orphan {
item.Phase = ""
} else {
// note: here is a theoretical risk that we delete an existing foreign object, because informers are not yet synced
// however not sending the delete request is also not an option, because this might lead to orphaned own dependents
// TODO: perform an additional owner id check
if err := r.deleteObject(ctx, item, existingObject); err != nil {
return false, errors.Wrapf(err, "error deleting object %s", item)
}
item.Phase = PhaseDeleting
item.Status = status.TerminatingStatus
numToBeDeleted++
}
} else {
numToBeDeleted++
}
case PhaseScheduledForCompletion:
// delete namespaces after all contained inventory items
// delete all instances of managed types before remaining objects; this ensures that no objects are prematurely
// deleted which are needed for the deletion of the managed instances, such as webhook servers, api servers, ...
if (!isNamespace(item) || !isNamespaceUsed(*inventory, item.Name)) && (numManagedToBeDeleted == 0 || isInstanceOfManagedType(*inventory, item)) {
if orphan {
return false, fmt.Errorf("invalid usage of deletion policy: object %s is scheduled for completion and therefore cannot be orphaned", item)
} else {
// note: here is a theoretical risk that we delete an existing foreign object, because informers are not yet synced
// however not sending the delete request is also not an option, because this might lead to orphaned own dependents
// TODO: perform an additional owner id check
if err := r.deleteObject(ctx, item, existingObject); err != nil {
return false, errors.Wrapf(err, "error deleting object %s", item)
}
item.Phase = PhaseCompleting
item.Status = status.TerminatingStatus
numToBeDeleted++
}
} else {
numToBeDeleted++
}
case PhaseDeleting:
// if object is gone, we can remove it from inventory
if existingObject == nil {
item.Phase = ""
} else {
numToBeDeleted++
}
case PhaseCompleting:
// if object is gone, it is set to completed, and kept in inventory
if existingObject == nil {
item.Phase = PhaseCompleted
item.Status = ""
} else {
numToBeDeleted++
}
default:
// note: any other phase value would indicate a severe code problem, so we want to see the panic in that case
panic("this cannot happen")
}
}
// trigger another reconcile if this is the last object of the wave, and some deletions are not yet completed
if k == len(*inventory)-1 || (*inventory)[k+1].DeleteOrder > item.DeleteOrder {
log.V(2).Info("end of deletion wave", "order", item.DeleteOrder)
if numToBeDeleted > 0 {
break
}
}
}
*inventory = slices.Select(*inventory, func(item *InventoryItem) bool { return item.Phase != "" })
// trigger another reconcile
if numToBeDeleted > 0 {
return false, nil
}
// note: after this point, PhaseScheduledForDeletion, PhaseScheduledForCompletion, PhaseDeleting, PhaseCompleting cannot occur anymore in inventory
// in other words: the inventory and objects contains the same resources
// create missing namespaces
if r.createMissingNamespaces {
for _, namespace := range findMissingNamespaces(objects) {
if err := r.client.Get(ctx, apitypes.NamespacedName{Name: namespace}, &corev1.Namespace{}); err != nil {
if !apierrors.IsNotFound(err) {
return false, errors.Wrapf(err, "error reading namespace %s", namespace)
}
if err := r.client.Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, client.FieldOwner(r.name)); err != nil {
return false, errors.Wrapf(err, "error creating namespace %s", namespace)
}
}
}
}
// put objects into right order for applying
objects = sortObjectsForApply(objects, getApplyOrder)
// apply objects and maintain inventory;
// objects are applied (i.e. created/updated) in waves according to their apply order;
// that means, only if all objects of a wave are ready or completed, the next wave
// will be procesed; within each wave, objects which are instances of managed types
// will be applied after all other objects
numNotManagedToBeApplied := 0
numUnready := 0
for k, object := range objects {
// retrieve inventory item corresponding to this object
item := mustGetItem(*inventory, object)
// retrieve object order
applyOrder := getApplyOrder(object)
// if this is the first object of an order, then
// count instances of managed types in this order which are about to be applied
if k == 0 || getApplyOrder(objects[k-1]) < applyOrder {
log.V(2).Info("begin of apply wave", "order", applyOrder)
numNotManagedToBeApplied = 0
for j := k; j < len(objects) && getApplyOrder(objects[j]) == applyOrder; j++ {
_object := objects[j]
_item := mustGetItem(*inventory, _object)
if _item.Phase != PhaseReady && _item.Phase != PhaseCompleted && !isInstanceOfManagedType(*inventory, _object) {
// that means: _item.Phase is one of PhaseScheduledForApplication, PhaseCreating, PhaseUpdating
numNotManagedToBeApplied++
}
}
}
// for non-completed objects, compute and update status, and apply (create or update) the object if necessary
if item.Phase != PhaseCompleted {
// reconcile all instances of managed types after remaining objects
// this ensures that everything is running what is needed for the reconciliation of the managed instances,
// such as webhook servers, api servers, ...
if numNotManagedToBeApplied == 0 || !isInstanceOfManagedType(*inventory, object) {
// fetch object (if existing)
existingObject, err := r.readObject(ctx, item)
if err != nil {
return false, errors.Wrapf(err, "error reading object %s", item)
}
setLabel(object, r.labelKeyOwnerId, hashedOwnerId)
setAnnotation(object, r.annotationKeyOwnerId, ownerId)
setAnnotation(object, r.annotationKeyDigest, item.Digest)
updatePolicy := getUpdatePolicy(object)
if existingObject == nil {
if err := r.createObject(ctx, object, nil, updatePolicy); err != nil {
return false, errors.Wrapf(err, "error creating object %s", item)
}
item.Phase = PhaseCreating
item.Status = status.InProgressStatus
numUnready++
} else if existingObject.GetDeletionTimestamp().IsZero() && existingObject.GetAnnotations()[r.annotationKeyDigest] != item.Digest {
switch updatePolicy {
case UpdatePolicyRecreate:
// TODO: perform an additional owner id check
if err := r.deleteObject(ctx, object, existingObject); err != nil {
return false, errors.Wrapf(err, "error deleting (while recreating) object %s", item)
}
default:
// TODO: perform an additional owner id check
if err := r.updateObject(ctx, object, existingObject, nil, updatePolicy); err != nil {
return false, errors.Wrapf(err, "error updating object %s", item)
}
}
item.Phase = PhaseUpdating
item.Status = status.InProgressStatus
numUnready++
} else {
existingStatus, err := r.statusAnalyzer.ComputeStatus(existingObject)
if err != nil {
return false, errors.Wrapf(err, "error checking status of object %s", item)
}
if existingObject.GetDeletionTimestamp().IsZero() && existingStatus == status.CurrentStatus {
item.Phase = PhaseReady
} else {
numUnready++
}
item.Status = existingStatus
}
} else {
numUnready++
}
}
// note: after this point, when numUnready is zero, then this and all previous objects are either in PhaseReady or PhaseCompleted
// if this is the last object of an order, then
// - if everything so far is ready, trigger due completions and trigger another reconcile if any completion was triggered
// - otherwise trigger another reconcile
if k == len(objects)-1 || getApplyOrder(objects[k+1]) > applyOrder {
log.V(2).Info("end of apply wave", "order", applyOrder)
if numUnready == 0 {
numPurged := 0
for j := 0; j <= k; j++ {
_object := objects[j]
_item := mustGetItem(*inventory, _object)
_purgeOrder := getPurgeOrder(_object)
if (k == len(objects)-1 && _purgeOrder <= maxOrder || _purgeOrder <= applyOrder) && _item.Phase != PhaseCompleted {
_item.Phase = PhaseScheduledForCompletion
numPurged++
}
}
if numPurged > 0 {
return false, nil
}
} else {
return false, nil
}
}
}
return numUnready == 0, nil
}
// Delete objects stored in the inventory from the target cluster and maintain inventory.
// Objects will be deleted in waves, according to their delete order (as stored in the inventory); that means, the deletion of
// objects having a certain delete order will only start if all objects with lower delete order are gone. Within a wave, objects are
// deleted following a certain internal ordering; in particular, if there are instances of types which are part of the wave, then these
// instances will be deleted first; only if all such instances are gone, the remaining objects of the wave will be deleted.
//
// This method will change the passed inventory (remove elements, change elements). If Delete() returns true, then all objects are gone; otherwise,
// if it returns false, the caller should recall it timely, until it returns true. In any case, the passed inventory should match the state of the
// inventory after the previous invocation of Delete(); usually, the caller saves the inventory after calling Delete(), and loads it before calling Delete().
func (r *Reconciler) Delete(ctx context.Context, inventory *[]*InventoryItem) (bool, error) {
log := log.FromContext(ctx)
// delete objects and maintain inventory;
// objects are deleted in waves according to their delete order;
// that means, only if all objects of a wave are gone, the next wave will be processed;
// within each wave, objects which are instances of managed types are deleted before all
// other objects, and namespaces will only be deleted if they are not used by any
// object in the inventory (note that this may cause deadlocks)
numManagedToBeDeleted := 0
numToBeDeleted := 0
for k, item := range *inventory {
// if this is the first object of an order, then
// count instances of managed types in this wave which are about to be deleted
if k == 0 || (*inventory)[k-1].DeleteOrder < item.DeleteOrder {
log.V(2).Info("begin of deletion wave", "order", item.DeleteOrder)
numManagedToBeDeleted = 0
for j := k; j < len(*inventory) && (*inventory)[j].DeleteOrder == item.DeleteOrder; j++ {
_item := (*inventory)[j]
if isInstanceOfManagedType(*inventory, _item) {
numManagedToBeDeleted++
}
}
}
// fetch object (if existing)
existingObject, err := r.readObject(ctx, item)
if err != nil {
return false, errors.Wrapf(err, "error reading object %s", item)
}
orphan := item.DeletePolicy == DeletePolicyOrphan
switch item.Phase {
case PhaseDeleting:
// if object is gone, we can remove it from inventory
if existingObject == nil {
item.Phase = ""
} else {
numToBeDeleted++
}
default:
// delete namespaces after all contained inventory items
// delete all instances of managed types before remaining objects; this ensures that no objects are prematurely
// deleted which are needed for the deletion of the managed instances, such as webhook servers, api servers, ...
if (!isNamespace(item) || !isNamespaceUsed(*inventory, item.Name)) && (numManagedToBeDeleted == 0 || isInstanceOfManagedType(*inventory, item)) {
if orphan {
item.Phase = ""
} else {
// delete the object
// note: here is a theoretical risk that we delete an existing (foreign) object, because informers are not yet synced
// however not sending the delete request is also not an option, because this might lead to orphaned own dependents
// TODO: perform an additional owner id check
if err := r.deleteObject(ctx, item, existingObject); err != nil {
return false, errors.Wrapf(err, "error deleting object %s", item)
}
item.Phase = PhaseDeleting
item.Status = status.TerminatingStatus
numToBeDeleted++
}
} else {
numToBeDeleted++
}
}
// trigger another reconcile if this is the last object of the wave, and some deletions are not yet completed
if k == len(*inventory)-1 || (*inventory)[k+1].DeleteOrder > item.DeleteOrder {
log.V(2).Info("end of deletion wave", "order", item.DeleteOrder)
if numToBeDeleted > 0 {
break
}
}
}
*inventory = slices.Select(*inventory, func(item *InventoryItem) bool { return item.Phase != "" })
return len(*inventory) == 0, nil
}
// Check if the object set defined by inventory is ready for deletion; that means: check if the inventory contains
// types (as custom resource definition or from an api service), while there exist instances of these types in the cluster,
// which are not contained in the inventory.
func (r *Reconciler) IsDeletionAllowed(ctx context.Context, inventory *[]*InventoryItem) (bool, string, error) {
for _, item := range *inventory {
switch {
case isCrd(item):
crd := &apiextensionsv1.CustomResourceDefinition{}
if err := r.client.Get(ctx, apitypes.NamespacedName{Name: item.GetName()}, crd); err != nil {
if apierrors.IsNotFound(err) {
continue
} else {
return false, "", errors.Wrapf(err, "error retrieving crd %s", item.GetName())
}
}
used, err := r.isCrdUsed(ctx, crd, true)
if err != nil {
return false, "", errors.Wrapf(err, "error checking usage of crd %s", item.GetName())
}
if used {
return false, fmt.Sprintf("crd %s is still in use (instances exist)", item.GetName()), nil
}
case isApiService(item):
apiService := &apiregistrationv1.APIService{}
if err := r.client.Get(ctx, apitypes.NamespacedName{Name: item.GetName()}, apiService); err != nil {
if apierrors.IsNotFound(err) {
continue
} else {
return false, "", errors.Wrapf(err, "error retrieving api service %s", item.GetName())
}
}
used, err := r.isApiServiceUsed(ctx, apiService, true)
if err != nil {
return false, "", errors.Wrapf(err, "error checking usage of api service %s", item.GetName())
}
if used {
// TODO: other than with CRDs it is not clear for which types there are instances existing
// we should improve the error message somehow
return false, fmt.Sprintf("api service %s is still in use (instances exist)", item.GetName()), nil
}
}
}
return true, "", nil
}
// reaad object and return as unstructured
func (r *Reconciler) readObject(ctx context.Context, key types.ObjectKey) (*unstructured.Unstructured, error) {
if counter := r.metrics.ReadCounter; counter != nil {
counter.Inc()
}
object := &unstructured.Unstructured{}
object.SetGroupVersionKind(key.GetObjectKind().GroupVersionKind())
if err := r.client.Get(ctx, apitypes.NamespacedName{Namespace: key.GetNamespace(), Name: key.GetName()}, object); err != nil {
if apimeta.IsNoMatchError(err) || apierrors.IsNotFound(err) {
object = nil
} else {
return nil, err
}
}
return object, nil
}
// create object; object may be a concrete type or unstructured; in any case, type meta must be populated;
// createdObject is optional; if non-nil, it will be populated with the created object; the same variable can be supplied as object and createObject;
// if object is a crd or an api services, the reconciler's name will be added as finalizer
func (r *Reconciler) createObject(ctx context.Context, object client.Object, createdObject any, updatePolicy UpdatePolicy) (err error) {
if counter := r.metrics.CreateCounter; counter != nil {
counter.Inc()
}
defer func() {
if err == nil && createdObject != nil {
err = runtime.DefaultUnstructuredConverter.FromUnstructured(object.(*unstructured.Unstructured).Object, createdObject)
}
if err == nil {
r.client.EventRecorder().Event(object, corev1.EventTypeNormal, objectReasonCreated, "Object successfully created")
}
}()
// log := log.FromContext(ctx).WithValues("object", types.ObjectKeyToString(object))
data, err := runtime.DefaultUnstructuredConverter.ToUnstructured(object)
if err != nil {
return err
}
object = &unstructured.Unstructured{Object: data}
if isCrd(object) || isApiService(object) {
controllerutil.AddFinalizer(object, r.name)
}
// note: clearing managedFields is anyway required for ssa; but also in the create (post) case it does not harm
object.SetManagedFields(nil)
// create the object right from the start with the right managed fields operation (Apply or Update), in order to avoid
// having to patch the managed fields during future update calls
switch updatePolicy {
case UpdatePolicySsaMerge, UpdatePolicySsaOverride:
// set the target resource version to an impossible value; this will produce a 409 conflict in case the object already exists
object.SetResourceVersion("1")
return r.client.Patch(ctx, object, client.Apply, client.FieldOwner(r.name))
default:
return r.client.Create(ctx, object, client.FieldOwner(r.name))
}
}
// update object; object may be a concrete type or unstructured; in any case, type meta must be populated;
// existingObject is required, and should represent the last-read state of the object; it must not have a deletionTimestamp set;
// updatedObject is optional; if non-nil, it will be populated with the updated object; the same variable can be supplied as object and updatedObject;
// if object is a crd or an api services, the reconciler's name will be added as finalizer;
// object may have a resourceVersion; if it does not, the resourceVersion of existingObject will be used for conflict checks during put/patch;
// if updatePolicy equals UpdatePolicyReplace, an update (put) will be performed; finalizers of existingObject will be copied;
// if updatePolicy equals UpdatePolicySsaMerge, a conflict-forcing server-side-apply patch will be performed;
// if updatePolicy equals UpdatePolicySsaOverride, then in addition, a preparation patch request will be performed before doing the conflict-forcing
// server-side-apply patch; this preparation patch will adjust managedFields, reclaiming fields/values previously owned by kubectl or helm
func (r *Reconciler) updateObject(ctx context.Context, object client.Object, existingObject *unstructured.Unstructured, updatedObject any, updatePolicy UpdatePolicy) (err error) {
if counter := r.metrics.UpdateCounter; counter != nil {
counter.Inc()
}
defer func() {
if err == nil && updatedObject != nil {
err = runtime.DefaultUnstructuredConverter.FromUnstructured(object.(*unstructured.Unstructured).Object, updatedObject)
}
if err == nil {
r.client.EventRecorder().Event(object, corev1.EventTypeNormal, objectReasonUpdated, "Object successfully updated")
} else {
r.client.EventRecorder().Eventf(existingObject, corev1.EventTypeWarning, objectReasonUpdateError, "Error updating object: %s", err)
}
}()
log := log.FromContext(ctx).WithValues("object", types.ObjectKeyToString(object))
// TODO: validate (by panic) that existingObject fits to object (i.e. have same object key)
if !existingObject.GetDeletionTimestamp().IsZero() {
// note: we must not update objects which are in deletion (e.g. to avoid unintentionally clearing finalizers), so we want to see the panic in that case
panic("this cannot happen")
}
data, err := runtime.DefaultUnstructuredConverter.ToUnstructured(object)
if err != nil {
return err
}
object = &unstructured.Unstructured{Object: data}
if isCrd(object) || isApiService(object) {
controllerutil.AddFinalizer(object, r.name)
}
// it is allowed that target object contains a resource version; otherwise, we set the resource version to the one of the existing object,
// in order to ensure that we do not unintentionally overwrite a state different from the one we have read;
// note that the api server performs a resource version conflict check not only in case of update (put), but also for ssa (patch)
if object.GetResourceVersion() == "" {
object.SetResourceVersion((existingObject.GetResourceVersion()))
}
// note: clearing managedFields is anyway required for ssa; but also in the replace (put, update) case it does not harm;
// because replace will only claim fields which are new or which have changed; the field owner of declared (but unmodified)
// fields will not be touched
object.SetManagedFields(nil)
switch updatePolicy {
case UpdatePolicySsaMerge, UpdatePolicySsaOverride:
var replacedFieldManagerPrefixes []string
if updatePolicy == UpdatePolicySsaOverride {
// TODO: add ways (per reconciler, per component, per object) to configure the list of field manager (prefixes) which are reclaimed
replacedFieldManagerPrefixes = []string{"kubectl", "helm"}