-
Notifications
You must be signed in to change notification settings - Fork 129
/
base_network_controller.go
850 lines (752 loc) · 30.7 KB
/
base_network_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
package ovn
import (
"fmt"
"math"
"net"
"strconv"
"sync"
"time"
libovsdbclient "github.com/ovn-org/libovsdb/client"
"github.com/ovn-org/libovsdb/ovsdb"
"github.com/pkg/errors"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/allocator/pod"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kubevirt"
libovsdbops "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdb/ops"
libovsdbutil "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdb/util"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"
addressset "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/address_set"
lsm "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/logical_switch_manager"
zoneic "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/zone_interconnect"
ovnretry "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/retry"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/syncmap"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
kapi "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
ref "k8s.io/client-go/tools/reference"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
)
// CommonNetworkControllerInfo structure is place holder for all fields shared among controllers.
type CommonNetworkControllerInfo struct {
client clientset.Interface
kube *kube.KubeOVN
watchFactory *factory.WatchFactory
podRecorder *metrics.PodRecorder
// event recorder used to post events to k8s
recorder record.EventRecorder
// libovsdb northbound client interface
nbClient libovsdbclient.Client
// libovsdb southbound client interface
sbClient libovsdbclient.Client
// has SCTP support
SCTPSupport bool
// has multicast support; set to false for secondary networks.
// TBD: Changes need to be made to support multicast for secondary networks
multicastSupport bool
// Supports OVN Template Load Balancers?
svcTemplateSupport bool
// Northbound database zone name to which this Controller is connected to - aka local zone
zone string
}
// BaseNetworkController structure holds per-network fields and network specific configuration
// Note that all the methods with NetworkControllerInfo pointer receivers will be called
// by more than one type of network controllers.
type BaseNetworkController struct {
CommonNetworkControllerInfo
// controllerName should be used to identify objects owned by given controller in the db
controllerName string
// network information
util.NetInfo
// retry framework for pods
retryPods *ovnretry.RetryFramework
// retry framework for nodes
retryNodes *ovnretry.RetryFramework
// retry framework for namespaces
retryNamespaces *ovnretry.RetryFramework
// retry framework for network policies
retryNetworkPolicies *ovnretry.RetryFramework
// pod events factory handler
podHandler *factory.Handler
// node events factory handler
nodeHandler *factory.Handler
// namespace events factory Handler
namespaceHandler *factory.Handler
// A cache of all logical switches seen by the watcher and their subnets
lsManager *lsm.LogicalSwitchManager
// An utility to allocate the PodAnnotation to pods
podAnnotationAllocator *pod.PodAnnotationAllocator
// A cache of all logical ports known to the controller
logicalPortCache *portCache
// Info about known namespaces. You must use oc.getNamespaceLocked() or
// oc.waitForNamespaceLocked() to read this map, and oc.createNamespaceLocked()
// or oc.deleteNamespaceLocked() to modify it. namespacesMutex is only held
// from inside those functions.
namespaces map[string]*namespaceInfo
namespacesMutex sync.Mutex
// An address set factory that creates address sets
addressSetFactory addressset.AddressSetFactory
// topology version of this network. It is first retrieved from network logical entities,
// and will eventually updated to latest version once topology upgrade is done.
topologyVersion int
// network policies map, key should be retrieved with getPolicyKey(policy *knet.NetworkPolicy).
// network policies that failed to be created will also be added here, and can be retried or cleaned up later.
// network policy is only deleted from this map after successful cleanup.
// Allowed order of locking is namespace Lock -> oc.networkPolicies key Lock -> networkPolicy.Lock
// Don't take namespace Lock while holding networkPolicy key lock to avoid deadlock.
networkPolicies *syncmap.SyncMap[*networkPolicy]
// map of existing shared port groups for network policies
// port group exists in the db if and only if port group key is present in this map
// key is namespace
// allowed locking order is namespace Lock -> networkPolicy.Lock -> sharedNetpolPortGroups key Lock
// make sure to keep this order to avoid deadlocks
sharedNetpolPortGroups *syncmap.SyncMap[*defaultDenyPortGroups]
podSelectorAddressSets *syncmap.SyncMap[*PodSelectorAddressSet]
// stopChan per controller
stopChan chan struct{}
// waitGroup per-Controller
wg *sync.WaitGroup
// some downstream components need to stop on their own or when the network
// controller is stopped
// use a chain of cancelable contexts for this
cancelableCtx util.CancelableContext
// List of nodes which belong to the local zone (stored as a sync map)
// If the map is nil, it means the controller is not tracking the node events
// and all the nodes are considered as local zone nodes.
localZoneNodes *sync.Map
// zoneICHandler creates the interconnect resources for local nodes and remote nodes.
// Interconnect resources are Transit switch and logical ports connecting this transit switch
// to the cluster router. Please see zone_interconnect/interconnect_handler.go for more details.
zoneICHandler *zoneic.ZoneInterconnectHandler
// releasedPodsBeforeStartup tracks pods per NAD (map of NADs to pods UIDs)
// might have been already be released on startup
releasedPodsBeforeStartup map[string]sets.Set[string]
releasedPodsOnStartupMutex sync.Mutex
}
// BaseSecondaryNetworkController structure holds per-network fields and network specific
// configuration for secondary network controller
type BaseSecondaryNetworkController struct {
BaseNetworkController
// multi-network policy events factory handler
policyHandler *factory.Handler
}
// NewCommonNetworkControllerInfo creates CommonNetworkControllerInfo shared by controllers
func NewCommonNetworkControllerInfo(client clientset.Interface, kube *kube.KubeOVN, wf *factory.WatchFactory,
recorder record.EventRecorder, nbClient libovsdbclient.Client, sbClient libovsdbclient.Client,
podRecorder *metrics.PodRecorder, SCTPSupport, multicastSupport, svcTemplateSupport bool) (*CommonNetworkControllerInfo, error) {
zone, err := libovsdbutil.GetNBZone(nbClient)
if err != nil {
return nil, fmt.Errorf("error getting NB zone name : err - %w", err)
}
return &CommonNetworkControllerInfo{
client: client,
kube: kube,
watchFactory: wf,
recorder: recorder,
nbClient: nbClient,
sbClient: sbClient,
podRecorder: podRecorder,
SCTPSupport: SCTPSupport,
multicastSupport: multicastSupport,
svcTemplateSupport: svcTemplateSupport,
zone: zone,
}, nil
}
func (bnc *BaseNetworkController) GetLogicalPortName(pod *kapi.Pod, nadName string) string {
if !bnc.IsSecondary() {
return util.GetLogicalPortName(pod.Namespace, pod.Name)
} else {
return util.GetSecondaryNetworkLogicalPortName(pod.Namespace, pod.Name, nadName)
}
}
func (bnc *BaseNetworkController) AddConfigDurationRecord(kind, namespace, name string) (
[]ovsdb.Operation, func(), time.Time, error) {
if !bnc.IsSecondary() {
return metrics.GetConfigDurationRecorder().AddOVN(bnc.nbClient, kind, namespace, name)
}
// TBD: no op for secondary network for now
return []ovsdb.Operation{}, func() {}, time.Time{}, nil
}
// createOvnClusterRouter creates the central router for the network
func (bnc *BaseNetworkController) createOvnClusterRouter() (*nbdb.LogicalRouter, error) {
// Create default Control Plane Protection (COPP) entry for routers
defaultCOPPUUID, err := EnsureDefaultCOPP(bnc.nbClient)
if err != nil {
return nil, fmt.Errorf("unable to create router control plane protection: %w", err)
}
// Create a single common distributed router for the cluster.
logicalRouterName := bnc.GetNetworkScopedName(types.OVNClusterRouter)
logicalRouter := nbdb.LogicalRouter{
Name: logicalRouterName,
ExternalIDs: map[string]string{
"k8s-cluster-router": "yes",
types.TopologyVersionExternalID: strconv.Itoa(bnc.topologyVersion),
},
Options: map[string]string{
"always_learn_from_arp_request": "false",
},
Copp: &defaultCOPPUUID,
}
if bnc.IsSecondary() {
logicalRouter.ExternalIDs[types.NetworkExternalID] = bnc.GetNetworkName()
logicalRouter.ExternalIDs[types.TopologyExternalID] = bnc.TopologyType()
}
if bnc.multicastSupport {
logicalRouter.Options = map[string]string{
"mcast_relay": "true",
}
}
err = libovsdbops.CreateOrUpdateLogicalRouter(bnc.nbClient, &logicalRouter, &logicalRouter.Options,
&logicalRouter.ExternalIDs, &logicalRouter.Copp)
if err != nil {
return nil, fmt.Errorf("failed to create distributed router %s, error: %v",
logicalRouterName, err)
}
return &logicalRouter, nil
}
// syncNodeClusterRouterPort ensures a node's LS to the cluster router's LRP is created.
// NOTE: We could have created the router port in ensureNodeLogicalNetwork() instead of here,
// but chassis ID is not available at that moment. We need the chassis ID to set the
// gateway-chassis, which in effect pins the logical switch to the current node in OVN.
// Otherwise, ovn-controller will flood-fill unrelated datapaths unnecessarily, causing scale
// problems.
func (bnc *BaseNetworkController) syncNodeClusterRouterPort(node *kapi.Node, hostSubnets []*net.IPNet) error {
chassisID, err := util.ParseNodeChassisIDAnnotation(node)
if err != nil {
return err
}
if len(hostSubnets) == 0 {
hostSubnets, err = util.ParseNodeHostSubnetAnnotation(node, bnc.GetNetworkName())
if err != nil {
return err
}
}
// logical router port MAC is based on IPv4 subnet if there is one, else IPv6
var nodeLRPMAC net.HardwareAddr
for _, hostSubnet := range hostSubnets {
gwIfAddr := util.GetNodeGatewayIfAddr(hostSubnet)
nodeLRPMAC = util.IPAddrToHWAddr(gwIfAddr.IP)
if !utilnet.IsIPv6CIDR(hostSubnet) {
break
}
}
switchName := bnc.GetNetworkScopedName(node.Name)
logicalRouterName := bnc.GetNetworkScopedName(types.OVNClusterRouter)
lrpName := types.RouterToSwitchPrefix + switchName
lrpNetworks := []string{}
for _, hostSubnet := range hostSubnets {
gwIfAddr := util.GetNodeGatewayIfAddr(hostSubnet)
lrpNetworks = append(lrpNetworks, gwIfAddr.String())
}
logicalRouterPort := nbdb.LogicalRouterPort{
Name: lrpName,
MAC: nodeLRPMAC.String(),
Networks: lrpNetworks,
}
logicalRouter := nbdb.LogicalRouter{Name: logicalRouterName}
gatewayChassis := nbdb.GatewayChassis{
Name: lrpName + "-" + chassisID,
ChassisName: chassisID,
Priority: 1,
}
err = libovsdbops.CreateOrUpdateLogicalRouterPort(bnc.nbClient, &logicalRouter, &logicalRouterPort,
&gatewayChassis, &logicalRouterPort.MAC, &logicalRouterPort.Networks)
if err != nil {
klog.Errorf("Failed to add gateway chassis %s to logical router port %s, error: %v", chassisID, lrpName, err)
return err
}
return nil
}
func (bnc *BaseNetworkController) createNodeLogicalSwitch(nodeName string, hostSubnets []*net.IPNet,
clusterLoadBalancerGroupUUID, switchLoadBalancerGroupUUID string) error {
// logical router port MAC is based on IPv4 subnet if there is one, else IPv6
var nodeLRPMAC net.HardwareAddr
switchName := bnc.GetNetworkScopedName(nodeName)
for _, hostSubnet := range hostSubnets {
gwIfAddr := util.GetNodeGatewayIfAddr(hostSubnet)
nodeLRPMAC = util.IPAddrToHWAddr(gwIfAddr.IP)
if !utilnet.IsIPv6CIDR(hostSubnet) {
break
}
}
logicalSwitch := nbdb.LogicalSwitch{
Name: switchName,
}
if bnc.IsSecondary() {
logicalSwitch.ExternalIDs = map[string]string{
types.NetworkExternalID: bnc.GetNetworkName(),
types.TopologyExternalID: bnc.TopologyType(),
}
}
var v4Gateway, v6Gateway net.IP
logicalSwitch.OtherConfig = map[string]string{}
for _, hostSubnet := range hostSubnets {
gwIfAddr := util.GetNodeGatewayIfAddr(hostSubnet)
mgmtIfAddr := util.GetNodeManagementIfAddr(hostSubnet)
if utilnet.IsIPv6CIDR(hostSubnet) {
v6Gateway = gwIfAddr.IP
logicalSwitch.OtherConfig["ipv6_prefix"] =
hostSubnet.IP.String()
} else {
v4Gateway = gwIfAddr.IP
excludeIPs := mgmtIfAddr.IP.String()
if config.HybridOverlay.Enabled {
hybridOverlayIfAddr := util.GetNodeHybridOverlayIfAddr(hostSubnet)
excludeIPs += ".." + hybridOverlayIfAddr.IP.String()
}
logicalSwitch.OtherConfig["subnet"] = hostSubnet.String()
logicalSwitch.OtherConfig["exclude_ips"] = excludeIPs
}
}
if clusterLoadBalancerGroupUUID != "" && switchLoadBalancerGroupUUID != "" {
logicalSwitch.LoadBalancerGroup = []string{clusterLoadBalancerGroupUUID, switchLoadBalancerGroupUUID}
}
// If supported, enable IGMP/MLD snooping and querier on the node.
if bnc.multicastSupport {
logicalSwitch.OtherConfig["mcast_snoop"] = "true"
// Configure IGMP/MLD querier if the gateway IP address is known.
// Otherwise disable it.
if v4Gateway != nil || v6Gateway != nil {
logicalSwitch.OtherConfig["mcast_querier"] = "true"
logicalSwitch.OtherConfig["mcast_eth_src"] = nodeLRPMAC.String()
if v4Gateway != nil {
logicalSwitch.OtherConfig["mcast_ip4_src"] = v4Gateway.String()
}
if v6Gateway != nil {
logicalSwitch.OtherConfig["mcast_ip6_src"] = util.HWAddrToIPv6LLA(nodeLRPMAC).String()
}
} else {
logicalSwitch.OtherConfig["mcast_querier"] = "false"
}
}
err := libovsdbops.CreateOrUpdateLogicalSwitch(bnc.nbClient, &logicalSwitch, &logicalSwitch.OtherConfig,
&logicalSwitch.LoadBalancerGroup)
if err != nil {
return fmt.Errorf("failed to add logical switch %+v: %v", logicalSwitch, err)
}
// Connect the switch to the router.
logicalSwitchPort := nbdb.LogicalSwitchPort{
Name: types.SwitchToRouterPrefix + switchName,
Type: "router",
Addresses: []string{"router"},
Options: map[string]string{
"router-port": types.RouterToSwitchPrefix + switchName,
"arp_proxy": kubevirt.ComposeARPProxyLSPOption(),
},
}
sw := nbdb.LogicalSwitch{Name: switchName}
err = libovsdbops.CreateOrUpdateLogicalSwitchPortsOnSwitch(bnc.nbClient, &sw, &logicalSwitchPort)
if err != nil {
klog.Errorf("Failed to add logical port %+v to switch %s: %v", logicalSwitchPort, switchName, err)
return err
}
if bnc.multicastSupport {
err = libovsdbops.AddPortsToPortGroup(bnc.nbClient, bnc.getClusterPortGroupName(types.ClusterRtrPortGroupNameBase), logicalSwitchPort.UUID)
if err != nil {
klog.Errorf(err.Error())
return fmt.Errorf("failed adding port to portgroup for multicast: %v", err)
}
}
// Add the switch to the logical switch cache
migratableIPsByPod, err := bnc.findMigratablePodIPsForSubnets(hostSubnets)
if err != nil {
return fmt.Errorf("failed finding migratable pod IPs belonging to %s: %v", nodeName, err)
}
return bnc.lsManager.AddOrUpdateSwitch(logicalSwitch.Name, hostSubnets, migratableIPsByPod...)
}
// deleteNodeLogicalNetwork removes the logical switch and logical router port associated with the node
func (bnc *BaseNetworkController) deleteNodeLogicalNetwork(nodeName string) error {
switchName := bnc.GetNetworkScopedName(nodeName)
// Remove the logical switch associated with the node
err := libovsdbops.DeleteLogicalSwitch(bnc.nbClient, switchName)
if err != nil {
return fmt.Errorf("failed to delete logical switch %s: %v", switchName, err)
}
logicalRouterName := bnc.GetNetworkScopedName(types.OVNClusterRouter)
logicalRouter := nbdb.LogicalRouter{Name: logicalRouterName}
logicalRouterPort := nbdb.LogicalRouterPort{
Name: types.RouterToSwitchPrefix + switchName,
}
err = libovsdbops.DeleteLogicalRouterPorts(bnc.nbClient, &logicalRouter, &logicalRouterPort)
if err != nil {
return fmt.Errorf("failed to delete router port %s: %w", logicalRouterPort.Name, err)
}
return nil
}
func (bnc *BaseNetworkController) addAllPodsOnNode(nodeName string) []error {
errs := []error{}
pods, err := bnc.kube.GetPods(metav1.NamespaceAll, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(),
})
if err != nil {
errs = append(errs, err)
klog.Errorf("Unable to list existing pods on node: %s, existing pods on this node may not function",
nodeName)
} else {
klog.V(5).Infof("When adding node %s for network %s, found %d pods to add to retryPods", nodeName, bnc.GetNetworkName(), len(pods))
for _, pod := range pods {
pod := *pod
if util.PodCompleted(&pod) {
continue
}
klog.V(5).Infof("Adding pod %s/%s to retryPods for network %s", pod.Namespace, pod.Name, bnc.GetNetworkName())
err = bnc.retryPods.AddRetryObjWithAddNoBackoff(&pod)
if err != nil {
errs = append(errs, err)
klog.Errorf("Failed to add pod %s/%s to retryPods for network %s: %v", pod.Namespace, pod.Name, bnc.GetNetworkName(), err)
}
}
}
bnc.retryPods.RequestRetryObjs()
return errs
}
func (bnc *BaseNetworkController) updateL3TopologyVersion() error {
currentTopologyVersion := strconv.Itoa(types.OvnCurrentTopologyVersion)
clusterRouterName := bnc.GetNetworkScopedName(types.OVNClusterRouter)
logicalRouter := nbdb.LogicalRouter{
Name: clusterRouterName,
ExternalIDs: map[string]string{types.TopologyVersionExternalID: currentTopologyVersion},
}
err := libovsdbops.UpdateLogicalRouterSetExternalIDs(bnc.nbClient, &logicalRouter)
if err != nil {
return fmt.Errorf("failed to generate set topology version, err: %v", err)
}
bnc.topologyVersion = types.OvnCurrentTopologyVersion
klog.Infof("Updated Logical_Router %s topology version to %s", clusterRouterName, currentTopologyVersion)
return nil
}
func (bnc *BaseNetworkController) updateL2TopologyVersion() error {
var switchName string
currentTopologyVersion := strconv.Itoa(types.OvnCurrentTopologyVersion)
topoType := bnc.TopologyType()
switch topoType {
case types.Layer2Topology:
switchName = bnc.GetNetworkScopedName(types.OVNLayer2Switch)
case types.LocalnetTopology:
switchName = bnc.GetNetworkScopedName(types.OVNLocalnetSwitch)
default:
return fmt.Errorf("topology type %s is not supported", topoType)
}
logicalSwitch := nbdb.LogicalSwitch{
Name: switchName,
ExternalIDs: map[string]string{types.TopologyVersionExternalID: currentTopologyVersion},
}
err := libovsdbops.UpdateLogicalSwitchSetExternalIDs(bnc.nbClient, &logicalSwitch)
if err != nil {
return fmt.Errorf("failed to generate set topology version, err: %v", err)
}
bnc.topologyVersion = types.OvnCurrentTopologyVersion
klog.Infof("Updated Logical_Switch %s topology version to %s", switchName, currentTopologyVersion)
return nil
}
// determineOVNTopoVersionFromOVN determines what OVN Topology version is being used.
// If TopologyVersionExternalID key in external_ids column does not exist, it is prior to OVN topology versioning
// and therefore set version number to OvnCurrentTopologyVersion
func (bnc *BaseNetworkController) determineOVNTopoVersionFromOVN() error {
var topologyVersion int
var err error
if !bnc.IsSecondary() {
topologyVersion, err = bnc.getOVNTopoVersionFromLogicalRouter(types.OVNClusterRouter)
} else {
topoType := bnc.TopologyType()
switch topoType {
case types.Layer3Topology:
topologyVersion, err = bnc.getOVNTopoVersionFromLogicalRouter(bnc.GetNetworkScopedName(types.OVNClusterRouter))
case types.Layer2Topology:
topologyVersion, err = bnc.getOVNTopoVersionFromLogicalSwitch(bnc.GetNetworkScopedName(types.OVNLayer2Switch))
case types.LocalnetTopology:
topologyVersion, err = bnc.getOVNTopoVersionFromLogicalSwitch(bnc.GetNetworkScopedName(types.OVNLocalnetSwitch))
default:
return fmt.Errorf("topology type %s not supported", topoType)
}
}
bnc.topologyVersion = topologyVersion
return err
}
func (bnc *BaseNetworkController) getOVNTopoVersionFromLogicalRouter(clusterRouterName string) (int, error) {
logicalRouter := &nbdb.LogicalRouter{Name: clusterRouterName}
logicalRouter, err := libovsdbops.GetLogicalRouter(bnc.nbClient, logicalRouter)
if err != nil && !errors.Is(err, libovsdbclient.ErrNotFound) {
return 0, fmt.Errorf("error getting router %s: %v", clusterRouterName, err)
}
if errors.Is(err, libovsdbclient.ErrNotFound) {
// no OVNClusterRouter exists, DB is empty, nothing to upgrade
return math.MaxInt32, nil
}
v, exists := logicalRouter.ExternalIDs[types.TopologyVersionExternalID]
if !exists {
klog.Infof("No version string found. The OVN topology is before versioning is introduced. Upgrade needed")
return 0, nil
}
ver, err := strconv.Atoi(v)
if err != nil {
return 0, fmt.Errorf("invalid OVN topology version string for network %s, err: %v", bnc.GetNetworkName(), err)
}
return ver, nil
}
func (bnc *BaseNetworkController) getOVNTopoVersionFromLogicalSwitch(switchName string) (int, error) {
logicalSwitch := &nbdb.LogicalSwitch{Name: switchName}
logicalSwitch, err := libovsdbops.GetLogicalSwitch(bnc.nbClient, logicalSwitch)
if err != nil && !errors.Is(err, libovsdbclient.ErrNotFound) {
return 0, fmt.Errorf("error getting switch %s: %v", switchName, err)
}
if errors.Is(err, libovsdbclient.ErrNotFound) {
// no switch exists, DB is empty, nothing to upgrade
return math.MaxInt32, nil
}
v := logicalSwitch.ExternalIDs[types.TopologyVersionExternalID]
ver, err := strconv.Atoi(v)
if err != nil {
return 0, fmt.Errorf("invalid OVN topology version string for network %s, err: %v", bnc.GetNetworkName(), err)
}
return ver, nil
}
// getNamespaceLocked locks namespacesMutex, looks up ns, and (if found), returns it with
// its mutex locked. If ns is not known, nil will be returned
func (bnc *BaseNetworkController) getNamespaceLocked(ns string, readOnly bool) (*namespaceInfo, func()) {
// Only hold namespacesMutex while reading/modifying oc.namespaces. In particular,
// we drop namespacesMutex while trying to claim nsInfo.Mutex, because something
// else might have locked the nsInfo and be doing something slow with it, and we
// don't want to block all access to oc.namespaces while that's happening.
bnc.namespacesMutex.Lock()
nsInfo := bnc.namespaces[ns]
bnc.namespacesMutex.Unlock()
if nsInfo == nil {
return nil, nil
}
var unlockFunc func()
if readOnly {
unlockFunc = func() { nsInfo.RUnlock() }
nsInfo.RLock()
} else {
unlockFunc = func() { nsInfo.Unlock() }
nsInfo.Lock()
}
// Check that the namespace wasn't deleted while we were waiting for the lock
bnc.namespacesMutex.Lock()
defer bnc.namespacesMutex.Unlock()
if nsInfo != bnc.namespaces[ns] {
unlockFunc()
return nil, nil
}
return nsInfo, unlockFunc
}
// deleteNamespaceLocked locks namespacesMutex, finds and deletes ns, and returns the
// namespace, locked. If error != nil, namespaceInfo is nil.
func (bnc *BaseNetworkController) deleteNamespaceLocked(ns string) (*namespaceInfo, error) {
// The locking here is the same as in getNamespaceLocked
bnc.namespacesMutex.Lock()
nsInfo := bnc.namespaces[ns]
bnc.namespacesMutex.Unlock()
if nsInfo == nil {
return nil, nil
}
nsInfo.Lock()
bnc.namespacesMutex.Lock()
defer bnc.namespacesMutex.Unlock()
if nsInfo != bnc.namespaces[ns] {
nsInfo.Unlock()
return nil, nil
}
if nsInfo.addressSet != nil {
// Empty the address set, then delete it after an interval.
if err := nsInfo.addressSet.SetIPs(nil); err != nil {
klog.Errorf("Warning: failed to empty address set for deleted NS %s: %v", ns, err)
}
// Delete the address set after a short delay.
// This is so NetworkPolicy handlers can converge and stop referencing it.
addressSet := nsInfo.addressSet
go func() {
select {
case <-bnc.stopChan:
return
case <-time.After(20 * time.Second):
// Check to see if the NS was re-added in the meanwhile. If so,
// only delete if the new NS's AddressSet shouldn't exist.
nsInfo, nsUnlock := bnc.getNamespaceLocked(ns, true)
if nsInfo != nil {
defer nsUnlock()
if nsInfo.addressSet != nil {
klog.V(5).Infof("Skipping deferred deletion of AddressSet for NS %s: re-created", ns)
return
}
}
klog.V(5).Infof("Finishing deferred deletion of AddressSet for NS %s", ns)
if err := addressSet.Destroy(); err != nil {
klog.Errorf("Failed to delete AddressSet for NS %s: %v", ns, err.Error())
}
}
}()
}
if nsInfo.portGroupName != "" {
err := libovsdbops.DeletePortGroups(bnc.nbClient, nsInfo.portGroupName)
if err != nil {
nsInfo.Unlock()
return nil, err
}
}
delete(bnc.namespaces, ns)
return nsInfo, nil
}
// WatchNodes starts the watching of the nodes resource and calls back the appropriate handler logic
func (bnc *BaseNetworkController) WatchNodes() error {
if bnc.nodeHandler != nil {
return nil
}
handler, err := bnc.retryNodes.WatchResource()
if err == nil {
bnc.nodeHandler = handler
}
return err
}
func (bnc *BaseNetworkController) recordNodeErrorEvent(node *kapi.Node, nodeErr error) {
if bnc.IsSecondary() {
// TBD, no op for secondary network for now
return
}
nodeRef, err := ref.GetReference(scheme.Scheme, node)
if err != nil {
klog.Errorf("Couldn't get a reference to node %s to post an event: %v", node.Name, err)
return
}
klog.V(5).Infof("Posting %s event for Node %s: %v", kapi.EventTypeWarning, node.Name, nodeErr)
bnc.recorder.Eventf(nodeRef, kapi.EventTypeWarning, "ErrorReconcilingNode", nodeErr.Error())
}
func (bnc *BaseNetworkController) doesNetworkRequireIPAM() bool {
return util.DoesNetworkRequireIPAM(bnc.NetInfo)
}
func (bnc *BaseNetworkController) buildPortGroup(hashName, name string, ports []*nbdb.LogicalSwitchPort, acls []*nbdb.ACL) *nbdb.PortGroup {
externalIds := map[string]string{"name": name}
if bnc.IsSecondary() {
externalIds[types.NetworkExternalID] = bnc.GetNetworkName()
}
return libovsdbops.BuildPortGroup(hashName, ports, acls, externalIds)
}
func (bnc *BaseNetworkController) getPodNADNames(pod *kapi.Pod) []string {
if !bnc.IsSecondary() {
return []string{types.DefaultNetworkName}
}
podNadNames, _ := util.PodNadNames(pod, bnc.NetInfo)
return podNadNames
}
// getClusterPortGroupName gets network scoped port group hash name; base is either
// ClusterPortGroupNameBase or ClusterRtrPortGroupNameBase.
func (bnc *BaseNetworkController) getClusterPortGroupName(base string) string {
if bnc.IsSecondary() {
return libovsdbutil.HashedPortGroup(bnc.GetNetworkName()) + "_" + base
}
return base
}
// GetLocalZoneNodes returns the list of local zone nodes
// A node is considered a local zone node if the zone name
// set in the node's annotation matches with the zone name
// set in the OVN Northbound database (to which this controller is connected to).
func (bnc *BaseNetworkController) GetLocalZoneNodes() ([]*kapi.Node, error) {
nodes, err := bnc.watchFactory.GetNodes()
if err != nil {
return nil, fmt.Errorf("failed to get nodes: %v", err)
}
var zoneNodes []*kapi.Node
for _, n := range nodes {
if bnc.isLocalZoneNode(n) {
zoneNodes = append(zoneNodes, n)
}
}
return zoneNodes, nil
}
// isLocalZoneNode returns true if the node is part of the local zone.
func (bnc *BaseNetworkController) isLocalZoneNode(node *kapi.Node) bool {
/** HACK BEGIN **/
// TODO(tssurya): Remove this HACK a few months from now. This has been added only to
// minimize disruption for upgrades when moving to interconnect=true.
// We want the legacy ovnkube-master to wait for remote ovnkube-node to
// signal it using "k8s.ovn.org/remote-zone-migrated" annotation before
// considering a node as remote when we upgrade from "global" (1 zone IC)
// zone to multi-zone. This is so that network disruption for the existing workloads
// is negligible and until the point where ovnkube-node flips the switch to connect
// to the new SBDB, it would continue talking to the legacy RAFT ovnkube-sbdb to ensure
// OVN/OVS flows are intact.
if bnc.zone == types.OvnDefaultZone {
return !util.HasNodeMigratedZone(node)
}
/** HACK END **/
return util.GetNodeZone(node) == bnc.zone
}
func (bnc *BaseNetworkController) isLayer2Interconnect() bool {
return config.OVNKubernetesFeature.EnableInterconnect && bnc.NetInfo.TopologyType() == types.Layer2Topology
}
func (bnc *BaseNetworkController) nodeZoneClusterChanged(oldNode, newNode *kapi.Node, newNodeIsLocalZone bool) bool {
// Check if the annotations have changed. Use network topology and local params to skip unnecessary checks
// NodeIDAnnotationChanged and NodeTransitSwitchPortAddrAnnotationChanged affects local and remote nodes
if util.NodeIDAnnotationChanged(oldNode, newNode) {
return true
}
if util.NodeTransitSwitchPortAddrAnnotationChanged(oldNode, newNode) {
return true
}
// NodeGatewayRouterLRPAddrAnnotationChanged would not affect local, nor layer3 secondary network
if !newNodeIsLocalZone && !bnc.IsSecondary() && util.NodeGatewayRouterLRPAddrAnnotationChanged(oldNode, newNode) {
return true
}
return false
}
func (bnc *BaseNetworkController) findMigratablePodIPsForSubnets(subnets []*net.IPNet) ([]*net.IPNet, error) {
// live migration is not supported in combination with secondary networks
if bnc.IsSecondary() {
return nil, nil
}
ipSet := sets.New[string]()
ipList := []*net.IPNet{}
liveMigratablePods, err := kubevirt.FindLiveMigratablePods(bnc.watchFactory)
if err != nil {
return nil, err
}
for _, liveMigratablePod := range liveMigratablePods {
if util.PodCompleted(liveMigratablePod) {
continue
}
isMigratedSourcePodStale, err := kubevirt.IsMigratedSourcePodStale(bnc.watchFactory, liveMigratablePod)
if err != nil {
return nil, err
}
if isMigratedSourcePodStale {
continue
}
podAnnotation, err := util.UnmarshalPodAnnotation(liveMigratablePod.Annotations, bnc.GetNetworkName())
if err != nil {
// even though it can be normal to not have an annotation now, live
// migration is a sensible process that might be used when draining
// nodes on upgrades, so log a warning in every case to have the
// information available
klog.Warningf("Could not get pod annotation of pod %s/%s for network %s: %v",
liveMigratablePod.Namespace,
liveMigratablePod.Name,
bnc.GetNetworkName(),
err)
continue
}
for _, podIP := range podAnnotation.IPs {
if util.IsContainedInAnyCIDR(podIP, subnets...) {
podIPString := podIP.String()
// Skip duplicate IPs
if !ipSet.Has(podIPString) {
ipSet = ipSet.Insert(podIPString)
ipList = append(ipList, &net.IPNet{
IP: podIP.IP,
Mask: util.GetIPFullMask(podIP.IP),
})
}
}
}
}
return ipList, nil
}