/
egressip.go
2354 lines (2240 loc) · 94.5 KB
/
egressip.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package ovn
import (
"encoding/hex"
"encoding/json"
"fmt"
"net"
"os"
"reflect"
"sort"
"strings"
"sync"
"syscall"
"time"
ocpcloudnetworkapi "github.com/openshift/api/cloudnetwork/v1"
libovsdbclient "github.com/ovn-org/libovsdb/client"
"github.com/ovn-org/libovsdb/ovsdb"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
egressipv1 "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdbops"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
kapi "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
)
type egressIPDialer interface {
dial(ip net.IP) bool
}
var dialer egressIPDialer = &egressIPDial{}
func (oc *Controller) reconcileEgressIP(old, new *egressipv1.EgressIP) (err error) {
// Lock the assignment, this is needed because this function can end up
// being called from WatchEgressNodes and WatchEgressIP, i.e: two different
// go-routines and we need to make sure the assignment is safe.
oc.eIPC.egressIPAssignmentMutex.Lock()
defer oc.eIPC.egressIPAssignmentMutex.Unlock()
// Initialize an empty name which is filled depending on the operation
// (ADD/UPDATE/DELETE) we are performing. This is done as to be able to
// delete the NB DB set up correctly when searching the DB based on the
// name.
name := ""
// Initialize a status which will be used to compare against
// new.spec.egressIPs and decide on what from the status should get deleted
// or kept.
status := []egressipv1.EgressIPStatusItem{}
// Initialize two empty objects as to avoid SIGSEGV. The code should play
// nicely with empty objects though.
oldEIP, newEIP := &egressipv1.EgressIP{}, &egressipv1.EgressIP{}
// Initialize two "nothing" selectors. Nothing selector are semantically
// opposed to "empty" selectors, i.e: they select and match nothing, while
// an empty one matches everything. If old/new are nil, and we don't do
// this: we would have an empty EgressIP object which would result in two
// empty selectors, matching everything, whereas we would mean the inverse
newNamespaceSelector, _ := metav1.LabelSelectorAsSelector(nil)
oldNamespaceSelector, _ := metav1.LabelSelectorAsSelector(nil)
if old != nil {
oldEIP = old
oldNamespaceSelector, err = metav1.LabelSelectorAsSelector(&oldEIP.Spec.NamespaceSelector)
if err != nil {
return fmt.Errorf("invalid old namespaceSelector, err: %v", err)
}
name = oldEIP.Name
status = oldEIP.Status.Items
}
if new != nil {
newEIP = new
newNamespaceSelector, err = metav1.LabelSelectorAsSelector(&newEIP.Spec.NamespaceSelector)
if err != nil {
return fmt.Errorf("invalid new namespaceSelector, err: %v", err)
}
name = newEIP.Name
status = newEIP.Status.Items
}
// We do not initialize a nothing selector for the podSelector, because
// these are allowed to be empty (i.e: matching all pods in a namespace), as
// supposed to the namespaceSelector
newPodSelector, err := metav1.LabelSelectorAsSelector(&newEIP.Spec.PodSelector)
if err != nil {
return fmt.Errorf("invalid new podSelector, err: %v", err)
}
oldPodSelector, err := metav1.LabelSelectorAsSelector(&oldEIP.Spec.PodSelector)
if err != nil {
return fmt.Errorf("invalid old podSelector, err: %v", err)
}
// Validate the spec and use only the valid egress IPs when performing any
// successive operations, theoretically: the user could specify invalid IP
// addresses, which would break us.
validSpecIPs, err := oc.validateEgressIPSpec(name, newEIP.Spec.EgressIPs)
if err != nil {
return fmt.Errorf("invalid EgressIP spec, err: %v", err)
}
// Validate the status, on restart it could be the case that what might have
// been assigned when ovnkube-master last ran is not a valid assignment
// anymore (specifically if ovnkube-master has been crashing for a while).
// Any invalid status at this point in time needs to be removed and assigned
// to a valid node.
validStatus, invalidStatus := oc.validateEgressIPStatus(name, status)
for status := range validStatus {
// If the spec has changed and an egress IP has been removed by the
// user: we need to un-assign that egress IP
if !validSpecIPs.Has(status.EgressIP) {
invalidStatus[status] = ""
delete(validStatus, status)
}
}
// Add only the diff between what is requested and valid and that which
// isn't already assigned.
ipsToAssign := validSpecIPs
ipsToRemove := sets.NewString()
statusToAdd := make([]egressipv1.EgressIPStatusItem, 0, len(ipsToAssign))
statusToKeep := make([]egressipv1.EgressIPStatusItem, 0, len(validStatus))
for status := range validStatus {
statusToKeep = append(statusToKeep, status)
ipsToAssign.Delete(status.EgressIP)
}
statusToRemove := make([]egressipv1.EgressIPStatusItem, 0, len(invalidStatus))
for status := range invalidStatus {
statusToRemove = append(statusToRemove, status)
ipsToRemove.Insert(status.EgressIP)
}
if ipsToRemove.Len() > 0 {
// The following is added as to ensure that we only add after having
// successfully removed egress IPs. This case is not very important on
// bare-metal (since we execute the add after the remove below, and
// hence have full control of the execution - barring its success), but
// on a cloud: we don't execute anything below, we wait for the status
// on the CloudPrivateIPConfig(s) we create to be set before executing
// anything in the OVN DB. So, we need to make sure that we delete and
// then add, mainly because if EIP1 is added to nodeX and then EIP2 is
// removed from nodeX, we might remove the setup made for EIP1. The
// add/delete ordering of events is not guaranteed on the cloud where we
// depend on other controllers to execute the work for us however. By
// comparing the spec to the status and applying the following truth
// table we can ensure that order of events.
// case ID | Egress IP to add | Egress IP to remove | ipsToAssign
// 1 | e1 | e1 | e1
// 2 | e2 | e1 | -
// 3 | e2 | - | e2
// 4 | - | e1 | -
// Case 1 handles updates. Case 2 and 3 makes sure we don't add until we
// successfully delete. Case 4 just shows an example of what would
// happen if we don't have anything to add
ipsToAssign = ipsToAssign.Intersection(ipsToRemove)
}
if !util.PlatformTypeIsEgressIPCloudProvider() {
if len(statusToRemove) > 0 {
// Delete the statusToRemove from the allocator cache. If we don't
// do this we will occupy assignment positions for the ipsToAssign,
// even though statusToRemove will be removed afterwards
oc.deleteAllocatorEgressIPAssignments(statusToRemove)
if err := oc.deleteEgressIPAssignments(name, statusToRemove); err != nil {
return err
}
}
if len(ipsToAssign) > 0 {
statusToAdd = oc.assignEgressIPs(name, ipsToAssign.UnsortedList())
statusToKeep = append(statusToKeep, statusToAdd...)
}
// Assign all statusToKeep, we need to warm up the podAssignment cache
// on restart. We won't perform any additional transactions to the NB DB
// for things which exists because the libovsdb operations use
// modelClient which is idempotent.
if err := oc.addEgressIPAssignments(name, statusToKeep, newEIP.Spec.NamespaceSelector, newEIP.Spec.PodSelector); err != nil {
return err
}
// Add all assignments which are to be kept to the allocator cache,
// allowing us to track all assignments which have been performed and
// avoid incorrect future assignments due to a de-synchronized cache.
oc.addAllocatorEgressIPAssignments(name, statusToKeep)
// Update the object only on an ADD/UPDATE. If we are processing a
// DELETE, new will be nil and we should not update the object.
if len(statusToAdd) > 0 || (len(statusToRemove) > 0 && new != nil) {
if err := oc.patchReplaceEgressIPStatus(name, statusToKeep); err != nil {
return err
}
}
} else {
// Delete all assignments that are to be removed from the allocator
// cache. If we don't do this we will occupy assignment positions for
// the ipsToAdd, even though statusToRemove will be removed afterwards
oc.deleteAllocatorEgressIPAssignments(statusToRemove)
// If running on a public cloud we should not program OVN just yet, we
// need confirmation from the cloud-network-config-controller that it
// can assign the IPs. reconcileCloudPrivateIPConfig will take care of
// processing the answer from the requests we make here, and update OVN
// accordingly when we know what the outcome is.
if len(ipsToAssign) > 0 {
statusToAdd = oc.assignEgressIPs(name, ipsToAssign.UnsortedList())
statusToKeep = append(statusToKeep, statusToAdd...)
}
// Same as above: Add all assignments which are to be kept to the
// allocator cache, allowing us to track all assignments which have been
// performed and avoid incorrect future assignments due to a
// de-synchronized cache.
oc.addAllocatorEgressIPAssignments(name, statusToKeep)
// Execute CloudPrivateIPConfig changes for assignments which need to be
// added/removed, assignments which don't change do not require any
// further setup.
if err := oc.executeCloudPrivateIPConfigChange(name, statusToAdd, statusToRemove); err != nil {
return err
}
}
// Record the egress IP allocator count
metrics.RecordEgressIPCount(getEgressIPAllocationTotalCount(oc.eIPC.allocator))
// If nothing has changed for what concerns the assignments, then check if
// the namespaceSelector and podSelector have changed. If they have changed
// then remove the setup for all pods which matched the old and add
// everything for all pods which match the new.
if len(ipsToAssign) == 0 &&
len(statusToRemove) == 0 {
// Only the namespace selector changed: remove the setup for all pods
// matching the old and not matching the new, and add setup for the pod
// matching the new and which didn't match the old.
if !reflect.DeepEqual(newNamespaceSelector, oldNamespaceSelector) && reflect.DeepEqual(newPodSelector, oldPodSelector) {
namespaces, err := oc.watchFactory.GetNamespaces()
if err != nil {
return err
}
for _, namespace := range namespaces {
namespaceLabels := labels.Set(namespace.Labels)
if !newNamespaceSelector.Matches(namespaceLabels) && oldNamespaceSelector.Matches(namespaceLabels) {
if err := oc.deleteNamespaceEgressIPAssignment(name, oldEIP.Status.Items, namespace, oldEIP.Spec.PodSelector); err != nil {
return err
}
}
if newNamespaceSelector.Matches(namespaceLabels) && !oldNamespaceSelector.Matches(namespaceLabels) {
if err := oc.addNamespaceEgressIPAssignments(name, newEIP.Status.Items, namespace, newEIP.Spec.PodSelector); err != nil {
return err
}
}
}
// Only the pod selector changed: remove the setup for all pods
// matching the old and not matching the new, and add setup for the pod
// matching the new and which didn't match the old.
} else if reflect.DeepEqual(newNamespaceSelector, oldNamespaceSelector) && !reflect.DeepEqual(newPodSelector, oldPodSelector) {
namespaces, err := oc.watchFactory.GetNamespacesBySelector(newEIP.Spec.NamespaceSelector)
if err != nil {
return err
}
for _, namespace := range namespaces {
pods, err := oc.watchFactory.GetPods(namespace.Name)
if err != nil {
return err
}
for _, pod := range pods {
podLabels := labels.Set(pod.Labels)
if !newPodSelector.Matches(podLabels) && oldPodSelector.Matches(podLabels) {
if err := oc.deletePodEgressIPAssignments(name, oldEIP.Status.Items, pod); err != nil {
return err
}
}
if newPodSelector.Matches(podLabels) && !oldPodSelector.Matches(podLabels) {
if err := oc.addPodEgressIPAssignments(name, newEIP.Status.Items, pod); err != nil {
return err
}
}
}
}
// Both selectors changed: remove the setup for pods matching the
// old ones and not matching the new ones, and add setup for all
// matching the new ones but which didn't match the old ones.
} else if !reflect.DeepEqual(newNamespaceSelector, oldNamespaceSelector) && !reflect.DeepEqual(newPodSelector, oldPodSelector) {
namespaces, err := oc.watchFactory.GetNamespaces()
if err != nil {
return err
}
for _, namespace := range namespaces {
namespaceLabels := labels.Set(namespace.Labels)
// If the namespace does not match anymore then there's no
// reason to look at the pod selector.
if !newNamespaceSelector.Matches(namespaceLabels) && oldNamespaceSelector.Matches(namespaceLabels) {
if err := oc.deleteNamespaceEgressIPAssignment(name, oldEIP.Status.Items, namespace, oldEIP.Spec.PodSelector); err != nil {
return err
}
}
// If the namespace starts matching, look at the pods selector
// and pods in that namespace and perform the setup for the pods
// which match the new pod selector or if the podSelector is empty
// then just perform the setup.
if newNamespaceSelector.Matches(namespaceLabels) && !oldNamespaceSelector.Matches(namespaceLabels) {
pods, err := oc.watchFactory.GetPods(namespace.Name)
if err != nil {
return err
}
for _, pod := range pods {
podLabels := labels.Set(pod.Labels)
if newPodSelector.Matches(podLabels) {
if err := oc.addPodEgressIPAssignments(name, newEIP.Status.Items, pod); err != nil {
return err
}
}
}
}
// If the namespace continues to match, look at the pods
// selector and pods in that namespace.
if newNamespaceSelector.Matches(namespaceLabels) && oldNamespaceSelector.Matches(namespaceLabels) {
pods, err := oc.watchFactory.GetPods(namespace.Name)
if err != nil {
return err
}
for _, pod := range pods {
podLabels := labels.Set(pod.Labels)
if !newPodSelector.Matches(podLabels) && oldPodSelector.Matches(podLabels) {
if err := oc.deletePodEgressIPAssignments(name, oldEIP.Status.Items, pod); err != nil {
return err
}
}
if newPodSelector.Matches(podLabels) && !oldPodSelector.Matches(podLabels) {
if err := oc.addPodEgressIPAssignments(name, newEIP.Status.Items, pod); err != nil {
return err
}
}
}
}
}
}
}
return nil
}
func (oc *Controller) reconcileEgressIPNamespace(old, new *v1.Namespace) error {
// Same as for reconcileEgressIP: labels play nicely with empty object, not
// nil ones.
oldNamespace, newNamespace := &v1.Namespace{}, &v1.Namespace{}
if old != nil {
oldNamespace = old
}
if new != nil {
newNamespace = new
}
// If the labels have not changed, then there's no change that we care
// about: return.
oldLabels := labels.Set(oldNamespace.Labels)
newLabels := labels.Set(newNamespace.Labels)
if reflect.DeepEqual(newLabels.AsSelector(), oldLabels.AsSelector()) {
return nil
}
// Iterate all EgressIPs and check if this namespace start/stops matching
// any and add/remove the setup accordingly. Namespaces can match multiple
// EgressIP objects (ex: users can chose to have one EgressIP object match
// all "blue" pods in namespace A, and a second EgressIP object match all
// "red" pods in namespace A), so continue iterating all EgressIP objects
// before finishing.
egressIPs, err := oc.watchFactory.GetEgressIPs()
if err != nil {
return err
}
for _, egressIP := range egressIPs {
namespaceSelector, _ := metav1.LabelSelectorAsSelector(&egressIP.Spec.NamespaceSelector)
if namespaceSelector.Matches(oldLabels) && !namespaceSelector.Matches(newLabels) {
if err := oc.deleteNamespaceEgressIPAssignment(egressIP.Name, egressIP.Status.Items, oldNamespace, egressIP.Spec.PodSelector); err != nil {
return err
}
}
if !namespaceSelector.Matches(oldLabels) && namespaceSelector.Matches(newLabels) {
if err := oc.addNamespaceEgressIPAssignments(egressIP.Name, egressIP.Status.Items, newNamespace, egressIP.Spec.PodSelector); err != nil {
return err
}
}
}
return nil
}
func (oc *Controller) reconcileEgressIPPod(old, new *v1.Pod) (err error) {
oldPod, newPod := &v1.Pod{}, &v1.Pod{}
namespace := &v1.Namespace{}
if old != nil {
oldPod = old
namespace, err = oc.watchFactory.GetNamespace(oldPod.Namespace)
if err != nil {
return err
}
}
if new != nil {
newPod = new
namespace, err = oc.watchFactory.GetNamespace(newPod.Namespace)
if err != nil {
return err
}
}
newPodLabels := labels.Set(newPod.Labels)
oldPodLabels := labels.Set(oldPod.Labels)
// If the namespace the pod belongs to does not have any labels, just return
// it can't match any EgressIP object
namespaceLabels := labels.Set(namespace.Labels)
if namespaceLabels.AsSelector().Empty() {
return nil
}
// Iterate all EgressIPs and check if this pod start/stops matching any and
// add/remove the setup accordingly. Pods should not match multiple EgressIP
// objects: that is considered a user error and is undefined. However, in
// such events iterate all EgressIPs and clean up as much as possible. By
// iterating all EgressIPs we also cover the case where a pod has its labels
// changed from matching one EgressIP to another, ex: EgressIP1 matching
// "blue pods" and EgressIP2 matching "red pods". If a pod with a red label
// gets changed to a blue label: we need add and remove the set up for both
// EgressIP obejcts - since we can't be sure of which EgressIP object we
// process first, always iterate all.
egressIPs, err := oc.watchFactory.GetEgressIPs()
if err != nil {
return err
}
for _, egressIP := range egressIPs {
namespaceSelector, _ := metav1.LabelSelectorAsSelector(&egressIP.Spec.NamespaceSelector)
if namespaceSelector.Matches(namespaceLabels) {
// If the namespace the pod belongs to matches this object then
// check the if there's a podSelector defined on the EgressIP
// object. If there is one: the user intends the EgressIP object to
// match only a subset of pods in the namespace, and we'll have to
// check that. If there is no podSelector: the user intends it to
// match all pods in the namespace.
podSelector, _ := metav1.LabelSelectorAsSelector(&egressIP.Spec.PodSelector)
if !podSelector.Empty() {
newMatches := podSelector.Matches(newPodLabels)
oldMatches := podSelector.Matches(oldPodLabels)
// If the podSelector doesn't match the pod, then continue
// because this EgressIP intends to match other pods in that
// namespace and not this one. Other EgressIP objects might
// match the pod though so we need to check that.
if !newMatches && !oldMatches {
continue
}
// Check if the pod stopped matching. If the pod was deleted,
// "new" will be nil and newPodLabels will not match, so this is
// should cover that case.
if !newMatches && oldMatches {
if err := oc.deletePodEgressIPAssignments(egressIP.Name, egressIP.Status.Items, oldPod); err != nil {
return err
}
continue
}
// If the pod starts matching the podSelector or continues to
// match: add the pod. The reason as to why we need to continue
// adding it if it continues to match, as opposed to once when
// it started matching, is because the pod might not have pod
// IPs assigned at that point and we need to continue trying the
// pod setup for every pod update as to make sure we process the
// pod IP assignment.
if err := oc.addPodEgressIPAssignments(egressIP.Name, egressIP.Status.Items, newPod); err != nil {
return err
}
continue
}
// If the podSelector is empty (i.e: the EgressIP object is intended
// to match all pods in the namespace) and the pod has been deleted:
// "new" will be nil and we need to remove the setup
if new == nil {
if err := oc.deletePodEgressIPAssignments(egressIP.Name, egressIP.Status.Items, oldPod); err != nil {
return err
}
continue
}
// For all else, perform a setup for the pod
if err := oc.addPodEgressIPAssignments(egressIP.Name, egressIP.Status.Items, newPod); err != nil {
return err
}
}
}
return nil
}
func (oc *Controller) reconcileCloudPrivateIPConfig(old, new *ocpcloudnetworkapi.CloudPrivateIPConfig) error {
oldCloudPrivateIPConfig, newCloudPrivateIPConfig := &ocpcloudnetworkapi.CloudPrivateIPConfig{}, &ocpcloudnetworkapi.CloudPrivateIPConfig{}
shouldDelete, shouldAdd := false, false
nodeToDelete := ""
if old != nil {
oldCloudPrivateIPConfig = old
// We need to handle two types of deletes, A) object UPDATE where the
// old egress IP <-> node assignment has been removed. This is indicated
// by the old object having a .status.node set and the new object having
// .status.node empty and the condition on the new being successful. B)
// object DELETE, for which new is nil
shouldDelete = oldCloudPrivateIPConfig.Status.Node != "" || new == nil
// On DELETE we need to delete the .spec.node for the old object
nodeToDelete = oldCloudPrivateIPConfig.Spec.Node
}
if new != nil {
newCloudPrivateIPConfig = new
// We should only proceed to setting things up for objects where the new
// object has the same .spec.node and .status.node, and assignment
// condition being true. This is how the cloud-network-config-controller
// indicates a successful cloud assignment.
shouldAdd = newCloudPrivateIPConfig.Status.Node == newCloudPrivateIPConfig.Spec.Node &&
ocpcloudnetworkapi.CloudPrivateIPConfigConditionType(newCloudPrivateIPConfig.Status.Conditions[0].Type) == ocpcloudnetworkapi.Assigned &&
kapi.ConditionStatus(newCloudPrivateIPConfig.Status.Conditions[0].Status) == kapi.ConditionTrue
// See above explanation for the delete
shouldDelete = shouldDelete && newCloudPrivateIPConfig.Status.Node == "" &&
ocpcloudnetworkapi.CloudPrivateIPConfigConditionType(newCloudPrivateIPConfig.Status.Conditions[0].Type) == ocpcloudnetworkapi.Assigned &&
kapi.ConditionStatus(newCloudPrivateIPConfig.Status.Conditions[0].Status) == kapi.ConditionTrue
// On UPDATE we need to delete the old .status.node
if shouldDelete {
nodeToDelete = oldCloudPrivateIPConfig.Status.Node
}
}
// As opposed to reconcileEgressIP, here we are only interested in changes
// made to the status (since we are the only ones performing the change made
// to the spec). So don't process the object if there is no change made to
// the status.
if reflect.DeepEqual(oldCloudPrivateIPConfig.Status, newCloudPrivateIPConfig.Status) {
return nil
}
if shouldDelete {
// Get the EgressIP owner reference
egressIPName, exists := oldCloudPrivateIPConfig.Annotations[util.OVNEgressIPOwnerRefLabel]
if !exists {
return fmt.Errorf("CloudPrivateIPConfig object: %s is missing the egress IP owner reference annotation", oldCloudPrivateIPConfig.Name)
}
// Check if the egress IP has been deleted or not, if we are processing
// a CloudPrivateIPConfig delete because the EgressIP has been deleted
// then we need to remove the setup made for it, but not update the
// object.
egressIP, err := oc.kube.GetEgressIP(egressIPName)
if err != nil && !apierrors.IsNotFound(err) {
return err
}
egressIPString := cloudPrivateIPConfigNameToIPString(oldCloudPrivateIPConfig.Name)
statusItem := egressipv1.EgressIPStatusItem{
Node: nodeToDelete,
EgressIP: egressIPString,
}
if err := oc.deleteEgressIPAssignments(egressIPName, []egressipv1.EgressIPStatusItem{statusItem}); err != nil {
return err
}
// If the EgressIP has been deleted just return at this point.
if apierrors.IsNotFound(err) {
return nil
}
// Deleting a status here means updating the object with the statuses we
// want to keep
updatedStatus := []egressipv1.EgressIPStatusItem{}
for _, status := range egressIP.Status.Items {
if !reflect.DeepEqual(status, statusItem) {
updatedStatus = append(updatedStatus, status)
}
}
if err := oc.patchReplaceEgressIPStatus(egressIP.Name, updatedStatus); err != nil {
return err
}
}
if shouldAdd {
// Get the EgressIP owner reference
egressIPName, exists := newCloudPrivateIPConfig.Annotations[util.OVNEgressIPOwnerRefLabel]
if !exists {
return fmt.Errorf("CloudPrivateIPConfig object: %s is missing the egress IP owner reference annotation", newCloudPrivateIPConfig.Name)
}
egressIP, err := oc.kube.GetEgressIP(egressIPName)
if err != nil {
return err
}
egressIPString := cloudPrivateIPConfigNameToIPString(newCloudPrivateIPConfig.Name)
statusItem := egressipv1.EgressIPStatusItem{
Node: newCloudPrivateIPConfig.Status.Node,
EgressIP: egressIPString,
}
if err := oc.addEgressIPAssignments(egressIP.Name, []egressipv1.EgressIPStatusItem{statusItem}, egressIP.Spec.NamespaceSelector, egressIP.Spec.PodSelector); err != nil {
return err
}
// Guard against performing the same assignment twice, which might
// happen when multiple updates come in on the same object.
hasStatus := false
for _, status := range egressIP.Status.Items {
if reflect.DeepEqual(status, statusItem) {
hasStatus = true
break
}
}
if !hasStatus {
statusToKeep := append(egressIP.Status.Items, statusItem)
if err := oc.patchReplaceEgressIPStatus(egressIP.Name, statusToKeep); err != nil {
return err
}
}
}
return nil
}
type cloudPrivateIPConfigOp struct {
toAdd string
toDelete string
}
// executeCloudPrivateIPConfigChange computes a diff between what needs to be
// assigned/removed and executes the object modification afterwards.
// Specifically: if one egress IP is moved from nodeA to nodeB, we actually care
// about an update on the CloudPrivateIPConfig object represented by that egress
// IP, cloudPrivateIPConfigOp is a helper used to determine that sort of
// operations from toAssign/toRemove
func (oc *Controller) executeCloudPrivateIPConfigChange(egressIPName string, toAssign, toRemove []egressipv1.EgressIPStatusItem) error {
ops := make(map[string]*cloudPrivateIPConfigOp, len(toAssign)+len(toRemove))
for _, assignment := range toAssign {
ops[assignment.EgressIP] = &cloudPrivateIPConfigOp{
toAdd: assignment.Node,
}
}
for _, removal := range toRemove {
if op, exists := ops[removal.EgressIP]; exists {
op.toDelete = removal.Node
} else {
ops[removal.EgressIP] = &cloudPrivateIPConfigOp{
toDelete: removal.Node,
}
}
}
return oc.executeCloudPrivateIPConfigOps(egressIPName, ops)
}
func (oc *Controller) executeCloudPrivateIPConfigOps(egressIPName string, ops map[string]*cloudPrivateIPConfigOp) error {
for egressIP, op := range ops {
cloudPrivateIPConfigName := ipStringToCloudPrivateIPConfigName(egressIP)
cloudPrivateIPConfig, err := oc.watchFactory.GetCloudPrivateIPConfig(cloudPrivateIPConfigName)
// toAdd and toDelete is non-empty, this indicates an UPDATE for which
// the object **must** exist, if not: that's an error.
if op.toAdd != "" && op.toDelete != "" {
if err != nil {
return fmt.Errorf("cloud update request failed for CloudPrivateIPConfig: %s, could not get item, err: %v", cloudPrivateIPConfigName, err)
}
cloudPrivateIPConfig.Spec.Node = op.toAdd
if _, err := oc.kube.UpdateCloudPrivateIPConfig(cloudPrivateIPConfig); err != nil {
eIPRef := kapi.ObjectReference{
Kind: "EgressIP",
Name: egressIPName,
}
oc.recorder.Eventf(&eIPRef, kapi.EventTypeWarning, "CloudUpdateFailed", "egress IP: %s for object EgressIP: %s could not be updated, err: %v", egressIP, egressIPName, err)
return fmt.Errorf("cloud update request failed for CloudPrivateIPConfig: %s, err: %v", cloudPrivateIPConfigName, err)
}
// toAdd is non-empty, this indicates an ADD for which
// the object **must not** exist, if not: that's an error.
} else if op.toAdd != "" {
if err == nil {
return fmt.Errorf("cloud create request failed for CloudPrivateIPConfig: %s, err: item exists", cloudPrivateIPConfigName)
}
cloudPrivateIPConfig := ocpcloudnetworkapi.CloudPrivateIPConfig{
ObjectMeta: metav1.ObjectMeta{
Name: cloudPrivateIPConfigName,
Annotations: map[string]string{
util.OVNEgressIPOwnerRefLabel: egressIPName,
},
},
Spec: ocpcloudnetworkapi.CloudPrivateIPConfigSpec{
Node: op.toAdd,
},
}
if _, err := oc.kube.CreateCloudPrivateIPConfig(&cloudPrivateIPConfig); err != nil {
eIPRef := kapi.ObjectReference{
Kind: "EgressIP",
Name: egressIPName,
}
oc.recorder.Eventf(&eIPRef, kapi.EventTypeWarning, "CloudAssignmentFailed", "egress IP: %s for object EgressIP: %s could not be created, err: %v", egressIP, egressIPName, err)
return fmt.Errorf("cloud add request failed for CloudPrivateIPConfig: %s, err: %v", cloudPrivateIPConfigName, err)
}
// toDelete is non-empty, this indicates an DELETE for which
// the object **must** exist, if not: that's an error.
} else if op.toDelete != "" {
if err != nil {
return fmt.Errorf("cloud deletion request failed for CloudPrivateIPConfig: %s, could not get item, err: %v", cloudPrivateIPConfigName, err)
}
if err := oc.kube.DeleteCloudPrivateIPConfig(cloudPrivateIPConfigName); err != nil {
eIPRef := kapi.ObjectReference{
Kind: "EgressIP",
Name: egressIPName,
}
oc.recorder.Eventf(&eIPRef, kapi.EventTypeWarning, "CloudDeletionFailed", "egress IP: %s for object EgressIP: %s could not be deleted, err: %v", egressIP, egressIPName, err)
return fmt.Errorf("cloud deletion request failed for CloudPrivateIPConfig: %s, err: %v", cloudPrivateIPConfigName, err)
}
}
}
return nil
}
func (oc *Controller) validateEgressIPSpec(name string, egressIPs []string) (sets.String, error) {
validatedEgressIPs := sets.NewString()
for _, egressIP := range egressIPs {
ip := net.ParseIP(egressIP)
if ip == nil {
eIPRef := kapi.ObjectReference{
Kind: "EgressIP",
Name: name,
}
oc.recorder.Eventf(&eIPRef, kapi.EventTypeWarning, "InvalidEgressIP", "egress IP: %s for object EgressIP: %s is not a valid IP address", egressIP, name)
return nil, fmt.Errorf("unable to parse provided EgressIP: %s, invalid", egressIP)
}
validatedEgressIPs.Insert(ip.String())
}
return validatedEgressIPs, nil
}
// validateEgressIPStatus validates if the statuses are valid given what the
// cache knows about all egress nodes. WatchEgressNodes is initialized before
// any other egress IP handler, so te cache should be warm and correct once we
// start going this.
func (oc *Controller) validateEgressIPStatus(name string, items []egressipv1.EgressIPStatusItem) (map[egressipv1.EgressIPStatusItem]string, map[egressipv1.EgressIPStatusItem]string) {
oc.eIPC.allocator.Lock()
defer oc.eIPC.allocator.Unlock()
valid, invalid := make(map[egressipv1.EgressIPStatusItem]string), make(map[egressipv1.EgressIPStatusItem]string)
for _, eIPStatus := range items {
validAssignment := true
eNode, exists := oc.eIPC.allocator.cache[eIPStatus.Node]
if !exists {
klog.Errorf("Allocator error: EgressIP: %s claims to have an allocation on a node which is unassignable for egress IP: %s", name, eIPStatus.Node)
validAssignment = false
} else {
if eNode.getAllocationCountForEgressIP(name) > 1 {
klog.Errorf("Allocator error: EgressIP: %s claims multiple egress IPs on same node: %s, will attempt rebalancing", name, eIPStatus.Node)
validAssignment = false
}
if !eNode.isEgressAssignable {
klog.Errorf("Allocator error: EgressIP: %s assigned to node: %s which does not have egress label, will attempt rebalancing", name, eIPStatus.Node)
validAssignment = false
}
if !eNode.isReachable {
klog.Errorf("Allocator error: EgressIP: %s assigned to node: %s which is not reachable, will attempt rebalancing", name, eIPStatus.Node)
validAssignment = false
}
if !eNode.isReady {
klog.Errorf("Allocator error: EgressIP: %s assigned to node: %s which is not ready, will attempt rebalancing", name, eIPStatus.Node)
validAssignment = false
}
ip := net.ParseIP(eIPStatus.EgressIP)
if ip == nil {
klog.Errorf("Allocator error: EgressIP allocation contains unparsable IP address: %s", eIPStatus.EgressIP)
validAssignment = false
}
if node := oc.isAnyClusterNodeIP(ip); node != nil {
klog.Errorf("Allocator error: EgressIP allocation: %s is the IP of node: %s ", ip.String(), node.name)
validAssignment = false
}
if utilnet.IsIPv6(ip) && eNode.egressIPConfig.V6.Net != nil {
if !eNode.egressIPConfig.V6.Net.Contains(ip) {
klog.Errorf("Allocator error: EgressIP allocation: %s on subnet: %s which cannot host it", ip.String(), eNode.egressIPConfig.V4.Net.String())
validAssignment = false
}
} else if !utilnet.IsIPv6(ip) && eNode.egressIPConfig.V4.Net != nil {
if !eNode.egressIPConfig.V4.Net.Contains(ip) {
klog.Errorf("Allocator error: EgressIP allocation: %s on subnet: %s which cannot host it", ip.String(), eNode.egressIPConfig.V4.Net.String())
validAssignment = false
}
} else {
klog.Errorf("Allocator error: EgressIP allocation on node: %s which does not support its IP protocol version", eIPStatus.Node)
validAssignment = false
}
}
if validAssignment {
valid[eIPStatus] = ""
} else {
invalid[eIPStatus] = ""
}
}
return valid, invalid
}
// addAllocatorEgressIPAssignments adds the allocation to the cache, so that
// they are tracked during the life-cycle of ovnkube-master
func (oc *Controller) addAllocatorEgressIPAssignments(name string, statusAssignments []egressipv1.EgressIPStatusItem) {
oc.eIPC.allocator.Lock()
defer oc.eIPC.allocator.Unlock()
for _, status := range statusAssignments {
if eNode, exists := oc.eIPC.allocator.cache[status.Node]; exists {
eNode.allocations[status.EgressIP] = name
}
}
}
func (oc *Controller) addEgressIPAssignments(name string, statusAssignments []egressipv1.EgressIPStatusItem, namespaceSelector, podSelector metav1.LabelSelector) error {
namespaces, err := oc.watchFactory.GetNamespacesBySelector(namespaceSelector)
if err != nil {
return err
}
for _, namespace := range namespaces {
if err := oc.addNamespaceEgressIPAssignments(name, statusAssignments, namespace, podSelector); err != nil {
return err
}
}
return nil
}
func (oc *Controller) addNamespaceEgressIPAssignments(name string, statusAssignments []egressipv1.EgressIPStatusItem, namespace *kapi.Namespace, podSelector metav1.LabelSelector) error {
var pods []*kapi.Pod
var err error
selector, _ := metav1.LabelSelectorAsSelector(&podSelector)
if !selector.Empty() {
pods, err = oc.watchFactory.GetPodsBySelector(namespace.Name, podSelector)
if err != nil {
return err
}
} else {
pods, err = oc.watchFactory.GetPods(namespace.Name)
if err != nil {
return err
}
}
for _, pod := range pods {
if err := oc.addPodEgressIPAssignments(name, statusAssignments, pod); err != nil {
return err
}
}
return nil
}
// addPodEgressIPAssignments tracks the setup made for each egress IP matching
// pod w.r.t to each status. This is mainly done to avoid a lot of duplicated
// work on ovnkube-master restarts when all egress IP handlers will most likely
// match and perform the setup for the same pod and status multiple times over.
func (oc *Controller) addPodEgressIPAssignments(name string, statusAssignments []egressipv1.EgressIPStatusItem, pod *kapi.Pod) error {
oc.eIPC.podAssignmentMutex.Lock()
defer oc.eIPC.podAssignmentMutex.Unlock()
// If statusAssignments is empty just return, not doing this will delete the
// external GW set up, even though there might be no egress IP set up to
// perform.
if len(statusAssignments) == 0 {
return nil
}
var remainingAssignments []egressipv1.EgressIPStatusItem
podKey := getPodKey(pod)
podState, exists := oc.eIPC.podAssignment[podKey]
if !exists {
remainingAssignments = statusAssignments
// Retrieve the pod's networking configuration from the
// logicalPortCache. The reason for doing this: a) only normal network
// pods are placed in this cache, b) once the pod is placed here we know
// addLogicalPort has finished successfully setting up networking for
// the pod, so we can proceed with retrieving its IP and deleting the
// external GW configuration created in addLogicalPort for the pod.
logicalPort, err := oc.logicalPortCache.get(util.GetLogicalPortName(pod.Namespace, pod.Name))
if err != nil {
return nil
}
podState = &podAssignmentState{
egressStatuses: make(map[egressipv1.EgressIPStatusItem]string),
podIPs: logicalPort.ips,
}
oc.eIPC.podAssignment[podKey] = podState
} else {
for _, status := range statusAssignments {
if _, exists := podState.egressStatuses[status]; !exists {
remainingAssignments = append(remainingAssignments, status)
}
}
}
for _, status := range remainingAssignments {
klog.V(5).Infof("Adding pod egress IP status: %v for EgressIP: %s and pod: %s/%s", status, name, pod.Name, pod.Namespace)
if err := oc.eIPC.addPodEgressIPAssignment(name, status, pod, podState.podIPs); err != nil {
return err
}
podState.egressStatuses[status] = ""
}
return nil
}
// deleteAllocatorEgressIPAssignments deletes the allocation as to keep the
// cache state correct, also see addAllocatorEgressIPAssignments
func (oc *Controller) deleteAllocatorEgressIPAssignments(statusAssignments []egressipv1.EgressIPStatusItem) {
oc.eIPC.allocator.Lock()
defer oc.eIPC.allocator.Unlock()
for _, status := range statusAssignments {
if eNode, exists := oc.eIPC.allocator.cache[status.Node]; exists {
delete(eNode.allocations, status.EgressIP)
}
}
}
// deleteEgressIPAssignments performs a full egress IP setup deletion on a per
// (egress IP name - status) basis. The idea is thus to list the full content of
// the NB DB for that egress IP object and delete everything which match the
// status. We also need to update the podAssignment cache and finally re-add the
// external GW setup in case the pod still exists.
func (oc *Controller) deleteEgressIPAssignments(name string, statusesToRemove []egressipv1.EgressIPStatusItem) error {
oc.eIPC.podAssignmentMutex.Lock()
defer oc.eIPC.podAssignmentMutex.Unlock()
for _, statusToRemove := range statusesToRemove {
klog.V(5).Infof("Deleting pod egress IP status: %v for EgressIP: %s", statusToRemove, name)
if err := oc.eIPC.deleteEgressIPStatusSetup(name, statusToRemove); err != nil {
return err
}
for podKey, podStatus := range oc.eIPC.podAssignment {
delete(podStatus.egressStatuses, statusToRemove)
podNamespace, podName := getPodNamespaceAndNameFromKey(podKey)
if err := oc.eIPC.addPerPodGRSNAT(podNamespace, podName, podStatus.podIPs); err != nil {
return err
}
delete(oc.eIPC.podAssignment, podKey)
}
}
return nil
}
func (oc *Controller) deleteNamespaceEgressIPAssignment(name string, statusAssignments []egressipv1.EgressIPStatusItem, namespace *kapi.Namespace, podSelector metav1.LabelSelector) error {
var pods []*kapi.Pod
var err error
selector, _ := metav1.LabelSelectorAsSelector(&podSelector)
if !selector.Empty() {
pods, err = oc.watchFactory.GetPodsBySelector(namespace.Name, podSelector)
if err != nil {
return err
}
} else {
pods, err = oc.watchFactory.GetPods(namespace.Name)
if err != nil {
return err
}
}
for _, pod := range pods {
if err := oc.deletePodEgressIPAssignments(name, statusAssignments, pod); err != nil {
return err
}
}
return nil
}
func (oc *Controller) deletePodEgressIPAssignments(name string, statusesToRemove []egressipv1.EgressIPStatusItem, pod *kapi.Pod) error {
oc.eIPC.podAssignmentMutex.Lock()
defer oc.eIPC.podAssignmentMutex.Unlock()
podKey := getPodKey(pod)
podStatus, exists := oc.eIPC.podAssignment[podKey]
if !exists {
return nil
}
for _, statusToRemove := range statusesToRemove {
klog.V(5).Infof("Deleting pod egress IP status: %v for EgressIP: %s and pod: %s/%s", statusToRemove, name, pod.Name, pod.Namespace)
if err := oc.eIPC.deletePodEgressIPAssignment(name, statusToRemove, podStatus.podIPs); err != nil {
return err
}
delete(podStatus.egressStatuses, statusToRemove)
}
if err := oc.eIPC.addPerPodGRSNAT(pod.Namespace, pod.Name, podStatus.podIPs); err != nil {
return err
}
// Delete the key if there are no more status assignments to keep
// for the pod.
delete(oc.eIPC.podAssignment, podKey)
return nil
}
func (oc *Controller) isEgressNodeReady(egressNode *kapi.Node) bool {
for _, condition := range egressNode.Status.Conditions {
if condition.Type == v1.NodeReady {
return condition.Status == v1.ConditionTrue
}
}
return false
}
func (oc *Controller) isEgressNodeReachable(egressNode *kapi.Node) bool {
oc.eIPC.allocator.Lock()
defer oc.eIPC.allocator.Unlock()
if eNode, exists := oc.eIPC.allocator.cache[egressNode.Name]; exists {
return eNode.isReachable || oc.isReachable(eNode)
}
return false
}
type egressIPCacheEntry struct {
podIPs sets.String
gatewayRouterIPs sets.String
egressIPs sets.String
}
func (oc *Controller) syncEgressIPs(eIPs []interface{}) {
// This part will take of syncing stale data which we might have in OVN if
// there's no ovnkube-master running for a while, while there are changes to
// pods/egress IPs.
// It will sync:
// - Egress IPs which have been deleted while ovnkube-master was down
// - pods/namespaces which have stopped matching on egress IPs while
// ovnkube-master was down