-
Notifications
You must be signed in to change notification settings - Fork 333
/
egressip.go
1516 lines (1432 loc) · 56.2 KB
/
egressip.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package egressip
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/netip"
"strings"
"sync"
"time"
ovnconfig "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
eipv1 "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1"
egressipinformer "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/informers/externalversions/egressip/v1"
egressiplisters "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/crd/egressip/v1/apis/listers/egressip/v1"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/iprulemanager"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/iptables"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/linkmanager"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/routemanager"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/syncmap"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
ktypes "k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilnet "k8s.io/utils/net"
"github.com/gaissmai/cidrtree"
"github.com/vishvananda/netlink"
)
const (
rulePriority = 6000 // the priority of the ip routing rules created by the controller. Egress Service priority is 5000.
ruleFwMarkPriority = 5999 // the priority of the ip routing rules for LGW mode when we want to skip processing eip ip rules because dst is a node ip. Pkt will be fw marked with 1008.
routingTableIDStart = 1000
chainName = "OVN-KUBE-EGRESS-IP-MULTI-NIC"
iptChainName = utiliptables.Chain(chainName)
maxRetries = 15
)
var (
_, defaultV4AnyCIDR, _ = net.ParseCIDR("0.0.0.0/0")
_, defaultV6AnyCIDR, _ = net.ParseCIDR("::/0")
_, linkLocalCIDR, _ = net.ParseCIDR("fe80::/64")
iptJumpRule = iptables.RuleArg{Args: []string{"-j", chainName}}
iptSaveMarkRule = iptables.RuleArg{Args: []string{"-m", "mark", "--mark", "1008", "-j", "CONNMARK", "--save-mark"}} // 1008 is pkt mark for node ip
iptRestoreMarkRule = iptables.RuleArg{Args: []string{"-m", "mark", "--mark", "0", "-j", "CONNMARK", "--restore-mark"}}
)
// eIPConfig represents exactly one EgressIP IP. It contains non-pod related EIP configuration information only.
type eIPConfig struct {
// EgressIP IP
addr *netlink.Addr
routes []netlink.Route
}
func newEIPConfig() *eIPConfig {
return &eIPConfig{}
}
// state contains current state for an EgressIP as it was applied.
type state struct {
// namespaceName -> pod ns/name -> pod IP configuration
namespacesWithPodIPConfigs map[string]map[ktypes.NamespacedName]*podIPConfigList
// eIPConfig IP contains all applied configuration for a given EgressIP IP. It does not contain any pod specific config
eIPConfig *eIPConfig
}
func newState() *state {
return &state{
namespacesWithPodIPConfigs: map[string]map[ktypes.NamespacedName]*podIPConfigList{},
eIPConfig: newEIPConfig(),
}
}
// config is used to update an EIP to the latest state, it stores all required information for an
// update.
type config struct {
// namespaceName -> pod ns/name -> pod IP configuration
namespacesWithPodIPConfigs map[string]map[ktypes.NamespacedName]*podIPConfigList
// eIPConfig IP contains all applied configuration for a given EgressIP IP. It does not contain any pod specific config
eIPConfig *eIPConfig
}
// referencedObjects is used by pod and namespace handlers to find what is selected for an EgressIP
type referencedObjects struct {
eIPNamespaces sets.Set[string]
eIPPods sets.Set[ktypes.NamespacedName]
}
// Controller implement Egress IP for secondary host networks
type Controller struct {
eIPLister egressiplisters.EgressIPLister
eIPInformer cache.SharedIndexInformer
eIPQueue workqueue.RateLimitingInterface
nodeLister corelisters.NodeLister
namespaceLister corelisters.NamespaceLister
namespaceInformer cache.SharedIndexInformer
namespaceQueue workqueue.RateLimitingInterface
podLister corelisters.PodLister
podInformer cache.SharedIndexInformer
podQueue workqueue.RateLimitingInterface
// cache is a cache of configuration states for EIPs, key is EgressIP Name.
cache *syncmap.SyncMap[*state]
// referencedObjects should only be accessed with referencedObjectsLock
referencedObjectsLock sync.RWMutex
// referencedObjects is a cache of objects that every EIP has selected for its config.
// With this cache namespace and pod handlers may fetch affected EIP config.
// key is EIP name.
referencedObjects map[string]*referencedObjects
routeManager *routemanager.Controller
linkManager *linkmanager.Controller
ruleManager *iprulemanager.Controller
iptablesManager *iptables.Controller
kube kube.Interface
nodeName string
v4 bool
v6 bool
}
func NewController(k kube.Interface, eIPInformer egressipinformer.EgressIPInformer, nodeInformer cache.SharedIndexInformer, namespaceInformer coreinformers.NamespaceInformer,
podInformer coreinformers.PodInformer, routeManager *routemanager.Controller, v4, v6 bool, nodeName string, linkManager *linkmanager.Controller) (*Controller, error) {
c := &Controller{
eIPLister: eIPInformer.Lister(),
eIPInformer: eIPInformer.Informer(),
eIPQueue: workqueue.NewNamedRateLimitingQueue(
workqueue.NewItemFastSlowRateLimiter(time.Second, 5*time.Second, 5),
"eipeip",
),
nodeLister: corelisters.NewNodeLister(nodeInformer.GetIndexer()),
namespaceLister: namespaceInformer.Lister(),
namespaceInformer: namespaceInformer.Informer(),
namespaceQueue: workqueue.NewNamedRateLimitingQueue(
workqueue.NewItemFastSlowRateLimiter(time.Second, 5*time.Second, 5),
"eipnamespace",
),
podLister: podInformer.Lister(),
podInformer: podInformer.Informer(),
podQueue: workqueue.NewNamedRateLimitingQueue(
workqueue.NewItemFastSlowRateLimiter(time.Second, 5*time.Second, 5),
"eippods",
),
cache: syncmap.NewSyncMap[*state](),
referencedObjectsLock: sync.RWMutex{},
referencedObjects: map[string]*referencedObjects{},
routeManager: routeManager,
linkManager: linkManager,
ruleManager: iprulemanager.NewController(v4, v6),
iptablesManager: iptables.NewController(),
kube: k,
nodeName: nodeName,
v4: v4,
v6: v6,
}
return c, nil
}
// Run starts the Egress IP that is hosted in secondary host networks. Changes to this function
// need to be mirrored in test function setupFakeTestNode
func (c *Controller) Run(stopCh <-chan struct{}, wg *sync.WaitGroup, threads int) error {
klog.Info("Starting Egress IP Controller")
_, err := c.namespaceInformer.AddEventHandler(
factory.WithUpdateHandlingForObjReplace(cache.ResourceEventHandlerFuncs{
AddFunc: c.onNamespaceAdd,
UpdateFunc: c.onNamespaceUpdate,
DeleteFunc: c.onNamespaceDelete,
}))
if err != nil {
return err
}
_, err = c.podInformer.AddEventHandler(
factory.WithUpdateHandlingForObjReplace(cache.ResourceEventHandlerFuncs{
AddFunc: c.onPodAdd,
UpdateFunc: c.onPodUpdate,
DeleteFunc: c.onPodDelete,
}))
if err != nil {
return err
}
_, err = c.eIPInformer.AddEventHandler(
factory.WithUpdateHandlingForObjReplace(cache.ResourceEventHandlerFuncs{
AddFunc: c.onEIPAdd,
UpdateFunc: c.onEIPUpdate,
DeleteFunc: c.onEIPDelete,
}))
if err != nil {
return err
}
syncWg := &sync.WaitGroup{}
var syncErrs []error
for _, se := range []struct {
resourceName string
syncFn cache.InformerSynced
}{
{"eipeip", c.eIPInformer.HasSynced},
{"eipnamespace", c.namespaceInformer.HasSynced},
{"eippod", c.podInformer.HasSynced},
} {
syncWg.Add(1)
go func(resourceName string, syncFn cache.InformerSynced) {
defer syncWg.Done()
if !util.WaitForInformerCacheSyncWithTimeout(resourceName, stopCh, syncFn) {
syncErrs = append(syncErrs, fmt.Errorf("timed out waiting for %q caches to sync", resourceName))
}
}(se.resourceName, se.syncFn)
}
syncWg.Wait()
if len(syncErrs) != 0 {
return kerrors.NewAggregate(syncErrs)
}
wg.Add(1)
go func() {
c.iptablesManager.Run(stopCh, 6*time.Minute)
wg.Done()
}()
wg.Add(1)
go func() {
c.ruleManager.Run(stopCh, 5*time.Minute)
wg.Done()
}()
// Tell rule manager and IPTable manager that we want to fully own all rules at a particular priority/table.
// Any rules created with this priority or in that particular IPTables chain, that we do not recognize it, will be
// removed by relevant manager.
if err := c.ruleManager.OwnPriority(rulePriority); err != nil {
return fmt.Errorf("failed to own priority %d for IP rules: %v", rulePriority, err)
}
if c.v4 {
if err := c.iptablesManager.OwnChain(utiliptables.TableNAT, iptChainName, utiliptables.ProtocolIPv4); err != nil {
return fmt.Errorf("unable to own chain %s: %v", iptChainName, err)
}
if err = c.iptablesManager.EnsureRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, utiliptables.ProtocolIPv4, iptJumpRule); err != nil {
return fmt.Errorf("failed to create rule in chain %s to jump to chain %s: %v", utiliptables.ChainPostrouting, iptChainName, err)
}
// for LGW mode, we need to restore pkt mark from conntrack in-order for RP filtering not to fail for return packets from cluster nodes
if ovnconfig.Gateway.Mode == ovnconfig.GatewayModeLocal {
if err = c.iptablesManager.EnsureRule(utiliptables.TableMangle, utiliptables.ChainPrerouting, utiliptables.ProtocolIPv4, iptRestoreMarkRule); err != nil {
return fmt.Errorf("failed to create rule in chain %s to restore pkt marking: %v", utiliptables.ChainPrerouting, err)
}
if err = c.iptablesManager.EnsureRule(utiliptables.TableMangle, utiliptables.ChainPrerouting, utiliptables.ProtocolIPv4, iptSaveMarkRule); err != nil {
return fmt.Errorf("failed to create rule in chain %s to save pkt marking: %v", utiliptables.ChainPrerouting, err)
}
// If dst is a node IP, use main routing table and skip EIP routing tables
if err = c.ruleManager.Add(getNodeIPFwMarkIPRule(netlink.FAMILY_V4)); err != nil {
return fmt.Errorf("failed to create IPv4 rule for node IPs: %v", err)
}
// The fwmark of the packet is included in reverse path route lookup. This permits rp_filter to function when the fwmark is
// used for routing traffic in both directions.
stdout, _, err := util.RunSysctl("-w", "net.ipv4.conf.all.src_valid_mark=1")
if err != nil || stdout != "net.ipv4.conf.all.src_valid_mark = 1" {
return fmt.Errorf("failed to set sysctl net.ipv4.conf.all.src_valid_mark to 1")
}
}
}
if c.v6 {
if err := c.iptablesManager.OwnChain(utiliptables.TableNAT, iptChainName, utiliptables.ProtocolIPv6); err != nil {
return fmt.Errorf("unable to own chain %s: %v", iptChainName, err)
}
if err = c.iptablesManager.EnsureRule(utiliptables.TableNAT, utiliptables.ChainPostrouting, utiliptables.ProtocolIPv6, iptJumpRule); err != nil {
return fmt.Errorf("unable to ensure iptables rules for jump rule: %v", err)
}
// for LGW mode, we need to restore pkt mark from conntrack in-order for RP filtering not to fail for return packets from cluster nodes
if ovnconfig.Gateway.Mode == ovnconfig.GatewayModeLocal {
if err = c.iptablesManager.EnsureRule(utiliptables.TableMangle, utiliptables.ChainPrerouting, utiliptables.ProtocolIPv6, iptRestoreMarkRule); err != nil {
return fmt.Errorf("failed to create rule in chain %s to restore pkt marking: %v", utiliptables.ChainPrerouting, err)
}
if err = c.iptablesManager.EnsureRule(utiliptables.TableMangle, utiliptables.ChainPrerouting, utiliptables.ProtocolIPv6, iptSaveMarkRule); err != nil {
return fmt.Errorf("failed to create rule in chain %s to save pkt marking: %v", utiliptables.ChainPrerouting, err)
}
// If dst is a node IP, use main routing table and skip EIP routing tables
// src_valid_mark is not applicable to ipv6
if err = c.ruleManager.Add(getNodeIPFwMarkIPRule(netlink.FAMILY_V6)); err != nil {
return fmt.Errorf("failed to create IPv6 rule for node IPs: %v", err)
}
}
}
err = wait.PollUntilContextTimeout(wait.ContextForChannel(stopCh), 1*time.Second, 10*time.Second, true,
func(ctx context.Context) (done bool, err error) {
if err := c.migrateFromAddrLabelToAnnotation(); err != nil {
klog.Errorf("Failed to migrate from managing EgressIP addresses using address labels to a node annotation - Retrying: %v", err)
return false, err
}
return true, nil
})
if err != nil {
return fmt.Errorf("failed to run EgressIP controller because migration from using address labels to a node annotation failed: %v", err)
}
err = wait.PollUntilContextTimeout(wait.ContextForChannel(stopCh), 1*time.Second, 10*time.Second, true,
func(ctx context.Context) (done bool, err error) {
if err := c.repairNode(); err != nil {
klog.Errorf("Failed to repair node: '%v' - Retrying", err)
return false, err
}
return true, nil
})
if err != nil {
return fmt.Errorf("failed to run EgressIP controller because repairing node failed: %v", err)
}
for i := 0; i < threads; i++ {
for _, workerFn := range []func(*sync.WaitGroup){
c.runEIPWorker,
c.runPodWorker,
c.runNamespaceWorker,
} {
wg.Add(1)
go func(fn func(*sync.WaitGroup)) {
defer wg.Done()
wait.Until(func() {
fn(wg)
}, time.Second, stopCh)
}(workerFn)
}
}
wg.Add(1)
go func() {
defer wg.Done()
// wait until we're told to stop
<-stopCh
c.eIPQueue.ShutDown()
c.podQueue.ShutDown()
c.namespaceQueue.ShutDown()
}()
return nil
}
func (c *Controller) onEIPAdd(obj interface{}) {
_, ok := obj.(*eipv1.EgressIP)
if !ok {
utilruntime.HandleError(fmt.Errorf("expecting %T but received %T", &eipv1.EgressIP{}, obj))
return
}
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
klog.V(4).Infof("Adding Egress IP %s", key)
c.eIPQueue.Add(key)
}
func (c *Controller) onEIPUpdate(oldObj, newObj interface{}) {
oldEIP, ok := oldObj.(*eipv1.EgressIP)
if !ok {
utilruntime.HandleError(fmt.Errorf("expecting %T but received %T", &eipv1.EgressIP{}, oldObj))
return
}
newEIP, ok := newObj.(*eipv1.EgressIP)
if !ok {
utilruntime.HandleError(fmt.Errorf("expecting %T but received %T", &eipv1.EgressIP{}, newObj))
return
}
if oldEIP == nil || newEIP == nil {
utilruntime.HandleError(errors.New("invalid Egress IP policy to onEIPUpdate()"))
return
}
if oldEIP.Generation == newEIP.Generation ||
!newEIP.GetDeletionTimestamp().IsZero() {
return
}
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", newObj, err))
}
c.eIPQueue.Add(key)
}
func (c *Controller) onEIPDelete(obj interface{}) {
_, ok := obj.(*eipv1.EgressIP)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tomstone %#v", obj))
return
}
_, ok = tombstone.Obj.(*eipv1.EgressIP)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not an Egress IP object %#v", tombstone.Obj))
return
}
}
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
c.eIPQueue.Add(key)
}
func (c *Controller) runEIPWorker(wg *sync.WaitGroup) {
for c.processNextEIPWorkItem(wg) {
}
}
func (c *Controller) processNextEIPWorkItem(wg *sync.WaitGroup) bool {
wg.Add(1)
defer wg.Done()
key, shutdown := c.eIPQueue.Get()
if shutdown {
return false
}
defer c.eIPQueue.Done(key)
klog.V(4).Infof("Processing Egress IP %s", key)
if err := c.syncEIP(key.(string)); err != nil {
if c.eIPQueue.NumRequeues(key) < maxRetries {
klog.V(4).Infof("Error found while processing Egress IP %s: %v", key, err)
c.eIPQueue.AddRateLimited(key)
return true
}
klog.Errorf("Dropping Egress IP %q out of the queue: %v", key, err)
utilruntime.HandleError(err)
}
c.eIPQueue.Forget(key)
return true
}
func (c *Controller) syncEIP(eIPName string) error {
// 1. Lock on the existing 'state', as we are going to use it for cleanup and update.
// 2. Build latest 'config'. This includes listing referenced namespaces and pods.
// To make sure there is no race with pod and namespace handlers, referencedObjects is acquired
// before listing objects, and released when the 'config' is built. At this point namespace and pod
// handler can use referencedObjects to see which objects were considered as related by the handler last time.
// 3. With existing state and newly generated config, we can clean up and apply.
return c.cache.DoWithLock(eIPName, func(eIPName string) error {
informerEIP, err := c.eIPLister.Get(eIPName)
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get Egress IP before sync: %w", err)
}
var update *config
// get updated policy and update policy refs
if apierrors.IsNotFound(err) || (informerEIP != nil && !informerEIP.DeletionTimestamp.IsZero()) {
// EIP deleted
update = nil
c.deleteRefObjects(eIPName)
} else {
update, err = c.getConfigAndUpdateRefs(informerEIP, true)
if err != nil {
return fmt.Errorf("failed to get config and update references for Egress IP %s: %w", eIPName, err)
}
}
existing, found := c.cache.Load(eIPName)
if !found {
if update == nil {
// nothing to do
return nil
}
existing = newState()
c.cache.Store(eIPName, existing)
}
if err = c.updateEIP(existing, update); err != nil {
return fmt.Errorf("failed to update policy from %+v to %+v: %w", existing, update, err)
}
if update == nil {
c.cache.Delete(eIPName)
}
return nil
})
}
// getConfigAndUpdateRefs lists and updates all referenced objects for a given EIP and returns
// config to perform an update.
// This function should be the only one that lists referenced objects, and updates referencedObjects atomically.
func (c *Controller) getConfigAndUpdateRefs(eIP *eipv1.EgressIP, updateRefs bool) (*config, error) {
c.referencedObjectsLock.Lock()
defer c.referencedObjectsLock.Unlock()
eIPConfig, selectedNamespaces, selectedPods, namespacesWithPodIPConfigs, err := c.processEIP(eIP)
if err != nil {
return nil, err
}
if updateRefs {
refObjs := &referencedObjects{
eIPNamespaces: selectedNamespaces,
eIPPods: selectedPods,
}
c.referencedObjects[eIP.Name] = refObjs
}
if eIPConfig == nil || len(namespacesWithPodIPConfigs) == 0 {
return nil, nil
}
return &config{
namespacesWithPodIPConfigs: namespacesWithPodIPConfigs,
eIPConfig: eIPConfig,
}, nil
}
// processEIP attempts to find namespaces and pods that match the EIP selectors and then attempts to find a network
// that can host one of the EIP IPs returning egress IP configuration, selected namespaces and pods
func (c *Controller) processEIP(eip *eipv1.EgressIP) (*eIPConfig, sets.Set[string], sets.Set[ktypes.NamespacedName],
map[string]map[ktypes.NamespacedName]*podIPConfigList, error) {
selectedNamespaces := sets.Set[string]{}
selectedPods := sets.Set[ktypes.NamespacedName]{}
selectedNamespacesPodIPs := map[string]map[ktypes.NamespacedName]*podIPConfigList{}
var eipSpecificConfig *eIPConfig
parsedNodeEIPConfig, err := c.getNodeEgressIPConfig()
if err != nil {
return nil, selectedNamespaces, selectedPods, selectedNamespacesPodIPs,
fmt.Errorf("failed to determine egress IP config for node %s: %w", c.nodeName, err)
}
// max of 1 EIP IP is selected. Return when 1 is found.
for _, status := range eip.Status.Items {
if isValid := isEIPStatusItemValid(status, c.nodeName); !isValid {
continue
}
eIPNet, err := util.GetIPNetFullMask(status.EgressIP)
if err != nil {
return nil, selectedNamespaces, selectedPods, selectedNamespacesPodIPs,
fmt.Errorf("failed to generate mask for EgressIP %s IP %s: %v", eip.Name, status.EgressIP, err)
}
if util.IsOVNNetwork(parsedNodeEIPConfig, eIPNet.IP) {
continue
}
found, link, err := findLinkOnSameNetworkAsIP(eIPNet.IP, c.v4, c.v6)
if err != nil {
return nil, selectedNamespaces, selectedPods, selectedNamespacesPodIPs,
fmt.Errorf("failed to find a network to host EgressIP %s IP %s: %v", eip.Name, status.EgressIP, err)
}
if !found {
continue
}
// namespace selector is mandatory for EIP
namespaces, err := c.listNamespacesBySelector(&eip.Spec.NamespaceSelector)
if err != nil {
return nil, selectedNamespaces, selectedPods, selectedNamespacesPodIPs, fmt.Errorf("failed to list namespaces: %w", err)
}
isEIPV6 := utilnet.IsIPv6(eIPNet.IP)
for _, namespace := range namespaces {
selectedNamespaces.Insert(namespace.Name)
pods, err := c.listPodsByNamespaceAndSelector(namespace.Name, &eip.Spec.PodSelector)
if err != nil {
return nil, selectedNamespaces, selectedPods, selectedNamespacesPodIPs, fmt.Errorf("failed to list pods in namespace %s: %w",
namespace.Name, err)
}
for _, pod := range pods {
// Ignore completed pods, host networked pods, pods not scheduled
if util.PodWantsHostNetwork(pod) || util.PodCompleted(pod) || !util.PodScheduled(pod) {
continue
}
ips, err := util.DefaultNetworkPodIPs(pod)
if err != nil {
return nil, selectedNamespaces, selectedPods, selectedNamespacesPodIPs, fmt.Errorf("failed to get pod ips: %w", err)
}
if len(ips) == 0 {
continue
}
podNamespaceName := ktypes.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
// generate pod specific configuration
if selectedNamespacesPodIPs[namespace.Name] == nil {
selectedNamespacesPodIPs[namespace.Name] = make(map[ktypes.NamespacedName]*podIPConfigList)
}
selectedNamespacesPodIPs[namespace.Name][podNamespaceName] = generatePodConfig(ips, link, eIPNet, isEIPV6)
selectedPods.Insert(podNamespaceName)
}
}
// ensure at least one pod is selected before generating config
if len(selectedNamespacesPodIPs) > 0 {
eipSpecificConfig, err = generateEIPConfig(link, eIPNet, isEIPV6)
if err != nil {
return nil, selectedNamespaces, selectedPods, selectedNamespacesPodIPs,
fmt.Errorf("failed to generate EIP configuration for EgressIP %s IP %s: %v", eip.Name, status.EgressIP, err)
}
}
break
}
return eipSpecificConfig, selectedNamespaces, selectedPods, selectedNamespacesPodIPs, nil
}
func generatePodConfig(podIPs []net.IP, link netlink.Link, eIPNet *net.IPNet, isEIPV6 bool) *podIPConfigList {
newPodIPConfigs := newPodIPConfigList()
for _, podIP := range podIPs {
isPodIPv6 := utilnet.IsIPv6(podIP)
if isPodIPv6 != isEIPV6 {
continue
}
ipConfig := newPodIPConfig()
ipConfig.ipTableRule = generateIPTablesSNATRuleArg(podIP, isPodIPv6, link.Attrs().Name, eIPNet.IP.String())
ipConfig.ipRule = generateIPRule(podIP, isPodIPv6, link.Attrs().Index)
ipConfig.v6 = isPodIPv6
newPodIPConfigs.elems = append(newPodIPConfigs.elems, ipConfig)
}
return newPodIPConfigs
}
// generateEIPConfig generates configuration that isn't related to any pod EIPs to support config of a single EIP
func generateEIPConfig(link netlink.Link, eIPNet *net.IPNet, isEIPV6 bool) (*eIPConfig, error) {
eipConfig := newEIPConfig()
linkRoutes, err := generateRoutesForLink(link, isEIPV6)
if err != nil {
return nil, err
}
eipConfig.routes = linkRoutes
eipConfig.addr = getNetlinkAddress(eIPNet, link.Attrs().Index)
return eipConfig, nil
}
func generateRoutesForLink(link netlink.Link, isV6 bool) ([]netlink.Route, error) {
linkRoutes, err := netlink.RouteList(link, util.GetIPFamily(isV6))
if err != nil {
return nil, fmt.Errorf("failed to get routes for link %s: %v", link.Attrs().Name, err)
}
linkRoutes = ensureAtLeastOneDefaultRoute(linkRoutes, link.Attrs().Index, isV6)
overwriteRoutesTableID(linkRoutes, getRouteTableID(link.Attrs().Index))
return linkRoutes, nil
}
func (c *Controller) deleteRefObjects(name string) {
c.referencedObjectsLock.Lock()
delete(c.referencedObjects, name)
c.referencedObjectsLock.Unlock()
}
// updateEIP reconciles existing state towards update config. If update is nil, delete existing state.
func (c *Controller) updateEIP(existing *state, update *config) error {
// cleanup first
// cleanup pod specific configuration - aka ip rules and iptables
if len(existing.namespacesWithPodIPConfigs) > 0 {
// track which namespaces should be removed from targetNamespaces
var namespacesToDelete []string
for targetNamespace, targetPods := range existing.namespacesWithPodIPConfigs {
// track which pods should be removed from targetPods
var podsToDelete []ktypes.NamespacedName
for podNamespacedName, existingPodConfig := range targetPods {
podIPConfigsToDelete := newPodIPConfigList()
// each pod IP will have its own configuration that needs to be tracked and possibly removed
for _, existingPodIPConfig := range existingPodConfig.elems {
// delete EIP config if:
// 1. EIP deleted or no EIP found
// 3. Is not present in update
// 3. Target pod is not listed in update.targetNamespaces
// 4. Pod IP config has changed
if update == nil || update.namespacesWithPodIPConfigs[targetNamespace][podNamespacedName] == nil ||
// delete if IPs dont match
(update.namespacesWithPodIPConfigs[targetNamespace][podNamespacedName] != nil &&
!update.namespacesWithPodIPConfigs[targetNamespace][podNamespacedName].has(existingPodIPConfig)) {
podIPConfigsToDelete.insert(*existingPodIPConfig)
}
}
if podIPConfigsToDelete.len() > 0 {
for _, podIPConfigToDelete := range podIPConfigsToDelete.elems {
if err := c.deleteIPConfig(podIPConfigToDelete); err != nil {
existingPodConfig.insertOverwriteFailed(*podIPConfigToDelete)
return err
}
existingPodConfig.delete(*podIPConfigToDelete)
}
}
if update == nil || update.namespacesWithPodIPConfigs[targetNamespace][podNamespacedName] == nil {
podsToDelete = append(podsToDelete, podNamespacedName)
}
}
for _, podToDelete := range podsToDelete {
delete(targetPods, podToDelete)
}
if update == nil || update.namespacesWithPodIPConfigs[targetNamespace] == nil {
namespacesToDelete = append(namespacesToDelete, targetNamespace)
}
}
for _, nsToDelete := range namespacesToDelete {
delete(existing.namespacesWithPodIPConfigs, nsToDelete)
}
}
// clean up pod independent configuration first
// if EIP IP has changed and therefore could be hosted by a different interface, remove old EIP
// Delete addresses and routes under the following conditions
// 1. existing contains a non nil IP and update is nil
// 2. existing contains an ip and update contains an ip and update contains an ip different to existing
if (update == nil && existing.eIPConfig != nil && existing.eIPConfig.addr != nil) ||
(update != nil && update.eIPConfig != nil && update.eIPConfig.addr != nil &&
existing.eIPConfig != nil && existing.eIPConfig.addr != nil && !existing.eIPConfig.addr.Equal(*update.eIPConfig.addr)) {
if err := c.linkManager.DelAddress(*existing.eIPConfig.addr); err != nil {
// TODO(mk): if we fail to delete address, handle it
return fmt.Errorf("failed to delete egress IP address %s: %w", existing.eIPConfig.addr.String(), err)
}
if err := c.deleteIPFromAnnotation(existing.eIPConfig.addr.IP.String()); err != nil {
return fmt.Errorf("failed to delete egress IP address %s from annotation: %v", existing.eIPConfig.addr.String(), err)
}
}
// delete stale routes
// existing routes need to be deleted if there's no update and if there's no other active egress IP on this link.
if update == nil && existing.eIPConfig != nil && len(existing.eIPConfig.routes) > 0 && existing.eIPConfig.addr != nil {
// Egress IP for this config and link should already be deleted in steps previously.
// If there is different Egress IP active on this link, we do not want to delete the routes needed for that other egress IP.
ipFamily := util.GetIPFamily(utilnet.IsIPv6(existing.eIPConfig.addr.IP))
assignedAddresses, err := c.getAnnotation()
if err != nil {
return fmt.Errorf("failed to get assigned addresses: %v", err)
}
isEIPOnLink, err := isEgressIPOnLink(existing.eIPConfig.addr.LinkIndex, ipFamily, assignedAddresses)
if err != nil {
return fmt.Errorf("failed to determine if link with index %d hosts an existing Egress IP: %v",
existing.eIPConfig.addr.LinkIndex, err)
}
if !isEIPOnLink {
for _, routeToDelete := range existing.eIPConfig.routes {
c.routeManager.Del(routeToDelete)
}
}
} else if update != nil && update.eIPConfig != nil && len(update.eIPConfig.routes) > 0 &&
existing.eIPConfig != nil && len(existing.eIPConfig.routes) > 0 {
// delete delta between existing and update
routesToDelete := routeDifference(existing.eIPConfig.routes, update.eIPConfig.routes)
for _, routeToDelete := range routesToDelete {
c.routeManager.Del(routeToDelete)
}
}
// apply new changes
if update != nil && update.eIPConfig != nil && update.eIPConfig.addr != nil && len(update.eIPConfig.routes) > 0 {
for updatedTargetNS, updatedTargetPod := range update.namespacesWithPodIPConfigs {
existingNs, found := existing.namespacesWithPodIPConfigs[updatedTargetNS]
if !found {
existingNs = map[ktypes.NamespacedName]*podIPConfigList{}
existing.namespacesWithPodIPConfigs[updatedTargetNS] = existingNs
}
for updatedPodNamespacedName, updatedPodIPConfig := range updatedTargetPod {
existingTargetPodIPConfig, found := existingNs[updatedPodNamespacedName]
if !found {
existingTargetPodIPConfig = newPodIPConfigList()
existingNs[updatedPodNamespacedName] = existingTargetPodIPConfig
}
// applyPodConfig will apply pod specific configuration - ip rules and iptables rules
err := c.applyPodConfig(existingTargetPodIPConfig, updatedPodIPConfig)
if err != nil {
return fmt.Errorf("failed to apply pod %s configuration: %v", updatedPodNamespacedName.String(), err)
}
}
}
if err := c.addIPToAnnotation(update.eIPConfig.addr.IP.String()); err != nil {
return fmt.Errorf("failed to add egress IP address to annotation: %v", err)
}
// TODO(mk): only apply the follow when its new config or when it failed to apply
// Ok to repeat requests to route manager and link manager
if err := c.linkManager.AddAddress(*update.eIPConfig.addr); err != nil {
return fmt.Errorf("failed to add address to link: %v", err)
}
existing.eIPConfig.addr = update.eIPConfig.addr
// route manager manages retry
for _, routeToAdd := range update.eIPConfig.routes {
c.routeManager.Add(routeToAdd)
}
existing.eIPConfig.routes = update.eIPConfig.routes
}
return nil
}
func (c *Controller) deleteIPConfig(podIPConfigToDelete *podIPConfig) error {
if err := c.ruleManager.Delete(podIPConfigToDelete.ipRule); err != nil {
return err
}
if podIPConfigToDelete.v6 {
if err := c.iptablesManager.DeleteRule(utiliptables.TableNAT, iptChainName, utiliptables.ProtocolIPv6,
podIPConfigToDelete.ipTableRule); err != nil {
return err
}
} else {
if err := c.iptablesManager.DeleteRule(utiliptables.TableNAT, iptChainName, utiliptables.ProtocolIPv4,
podIPConfigToDelete.ipTableRule); err != nil {
return err
}
}
return nil
}
func (c *Controller) applyPodConfig(existingPodIPsConfig *podIPConfigList, updatedPodIPsConfig *podIPConfigList) error {
if existingPodIPsConfig == nil {
return fmt.Errorf("unexpected nil existing config")
}
if updatedPodIPsConfig == nil {
return fmt.Errorf("unexpected nil updated config")
}
newPodIPConfigs := newPodIPConfigList()
for _, newConfig := range updatedPodIPsConfig.elems {
if !existingPodIPsConfig.hasWithoutError(newConfig) {
newPodIPConfigs.insert(*newConfig)
}
}
for _, newPodIPConfig := range newPodIPConfigs.elems {
if err := c.ruleManager.Add(newPodIPConfig.ipRule); err != nil {
existingPodIPsConfig.insertOverwriteFailed(*newPodIPConfig)
return err
}
if newPodIPConfig.v6 {
if err := c.iptablesManager.EnsureRule(utiliptables.TableNAT, iptChainName, utiliptables.ProtocolIPv6, newPodIPConfig.ipTableRule); err != nil {
existingPodIPsConfig.insertOverwriteFailed(*newPodIPConfig)
return fmt.Errorf("unable to ensure iptables rules: %v", err)
}
} else {
if err := c.iptablesManager.EnsureRule(utiliptables.TableNAT, iptChainName, utiliptables.ProtocolIPv4, newPodIPConfig.ipTableRule); err != nil {
existingPodIPsConfig.insertOverwriteFailed(*newPodIPConfig)
return fmt.Errorf("failed to ensure rules (%+v) in chain %s: %v", newPodIPConfig.ipTableRule, iptChainName, err)
}
}
existingPodIPsConfig.insertOverwrite(*newPodIPConfig)
}
return nil
}
func (c *Controller) getAllEIPs() ([]*eipv1.EgressIP, error) {
eips, err := c.eIPLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list EgressIPs: %v", err)
}
return eips, nil
}
// addrLink is used to store information for an IP address and its associated link. Only used to implement comparable
// interface because netlink.Addr does not implement comparable
type addrLink struct {
addr string // IP + mask
linkIndex int
}
// repairNode generates whats expected and what is seen on the node and removes any stale configuration. This should be
// called at Controller startup.
func (c *Controller) repairNode() error {
// get address map for each interface -> addresses/mask
// also map address/mask -> interface name
assignedAddr := sets.New[addrLink]()
assignedAddrStrToAddrs := make(map[string]netlink.Addr)
assignedIPRoutes := sets.New[string]()
assignedIPRouteStrToRoutes := make(map[string]netlink.Route)
assignedIPRules := sets.New[string]()
assignedIPRulesStrToRules := make(map[string]netlink.Rule)
assignedIPTableV4Rules := sets.New[string]()
assignedIPTableV6Rules := sets.New[string]()
assignedIPTablesV4StrToRules := make(map[string]iptables.RuleArg)
assignedIPTablesV6StrToRules := make(map[string]iptables.RuleArg)
existingAddrsFromAnnot, err := c.getAnnotation()
if err != nil {
return fmt.Errorf("failed to get annotation: %v", err)
}
links, err := util.GetNetLinkOps().LinkList()
if err != nil {
return fmt.Errorf("failed to list links: %v", err)
}
for _, link := range links {
link := link
linkName := link.Attrs().Name
linkIdx := link.Attrs().Index
addresses, err := util.GetFilteredInterfaceAddrs(link, c.v4, c.v6)
if err != nil {
return fmt.Errorf("unable to get link addresses for link %s: %v", linkName, err)
}
for _, address := range addresses {
if existingAddrsFromAnnot.Has(address.IP.String()) {
addressStr := address.IPNet.String()
assignedAddr.Insert(addrLink{address.IPNet.String(), address.LinkIndex})
assignedAddrStrToAddrs[addressStr] = address
}
}
filter, mask := filterRouteByLinkTable(linkIdx, getRouteTableID(linkIdx))
existingRoutes, err := util.GetNetLinkOps().RouteListFiltered(netlink.FAMILY_ALL, filter, mask)
if err != nil {
return fmt.Errorf("unable to get route list using filter (%s): %v", filter.String(), err)
}
for _, existingRoute := range existingRoutes {
routeStr := existingRoute.String()
assignedIPRoutes.Insert(routeStr)
assignedIPRouteStrToRoutes[routeStr] = existingRoute
}
}
filter, mask := filterRuleByPriority(rulePriority)
existingRules, err := util.GetNetLinkOps().RuleListFiltered(netlink.FAMILY_ALL, filter, mask)
if err != nil {
return fmt.Errorf("failed to list IP rules: %v", err)
}
for _, existingRule := range existingRules {
ruleStr := existingRule.String()
assignedIPRules.Insert(ruleStr)
assignedIPRulesStrToRules[ruleStr] = existingRule
}
// gather IPv4 and IPv6 IPTable rules and ignore what IP family we currently support because we may have converted from
// dual to single or vice versa
ipTableV4Rules, err := c.iptablesManager.GetIPv4ChainRuleArgs(utiliptables.TableNAT, chainName)
if err != nil {
return fmt.Errorf("failed to list IPTable IPv4 rules: %v", err)
}
for _, rule := range ipTableV4Rules {
ruleStr := strings.Join(rule.Args, " ")
assignedIPTableV4Rules.Insert(ruleStr)
assignedIPTablesV4StrToRules[ruleStr] = rule
}
ipTableV6Rules, err := c.iptablesManager.GetIPv6ChainRuleArgs(utiliptables.TableNAT, chainName)
if err != nil {
// IPv6 NAT table may not be available by default on some distributions.
ipTableV6Rules = make([]iptables.RuleArg, 0)
klog.Warningf("Failed to list IPTable IPv6 rules: %v", err)
}
for _, rule := range ipTableV6Rules {
ruleStr := strings.Join(rule.Args, " ")
assignedIPTableV6Rules.Insert(ruleStr)
assignedIPTablesV6StrToRules[ruleStr] = rule
}
expectedAddrs := sets.New[addrLink]()
expectedIPRoutes := sets.New[string]()
expectedIPRules := sets.New[string]()
expectedIPTableV4Rules := sets.New[string]()
expectedIPTableV6Rules := sets.New[string]()
egressIPs, err := c.getAllEIPs()
if err != nil {
return err
}
parsedNodeEIPConfig, err := c.getNodeEgressIPConfig()
if err != nil {
return fmt.Errorf("failed to get node egress IP config: %v", err)
}
for _, egressIP := range egressIPs {
if len(egressIP.Status.Items) == 0 {
continue
}
for _, status := range egressIP.Status.Items {
if isValid := isEIPStatusItemValid(status, c.nodeName); !isValid {
continue
}
eIPNet, err := util.GetIPNetFullMask(status.EgressIP)
if err != nil {
return err
}
if util.IsOVNNetwork(parsedNodeEIPConfig, eIPNet.IP) {
continue
}
isEIPV6 := utilnet.IsIPv6(eIPNet.IP)
found, link, err := findLinkOnSameNetworkAsIP(eIPNet.IP, c.v4, c.v6)
if err != nil {
return fmt.Errorf("failed to find a network to host EgressIP %s IP %s: %v", egressIP.Name,
eIPNet.IP.String(), err)
}
if !found {
continue
}
linkIdx := link.Attrs().Index
linkName := link.Attrs().Name
// copy routes associated with link to new route table
linkRoutes, err := generateRoutesForLink(link, isEIPV6)
if err != nil {
return fmt.Errorf("failed to generate IP routes for link %s for EgressIP %s IP %s: %v", linkName,
egressIP.Name, eIPNet.IP.String(), err)
}
for _, route := range linkRoutes {
expectedIPRoutes.Insert(route.String())
}
expectedAddrs.Insert(addrLink{eIPNet.String(), linkIdx})
namespaceSelector, err := metav1.LabelSelectorAsSelector(&egressIP.Spec.NamespaceSelector)
if err != nil {
return fmt.Errorf("invalid namespaceSelector for egress IP %s: %v", egressIP.Name, err)
}
podSelector, err := metav1.LabelSelectorAsSelector(&egressIP.Spec.PodSelector)
if err != nil {
return fmt.Errorf("invalid podSelector for egress IP %s: %v", egressIP.Name, err)
}
namespaces, err := c.namespaceLister.List(namespaceSelector)
if err != nil {
return fmt.Errorf("failed to list namespaces using selector %s to configure egress IP %s: %v",
namespaceSelector.String(), egressIP.Name, err)
}
for _, namespace := range namespaces {
namespaceLabels := labels.Set(namespace.Labels)
if namespaceSelector.Matches(namespaceLabels) {
pods, err := c.podLister.Pods(namespace.Name).List(podSelector)
if err != nil {
return fmt.Errorf("failed to list pods using selector %s to configure egress IP %s: %v",
podSelector.String(), egressIP.Name, err)
}
for _, pod := range pods {
if util.PodCompleted(pod) || util.PodWantsHostNetwork(pod) || len(pod.Status.PodIPs) == 0 {
continue
}
podIPs, err := util.DefaultNetworkPodIPs(pod)
if err != nil {
return err