/
gateway.go
549 lines (495 loc) · 17.3 KB
/
gateway.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
package node
import (
"fmt"
"net"
"sync"
"time"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/config"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/factory"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/informer"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/retry"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
util "github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
"github.com/pkg/errors"
"github.com/safchain/ethtool"
kapi "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
apierrors "k8s.io/apimachinery/pkg/util/errors"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
)
// Gateway responds to Service and Endpoint K8s events
// and programs OVN gateway functionality.
// It may also spawn threads to ensure the flow tables
// are kept in sync
type Gateway interface {
informer.ServiceAndEndpointsEventHandler
Init(<-chan struct{}, *sync.WaitGroup) error
Start()
GetGatewayBridgeIface() string
SetDefaultGatewayBridgeMAC(addr net.HardwareAddr)
Reconcile() error
}
type gateway struct {
// loadBalancerHealthChecker is a health check server for load-balancer type services
loadBalancerHealthChecker informer.ServiceAndEndpointsEventHandler
// portClaimWatcher is for reserving ports for virtual IPs allocated by the cluster on the host
portClaimWatcher informer.ServiceEventHandler
// nodePortWatcherIptables is used in Shared GW mode to handle nodePort IPTable rules
nodePortWatcherIptables informer.ServiceEventHandler
// nodePortWatcher is used in Local+Shared GW modes to handle nodePort flows in shared OVS bridge
nodePortWatcher informer.ServiceAndEndpointsEventHandler
openflowManager *openflowManager
nodeIPManager *addressManager
initFunc func() error
readyFunc func() (bool, error)
servicesRetryFramework *retry.RetryFramework
watchFactory *factory.WatchFactory // used for retry
stopChan <-chan struct{}
wg *sync.WaitGroup
}
func (g *gateway) AddService(svc *kapi.Service) error {
var err error
var errors []error
if g.portClaimWatcher != nil {
if err = g.portClaimWatcher.AddService(svc); err != nil {
errors = append(errors, err)
}
}
if g.loadBalancerHealthChecker != nil {
if err = g.loadBalancerHealthChecker.AddService(svc); err != nil {
errors = append(errors, err)
}
}
if g.nodePortWatcher != nil {
if err = g.nodePortWatcher.AddService(svc); err != nil {
errors = append(errors, err)
}
}
if g.nodePortWatcherIptables != nil {
if err = g.nodePortWatcherIptables.AddService(svc); err != nil {
errors = append(errors, err)
}
}
return apierrors.NewAggregate(errors)
}
func (g *gateway) UpdateService(old, new *kapi.Service) error {
var err error
var errors []error
if g.portClaimWatcher != nil {
if err = g.portClaimWatcher.UpdateService(old, new); err != nil {
errors = append(errors, err)
}
}
if g.loadBalancerHealthChecker != nil {
if err = g.loadBalancerHealthChecker.UpdateService(old, new); err != nil {
errors = append(errors, err)
}
}
if g.nodePortWatcher != nil {
if err = g.nodePortWatcher.UpdateService(old, new); err != nil {
errors = append(errors, err)
}
}
if g.nodePortWatcherIptables != nil {
if err = g.nodePortWatcherIptables.UpdateService(old, new); err != nil {
errors = append(errors, err)
}
}
return apierrors.NewAggregate(errors)
}
func (g *gateway) DeleteService(svc *kapi.Service) error {
var err error
var errors []error
if g.portClaimWatcher != nil {
if err = g.portClaimWatcher.DeleteService(svc); err != nil {
errors = append(errors, err)
}
}
if g.loadBalancerHealthChecker != nil {
if err = g.loadBalancerHealthChecker.DeleteService(svc); err != nil {
errors = append(errors, err)
}
}
if g.nodePortWatcher != nil {
if err = g.nodePortWatcher.DeleteService(svc); err != nil {
errors = append(errors, err)
}
}
if g.nodePortWatcherIptables != nil {
if err = g.nodePortWatcherIptables.DeleteService(svc); err != nil {
errors = append(errors, err)
}
}
return apierrors.NewAggregate(errors)
}
func (g *gateway) SyncServices(objs []interface{}) error {
var err error
klog.Infof("Starting gateway service sync")
start := time.Now()
if g.portClaimWatcher != nil {
err = g.portClaimWatcher.SyncServices(objs)
}
if err == nil && g.loadBalancerHealthChecker != nil {
err = g.loadBalancerHealthChecker.SyncServices(objs)
}
if err == nil && g.nodePortWatcher != nil {
err = g.nodePortWatcher.SyncServices(objs)
}
if err == nil && g.nodePortWatcherIptables != nil {
err = g.nodePortWatcherIptables.SyncServices(objs)
}
if err != nil {
return fmt.Errorf("gateway sync services failed: %v", err)
}
klog.Infof("Gateway service sync done. Time taken: %s", time.Since(start))
return nil
}
func (g *gateway) AddEndpointSlice(epSlice *discovery.EndpointSlice) error {
var err error
var errors []error
if g.loadBalancerHealthChecker != nil {
if err = g.loadBalancerHealthChecker.AddEndpointSlice(epSlice); err != nil {
errors = append(errors, err)
}
}
if g.nodePortWatcher != nil {
if err = g.nodePortWatcher.AddEndpointSlice(epSlice); err != nil {
errors = append(errors, err)
}
}
return apierrors.NewAggregate(errors)
}
func (g *gateway) UpdateEndpointSlice(oldEpSlice, newEpSlice *discovery.EndpointSlice) error {
var err error
var errors []error
if g.loadBalancerHealthChecker != nil {
if err = g.loadBalancerHealthChecker.UpdateEndpointSlice(oldEpSlice, newEpSlice); err != nil {
errors = append(errors, err)
}
}
if g.nodePortWatcher != nil {
if err = g.nodePortWatcher.UpdateEndpointSlice(oldEpSlice, newEpSlice); err != nil {
errors = append(errors, err)
}
}
return apierrors.NewAggregate(errors)
}
func (g *gateway) DeleteEndpointSlice(epSlice *discovery.EndpointSlice) error {
var err error
var errors []error
if g.loadBalancerHealthChecker != nil {
if err = g.loadBalancerHealthChecker.DeleteEndpointSlice(epSlice); err != nil {
errors = append(errors, err)
}
}
if g.nodePortWatcher != nil {
if err = g.nodePortWatcher.DeleteEndpointSlice(epSlice); err != nil {
errors = append(errors, err)
}
}
return apierrors.NewAggregate(errors)
}
func (g *gateway) Init(stopChan <-chan struct{}, wg *sync.WaitGroup) error {
g.stopChan = stopChan
g.wg = wg
var err error
if err = g.initFunc(); err != nil {
return err
}
g.servicesRetryFramework = g.newRetryFrameworkNode(factory.ServiceForGatewayType)
if _, err = g.servicesRetryFramework.WatchResource(); err != nil {
return fmt.Errorf("gateway init failed to start watching services: %v", err)
}
endpointSlicesRetryFramework := g.newRetryFrameworkNode(factory.EndpointSliceForGatewayType)
if _, err = endpointSlicesRetryFramework.WatchResource(); err != nil {
return fmt.Errorf("gateway init failed to start watching endpointslices: %v", err)
}
return nil
}
func (g *gateway) Start() {
if g.nodeIPManager != nil {
g.nodeIPManager.Run(g.stopChan, g.wg)
}
if g.openflowManager != nil {
klog.Info("Spawning Conntrack Rule Check Thread")
g.openflowManager.Run(g.stopChan, g.wg)
}
}
// sets up an uplink interface for UDP Generic Receive Offload forwarding as part of
// the EnableUDPAggregation feature.
func setupUDPAggregationUplink(ifname string) error {
e, err := ethtool.NewEthtool()
if err != nil {
return fmt.Errorf("failed to initialize ethtool: %v", err)
}
defer e.Close()
err = e.Change(ifname, map[string]bool{
"rx-udp-gro-forwarding": true,
})
if err != nil {
return fmt.Errorf("could not enable UDP offload features on %q: %v", ifname, err)
}
return nil
}
func gatewayInitInternal(nodeName, gwIntf, egressGatewayIntf string, gwNextHops []net.IP, gwIPs []*net.IPNet, nodeAnnotator kube.Annotator) (
*bridgeConfiguration, *bridgeConfiguration, error) {
gatewayBridge, err := bridgeForInterface(gwIntf, nodeName, types.PhysicalNetworkName, gwIPs)
if err != nil {
return nil, nil, errors.Wrapf(err, "Bridge for interface failed for %s", gwIntf)
}
var egressGWBridge *bridgeConfiguration
if egressGatewayIntf != "" {
egressGWBridge, err = bridgeForInterface(egressGatewayIntf, nodeName, types.PhysicalNetworkExGwName, nil)
if err != nil {
return nil, nil, errors.Wrapf(err, "Bridge for interface failed for %s", egressGatewayIntf)
}
}
chassisID, err := util.GetNodeChassisID()
if err != nil {
return nil, nil, err
}
// Set annotation that determines if options:gateway_mtu shall be set for this node.
enableGatewayMTU := true
if config.Gateway.DisablePacketMTUCheck {
klog.Warningf("Config option disable-pkt-mtu-check is set to true. " +
"options:gateway_mtu will be disabled on gateway routers. " +
"IP fragmentation or large TCP/UDP payloads may not be forwarded correctly.")
enableGatewayMTU = false
} else {
chkPktLengthSupported, err := util.DetectCheckPktLengthSupport(gatewayBridge.bridgeName)
if err != nil {
return nil, nil, err
}
if !chkPktLengthSupported {
klog.Warningf("OVS does not support check_packet_length action. " +
"options:gateway_mtu will be disabled on gateway routers. " +
"IP fragmentation or large TCP/UDP payloads may not be forwarded correctly.")
enableGatewayMTU = false
} else {
/* This is a work around. In order to have the most optimal performance, the packet MTU check should be
* disabled when OVS HW Offload is enabled on the node. The reason is that OVS HW Offload does not support
* packet MTU checks properly without the offload support for sFlow.
* The patches for sFlow in OvS: https://patchwork.ozlabs.org/project/openvswitch/list/?series=290804
* As of writing these offload support patches for sFlow are in review.
* TODO: This workaround should be removed once the offload support for sFlow patches are merged upstream OvS.
*/
ovsHardwareOffloadEnabled, err := util.IsOvsHwOffloadEnabled()
if err != nil {
return nil, nil, err
}
if ovsHardwareOffloadEnabled {
klog.Warningf("OVS hardware offloading is enabled. " +
"options:gateway_mtu will be disabled on gateway routers for performance reasons. " +
"IP fragmentation or large TCP/UDP payloads may not be forwarded correctly.")
enableGatewayMTU = false
}
}
}
if err := util.SetGatewayMTUSupport(nodeAnnotator, enableGatewayMTU); err != nil {
return nil, nil, err
}
if config.Default.EnableUDPAggregation {
err = setupUDPAggregationUplink(gatewayBridge.uplinkName)
if err == nil && egressGWBridge != nil {
err = setupUDPAggregationUplink(egressGWBridge.uplinkName)
}
if err != nil {
klog.Warningf("Could not enable UDP packet aggregation on uplink interface (aggregation will be disabled): %v", err)
config.Default.EnableUDPAggregation = false
}
}
l3GwConfig := util.L3GatewayConfig{
Mode: config.Gateway.Mode,
ChassisID: chassisID,
InterfaceID: gatewayBridge.interfaceID,
MACAddress: gatewayBridge.macAddress,
IPAddresses: gatewayBridge.ips,
NextHops: gwNextHops,
NodePortEnable: config.Gateway.NodeportEnable,
VLANID: &config.Gateway.VLANID,
}
if egressGWBridge != nil {
l3GwConfig.EgressGWInterfaceID = egressGWBridge.interfaceID
l3GwConfig.EgressGWMACAddress = egressGWBridge.macAddress
l3GwConfig.EgressGWIPAddresses = egressGWBridge.ips
}
err = util.SetL3GatewayConfig(nodeAnnotator, &l3GwConfig)
return gatewayBridge, egressGWBridge, err
}
func gatewayReady(patchPort string) (bool, error) {
// Get ofport of patchPort
ofport, _, err := util.GetOVSOfPort("--if-exists", "get", "interface", patchPort, "ofport")
if err != nil || len(ofport) == 0 {
return false, nil
}
klog.Info("Gateway is ready")
return true, nil
}
func (g *gateway) GetGatewayBridgeIface() string {
return g.openflowManager.getDefaultBridgeName()
}
// getMaxFrameLength returns the maximum frame size (ignoring VLAN header) that a gateway can handle
func getMaxFrameLength() int {
return config.Default.MTU + 14
}
// SetDefaultGatewayBridgeMAC updates the mac address for the OFM used to render flows with
func (g *gateway) SetDefaultGatewayBridgeMAC(macAddr net.HardwareAddr) {
g.openflowManager.setDefaultBridgeMAC(macAddr)
klog.Infof("Default gateway bridge MAC address updated to %s", macAddr)
}
// Reconcile handles triggering updates to different components of a gateway, like OFM, Services
func (g *gateway) Reconcile() error {
klog.Info("Reconciling gateway with updates")
node, err := g.watchFactory.GetNode(g.nodeIPManager.nodeName)
if err != nil {
return err
}
subnets, err := util.ParseNodeHostSubnetAnnotation(node, types.DefaultNetworkName)
if err != nil {
return fmt.Errorf("failed to get subnets for node: %s for OpenFlow cache update", node.Name)
}
if err := g.openflowManager.updateBridgeFlowCache(subnets, g.nodeIPManager.ListAddresses()); err != nil {
return err
}
// Services create OpenFlow flows as well, need to update them all
if g.servicesRetryFramework != nil {
if errs := g.addAllServices(); errs != nil {
err := kerrors.NewAggregate(errs)
return err
}
}
return nil
}
func (g *gateway) addAllServices() []error {
errs := []error{}
svcs, err := g.watchFactory.GetServices()
if err != nil {
errs = append(errs, err)
} else {
for _, svc := range svcs {
svc := *svc
klog.V(5).Infof("Adding service %s/%s to retryServices", svc.Namespace, svc.Name)
err = g.servicesRetryFramework.AddRetryObjWithAddNoBackoff(&svc)
if err != nil {
err = fmt.Errorf("failed to add service %s/%s to retry framework: %w", svc.Namespace, svc.Name, err)
errs = append(errs, err)
}
}
}
g.servicesRetryFramework.RequestRetryObjs()
return errs
}
type bridgeConfiguration struct {
sync.Mutex
bridgeName string
uplinkName string
ips []*net.IPNet
interfaceID string
macAddress net.HardwareAddr
patchPort string
ofPortPatch string
ofPortPhys string
ofPortHost string
}
// updateInterfaceIPAddresses sets and returns the bridge's current ips
func (b *bridgeConfiguration) updateInterfaceIPAddresses(node *kapi.Node) ([]*net.IPNet, error) {
b.Lock()
defer b.Unlock()
ifAddrs, err := getNetworkInterfaceIPAddresses(b.bridgeName)
if err != nil {
return nil, err
}
// For DPU, here we need to use the DPU host's IP address which is the tenant cluster's
// host internal IP address instead of the DPU's external bridge IP address.
if config.OvnKubeNode.Mode == types.NodeModeDPU {
nodeAddrStr, err := util.GetNodePrimaryIP(node)
if err != nil {
return nil, err
}
nodeAddr := net.ParseIP(nodeAddrStr)
if nodeAddr == nil {
return nil, fmt.Errorf("failed to parse node IP address. %v", nodeAddrStr)
}
ifAddrs, err = getDPUHostPrimaryIPAddresses(nodeAddr, ifAddrs)
if err != nil {
return nil, err
}
}
b.ips = ifAddrs
return ifAddrs, nil
}
func bridgeForInterface(intfName, nodeName, physicalNetworkName string, gwIPs []*net.IPNet) (*bridgeConfiguration, error) {
res := bridgeConfiguration{}
gwIntf := intfName
if bridgeName, _, err := util.RunOVSVsctl("port-to-br", intfName); err == nil {
// This is an OVS bridge's internal port
uplinkName, err := util.GetNicName(bridgeName)
if err != nil {
return nil, errors.Wrapf(err, "Failed to find nic name for bridge %s", bridgeName)
}
res.bridgeName = bridgeName
res.uplinkName = uplinkName
gwIntf = bridgeName
} else if _, _, err := util.RunOVSVsctl("br-exists", intfName); err != nil {
// This is not a OVS bridge. We need to create a OVS bridge
// and add cluster.GatewayIntf as a port of that bridge.
bridgeName, err := util.NicToBridge(intfName)
if err != nil {
return nil, errors.Wrapf(err, "NicToBridge failed for %s", intfName)
}
res.bridgeName = bridgeName
res.uplinkName = intfName
gwIntf = bridgeName
} else {
// gateway interface is an OVS bridge
uplinkName, err := getIntfName(intfName)
if err != nil {
if config.Gateway.Mode == config.GatewayModeLocal && config.Gateway.AllowNoUplink {
klog.Infof("Could not find uplink for %s, setup gateway bridge with no uplink port, egress IP and egress GW will not work", intfName)
} else {
return nil, errors.Wrapf(err, "Failed to find intfName for %s", intfName)
}
} else {
res.uplinkName = uplinkName
}
res.bridgeName = intfName
}
var err error
// Now, we get IP addresses for the bridge
if len(gwIPs) > 0 {
// use gwIPs if provided
res.ips = gwIPs
} else {
// get IP addresses from OVS bridge. If IP does not exist,
// error out.
res.ips, err = getNetworkInterfaceIPAddresses(gwIntf)
if err != nil {
return nil, errors.Wrapf(err, "failed to get interface details for %s", gwIntf)
}
}
res.macAddress, err = util.GetOVSPortMACAddress(gwIntf)
if err != nil {
return nil, errors.Wrapf(err, "failed to get MAC address for ovs port %s", gwIntf)
}
res.interfaceID, err = bridgedGatewayNodeSetup(nodeName, res.bridgeName, physicalNetworkName)
if err != nil {
return nil, fmt.Errorf("failed to set up shared interface gateway: %v", err)
}
// the name of the patch port created by ovn-controller is of the form
// patch-<logical_port_name_of_localnet_port>-to-br-int
res.patchPort = "patch-" + res.bridgeName + "_" + nodeName + "-to-br-int"
// for DPU we use the host MAC address for the Gateway configuration
if config.OvnKubeNode.Mode == types.NodeModeDPU {
hostRep, err := util.GetDPUHostInterface(res.bridgeName)
if err != nil {
return nil, err
}
res.macAddress, err = util.GetSriovnetOps().GetRepresentorPeerMacAddress(hostRep)
if err != nil {
return nil, err
}
}
return &res, nil
}