From 83f780048ee14dc6cf4e33f59e4bca40d13627a9 Mon Sep 17 00:00:00 2001 From: Andrew Stoycos Date: Fri, 2 Jul 2021 14:35:13 -0400 Subject: [PATCH] Simplify LGW IPtables Make Iptables rule for every ExternalIP or ingress IP, this will result in shortcirciting of node -> ExternalIP service traffic Remove code to manage host routes Ensure we cleanup any stale routes on upgrade Signed-off-by: astoycos --- go-controller/pkg/node/gateway_localnet.go | 130 ++------------- .../pkg/node/gateway_localnet_linux_test.go | 156 +++++------------- 2 files changed, 57 insertions(+), 229 deletions(-) diff --git a/go-controller/pkg/node/gateway_localnet.go b/go-controller/pkg/node/gateway_localnet.go index 8c8e06a669..4a2b3ca2b1 100644 --- a/go-controller/pkg/node/gateway_localnet.go +++ b/go-controller/pkg/node/gateway_localnet.go @@ -73,31 +73,24 @@ func getLocalAddrs() (map[string]net.IPNet, error) { return localAddrSet, nil } -func (npw *localPortWatcherData) networkHasAddress(ip net.IP) bool { - for _, net := range npw.localAddrSet { - if net.Contains(ip) { - return true - } - } - return false -} - func (npw *localPortWatcherData) addService(svc *kapi.Service) error { + // don't process headless service or services that do not have NodePorts or ExternalIPs + if !util.ServiceTypeHasClusterIP(svc) || !util.IsClusterIPSet(svc) { + return nil + } + iptRules := []iptRule{} isIPv6Service := utilnet.IsIPv6String(svc.Spec.ClusterIP) gatewayIP := npw.gatewayIPv4 if isIPv6Service { gatewayIP = npw.gatewayIPv6 } - // holds map of external ips and if they are currently using routes - routeUsage := make(map[string]bool) for _, port := range svc.Spec.Ports { if util.ServiceTypeHasNodePort(svc) { if err := util.ValidatePort(port.Protocol, port.NodePort); err != nil { klog.Warningf("Invalid service node port %s, err: %v", port.Name, err) continue } - if gatewayIP != "" { // Fix Azure/GCP LoadBalancers. They will forward traffic directly to the node with the // dest address as the load-balancer ingress IP and port @@ -120,36 +113,8 @@ func (npw *localPortWatcherData) addService(svc *kapi.Service) error { externalIP, svc.Namespace, svc.Name, svc.Spec.ClusterIP) continue } - if _, exists := npw.localAddrSet[externalIP]; exists { - if !util.IsClusterIPSet(svc) { - serviceRef := kapi.ObjectReference{ - Kind: "Service", - Namespace: svc.Namespace, - Name: svc.Name, - } - npw.recorder.Eventf(&serviceRef, kapi.EventTypeWarning, "UnsupportedServiceDefinition", "Unsupported service definition, headless service: %s with a local ExternalIP is not supported by ovn-kubernetes in local gateway mode", svc.Name) - klog.Warningf("UnsupportedServiceDefinition event for service %s in namespace %s", svc.Name, svc.Namespace) - continue - } - iptRules = append(iptRules, getExternalIPTRules(port, externalIP, svc.Spec.ClusterIP)...) - klog.V(5).Infof("Will add iptables rule for ExternalIP: %s", externalIP) - } else if npw.networkHasAddress(net.ParseIP(externalIP)) { - klog.V(5).Infof("ExternalIP: %s is reachable through one of the interfaces on this node, will skip setup", externalIP) - } else { - if gatewayIP != "" { - routeUsage[externalIP] = true - } else { - klog.Warningf("No gateway of appropriate IP family for ExternalIP %s for Service %s/%s", - externalIP, svc.Namespace, svc.Name) - } - } - } - } - for externalIP := range routeUsage { - if stdout, stderr, err := util.RunIP("route", "replace", externalIP, "via", gatewayIP, "dev", util.K8sMgmtIntfName, "table", localnetGatewayExternalIDTable); err != nil { - klog.Errorf("Error adding routing table entry for ExternalIP %s, via gw: %s: stdout: %s, stderr: %s, err: %v", externalIP, gatewayIP, stdout, stderr, err) - } else { - klog.V(5).Infof("Successfully added route for ExternalIP: %s", externalIP) + iptRules = append(iptRules, getExternalIPTRules(port, externalIP, svc.Spec.ClusterIP)...) + klog.V(5).Infof("Will add iptables rule for ExternalIP: %s", externalIP) } } klog.V(5).Infof("Adding iptables rules: %v for service: %v", iptRules, svc.Name) @@ -163,8 +128,6 @@ func (npw *localPortWatcherData) deleteService(svc *kapi.Service) error { if isIPv6Service { gatewayIP = npw.gatewayIPv6 } - // holds map of external ips and if they are currently using routes - routeUsage := make(map[string]bool) // Note that unlike with addService we just silently ignore IPv4/IPv6 mismatches here for _, port := range svc.Spec.Ports { if util.ServiceTypeHasNodePort(svc) { @@ -181,24 +144,10 @@ func (npw *localPortWatcherData) deleteService(svc *kapi.Service) error { if utilnet.IsIPv6String(externalIP) != isIPv6Service { continue } - if _, exists := npw.localAddrSet[externalIP]; exists { - iptRules = append(iptRules, getExternalIPTRules(port, externalIP, svc.Spec.ClusterIP)...) - klog.V(5).Infof("Will delete iptables rule for ExternalIP: %s", externalIP) - } else if npw.networkHasAddress(net.ParseIP(externalIP)) { - klog.V(5).Infof("ExternalIP: %s is reachable through one of the interfaces on this node, will skip cleanup", externalIP) - } else { - if gatewayIP != "" { - routeUsage[externalIP] = true - } - } - } - } + + iptRules = append(iptRules, getExternalIPTRules(port, externalIP, svc.Spec.ClusterIP)...) + klog.V(5).Infof("Will delete iptables rule for ExternalIP: %s", externalIP) - for externalIP := range routeUsage { - if stdout, stderr, err := util.RunIP("route", "del", externalIP, "via", gatewayIP, "dev", util.K8sMgmtIntfName, "table", localnetGatewayExternalIDTable); err != nil { - klog.Errorf("Error delete routing table entry for ExternalIP %s: stdout: %s, stderr: %s, err: %v", externalIP, stdout, stderr, err) - } else { - klog.V(5).Infof("Successfully deleted route for ExternalIP: %s", externalIP) } } @@ -207,73 +156,24 @@ func (npw *localPortWatcherData) deleteService(svc *kapi.Service) error { } func (npw *localPortWatcherData) syncServices(serviceInterface []interface{}) { - removeStaleRoutes := func(keepRoutes []string) { - stdout, stderr, err := util.RunIP("route", "list", "table", localnetGatewayExternalIDTable) - if err != nil || stdout == "" { - klog.Infof("No routing table entries for ExternalIP table %s: stdout: %s, stderr: %s, err: %v", - localnetGatewayExternalIDTable, stdout, strings.Replace(stderr, "\n", "", -1), err) - return - } - for _, existingRoute := range strings.Split(stdout, "\n") { - isFound := false - for _, keepRoute := range keepRoutes { - if strings.Contains(existingRoute, keepRoute) { - isFound = true - break - } - } - if !isFound { - klog.V(5).Infof("Deleting stale routing rule: %s", existingRoute) - if _, stderr, err := util.RunIP("route", "del", existingRoute, "table", localnetGatewayExternalIDTable); err != nil { - klog.Errorf("Error deleting stale routing rule: stderr: %s, err: %v", stderr, err) - } - } - } - } keepIPTRules := []iptRule{} - keepRoutes := []string{} - for _, service := range serviceInterface { - svc, ok := service.(*kapi.Service) - if !ok { - klog.Errorf("Spurious object in syncServices: %v", serviceInterface) - continue - } - gatewayIP := npw.gatewayIPv4 - if utilnet.IsIPv6String(svc.Spec.ClusterIP) { - gatewayIP = npw.gatewayIPv6 - } - if gatewayIP != "" { - keepIPTRules = append(keepIPTRules, getGatewayIPTRules(svc, gatewayIP, nil)...) - } - keepRoutes = append(keepRoutes, svc.Spec.ExternalIPs...) - } for _, chain := range []string{iptableNodePortChain, iptableExternalIPChain} { recreateIPTRules("nat", chain, keepIPTRules) recreateIPTRules("filter", chain, keepIPTRules) } - removeStaleRoutes(keepRoutes) -} -func initRoutingRules() error { - stdout, stderr, err := util.RunIP("rule") - if err != nil { - return fmt.Errorf("error listing routing rules, stdout: %s, stderr: %s, err: %v", stdout, stderr, err) - } - if !strings.Contains(stdout, fmt.Sprintf("from all lookup %s", localnetGatewayExternalIDTable)) { - if stdout, stderr, err := util.RunIP("rule", "add", "from", "all", "table", localnetGatewayExternalIDTable); err != nil { - return fmt.Errorf("error adding routing rule for ExternalIP table (%s): stdout: %s, stderr: %s, err: %v", localnetGatewayExternalIDTable, stdout, stderr, err) - } + // Previously LGW used routes in the localnetGatewayExternalIDTable, to handle + // upgrades correctly make sure we flush this table of all routes + klog.Infof("Flushing host's routing table: %s", localnetGatewayExternalIDTable) + if _, stderr, err := util.RunIP("route", "flush", "table", localnetGatewayExternalIDTable); err != nil { + klog.Errorf("Error flushing host's routing table: %s stderr: %s err: %v", localnetGatewayExternalIDTable, stderr, err) } - return nil } func (n *OvnNode) watchLocalPorts(npw *localPortWatcherData) error { if err := initLocalGatewayIPTables(); err != nil { return err } - if err := initRoutingRules(); err != nil { - return err - } n.watchFactory.AddServiceHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { svc := obj.(*kapi.Service) diff --git a/go-controller/pkg/node/gateway_localnet_linux_test.go b/go-controller/pkg/node/gateway_localnet_linux_test.go index 6bccc8cad2..b7ce88003f 100644 --- a/go-controller/pkg/node/gateway_localnet_linux_test.go +++ b/go-controller/pkg/node/gateway_localnet_linux_test.go @@ -2,7 +2,6 @@ package node import ( "fmt" - "net" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -19,17 +18,7 @@ const ( v4localnetGatewayIP = "10.244.0.1" ) -func getFakeLocalAddrs() map[string]net.IPNet { - localAddrSet := make(map[string]net.IPNet) - for _, network := range []string{"127.0.0.1/32", "10.10.10.1/24"} { - ip, ipNet, err := net.ParseCIDR(network) - Expect(err).NotTo(HaveOccurred()) - localAddrSet[ip.String()] = *ipNet - } - return localAddrSet -} - -func initFakeNodePortWatcher(fakeOvnNode *FakeOVNNode, iptV4, iptV6 util.IPTablesHelper) *localPortWatcherData { +func initFakeNodePortWatcher(fakeOvnNode *FakeOVNNode, iptV4, iptV6 util.IPTablesHelper) *localPortWatcher { initIPTable := map[string]util.FakeTable{ "filter": {}, "nat": {}, @@ -43,14 +32,36 @@ func initFakeNodePortWatcher(fakeOvnNode *FakeOVNNode, iptV4, iptV6 util.IPTable err = f6.MatchState(initIPTable) Expect(err).NotTo(HaveOccurred()) - fNPW := localPortWatcherData{ - recorder: fakeOvnNode.recorder, - gatewayIPv4: v4localnetGatewayIP, - localAddrSet: getFakeLocalAddrs(), + fNPW := localPortWatcher{ + recorder: fakeOvnNode.recorder, + gatewayIPv4: v4localnetGatewayIP, } return &fNPW } +func startLocalPortWatcher(l *localPortWatcher, wf factory.NodeWatchFactory) error { + if err := initLocalGatewayIPTables(); err != nil { + return err + } + + wf.AddServiceHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + svc := obj.(*kapi.Service) + l.AddService(svc) + }, + UpdateFunc: func(old, new interface{}) { + oldSvc := old.(*kapi.Service) + newSvc := new.(*kapi.Service) + l.UpdateService(oldSvc, newSvc) + }, + DeleteFunc: func(obj interface{}) { + svc := obj.(*kapi.Service) + l.DeleteService(svc) + }, + }, l.SyncServices) + return nil +} + func newServiceMeta(name, namespace string) metav1.ObjectMeta { return metav1.ObjectMeta{ UID: types.UID(namespace), @@ -95,94 +106,14 @@ var _ = Describe("Node Operations", func() { }) Context("on startup", func() { - - It("inits physical routing rules", func() { - app.Action = func(ctx *cli.Context) error { - - iptV4, iptV6 := util.SetFakeIPTablesHelpers() - fNPW := initFakeNodePortWatcher(fakeOvnNode, iptV4, iptV6) - - fakeOvnNode.fakeExec.AddFakeCmd(&ovntest.ExpectedCmd{ - Cmd: "ip rule", - Output: "0: from all lookup local\n32766: from all lookup main\n32767: from all lookup default\n", - }) - fakeOvnNode.fakeExec.AddFakeCmdsNoOutputNoError([]string{ - "ip rule add from all table " + localnetGatewayExternalIDTable, - }) - fakeOvnNode.fakeExec.AddFakeCmdsNoOutputNoError([]string{ - "ip route list table " + localnetGatewayExternalIDTable, - }) - - fakeOvnNode.start(ctx) - fakeOvnNode.node.watchLocalPorts(fNPW) - - expectedTables := map[string]util.FakeTable{ - "filter": { - "FORWARD": []string{ - "-j OVN-KUBE-EXTERNALIP", - "-j OVN-KUBE-NODEPORT", - }, - "OVN-KUBE-NODEPORT": []string{}, - "OVN-KUBE-EXTERNALIP": []string{}, - }, - "nat": { - "PREROUTING": []string{ - "-j OVN-KUBE-EXTERNALIP", - "-j OVN-KUBE-NODEPORT", - }, - "OUTPUT": []string{ - "-j OVN-KUBE-EXTERNALIP", - "-j OVN-KUBE-NODEPORT", - }, - "OVN-KUBE-NODEPORT": []string{}, - "OVN-KUBE-EXTERNALIP": []string{}, - }, - } - f4 := iptV4.(*util.FakeIPTables) - err := f4.MatchState(expectedTables) - Expect(err).NotTo(HaveOccurred()) - - expectedTables = map[string]util.FakeTable{ - "filter": {}, - "nat": {}, - } - f6 := iptV6.(*util.FakeIPTables) - err = f6.MatchState(expectedTables) - Expect(err).NotTo(HaveOccurred()) - - fakeOvnNode.shutdown() - return nil - } - err := app.Run([]string{app.Name}) - Expect(err).NotTo(HaveOccurred()) - }) - - It("removes stale physical routing rules while keeping remaining intact", func() { + It("removes stale iptables rules while keeping remaining intact", func() { app.Action = func(ctx *cli.Context) error { externalIP := "1.1.1.1" externalIPPort := int32(8032) - - iptV4, iptV6 := util.SetFakeIPTablesHelpers() - fNPW := initFakeNodePortWatcher(fakeOvnNode, iptV4, iptV6) - - // Create some fake routing and iptable rules - fakeOvnNode.fakeExec.AddFakeCmd(&ovntest.ExpectedCmd{ - Cmd: "ip rule", - Output: "0: from all lookup local\n32766: from all lookup main\n32767: from all lookup default\n", - }) + // create fake ip route commands fakeOvnNode.fakeExec.AddFakeCmdsNoOutputNoError([]string{ - "ip rule add from all table " + localnetGatewayExternalIDTable, - }) - fakeOvnNode.fakeExec.AddFakeCmd(&ovntest.ExpectedCmd{ - Cmd: "ip route list table " + localnetGatewayExternalIDTable, - Output: fmt.Sprintf("%s via %s dev %s\n9.9.9.9 via %s dev %s\n", externalIP, v4localnetGatewayIP, util.K8sMgmtIntfName, v4localnetGatewayIP, util.K8sMgmtIntfName), - }) - fakeOvnNode.fakeExec.AddFakeCmdsNoOutputNoError([]string{ - fmt.Sprintf("ip route del 9.9.9.9 via %s dev %s table %s", v4localnetGatewayIP, util.K8sMgmtIntfName, localnetGatewayExternalIDTable), - }) - fakeOvnNode.fakeExec.AddFakeCmdsNoOutputNoError([]string{ - fmt.Sprintf("ip route replace %s via %s dev %s table %s", externalIP, v4localnetGatewayIP, util.K8sMgmtIntfName, localnetGatewayExternalIDTable), + fmt.Sprintf("ip route flush table %s", localnetGatewayExternalIDTable), }) service := *newService("service1", "namespace1", "10.129.0.2", @@ -278,8 +209,7 @@ var _ = Describe("Node Operations", func() { }) Context("on add", func() { - - It("inits physical routing rules with ExternalIP outside any local network", func() { + It("inits iptables rules with ExternalIP outside any local network", func() { app.Action = func(ctx *cli.Context) error { externalIP := "1.1.1.1" @@ -305,25 +235,24 @@ var _ = Describe("Node Operations", func() { fNPW.addService(&service) expectedTables := map[string]util.FakeTable{ - "filter": {}, - "nat": {}, + "nat": { + "OVN-KUBE-EXTERNALIP": []string{ + fmt.Sprintf("-p %s -d %s --dport %v -j DNAT --to-destination %s:%v", service.Spec.Ports[0].Protocol, externalIP, service.Spec.Ports[0].Port, service.Spec.ClusterIP, service.Spec.Ports[0].Port), + }, + }, } f4 := iptV4.(*util.FakeIPTables) err := f4.MatchState(expectedTables) Expect(err).NotTo(HaveOccurred()) - f6 := iptV6.(*util.FakeIPTables) - err = f6.MatchState(expectedTables) - Expect(err).NotTo(HaveOccurred()) - return nil } err := app.Run([]string{app.Name}) Expect(err).NotTo(HaveOccurred()) }) - It("does nothing when ExternalIP on shared network", func() { + It("inits iptables rules with ExternalIP on shared network", func() { app.Action = func(ctx *cli.Context) error { externalIP := "10.10.10.2" @@ -345,18 +274,17 @@ var _ = Describe("Node Operations", func() { fNPW.addService(&service) expectedTables := map[string]util.FakeTable{ - "filter": {}, - "nat": {}, + "nat": { + "OVN-KUBE-EXTERNALIP": []string{ + fmt.Sprintf("-p %s -d %s --dport %v -j DNAT --to-destination %s:%v", service.Spec.Ports[0].Protocol, externalIP, service.Spec.Ports[0].Port, service.Spec.ClusterIP, service.Spec.Ports[0].Port), + }, + }, } f4 := iptV4.(*util.FakeIPTables) err := f4.MatchState(expectedTables) Expect(err).NotTo(HaveOccurred()) - f6 := iptV6.(*util.FakeIPTables) - err = f6.MatchState(expectedTables) - Expect(err).NotTo(HaveOccurred()) - return nil } err := app.Run([]string{app.Name})