Skip to content

Commit

Permalink
add policy route for centralized subnet
Browse files Browse the repository at this point in the history
  • Loading branch information
hongzhen-ma committed Feb 14, 2022
1 parent 0fd564e commit a90b06a
Show file tree
Hide file tree
Showing 5 changed files with 458 additions and 147 deletions.
205 changes: 193 additions & 12 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ func (c *Controller) handleAddNode(key string) error {
return err
}

if err := c.addPolicyRouteForNode(node.Name, ipStr); err != nil {
klog.Errorf("failed to add policy route for node %s, %v", key, err)
return err
}

if err := c.RemoveRedundantChassis(node); err != nil {
return err
}
Expand Down Expand Up @@ -396,6 +401,10 @@ func (c *Controller) handleDeleteNode(key string) error {
klog.Errorf("failed to delete port group %s for node, %v", portName, err)
return err
}
if err := c.deletePolicyRouteForNode(key); err != nil {
klog.Errorf("failed to delete policy route for node %s: %v", key, err)
return err
}

addresses := c.ipam.GetPodAddress(portName)
for _, addr := range addresses {
Expand Down Expand Up @@ -608,15 +617,20 @@ func (c *Controller) checkGatewayReady() error {
for _, node := range nodes {
ipStr := node.Annotations[util.IpAddressAnnotation]
for _, ip := range strings.Split(ipStr, ",") {
var cidrBlock string
for _, cidrBlock = range strings.Split(subnet.Spec.CIDRBlock, ",") {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
if util.CheckProtocol(cidrBlock) != util.CheckProtocol(ip) {
continue
}

exist, err := c.checkRouteExist(ip, cidrBlock, ovs.PolicySrcIP)
exist, err := c.checkPolicyRouteExistForNode(node.Name, cidrBlock)
if err != nil {
klog.Errorf("get ecmp static route for subnet %v, error %v", subnet.Name, err)
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
break
}

nextHops, nameIpMap, err := c.getPolicyRouteParas(cidrBlock)
if err != nil {
klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
break
}

Expand Down Expand Up @@ -644,27 +658,36 @@ func (c *Controller) checkGatewayReady() error {
}

if !success {
klog.Warningf("failed to ping ovn0 %s or node %v is not ready", ip, node.Name)
if exist {
if err := c.ovnClient.DeleteMatchedStaticRoute(cidrBlock, ip, c.config.ClusterRouter); err != nil {
klog.Errorf("failed to delete static route %s for node %s, %v", ip, node.Name, err)
klog.Warningf("failed to ping ovn0 %s or node %v is not ready, delete ecmp policy route for node", ip, node.Name)
nextHops = util.RemoveString(nextHops, ip)
delete(nameIpMap, node.Name)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
return err
}
}
} else {
klog.V(3).Infof("succeed to ping gw %s", ip)
if !exist {
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, subnet.Spec.CIDRBlock, ip, c.config.ClusterRouter, util.EcmpRouteType); err != nil {
klog.Errorf("failed to add static route for node %s, %v", node.Name, err)
nextHops = append(nextHops, ip)
if nameIpMap == nil {
nameIpMap = make(map[string]string, 1)
}
nameIpMap[node.Name] = ip
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
klog.Errorf("failed to add ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
return err
}
}
}
} else {
if exist {
klog.Infof("subnet %v gatewayNode does not contains node %v, should delete ecmp route for node ip %s", subnet.Name, node.Name, ip)
if err := c.ovnClient.DeleteMatchedStaticRoute(cidrBlock, ip, c.config.ClusterRouter); err != nil {
klog.Errorf("failed to delete static route %s for node %s, %v", ip, node.Name, err)
klog.Infof("subnet %v gatewayNode does not contains node %v, delete policy route for node ip %s", subnet.Name, node.Name, ip)
nextHops = util.RemoveString(nextHops, ip)
delete(nameIpMap, node.Name)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
return err
}
}
Expand Down Expand Up @@ -936,3 +959,161 @@ func (c *Controller) addNodeGwStaticRoute() error {
}
return nil
}

func (c *Controller) getPolicyRouteParas(cidr string) ([]string, map[string]string, error) {
ipSuffix := "ip4"
subnetAsName := getOverlaySubnetsAddressSetName(c.config.ClusterRouter, kubeovnv1.ProtocolIPv4)
if util.CheckProtocol(cidr) == kubeovnv1.ProtocolIPv6 {
ipSuffix = "ip6"
subnetAsName = getOverlaySubnetsAddressSetName(c.config.ClusterRouter, kubeovnv1.ProtocolIPv6)
}
match := fmt.Sprintf("%s.src == %s && %s.dst != $%s", ipSuffix, cidr, ipSuffix, subnetAsName)

nextHops, nameIpMap, err := c.ovnClient.GetPolicyRouteParas(util.CentralSubnetPriority, match)
if err != nil {
klog.Errorf("failed to get policy route paras, %v", err)
return nextHops, nameIpMap, err
}
return nextHops, nameIpMap, nil
}

func (c *Controller) checkPolicyRouteExistForNode(nodeName, cidr string) (bool, error) {
_, nameIpMap, err := c.getPolicyRouteParas(cidr)
if err != nil {
klog.Errorf("failed to get policy route paras, %v", err)
return false, err
}

if _, ok := nameIpMap[nodeName]; ok {
return true, nil
}
return false, nil
}

func (c *Controller) deletePolicyRouteForNode(nodeName string) error {
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get subnets %v", err)
return err
}

for _, subnet := range subnets {
if subnet.Spec.Vlan != "" || subnet.Spec.Vpc != util.DefaultVpc || subnet.Name == c.config.NodeSwitch {
continue
}

if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
pgName := getOverlaySubnetsPortGroupName(subnet.Name, nodeName)
if err = c.ovnClient.DeletePortGroup(pgName); err != nil {
klog.Errorf("failed to delete port group for subnet %s and node %s, %v", subnet.Name, nodeName, err)
return err
}

if err = c.deletePolicyRouteForDistributedSubnet(subnet, nodeName); err != nil {
klog.Errorf("failed to delete policy route for subnet %s and node %s, %v", subnet.Name, nodeName, err)
return err
}
}

if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType {
if c.config.EnableEcmp {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
exist, err := c.checkPolicyRouteExistForNode(nodeName, cidrBlock)
if err != nil {
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
continue
}

nextHops, nameIpMap, err := c.getPolicyRouteParas(cidrBlock)
if err != nil {
klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
continue
}

if exist {
nextHops = util.RemoveString(nextHops, nameIpMap[nodeName])
delete(nameIpMap, nodeName)

if len(nextHops) == 0 {
if err := c.deletePolicyRouteForCentralizedSubnet(subnet); err != nil {
klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
return err
}
} else {
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
return err
}
}
}
}
} else {
if err := c.deletePolicyRouteForCentralizedSubnet(subnet); err != nil {
klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
return err
}
}
}
}
return nil
}

