diff --git a/pkg/controller/subnet.go b/pkg/controller/subnet.go index 571526ea3ca..6b11df0308f 100644 --- a/pkg/controller/subnet.go +++ b/pkg/controller/subnet.go @@ -472,96 +472,17 @@ func (c Controller) patchSubnetStatus(subnet *kubeovnv1.Subnet, reason string, e } } -func (c *Controller) handleAddOrUpdateSubnet(key string) error { - c.subnetKeyMutex.LockKey(key) - defer func() { _ = c.subnetKeyMutex.UnlockKey(key) }() - - cachedSubnet, err := c.subnetsLister.Get(key) - if err != nil { - if k8serrors.IsNotFound(err) { - return nil - } - return err - } - klog.V(4).Infof("handle add or update subnet %s", cachedSubnet.Name) - - subnet := cachedSubnet.DeepCopy() - if err = formatSubnet(subnet, c); err != nil { - return err - } - - subnet, err = c.subnetsLister.Get(key) - if err != nil { - if k8serrors.IsNotFound(err) { - return nil - } - klog.Errorf("failed to get subnet %s error %v", key, err) - return err - } - - deleted, err := c.handleSubnetFinalizer(subnet) - if err != nil { - klog.Errorf("handle subnet finalizer failed %v", err) - return err - } - if deleted { - return nil - } - - if subnet.Spec.LogicalGateway && subnet.Spec.U2OInterconnection { - err = fmt.Errorf("logicalGateway and u2oInterconnection can't be opened at the same time") - klog.Error(err) - return err - } - - if subnet.Spec.Protocol == kubeovnv1.ProtocolDual { - err = calcDualSubnetStatusIP(subnet, c) - } else { - err = calcSubnetStatusIP(subnet, c) - } - if err != nil { - klog.Errorf("calculate subnet %s used ip failed, %v", subnet.Name, err) - return err - } - - if err := c.ipam.AddOrUpdateSubnet(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps); err != nil { - return err - } - - if err = util.ValidateSubnet(*subnet); err != nil { - klog.Errorf("failed to validate subnet %s, %v", subnet.Name, err) - c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error()) - return err - } else { - c.patchSubnetStatus(subnet, "ValidateLogicalSwitchSuccess", "") - } - - if !isOvnSubnet(subnet) { - // subnet provider is not ovn, and vpc is empty, should not reconcile - c.patchSubnetStatus(subnet, "SetNonOvnSubnetSuccess", "") - - subnet.Status.EnsureStandardConditions() - klog.Infof("non ovn subnet %s is ready", subnet.Name) - return nil - } - - if subnet.Spec.Vpc == "" { - // ovn subnet, but vpc is empty, should not reconcile - err := fmt.Errorf("subnet %s has no vpc, should not reconcile", subnet.Name) - klog.Error(err) - return err - } - +func (c *Controller) validateVpcBySubnet(subnet *kubeovnv1.Subnet) (*kubeovnv1.Vpc, error) { vpc, err := c.vpcsLister.Get(subnet.Spec.Vpc) if err != nil { klog.Errorf("failed to get subnet's vpc '%s', %v", subnet.Spec.Vpc, err) - return err + return vpc, err } if !vpc.Status.Standby { err = fmt.Errorf("the vpc '%s' not standby yet", vpc.Name) klog.Error(err) - return err + return vpc, err } if !vpc.Status.Default { @@ -569,31 +490,27 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error { if !util.ContainsString(vpc.Spec.Namespaces, ns) { err = fmt.Errorf("namespace '%s' is out of range to custom vpc '%s'", ns, vpc.Name) klog.Error(err) - return err + return vpc, err } } } else { vpcs, err := c.vpcsLister.List(labels.Everything()) if err != nil { klog.Errorf("failed to list vpc, %v", err) - return err + return vpc, err } for _, vpc := range vpcs { if subnet.Spec.Vpc != vpc.Name && !vpc.Status.Default && util.IsStringsOverlap(vpc.Spec.Namespaces, subnet.Spec.Namespaces) { err = fmt.Errorf("namespaces %v are overlap with vpc '%s'", subnet.Spec.Namespaces, vpc.Name) klog.Error(err) - return err + return vpc, err } } } + return vpc, nil +} - if subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway { - if err := c.reconcileU2OInterconnectionIP(subnet); err != nil { - klog.Errorf("failed to reconcile underlay subnet %s to overlay interconnection %v", subnet.Name, err) - return err - } - } - +func (c *Controller) checkSubnetConflict(subnet *kubeovnv1.Subnet) error { subnetList, err := c.subnetsLister.List(labels.Everything()) if err != nil { klog.Errorf("failed to list subnets %v", err) @@ -638,39 +555,24 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error { } } } + return nil +} - needRouter := subnet.Spec.Vlan == "" || subnet.Spec.LogicalGateway || - (subnet.Status.U2OInterconnectionIP != "" && subnet.Spec.U2OInterconnection) - // 1. overlay subnet, should add lrp, lrp ip is subnet gw - // 2. underlay subnet use logical gw, should add lrp, lrp ip is subnet gw - randomAllocateGW := !subnet.Spec.LogicalGateway && vpc.Spec.EnableExternal && subnet.Name == c.config.ExternalGatewaySwitch - // 3. underlay subnet use physical gw, vpc has eip, lrp managed in vpc process, lrp ip is random allocation, not subnet gw - - gateway := subnet.Spec.Gateway - if subnet.Status.U2OInterconnectionIP != "" && subnet.Spec.U2OInterconnection { - gateway = subnet.Status.U2OInterconnectionIP - } - - if err := c.clearOldU2OResource(subnet); err != nil { - klog.Errorf("clear subnet %s old u2o resource failed: %v", subnet.Name, err) - return err - } - - // create or update logical switch - if err := c.ovnClient.CreateLogicalSwitch(subnet.Name, vpc.Status.Router, subnet.Spec.CIDRBlock, gateway, needRouter, randomAllocateGW); err != nil { - klog.Errorf("create logical switch %s: %v", subnet.Name, err) - return err - } - - subnet.Status.EnsureStandardConditions() - +func (c *Controller) updateSubnetDHCPOption(subnet *kubeovnv1.Subnet, needRouter bool) error { var dhcpOptionsUUIDs *ovs.DHCPOptionsUUIDs + var err error dhcpOptionsUUIDs, err = c.ovnLegacyClient.UpdateDHCPOptions(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.DHCPv4Options, subnet.Spec.DHCPv6Options, subnet.Spec.EnableDHCP) if err != nil { klog.Errorf("failed to update dhcp options for switch %s, %v", subnet.Name, err) return err } + vpc, err := c.vpcsLister.Get(subnet.Spec.Vpc) + if err != nil { + klog.Errorf("failed to get subnet's vpc '%s', %v", subnet.Spec.Vpc, err) + return err + } + if needRouter { lrpName := fmt.Sprintf("%s-%s", vpc.Status.Router, subnet.Name) if err := c.ovnClient.UpdateLogicalRouterPortRA(lrpName, subnet.Spec.IPv6RAConfigs, subnet.Spec.EnableIPv6RA); err != nil { @@ -694,6 +596,125 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error { } } + return nil +} + +func (c *Controller) handleAddOrUpdateSubnet(key string) error { + c.subnetKeyMutex.LockKey(key) + defer func() { _ = c.subnetKeyMutex.UnlockKey(key) }() + + cachedSubnet, err := c.subnetsLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + klog.V(4).Infof("handle add or update subnet %s", cachedSubnet.Name) + + subnet := cachedSubnet.DeepCopy() + if err = formatSubnet(subnet, c); err != nil { + return err + } + + subnet, err = c.subnetsLister.Get(key) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + klog.Errorf("failed to get subnet %s error %v", key, err) + return err + } + + deleted, err := c.handleSubnetFinalizer(subnet) + if err != nil { + klog.Errorf("handle subnet finalizer failed %v", err) + return err + } + if deleted { + return nil + } + + if err = util.ValidateSubnet(*subnet); err != nil { + klog.Errorf("failed to validate subnet %s, %v", subnet.Name, err) + c.patchSubnetStatus(subnet, "ValidateLogicalSwitchFailed", err.Error()) + return err + } else { + c.patchSubnetStatus(subnet, "ValidateLogicalSwitchSuccess", "") + } + + if subnet.Spec.Protocol == kubeovnv1.ProtocolDual { + err = calcDualSubnetStatusIP(subnet, c) + } else { + err = calcSubnetStatusIP(subnet, c) + } + if err != nil { + klog.Errorf("calculate subnet %s used ip failed, %v", subnet.Name, err) + return err + } + + if err := c.ipam.AddOrUpdateSubnet(subnet.Name, subnet.Spec.CIDRBlock, subnet.Spec.Gateway, subnet.Spec.ExcludeIps); err != nil { + return err + } + + if !isOvnSubnet(subnet) { + // subnet provider is not ovn, and vpc is empty, should not reconcile + c.patchSubnetStatus(subnet, "SetNonOvnSubnetSuccess", "") + + subnet.Status.EnsureStandardConditions() + klog.Infof("non ovn subnet %s is ready", subnet.Name) + return nil + } + + // This validate should be processed after isOvnSubnet, since maybe there's no vpc for subnet not managed by kube-ovn + vpc, err := c.validateVpcBySubnet(subnet) + if err != nil { + klog.Errorf("failed to get subnet's vpc '%s', %v", subnet.Spec.Vpc, err) + return err + } + + if subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway { + if err := c.reconcileU2OInterconnectionIP(subnet); err != nil { + klog.Errorf("failed to reconcile underlay subnet %s to overlay interconnection %v", subnet.Name, err) + return err + } + } + + if err := c.checkSubnetConflict(subnet); err != nil { + klog.Errorf("failed to check subnet %s, %v", subnet.Name, err) + return err + } + + needRouter := subnet.Spec.Vlan == "" || subnet.Spec.LogicalGateway || + (subnet.Status.U2OInterconnectionIP != "" && subnet.Spec.U2OInterconnection) + // 1. overlay subnet, should add lrp, lrp ip is subnet gw + // 2. underlay subnet use logical gw, should add lrp, lrp ip is subnet gw + randomAllocateGW := !subnet.Spec.LogicalGateway && vpc.Spec.EnableExternal && subnet.Name == c.config.ExternalGatewaySwitch + // 3. underlay subnet use physical gw, vpc has eip, lrp managed in vpc process, lrp ip is random allocation, not subnet gw + + gateway := subnet.Spec.Gateway + if subnet.Status.U2OInterconnectionIP != "" && subnet.Spec.U2OInterconnection { + gateway = subnet.Status.U2OInterconnectionIP + } + + if err := c.clearOldU2OResource(subnet); err != nil { + klog.Errorf("clear subnet %s old u2o resource failed: %v", subnet.Name, err) + return err + } + + // create or update logical switch + if err := c.ovnClient.CreateLogicalSwitch(subnet.Name, vpc.Status.Router, subnet.Spec.CIDRBlock, gateway, needRouter, randomAllocateGW); err != nil { + klog.Errorf("create logical switch %s: %v", subnet.Name, err) + return err + } + + subnet.Status.EnsureStandardConditions() + + if err := c.updateSubnetDHCPOption(subnet, needRouter); err != nil { + klog.Errorf("failed to update subnet %s dhcpOptions: %v", subnet.Name, err) + return err + } + if c.config.EnableLb && subnet.Name != c.config.NodeSwitch { lbs := []string{ vpc.Status.TcpLoadBalancer, @@ -1316,13 +1337,47 @@ func (c *Controller) reconcileVpcDelNormalStaticRoute(vpcName string) error { return nil } -func (c *Controller) reconcileOvnDefaultVpcRoute(subnet *kubeovnv1.Subnet) error { - if subnet.Name == c.config.NodeSwitch { - if err := c.addCommonRoutesForSubnet(subnet); err != nil { - klog.Error(err) +func (c *Controller) reconcileDistributedSubnetRouteInDefaultVpc(subnet *kubeovnv1.Subnet) error { + if subnet.Spec.GatewayNode != "" || subnet.Status.ActivateGateway != "" { + klog.Infof("delete old centralized policy route for subnet %s", subnet.Name) + if err := c.deletePolicyRouteForCentralizedSubnet(subnet); err != nil { + klog.Errorf("failed to delete policy route for subnet %s, %v", subnet.Name, err) + return err + } + + subnet.Spec.GatewayNode = "" + if _, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Update(context.Background(), subnet, metav1.UpdateOptions{}); err != nil { + klog.Errorf("failed to remove gatewayNode or activateGateway from subnet %s, %v", subnet.Name, err) + return err + } + subnet.Status.ActivateGateway = "" + c.patchSubnetStatus(subnet, "ChangeToDistributedGw", "") + } + + nodes, err := c.nodesLister.List(labels.Everything()) + if err != nil { + klog.Errorf("failed to list nodes: %v", err) + return err + } + for _, node := range nodes { + if err = c.createPortGroupForDistributedSubnet(node, subnet); err != nil { + klog.Errorf("failed to create port group %v", err) + return err + } + if node.Annotations[util.AllocatedAnnotation] != "true" { + continue + } + nodeIP, err := getNodeTunlIP(node) + if err != nil { + klog.Errorf("failed to get node %s tunnel ip, %v", node.Name, err) + return err + } + nextHop := getNextHopByTunnelIP(nodeIP) + v4IP, v6IP := util.SplitStringIP(nextHop) + if err = c.addPolicyRouteForDistributedSubnet(subnet, node.Name, v4IP, v6IP); err != nil { + klog.Errorf("failed to add policy router for node %s and subnet %s: %v", node.Name, subnet.Name, err) return err } - return nil } pods, err := c.podsLister.Pods(metav1.NamespaceAll).List(labels.Everything()) @@ -1330,9 +1385,225 @@ func (c *Controller) reconcileOvnDefaultVpcRoute(subnet *kubeovnv1.Subnet) error klog.Errorf("failed to list pods %v", err) return err } + for _, pod := range pods { + if !isPodAlive(pod) || c.config.EnableEipSnat && (pod.Annotations[util.EipAnnotation] != "" || pod.Annotations[util.SnatAnnotation] != "") || pod.Spec.NodeName == "" { + continue + } + + podNets, err := c.getPodKubeovnNets(pod) + if err != nil { + klog.Errorf("failed to get pod nets %v", err) + continue + } + + podPorts := make([]string, 0, 1) + for _, podNet := range podNets { + if !isOvnSubnet(podNet.Subnet) { + continue + } + + if pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, podNet.ProviderName)] == "" || pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, podNet.ProviderName)] != subnet.Name { + continue + } + + if pod.Annotations[util.NorthGatewayAnnotation] != "" { + nextHop := pod.Annotations[util.NorthGatewayAnnotation] + if err := c.ovnClient.AddLogicalRouterStaticRoute( + c.config.ClusterRouter, util.MainRouteTable, ovnnb.LogicalRouterStaticRoutePolicySrcIP, + pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, podNet.ProviderName)], nextHop, + ); err != nil { + klog.Errorf("add static route failed, %v", err) + return err + } + } else { + podName := c.getNameByPod(pod) + portName := ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName) + podPorts = append(podPorts, portName) + } + } + + if pod.Annotations[util.NorthGatewayAnnotation] != "" { + continue + } + + pgName := getOverlaySubnetsPortGroupName(subnet.Name, pod.Spec.NodeName) + portsToAdd := make([]string, 0, len(podPorts)) + for _, port := range podPorts { + exist, err := c.ovnClient.LogicalSwitchPortExists(port) + if err != nil { + return err + } + + if !exist { + klog.Errorf("lsp does not exist for pod %v, please delete the pod and retry", port) + continue + } + + portsToAdd = append(portsToAdd, port) + } + + if err = c.ovnClient.PortGroupAddPorts(pgName, portsToAdd...); err != nil { + klog.Errorf("add ports to port group %s: %v", pgName, err) + return err + } + } + return nil +} + +func (c *Controller) reconcileDefaultCentralizedSubnetRouteInDefaultVpc(subnet *kubeovnv1.Subnet) error { + // check if activateGateway still ready + if subnet.Status.ActivateGateway != "" && util.GatewayContains(subnet.Spec.GatewayNode, subnet.Status.ActivateGateway) { + node, err := c.nodesLister.Get(subnet.Status.ActivateGateway) + if err == nil && nodeReady(node) { + klog.Infof("subnet %s uses the old activate gw %s", subnet.Name, node.Name) + + nodeTunlIPAddr, err := getNodeTunlIP(node) + if err != nil { + klog.Errorf("failed to get gatewayNode tunnel ip for subnet %s", subnet.Name) + return err + } + nextHop := getNextHopByTunnelIP(nodeTunlIPAddr) + if err = c.addPolicyRouteForCentralizedSubnet(subnet, subnet.Status.ActivateGateway, nil, strings.Split(nextHop, ",")); err != nil { + klog.Errorf("failed to add active-backup policy route for centralized subnet %s: %v", subnet.Name, err) + return err + } + return nil + } + } + + klog.Info("find a new activate node") + // need a new activate gateway + newActivateNode := "" + var nodeTunlIPAddr []net.IP + for _, gw := range strings.Split(subnet.Spec.GatewayNode, ",") { + // the format of gatewayNodeStr can be like 'kube-ovn-worker:172.18.0.2, kube-ovn-control-plane:172.18.0.3', which consists of node name and designative egress ip + if strings.Contains(gw, ":") { + gw = strings.TrimSpace(strings.Split(gw, ":")[0]) + } else { + gw = strings.TrimSpace(gw) + } + node, err := c.nodesLister.Get(gw) + if err == nil && nodeReady(node) { + newActivateNode = node.Name + nodeTunlIPAddr, err = getNodeTunlIP(node) + if err != nil { + return err + } + klog.Infof("subnet %s uses a new activate gw %s", subnet.Name, node.Name) + break + } + } + if newActivateNode == "" { + klog.Warningf("all gateways of subnet %s are not ready", subnet.Name) + subnet.Status.ActivateGateway = newActivateNode + c.patchSubnetStatus(subnet, "NoActiveGatewayFound", fmt.Sprintf("subnet %s gws are not ready", subnet.Name)) + + return fmt.Errorf("subnet %s gws are not ready", subnet.Name) + } + + nextHop := getNextHopByTunnelIP(nodeTunlIPAddr) + klog.Infof("subnet %s configure new gateway node, nextHop %s", subnet.Name, nextHop) + if err := c.addPolicyRouteForCentralizedSubnet(subnet, newActivateNode, nil, strings.Split(nextHop, ",")); err != nil { + klog.Errorf("failed to add policy route for active-backup centralized subnet %s: %v", subnet.Name, err) + return err + } + subnet.Status.ActivateGateway = newActivateNode + c.patchSubnetStatus(subnet, "ReconcileCentralizedGatewaySuccess", "") + + klog.Infof("delete old distributed policy route for subnet %s", subnet.Name) + if err := c.deletePolicyRouteByGatewayType(subnet, kubeovnv1.GWDistributedType, false); err != nil { + klog.Errorf("failed to delete policy route for overlay subnet %s, %v", subnet.Name, err) + return err + } + return nil +} + +func (c *Controller) reconcileEcmpCentralizedSubnetRouteInDefaultVpc(subnet *kubeovnv1.Subnet) error { + // centralized subnet, enable ecmp, add ecmp policy route + gatewayNodes := strings.Split(subnet.Spec.GatewayNode, ",") + nodeV4Ips := make([]string, 0, len(gatewayNodes)) + nodeV6Ips := make([]string, 0, len(gatewayNodes)) + nameV4IpMap := make(map[string]string, len(gatewayNodes)) + nameV6IpMap := make(map[string]string, len(gatewayNodes)) + + for _, gw := range gatewayNodes { + // the format of gatewayNodeStr can be like 'kube-ovn-worker:172.18.0.2, kube-ovn-control-plane:172.18.0.3', which consists of node name and designative egress ip + if strings.Contains(gw, ":") { + gw = strings.TrimSpace(strings.Split(gw, ":")[0]) + } else { + gw = strings.TrimSpace(gw) + } + + node, err := c.nodesLister.Get(gw) + if err != nil { + klog.Errorf("failed to get gw node %s, %v", gw, err) + continue + } + + if nodeReady(node) { + nexthopNodeIP := strings.TrimSpace(node.Annotations[util.IpAddressAnnotation]) + if nexthopNodeIP == "" { + klog.Errorf("gateway node %v has no ip annotation", node.Name) + continue + } + nexthopV4, nexthopV6 := util.SplitStringIP(nexthopNodeIP) + if nexthopV4 != "" { + nameV4IpMap[node.Name] = nexthopV4 + nodeV4Ips = append(nodeV4Ips, nexthopV4) + } + if nexthopV6 != "" { + nameV6IpMap[node.Name] = nexthopV6 + nodeV6Ips = append(nodeV6Ips, nexthopV6) + } + } else { + klog.Errorf("gateway node %v is not ready", gw) + } + } + + v4Cidr, v6Cidr := util.SplitStringIP(subnet.Spec.CIDRBlock) + if nodeV4Ips != nil && v4Cidr != "" { + klog.V(3).Infof("delete old distributed policy route for subnet %s", subnet.Name) + if err := c.deletePolicyRouteByGatewayType(subnet, kubeovnv1.GWDistributedType, false); err != nil { + klog.Errorf("failed to delete policy route for overlay subnet %s, %v", subnet.Name, err) + return err + } + klog.V(3).Infof("subnet %s configure ecmp policy route, nexthops %v", subnet.Name, nodeV4Ips) + if err := c.updatePolicyRouteForCentralizedSubnet(subnet.Name, v4Cidr, nodeV4Ips, nameV4IpMap); err != nil { + klog.Errorf("failed to add v4 ecmp policy route for centralized subnet %s: %v", subnet.Name, err) + return err + } + } + if nodeV6Ips != nil && v6Cidr != "" { + klog.V(3).Infof("delete old distributed policy route for subnet %s", subnet.Name) + if err := c.deletePolicyRouteByGatewayType(subnet, kubeovnv1.GWDistributedType, false); err != nil { + klog.Errorf("failed to delete policy route for overlay subnet %s, %v", subnet.Name, err) + return err + } + klog.V(3).Infof("subnet %s configure ecmp policy route, nexthops %v", subnet.Name, nodeV6Ips) + if err := c.updatePolicyRouteForCentralizedSubnet(subnet.Name, v6Cidr, nodeV6Ips, nameV6IpMap); err != nil { + klog.Errorf("failed to add v6 ecmp policy route for centralized subnet %s: %v", subnet.Name, err) + return err + } + } + return nil +} + +func (c *Controller) reconcileOvnDefaultVpcRoute(subnet *kubeovnv1.Subnet) error { + if subnet.Name == c.config.NodeSwitch { + if err := c.addCommonRoutesForSubnet(subnet); err != nil { + klog.Error(err) + return err + } + return nil + } if subnet.Spec.Vlan != "" && !subnet.Spec.LogicalGateway { // physical switch provide gw for this underlay subnet + pods, err := c.podsLister.Pods(metav1.NamespaceAll).List(labels.Everything()) + if err != nil { + klog.Errorf("failed to list pods %v", err) + return err + } for _, pod := range pods { if pod.Annotations[util.LogicalSwitchAnnotation] == subnet.Name && pod.Annotations[util.IpAddressAnnotation] != "" { if err := c.deleteStaticRoute( @@ -1370,145 +1641,25 @@ func (c *Controller) reconcileOvnDefaultVpcRoute(subnet *kubeovnv1.Subnet) error } } } else { - if err = c.addCommonRoutesForSubnet(subnet); err != nil { + if err := c.addCommonRoutesForSubnet(subnet); err != nil { klog.Error(err) return err } - // if gw is distributed remove activateGateway field + // distributed subnet, only add distributed policy route if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType { - // distributed subnet, only add distributed policy route - if subnet.Spec.GatewayNode != "" || subnet.Status.ActivateGateway != "" { - klog.Infof("delete old centralized policy route for subnet %s", subnet.Name) - if err := c.deletePolicyRouteForCentralizedSubnet(subnet); err != nil { - klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err) - return err - } - subnet.Spec.GatewayNode = "" - subnet.Status.ActivateGateway = "" - c.recorder.Eventf(subnet, v1.EventTypeNormal, "ChangeToCentralizedGw", "") - - bytes, err := subnet.Status.Bytes() - if err != nil { - klog.Errorf("failed to marshal subnet status %v", err) - return err - } - _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "") - if err != nil { - klog.Errorf("failed to patch subnet status %v", err) - return err - } - } - - nodes, err := c.nodesLister.List(labels.Everything()) - if err != nil { - klog.Errorf("failed to list nodes: %v", err) + if err := c.reconcileDistributedSubnetRouteInDefaultVpc(subnet); err != nil { + klog.Error(err) return err } - for _, node := range nodes { - if err = c.createPortGroupForDistributedSubnet(node, subnet); err != nil { - klog.Errorf("failed to create port group %v", err) - return err - } - if node.Annotations[util.AllocatedAnnotation] != "true" { - continue - } - nodeIP, err := getNodeTunlIP(node) - if err != nil { - klog.Errorf("failed to get node %s tunnel ip, %v", node.Name, err) - return err - } - nextHop := getNextHopByTunnelIP(nodeIP) - v4IP, v6IP := util.SplitStringIP(nextHop) - if err = c.addPolicyRouteForDistributedSubnet(subnet, node.Name, v4IP, v6IP); err != nil { - klog.Errorf("failed to add policy router for node %s and subnet %s: %v", node.Name, subnet.Name, err) - return err - } - } - - for _, pod := range pods { - if !isPodAlive(pod) { - continue - } - if c.config.EnableEipSnat && (pod.Annotations[util.EipAnnotation] != "" || pod.Annotations[util.SnatAnnotation] != "") { - continue - } - // Pod will add to port-group when pod get updated - if pod.Spec.NodeName == "" { - continue - } - - podNets, err := c.getPodKubeovnNets(pod) - if err != nil { - klog.Errorf("failed to get pod nets %v", err) - continue - } - - podPorts := make([]string, 0, 1) - for _, podNet := range podNets { - if !isOvnSubnet(podNet.Subnet) { - continue - } - - if pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, podNet.ProviderName)] == "" || pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, podNet.ProviderName)] != subnet.Name { - continue - } - - if pod.Annotations[util.NorthGatewayAnnotation] != "" { - nextHop := pod.Annotations[util.NorthGatewayAnnotation] - if err := c.ovnClient.AddLogicalRouterStaticRoute( - c.config.ClusterRouter, util.MainRouteTable, ovnnb.LogicalRouterStaticRoutePolicySrcIP, - pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, podNet.ProviderName)], nextHop, - ); err != nil { - klog.Errorf("add static route failed, %v", err) - return err - } - } else { - podName := c.getNameByPod(pod) - portName := ovs.PodNameToPortName(podName, pod.Namespace, podNet.ProviderName) - podPorts = append(podPorts, portName) - } - } - - if pod.Annotations[util.NorthGatewayAnnotation] != "" { - continue - } - - pgName := getOverlaySubnetsPortGroupName(subnet.Name, pod.Spec.NodeName) - portsToAdd := make([]string, 0, len(podPorts)) - for _, port := range podPorts { - exist, err := c.ovnClient.LogicalSwitchPortExists(port) - if err != nil { - return err - } - - if !exist { - klog.Errorf("lsp does not exist for pod %v, please delete the pod and retry", port) - continue - } - - portsToAdd = append(portsToAdd, port) - } - - if err = c.ovnClient.PortGroupAddPorts(pgName, portsToAdd...); err != nil { - klog.Errorf("add ports to port group %s: %v", pgName, err) - return err - } - } - return nil } else { // centralized subnet if subnet.Spec.GatewayNode == "" { - errMsg := fmt.Sprintf("subnet %s Spec.GatewayNode field must be specified for centralized gateway type", subnet.Name) - klog.Errorf(errMsg) - reason := "NoReadyGateway" subnet.Status.NotReady("NoReadyGateway", "") - c.recorder.Eventf(subnet, v1.EventTypeWarning, reason, errMsg) - bytes, err := subnet.Status.Bytes() - if err != nil { - return err - } - _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status") + c.patchSubnetStatus(subnet, "NoReadyGateway", "") + + err := fmt.Errorf("subnet %s Spec.GatewayNode field must be specified for centralized gateway type", subnet.Name) + klog.Error(err) return err } @@ -1519,143 +1670,15 @@ func (c *Controller) reconcileOvnDefaultVpcRoute(subnet *kubeovnv1.Subnet) error } if subnet.Spec.EnableEcmp { - // centralized subnet, enable ecmp, add ecmp policy route - gatewayNodes := strings.Split(subnet.Spec.GatewayNode, ",") - nodeV4Ips := make([]string, 0, len(gatewayNodes)) - nodeV6Ips := make([]string, 0, len(gatewayNodes)) - nameV4IpMap := make(map[string]string, len(gatewayNodes)) - nameV6IpMap := make(map[string]string, len(gatewayNodes)) - for _, gw := range gatewayNodes { - // the format of gatewayNodeStr can be like 'kube-ovn-worker:172.18.0.2, kube-ovn-control-plane:172.18.0.3', which consists of node name and designative egress ip - if strings.Contains(gw, ":") { - gw = strings.TrimSpace(strings.Split(gw, ":")[0]) - } else { - gw = strings.TrimSpace(gw) - } - - node, err := c.nodesLister.Get(gw) - if err != nil { - klog.Errorf("failed to get gw node %s, %v", gw, err) - continue - } - - if nodeReady(node) { - nexthopNodeIP := strings.TrimSpace(node.Annotations[util.IpAddressAnnotation]) - if nexthopNodeIP == "" { - klog.Errorf("gateway node %v has no ip annotation", node.Name) - continue - } - nexthopV4, nexthopV6 := util.SplitStringIP(nexthopNodeIP) - if nexthopV4 != "" { - nameV4IpMap[node.Name] = nexthopV4 - nodeV4Ips = append(nodeV4Ips, nexthopV4) - } - if nexthopV6 != "" { - nameV6IpMap[node.Name] = nexthopV6 - nodeV6Ips = append(nodeV6Ips, nexthopV6) - } - } else { - klog.Errorf("gateway node %v is not ready", gw) - } - } - v4Cidr, v6Cidr := util.SplitStringIP(subnet.Spec.CIDRBlock) - if nodeV4Ips != nil && v4Cidr != "" { - klog.V(3).Infof("delete old distributed policy route for subnet %s", subnet.Name) - if err := c.deletePolicyRouteByGatewayType(subnet, kubeovnv1.GWDistributedType, false); err != nil { - klog.Errorf("failed to delete policy route for overlay subnet %s, %v", subnet.Name, err) - return err - } - klog.V(3).Infof("subnet %s configure ecmp policy route, nexthops %v", subnet.Name, nodeV4Ips) - if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, v4Cidr, nodeV4Ips, nameV4IpMap); err != nil { - klog.Errorf("failed to add v4 ecmp policy route for centralized subnet %s: %v", subnet.Name, err) - return err - } - } - if nodeV6Ips != nil && v6Cidr != "" { - klog.V(3).Infof("delete old distributed policy route for subnet %s", subnet.Name) - if err := c.deletePolicyRouteByGatewayType(subnet, kubeovnv1.GWDistributedType, false); err != nil { - klog.Errorf("failed to delete policy route for overlay subnet %s, %v", subnet.Name, err) - return err - } - klog.V(3).Infof("subnet %s configure ecmp policy route, nexthops %v", subnet.Name, nodeV6Ips) - if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, v6Cidr, nodeV6Ips, nameV6IpMap); err != nil { - klog.Errorf("failed to add v6 ecmp policy route for centralized subnet %s: %v", subnet.Name, err) - return err - } - } - } else { - // centralized subnet, not enable ecmp, no ecmp and distributed policy route about this subnet - // use vpc spec policy route to control policy route diff update - - // check if activateGateway still ready - if subnet.Status.ActivateGateway != "" && util.GatewayContains(subnet.Spec.GatewayNode, subnet.Status.ActivateGateway) { - node, err := c.nodesLister.Get(subnet.Status.ActivateGateway) - if err == nil && nodeReady(node) { - klog.Infof("subnet %s uses the old activate gw %s", subnet.Name, node.Name) - - nodeTunlIPAddr, err := getNodeTunlIP(node) - if err != nil { - klog.Errorf("failed to get gatewayNode tunnel ip for subnet %s", subnet.Name) - return err - } - nextHop := getNextHopByTunnelIP(nodeTunlIPAddr) - if err = c.addPolicyRouteForCentralizedSubnet(subnet, subnet.Status.ActivateGateway, nil, strings.Split(nextHop, ",")); err != nil { - klog.Errorf("failed to add active-backup policy route for centralized subnet %s: %v", subnet.Name, err) - return err - } - return nil - } - } - - klog.Info("find a new activate node") - // need a new activate gateway - newActivateNode := "" - var nodeTunlIPAddr []net.IP - for _, gw := range strings.Split(subnet.Spec.GatewayNode, ",") { - // the format of gatewayNodeStr can be like 'kube-ovn-worker:172.18.0.2, kube-ovn-control-plane:172.18.0.3', which consists of node name and designative egress ip - if strings.Contains(gw, ":") { - gw = strings.TrimSpace(strings.Split(gw, ":")[0]) - } else { - gw = strings.TrimSpace(gw) - } - node, err := c.nodesLister.Get(gw) - if err == nil && nodeReady(node) { - newActivateNode = node.Name - nodeTunlIPAddr, err = getNodeTunlIP(node) - if err != nil { - return err - } - klog.Infof("subnet %s uses a new activate gw %s", subnet.Name, node.Name) - break - } - } - if newActivateNode == "" { - klog.Warningf("all subnet %s gws are not ready", subnet.Name) - c.recorder.Eventf(subnet, v1.EventTypeWarning, "NoActiveGatewayFound", fmt.Sprintf("subnet %s gws are not ready", subnet.Name)) - subnet.Status.ActivateGateway = newActivateNode - bytes, err := subnet.Status.Bytes() - if err != nil { - return err - } - if _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Patch(context.Background(), subnet.Name, types.MergePatchType, bytes, metav1.PatchOptions{}, "status"); err != nil { - klog.Errorf("failed to patch subnet %s NoReadyGateway status: %v", subnet.Name, err) - } - return err - } - - nextHop := getNextHopByTunnelIP(nodeTunlIPAddr) - klog.Infof("subnet %s configure new gateway node, nextHop %s", subnet.Name, nextHop) - if err = c.addPolicyRouteForCentralizedSubnet(subnet, newActivateNode, nil, strings.Split(nextHop, ",")); err != nil { - klog.Errorf("failed to add active-backup policy route for centralized subnet %s: %v", subnet.Name, err) + if err := c.reconcileEcmpCentralizedSubnetRouteInDefaultVpc(subnet); err != nil { + klog.Error(err) return err } - klog.Infof("delete old distributed policy route for subnet %s", subnet.Name) - if err := c.deletePolicyRouteByGatewayType(subnet, kubeovnv1.GWDistributedType, false); err != nil { - klog.Errorf("failed to delete policy route for overlay subnet %s, %v", subnet.Name, err) + } else { + if err := c.reconcileDefaultCentralizedSubnetRouteInDefaultVpc(subnet); err != nil { + klog.Error(err) return err } - subnet.Status.ActivateGateway = newActivateNode - c.patchSubnetStatus(subnet, "ReconcileCentralizedGatewaySuccess", "") } } } @@ -2156,7 +2179,7 @@ func (c *Controller) updatePolicyRouteForCentralizedSubnet(subnetName, cidr stri for node, ip := range nameIpMap { externalIDs[node] = ip } - klog.Infof("add ecmp policy route for router: %s, match %s, action %s, nexthops %v, extrenalID %s", c.config.ClusterRouter, match, action, nextHops, externalIDs) + klog.Infof("add policy route for router: %s, match %s, action %s, nexthops %v, extrenalID %s", c.config.ClusterRouter, match, action, nextHops, externalIDs) if err := c.ovnClient.AddLogicalRouterPolicy(c.config.ClusterRouter, util.GatewayRouterPolicyPriority, match, action, nextHops, externalIDs); err != nil { klog.Errorf("failed to add policy route for centralized subnet %s: %v", subnetName, err) return err @@ -2480,8 +2503,6 @@ func (c *Controller) reconcileRouteTableForSubnet(subnet *kubeovnv1.Subnet) erro return nil } - klog.Infof("reconcile route table %q for subnet %s", subnet.Spec.RouteTable, subnet.Name) - routerPortName := ovs.LogicalRouterPortName(subnet.Spec.Vpc, subnet.Name) lrp, err := c.ovnClient.GetLogicalRouterPort(routerPortName, false) if err != nil { @@ -2495,6 +2516,7 @@ func (c *Controller) reconcileRouteTableForSubnet(subnet *kubeovnv1.Subnet) erro return nil } + klog.Infof("reconcile route table %q for subnet %s", subnet.Spec.RouteTable, subnet.Name) opt := map[string]string{"route_table": subnet.Spec.RouteTable} if err = c.ovnClient.UpdateLogicalRouterPortOptions(routerPortName, opt); err != nil { klog.Errorf("failed to set route table of logical router port %s to %s: %v", routerPortName, subnet.Spec.RouteTable, err) diff --git a/pkg/ovs/ovn-nbctl-legacy.go b/pkg/ovs/ovn-nbctl-legacy.go index b3a061d761b..00d50140f7d 100644 --- a/pkg/ovs/ovn-nbctl-legacy.go +++ b/pkg/ovs/ovn-nbctl-legacy.go @@ -562,7 +562,7 @@ func (c *LegacyClient) DeleteDHCPOptionsByUUIDs(uuidList []string) (err error) { } func (c *LegacyClient) DeleteDHCPOptions(ls string, protocol string) error { - klog.Infof("delete dhcp options for switch %s protocol %s", ls, protocol) + klog.V(4).Infof("delete dhcp options for switch %s protocol %s", ls, protocol) dhcpOptionsList, err := c.ListDHCPOptions(true, ls, protocol) if err != nil { klog.Errorf("find dhcp options failed, %v", err) diff --git a/pkg/util/validator.go b/pkg/util/validator.go index 8484ed83696..87bef877d69 100644 --- a/pkg/util/validator.go +++ b/pkg/util/validator.go @@ -105,6 +105,11 @@ func ValidateSubnet(subnet kubeovnv1.Subnet) error { } } } + + if subnet.Spec.LogicalGateway && subnet.Spec.U2OInterconnection { + return fmt.Errorf("logicalGateway and u2oInterconnection can't be opened at the same time") + } + return nil }