From 4246cb74de513fb1267b512e19eedcb2e8b969e4 Mon Sep 17 00:00:00 2001 From: Mengxin Liu Date: Mon, 23 Sep 2019 23:43:59 +0800 Subject: [PATCH] feat: gateway ha --- docs/kubectl-plugin.md | 2 +- docs/subnet.md | 2 +- pkg/apis/kubeovn/v1/condition.go | 8 +- pkg/apis/kubeovn/v1/types.go | 5 +- pkg/controller/controller.go | 9 ++ pkg/controller/node.go | 110 +++++++++++++++++++ pkg/controller/pod.go | 19 +--- pkg/controller/subnet.go | 176 ++++++++++++++++++++++++++++--- pkg/daemon/gateway.go | 95 +++++++++++++---- 9 files changed, 365 insertions(+), 61 deletions(-) diff --git a/docs/kubectl-plugin.md b/docs/kubectl-plugin.md index 8598f63c11b..832e723b1b8 100644 --- a/docs/kubectl-plugin.md +++ b/docs/kubectl-plugin.md @@ -8,7 +8,7 @@ To enable kubectl plugin, kubectl version of 1.12 or later is recommended. You c 1. Get the `kubectl-ko` file ```bash -wget https://github.com/alauda/kube-ovn/blob/master/dist/images/kubectl-ko +wget https://raw.githubusercontent.com/alauda/kube-ovn/master/dist/images/kubectl-ko ``` 2. Move the file to one of $PATH directories diff --git a/docs/subnet.md b/docs/subnet.md index 9a43c6414cd..a222ac06eab 100644 --- a/docs/subnet.md +++ b/docs/subnet.md @@ -56,5 +56,5 @@ For a distributed Gateway, outgoing traffic from Pods within the OVN network to For a centralized gateway, outgoing traffic from Pods within the OVN network to external destinations will go through Gateway Node for the Namespace. - `gatewayType`: `distributed` or `centralized`, default is `distributed`. -- `gatewayNode`: when `gatewayType` is `centralized` used this field to specify which node act as the namespace gateway. +- `gatewayNode`: when `gatewayType` is `centralized` used this field to specify which node act as the namespace gateway. This field can be a comma separated string, like `node1,node2` and kube-ovn will automatically apply an active-backup failover strategy. - `natOutgoing`: `true` or `false`, whether pod ip need to be masqueraded when go through gateway. When `false`, pod ip will be exposed to external network directly, default `false`. diff --git a/pkg/apis/kubeovn/v1/condition.go b/pkg/apis/kubeovn/v1/condition.go index 8f6798b868c..cf3d68309fd 100644 --- a/pkg/apis/kubeovn/v1/condition.go +++ b/pkg/apis/kubeovn/v1/condition.go @@ -87,22 +87,22 @@ func (m *SubnetStatus) ConditionReason(ctype ConditionType) string { return "" } -// Ready - shortcut to set ready contition to true +// Ready - shortcut to set ready condition to true func (m *SubnetStatus) Ready(reason, message string) { m.SetCondition(Ready, reason, message) } -// NotReady - shortcut to set ready contition to false +// NotReady - shortcut to set ready condition to false func (m *SubnetStatus) NotReady(reason, message string) { m.ClearCondition(Ready, reason, message) } -// Validated - shortcut to set validated contition to true +// Validated - shortcut to set validated condition to true func (m *SubnetStatus) Validated(reason, message string) { m.SetCondition(Validated, reason, message) } -// NotValidated - shortcut to set validated contition to false +// NotValidated - shortcut to set validated condition to false func (m *SubnetStatus) NotValidated(reason, message string) { m.ClearCondition(Validated, reason, message) } diff --git a/pkg/apis/kubeovn/v1/types.go b/pkg/apis/kubeovn/v1/types.go index a9fe4082312..824f37989cf 100644 --- a/pkg/apis/kubeovn/v1/types.go +++ b/pkg/apis/kubeovn/v1/types.go @@ -116,8 +116,9 @@ type SubnetStatus struct { // +patchStrategy=merge Conditions []SubnetCondition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` - AvailableIPs uint64 `json:"availableIPs"` - UsingIPs uint64 `json:"usingIPs"` + AvailableIPs uint64 `json:"availableIPs"` + UsingIPs uint64 `json:"usingIPs"` + ActivateGateway string `json:"activateGateway"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 0c91dc68052..2d657baf90e 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -47,6 +47,7 @@ type Controller struct { subnetSynced cache.InformerSynced addSubnetQueue workqueue.RateLimitingInterface deleteSubnetQueue workqueue.RateLimitingInterface + deleteRouteQueue workqueue.RateLimitingInterface updateSubnetQueue workqueue.RateLimitingInterface updateSubnetStatusQueue workqueue.RateLimitingInterface @@ -59,6 +60,7 @@ type Controller struct { nodesLister v1.NodeLister nodesSynced cache.InformerSynced addNodeQueue workqueue.RateLimitingInterface + updateNodeQueue workqueue.RateLimitingInterface deleteNodeQueue workqueue.RateLimitingInterface servicesLister v1.ServiceLister @@ -117,6 +119,7 @@ func NewController(config *Configuration) *Controller { subnetSynced: subnetInformer.Informer().HasSynced, addSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddSubnet"), deleteSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteSubnet"), + deleteRouteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteRoute"), updateSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnet"), updateSubnetStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnetStatus"), @@ -136,6 +139,7 @@ func NewController(config *Configuration) *Controller { nodesLister: nodeInformer.Lister(), nodesSynced: nodeInformer.Informer().HasSynced, addNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddNode"), + updateNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNode"), deleteNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNode"), servicesLister: serviceInformer.Lister(), @@ -173,6 +177,7 @@ func NewController(config *Configuration) *Controller { nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueAddNode, + UpdateFunc: controller.enqueueUpdateNode, DeleteFunc: controller.enqueueDeleteNode, }) @@ -223,9 +228,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) error { defer c.addSubnetQueue.ShutDown() defer c.updateSubnetQueue.ShutDown() defer c.deleteSubnetQueue.ShutDown() + defer c.deleteRouteQueue.ShutDown() defer c.updateSubnetStatusQueue.ShutDown() defer c.addNodeQueue.ShutDown() + defer c.updateNodeQueue.ShutDown() defer c.deleteNodeQueue.ShutDown() defer c.deleteTcpServiceQueue.ShutDown() @@ -279,9 +286,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) error { go wait.Until(c.runDeleteSubnetWorker, time.Second, stopCh) go wait.Until(c.runUpdateSubnetWorker, time.Second, stopCh) + go wait.Until(c.runDeleteRouteWorker, time.Second, stopCh) go wait.Until(c.runUpdateSubnetStatusWorker, time.Second, stopCh) go wait.Until(c.runAddNodeWorker, time.Second, stopCh) + go wait.Until(c.runUpdateNodeWorker, time.Second, stopCh) go wait.Until(c.runDeleteNodeWorker, time.Second, stopCh) go wait.Until(c.runUpdateServiceWorker, time.Second, stopCh) diff --git a/pkg/controller/node.go b/pkg/controller/node.go index 0deb14e9903..c2d5e5dbca9 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -11,6 +11,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" @@ -31,6 +32,35 @@ func (c *Controller) enqueueAddNode(obj interface{}) { c.addNodeQueue.AddRateLimited(key) } +func nodeReady(node *v1.Node) bool { + for _, con := range node.Status.Conditions { + if con.Type == v1.NodeReady && con.Status == v1.ConditionTrue { + return true + } + } + return false +} + +func (c *Controller) enqueueUpdateNode(oldObj, newObj interface{}) { + if !c.isLeader() { + return + } + + oldNode := oldObj.(*v1.Node) + newNode := newObj.(*v1.Node) + + if nodeReady(oldNode) != nodeReady(newNode) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(newObj); err != nil { + utilruntime.HandleError(err) + return + } + klog.V(3).Infof("enqueue update node %s", key) + c.updateNodeQueue.AddRateLimited(key) + } +} + func (c *Controller) enqueueDeleteNode(obj interface{}) { if !c.isLeader() { return @@ -50,6 +80,11 @@ func (c *Controller) runAddNodeWorker() { } } +func (c *Controller) runUpdateNodeWorker() { + for c.processNextUpdateNodeWorkItem() { + } +} + func (c *Controller) runDeleteNodeWorker() { for c.processNextDeleteNodeWorkItem() { } @@ -86,6 +121,37 @@ func (c *Controller) processNextAddNodeWorkItem() bool { return true } +func (c *Controller) processNextUpdateNodeWorkItem() bool { + obj, shutdown := c.updateNodeQueue.Get() + + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer c.updateNodeQueue.Done(obj) + var key string + var ok bool + if key, ok = obj.(string); !ok { + c.updateNodeQueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + if err := c.handleUpdateNode(key); err != nil { + c.updateNodeQueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + c.updateNodeQueue.Forget(obj) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + return true +} + func (c *Controller) processNextDeleteNodeWorkItem() bool { obj, shutdown := c.deleteNodeQueue.Get() @@ -259,6 +325,50 @@ func (c *Controller) handleDeleteNode(key string) error { return nil } +func (c *Controller) handleUpdateNode(key string) error { + node, err := c.nodesLister.Get(key) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + subnets, err := c.subnetsLister.List(labels.Everything()) + if err != nil { + klog.Errorf("failed to get subnets %v", err) + return err + } + + if nodeReady(node) { + for _, subnet := range subnets { + if subnet.Status.ActivateGateway == "" && gatewayContains(subnet.Spec.GatewayNode, node.Name) { + if err := c.reconcileCentralizedGateway(subnet); err != nil { + return err + } + } + } + } else { + for _, subnet := range subnets { + if subnet.Status.ActivateGateway == node.Name { + if err := c.reconcileCentralizedGateway(subnet); err != nil { + return err + } + } + } + } + return nil +} + +func gatewayContains(gatewayNodeStr, gateway string) bool { + for _, gw := range strings.Split(gatewayNodeStr, ",") { + gw = strings.TrimSpace(gw) + if gw == gateway { + return true + } + } + return false +} + func getNodeInternalIP(node *v1.Node) string { var nodeAddr string for _, addr := range node.Status.Addresses { diff --git a/pkg/controller/pod.go b/pkg/controller/pod.go index 8105139845a..aa3bd62a40a 100644 --- a/pkg/controller/pod.go +++ b/pkg/controller/pod.go @@ -649,8 +649,7 @@ func (c *Controller) handleUpdatePod(key string) error { } } - switch subnet.Spec.GatewayType { - case "", kubeovnv1.GWDistributedType: + if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType { node, err := c.nodesLister.Get(pod.Spec.NodeName) if err != nil { klog.Errorf("get node %s failed %v", pod.Spec.NodeName, err) @@ -666,22 +665,6 @@ func (c *Controller) handleUpdatePod(key string) error { if err := c.ovnClient.AddStaticRouter(ovs.PolicySrcIP, pod.Status.PodIP, nodeTunlIPAddr.String(), c.config.ClusterRouter); err != nil { return errors.Annotate(err, "add static route failed") } - case kubeovnv1.GWCentralizedType: - node, err := c.nodesLister.Get(subnet.Spec.GatewayNode) - if err != nil { - klog.Errorf("get node %s failed %v", pod.Spec.NodeName, err) - return err - } - nodeTunlIPAddr, err := getNodeTunlIP(node) - if err != nil { - return err - } - if err := c.ovnClient.DeleteStaticRouter(pod.Status.PodIP, c.config.ClusterRouter); err != nil { - return errors.Annotate(err, "del static route failed") - } - if err := c.ovnClient.AddStaticRouter(ovs.PolicySrcIP, pod.Status.PodIP, nodeTunlIPAddr.String(), c.config.ClusterRouter); err != nil { - return errors.Annotate(err, "add static route failed") - } } return nil } diff --git a/pkg/controller/subnet.go b/pkg/controller/subnet.go index 740d225dc4c..9006c2d030c 100644 --- a/pkg/controller/subnet.go +++ b/pkg/controller/subnet.go @@ -2,14 +2,17 @@ package controller import ( "fmt" + "net" + "reflect" + "strings" + kubeovnv1 "github.com/alauda/kube-ovn/pkg/apis/kubeovn/v1" "github.com/alauda/kube-ovn/pkg/ovs" "github.com/alauda/kube-ovn/pkg/util" - "net" - "reflect" + "github.com/juju/errors" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -45,6 +48,10 @@ func (c *Controller) enqueueDeleteSubnet(obj interface{}) { } klog.V(3).Infof("enqueue delete subnet %s", key) c.deleteSubnetQueue.AddRateLimited(key) + subnet := obj.(*kubeovnv1.Subnet) + if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType { + c.deleteRouteQueue.AddRateLimited(subnet.Spec.CIDRBlock) + } } func (c *Controller) enqueueUpdateSubnet(old, new interface{}) { @@ -56,7 +63,9 @@ func (c *Controller) enqueueUpdateSubnet(old, new interface{}) { if oldSubnet.Spec.Private != newSubnet.Spec.Private || !reflect.DeepEqual(oldSubnet.Spec.AllowSubnets, newSubnet.Spec.AllowSubnets) || - !reflect.DeepEqual(oldSubnet.Spec.Namespaces, newSubnet.Spec.Namespaces) { + !reflect.DeepEqual(oldSubnet.Spec.Namespaces, newSubnet.Spec.Namespaces) || + oldSubnet.Spec.GatewayType != newSubnet.Spec.GatewayType || + oldSubnet.Spec.GatewayNode != newSubnet.Spec.GatewayNode { var key string var err error if key, err = cache.MetaNamespaceKeyFunc(new); err != nil { @@ -83,6 +92,12 @@ func (c *Controller) runUpdateSubnetStatusWorker() { } } +func (c *Controller) runDeleteRouteWorker() { + for c.processNextDeleteRoutePodWorkItem() { + + } +} + func (c *Controller) runDeleteSubnetWorker() { for c.processNextDeleteSubnetWorkItem() { } @@ -148,6 +163,36 @@ func (c *Controller) processNextUpdateSubnetWorkItem() bool { return true } +func (c *Controller) processNextDeleteRoutePodWorkItem() bool { + obj, shutdown := c.deleteRouteQueue.Get() + if shutdown { + return false + } + + err := func(obj interface{}) error { + defer c.deleteRouteQueue.Done(obj) + var key string + var ok bool + if key, ok = obj.(string); !ok { + c.deleteRouteQueue.Forget(obj) + utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + if err := c.handleDeleteRoute(key); err != nil { + c.deleteRouteQueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + c.deleteRouteQueue.Forget(obj) + return nil + }(obj) + + if err != nil { + utilruntime.HandleError(err) + return true + } + return true +} + func (c *Controller) processNextUpdateSubnetStatusWorkItem() bool { obj, shutdown := c.updateSubnetStatusQueue.Get() if shutdown { @@ -251,7 +296,7 @@ func formatSubnet(subnet *kubeovnv1.Subnet, c *Controller) error { } if changed { - subnet, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Update(subnet) + _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Update(subnet) if err != nil { klog.Errorf("failed to update subnet %s, %v", subnet.Name, err) return err @@ -263,7 +308,7 @@ func formatSubnet(subnet *kubeovnv1.Subnet, c *Controller) error { func (c *Controller) handleAddSubnet(key string) error { subnet, err := c.subnetsLister.Get(key) if err != nil { - if errors.IsNotFound(err) { + if k8serrors.IsNotFound(err) { return nil } return err @@ -364,6 +409,20 @@ func (c *Controller) handleAddSubnet(key string) error { return err } + if err := c.reconcileCentralizedGateway(subnet); err != nil { + klog.Errorf("failed to reconcile gateway %s, %v", subnet.Name, err) + subnet.Status.SetError("ReconcileGatewayFailed", err.Error()) + bytes, err := subnet.Status.Bytes() + if err != nil { + klog.Error(err) + } else { + if _, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(subnet.Name, types.MergePatchType, bytes, "status"); err != nil { + klog.Error("patch subnet status failed", err) + } + } + return err + } + if subnet.Spec.Private { err = c.ovnClient.SetPrivateLogicalSwitch(subnet.Name, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, subnet.Spec.AllowSubnets) if err != nil { @@ -396,13 +455,85 @@ func (c *Controller) handleAddSubnet(key string) error { klog.Error("patch subnet status failed", err) } } + + return err +} + +func (c *Controller) reconcileCentralizedGateway(subnet *kubeovnv1.Subnet) error { + // if gw is distributed remove activateGateway field + if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType { + if subnet.Status.ActivateGateway == "" { + return nil + } + subnet.Status.ActivateGateway = "" + bytes, err := subnet.Status.Bytes() + if err != nil { + return err + } + _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(subnet.Name, types.MergePatchType, bytes, "status") + return err + } + klog.Infof("start to init centralized gateway for subnet %s", subnet.Name) + + // check if activateGateway still ready + if subnet.Status.ActivateGateway != "" { + node, err := c.nodesLister.Get(subnet.Status.ActivateGateway) + if err == nil && nodeReady(node) { + klog.Infof("subnet %s uses the old activate gw %s", subnet.Name, node.Name) + return nil + } + } + + klog.Info("find a new activate node") + // need a new activate gateway + newActivateNode := "" + var nodeTunlIPAddr net.IP + for _, gw := range strings.Split(subnet.Spec.GatewayNode, ",") { + gw = strings.TrimSpace(gw) + node, err := c.nodesLister.Get(gw) + if err == nil && nodeReady(node) { + newActivateNode = node.Name + nodeTunlIPAddr, err = getNodeTunlIP(node) + if err != nil { + return err + } + klog.Infof("subnet %s uses a new activate gw %s", subnet.Name, node.Name) + break + } + } + if newActivateNode == "" { + klog.Warningf("all subnet %s gws are not ready", subnet.Name) + subnet.Status.ActivateGateway = newActivateNode + subnet.Status.NotReady("NoReadyGateway", "") + bytes, err := subnet.Status.Bytes() + if err != nil { + return err + } + _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(subnet.Name, types.MergePatchType, bytes, "status") + return err + } + + if err := c.ovnClient.DeleteStaticRouter(subnet.Spec.CIDRBlock, c.config.ClusterRouter); err != nil { + return errors.Annotate(err, "del static route failed") + } + if err := c.ovnClient.AddStaticRouter(ovs.PolicySrcIP, subnet.Spec.CIDRBlock, nodeTunlIPAddr.String(), c.config.ClusterRouter); err != nil { + return errors.Annotate(err, "add static route failed") + } + + subnet.Status.ActivateGateway = newActivateNode + bytes, err := subnet.Status.Bytes() + subnet.Status.Ready("ReconcileCentralizedGatewaySuccess", "") + if err != nil { + return err + } + _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(subnet.Name, types.MergePatchType, bytes, "status") return err } func (c *Controller) handleUpdateSubnet(key string) error { subnet, err := c.subnetsLister.Get(key) if err != nil { - if errors.IsNotFound(err) { + if k8serrors.IsNotFound(err) { return nil } return err @@ -474,6 +605,20 @@ func (c *Controller) handleUpdateSubnet(key string) error { return err } + if err := c.reconcileCentralizedGateway(subnet); err != nil { + klog.Errorf("failed to reconcile gateway %s, %v", subnet.Name, err) + subnet.Status.SetError("ReconcileGatewayFailed", err.Error()) + bytes, err := subnet.Status.Bytes() + if err != nil { + klog.Error(err) + } else { + if _, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(subnet.Name, types.MergePatchType, bytes, "status"); err != nil { + klog.Error("patch subnet status failed", err) + } + } + return err + } + if subnet.Spec.Private { err = c.ovnClient.SetPrivateLogicalSwitch(subnet.Name, subnet.Spec.Protocol, subnet.Spec.CIDRBlock, subnet.Spec.AllowSubnets) if err != nil { @@ -511,7 +656,7 @@ func (c *Controller) handleUpdateSubnet(key string) error { func (c *Controller) handleUpdateSubnetStatus(key string) error { subnet, err := c.subnetsLister.Get(key) if err != nil { - if errors.IsNotFound(err) { + if k8serrors.IsNotFound(err) { return nil } return err @@ -519,6 +664,14 @@ func (c *Controller) handleUpdateSubnetStatus(key string) error { return calcSubnetStatusIP(subnet, c) } +func (c *Controller) handleDeleteRoute(key string) error { + if _, _, err := net.ParseCIDR(key); err != nil { + return nil + } + + return c.ovnClient.DeleteStaticRouter(key, c.config.ClusterRouter) +} + func (c *Controller) handleDeleteSubnet(key string) error { exist, err := c.ovnClient.LogicalSwitchExists(key) if err != nil { @@ -529,16 +682,15 @@ func (c *Controller) handleDeleteSubnet(key string) error { return nil } - err = c.ovnClient.CleanLogicalSwitchAcl(key) - if err != nil { + if err = c.ovnClient.CleanLogicalSwitchAcl(key); err != nil { klog.Errorf("failed to delete acl of logical switch %s %v", key, err) return err } - err = c.ovnClient.DeleteLogicalSwitch(key) - if err != nil { + if err = c.ovnClient.DeleteLogicalSwitch(key); err != nil { klog.Errorf("failed to delete logical switch %s %v", key, err) return err } + return nil } diff --git a/pkg/daemon/gateway.go b/pkg/daemon/gateway.go index d9bea006f80..4f6557847cb 100644 --- a/pkg/daemon/gateway.go +++ b/pkg/daemon/gateway.go @@ -13,22 +13,33 @@ import ( ) const ( - SubnetSet = "subnets" - LocalPodSet = "local-pod-ip-nat" - IPSetPrefix = "ovn" + SubnetSet = "subnets" + SubnetNatSet = "subnets-nat" + LocalPodSet = "local-pod-ip-nat" + IPSetPrefix = "ovn" ) var ( - natV4Rule = util.IPTableRule{ + podNatV4Rule = util.IPTableRule{ Table: "nat", Chain: "POSTROUTING", Rule: strings.Split("-m set --match-set ovn40local-pod-ip-nat src -m set ! --match-set ovn40subnets dst -j MASQUERADE", " "), } - natV6Rule = util.IPTableRule{ + subnetNatV4Rule = util.IPTableRule{ + Table: "nat", + Chain: "POSTROUTING", + Rule: strings.Split("-m set --match-set ovn40subnets-nat src -m set ! --match-set ovn40subnets dst -j MASQUERADE", " "), + } + podNatV6Rule = util.IPTableRule{ Table: "nat", Chain: "POSTROUTING", Rule: strings.Split("-m set --match-set ovn60local-pod-ip-nat src -m set ! --match-set ovn60subnets dst -j MASQUERADE", " "), } + subnetNatV6Rule = util.IPTableRule{ + Table: "nat", + Chain: "POSTROUTING", + Rule: strings.Split("-m set --match-set ovn60subnets-nat src -m set ! --match-set ovn60subnets dst -j MASQUERADE", " "), + } forwardAcceptRule1 = util.IPTableRule{ Table: "filter", Chain: "FORWARD", @@ -53,7 +64,11 @@ func (c *Controller) runGateway(protocol string, stopCh <-chan struct{}) error { klog.Errorf("get local pod ips failed, %+v", err) return err } - + subnetsNeedNat, err := c.getSubnetsNeedNAT(protocol) + if err != nil { + klog.Errorf("get need nat subnets failed, %+v", err) + return err + } if protocol == kubeovnv1.ProtocolIPv4 { c.ipSetsV4Mgr.AddOrReplaceIPSet(ipsets.IPSetMetadata{ MaxSize: 1048576, @@ -65,9 +80,14 @@ func (c *Controller) runGateway(protocol string, stopCh <-chan struct{}) error { SetID: LocalPodSet, Type: ipsets.IPSetTypeHashIP, }, localPodIPs) + c.ipSetsV4Mgr.AddOrReplaceIPSet(ipsets.IPSetMetadata{ + MaxSize: 1048576, + SetID: SubnetNatSet, + Type: ipsets.IPSetTypeHashNet, + }, subnetsNeedNat) c.ipSetsV4Mgr.ApplyUpdates() - for _, iptRule := range []util.IPTableRule{forwardAcceptRule1, forwardAcceptRule2, natV4Rule} { + for _, iptRule := range []util.IPTableRule{forwardAcceptRule1, forwardAcceptRule2, podNatV4Rule, subnetNatV4Rule} { exists, err := c.iptablesV4Mgr.Exists(iptRule.Table, iptRule.Chain, iptRule.Rule...) if err != nil { klog.Errorf("check iptable rule exist failed, %+v", err) @@ -92,9 +112,14 @@ func (c *Controller) runGateway(protocol string, stopCh <-chan struct{}) error { SetID: LocalPodSet, Type: ipsets.IPSetTypeHashIP, }, localPodIPs) + c.ipSetsV6Mgr.AddOrReplaceIPSet(ipsets.IPSetMetadata{ + MaxSize: 1048576, + SetID: SubnetNatSet, + Type: ipsets.IPSetTypeHashNet, + }, subnetsNeedNat) c.ipSetsV6Mgr.ApplyUpdates() - for _, iptRule := range []util.IPTableRule{forwardAcceptRule1, forwardAcceptRule2, natV6Rule} { + for _, iptRule := range []util.IPTableRule{forwardAcceptRule1, forwardAcceptRule2, podNatV6Rule, subnetNatV6Rule} { exists, err := c.iptablesV6Mgr.Exists(iptRule.Table, iptRule.Chain, iptRule.Rule...) if err != nil { klog.Errorf("check iptable rule exist failed, %+v", err) @@ -130,6 +155,11 @@ LOOP: klog.Errorf("get local pod ips failed, %+v", err) continue } + subnetsNeedNat, err := c.getSubnetsNeedNAT(protocol) + if err != nil { + klog.Errorf("get need nat subnets failed, %+v", err) + continue + } if protocol == kubeovnv1.ProtocolIPv4 { c.ipSetsV4Mgr.AddOrReplaceIPSet(ipsets.IPSetMetadata{ MaxSize: 1048576, @@ -141,6 +171,11 @@ LOOP: SetID: LocalPodSet, Type: ipsets.IPSetTypeHashIP, }, localPodIPs) + c.ipSetsV4Mgr.AddOrReplaceIPSet(ipsets.IPSetMetadata{ + MaxSize: 1048576, + SetID: SubnetNatSet, + Type: ipsets.IPSetTypeHashNet, + }, subnetsNeedNat) c.ipSetsV4Mgr.ApplyUpdates() } else { c.ipSetsV6Mgr.AddOrReplaceIPSet(ipsets.IPSetMetadata{ @@ -153,9 +188,13 @@ LOOP: SetID: LocalPodSet, Type: ipsets.IPSetTypeHashIP, }, localPodIPs) + c.ipSetsV6Mgr.AddOrReplaceIPSet(ipsets.IPSetMetadata{ + MaxSize: 1048576, + SetID: SubnetNatSet, + Type: ipsets.IPSetTypeHashNet, + }, subnetsNeedNat) c.ipSetsV6Mgr.ApplyUpdates() } - } return nil } @@ -180,26 +219,36 @@ func (c *Controller) getLocalPodIPsNeedNAT(protocol string) ([]string, error) { nsGWType := subnet.Spec.GatewayType nsGWNat := subnet.Spec.NatOutgoing - if nsGWNat { - switch nsGWType { - case "", kubeovnv1.GWDistributedType: - if pod.Spec.NodeName == hostname { - if util.CheckProtocol(pod.Status.PodIP) == protocol { - localPodIPs = append(localPodIPs, pod.Status.PodIP) - } - } - case kubeovnv1.GWCentralizedType: - gwNode := subnet.Spec.GatewayNode - if gwNode == hostname && util.CheckProtocol(pod.Status.PodIP) == protocol { - localPodIPs = append(localPodIPs, pod.Status.PodIP) - } - } + if nsGWNat && + nsGWType == kubeovnv1.GWDistributedType && + pod.Spec.NodeName == hostname && + util.CheckProtocol(pod.Status.PodIP) == protocol { + localPodIPs = append(localPodIPs, pod.Status.PodIP) } } + klog.V(3).Infof("local pod ips %v", localPodIPs) return localPodIPs, nil } +func (c *Controller) getSubnetsNeedNAT(protocol string) ([]string, error) { + var subnetsNeedNat []string + subnets, err := c.subnetsLister.List(labels.Everything()) + if err != nil { + klog.Errorf("list subnets failed, %v", err) + return nil, err + } + for _, subnet := range subnets { + if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType && + subnet.Status.ActivateGateway == c.config.NodeName && + subnet.Spec.Protocol == protocol && + subnet.Spec.NatOutgoing { + subnetsNeedNat = append(subnetsNeedNat, subnet.Spec.CIDRBlock) + } + } + return subnetsNeedNat, nil +} + func (c *Controller) getSubnetsCIDR(protocol string) ([]string, error) { var ret = []string{c.config.ServiceClusterIPRange} subnets, err := c.subnetsLister.List(labels.Everything())