func (c *Controller) addPolicyRouteForNode(nodeName, nodeIP string) error {
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get subnets %v", err)
return err
}

for _, subnet := range subnets {
if subnet.Spec.Vlan != "" || subnet.Spec.Vpc != util.DefaultVpc || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWCentralizedType {
continue
}

if c.config.EnableEcmp {
if !util.GatewayContains(subnet.Spec.GatewayNode, nodeName) {
continue
}

for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
exist, err := c.checkPolicyRouteExistForNode(nodeName, cidrBlock)
if err != nil {
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
continue
}
if exist {
continue
}

nextHops, nameIpMap, err := c.getPolicyRouteParas(cidrBlock)
if err != nil {
klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
continue
}

for _, nextHop := range strings.Split(nodeIP, ",") {
if util.CheckProtocol(cidrBlock) == util.CheckProtocol(nextHop) {
continue
}
nextHops = append(nextHops, nextHop)
nameIpMap[nodeName] = nextHop

if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
return err
}
}
}
} else {
if subnet.Status.ActivateGateway != nodeName {
continue
}

if err = c.addPolicyRouteForCentralizedSubnet(subnet, nodeName, nil, strings.Split(nodeIP, ",")); err != nil {
klog.Errorf("failed to add active-backup policy route for centralized subnet %s: %v", subnet.Name, err)
return err
}
}
}
return nil
}

0 comments on commit a90b06a

Please sign in to comment.