Skip to content

Commit

Permalink
update policy route when join subnet cidr changed (#1638)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongzhen-ma committed Jun 23, 2022
1 parent bcf057d commit 83c9e84
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 77 deletions.
3 changes: 3 additions & 0 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,9 @@ func (c *Controller) initNodeRoutes() error {
return err
}
for _, node := range nodes {
if node.Annotations[util.AllocatedAnnotation] != "true" {
continue
}
nodeIPv4, nodeIPv6 := util.GetNodeInternalIP(*node)
joinAddrV4, joinAddrV6 := util.SplitStringIP(node.Annotations[util.IpAddressAnnotation])
if nodeIPv4 != "" && joinAddrV4 != "" {
Expand Down
60 changes: 32 additions & 28 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (c *Controller) handleAddNode(key string) error {
return err
}
node := orinode.DeepCopy()
klog.Infof("handle add node %v", node.Name)

subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
Expand Down Expand Up @@ -320,6 +321,9 @@ func (c *Controller) handleAddNode(key string) error {
}

for _, subnet := range subnets {
if subnet.Spec.Vlan != "" || subnet.Spec.Vpc != util.DefaultVpc || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWDistributedType {
continue
}
if err = c.createPortGroupForDistributedSubnet(node, subnet); err != nil {
klog.Errorf("failed to create port group for node %s and subnet %s: %v", node.Name, subnet.Name, err)
return err
Expand All @@ -328,6 +332,8 @@ func (c *Controller) handleAddNode(key string) error {
klog.Errorf("failed to add policy router for node %s and subnet %s: %v", node.Name, subnet.Name, err)
return err
}
// policy route for overlay distributed subnet should be reconciled when node ip changed
c.addOrUpdateSubnetQueue.Add(subnet.Name)
}

// ovn acl doesn't support address_set name with '-', so replace '-' by '.'
Expand All @@ -337,7 +343,7 @@ func (c *Controller) handleAddNode(key string) error {
return err
}

if err := c.addPolicyRouteForNode(node.Name, ipStr); err != nil {
if err := c.addPolicyRouteForCentralizedSubnetOnNode(node.Name, ipStr); err != nil {
klog.Errorf("failed to add policy route for node %s, %v", key, err)
return err
}
Expand Down Expand Up @@ -721,7 +727,7 @@ func (c *Controller) checkGatewayReady() error {
continue
}

exist, err := c.checkPolicyRouteExistForNode(node.Name, cidrBlock)
exist, err := c.checkPolicyRouteExistForNode(node.Name, cidrBlock, ip)
if err != nil {
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
break
Expand Down Expand Up @@ -1063,14 +1069,14 @@ func (c *Controller) getPolicyRouteParas(cidr string) ([]string, map[string]stri
return nextHops, nameIpMap, nil
}

func (c *Controller) checkPolicyRouteExistForNode(nodeName, cidr string) (bool, error) {
func (c *Controller) checkPolicyRouteExistForNode(nodeName, cidr, nexthop string) (bool, error) {
_, nameIpMap, err := c.getPolicyRouteParas(cidr)
if err != nil {
klog.Errorf("failed to get policy route paras, %v", err)
return false, err
}

if _, ok := nameIpMap[nodeName]; ok {
if nodeIp, ok := nameIpMap[nodeName]; ok && nodeIp == nexthop {
return true, nil
}
return false, nil
Expand Down Expand Up @@ -1104,18 +1110,17 @@ func (c *Controller) deletePolicyRouteForNode(nodeName string) error {
if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType {
if c.config.EnableEcmp {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
exist, err := c.checkPolicyRouteExistForNode(nodeName, cidrBlock)
if err != nil {
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
continue
}

nextHops, nameIpMap, err := c.getPolicyRouteParas(cidrBlock)
if err != nil {
klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
continue
}

exist := false
if _, ok := nameIpMap[nodeName]; ok {
exist = true
}

if exist {
nextHops = util.RemoveString(nextHops, nameIpMap[nodeName])
delete(nameIpMap, nodeName)
Expand Down Expand Up @@ -1144,7 +1149,7 @@ func (c *Controller) deletePolicyRouteForNode(nodeName string) error {
return nil
}

func (c *Controller) addPolicyRouteForNode(nodeName, nodeIP string) error {
func (c *Controller) addPolicyRouteForCentralizedSubnetOnNode(nodeName, nodeIP string) error {
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get subnets %v", err)
Expand All @@ -1161,24 +1166,23 @@ func (c *Controller) addPolicyRouteForNode(nodeName, nodeIP string) error {
continue
}

for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
exist, err := c.checkPolicyRouteExistForNode(nodeName, cidrBlock)
if err != nil {
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
continue
}
if exist {
continue
}

nextHops, nameIpMap, err := c.getPolicyRouteParas(cidrBlock)
if err != nil {
klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
continue
}
for _, nextHop := range strings.Split(nodeIP, ",") {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
if util.CheckProtocol(cidrBlock) != util.CheckProtocol(nextHop) {
continue
}
exist, err := c.checkPolicyRouteExistForNode(nodeName, cidrBlock, nextHop)
if err != nil {
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
continue
}
if exist {
continue
}

for _, nextHop := range strings.Split(nodeIP, ",") {
if util.CheckProtocol(cidrBlock) == util.CheckProtocol(nextHop) {
nextHops, nameIpMap, err := c.getPolicyRouteParas(cidrBlock)
if err != nil {
klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
continue
}
nextHops = append(nextHops, nextHop)
Expand Down
104 changes: 55 additions & 49 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ func (c *Controller) handleDeleteLogicalSwitch(key string) (err error) {
func (c *Controller) handleDeleteSubnet(subnet *kubeovnv1.Subnet) error {
c.updateVpcStatusQueue.Add(subnet.Spec.Vpc)

if err := c.deletePolicyRouteByGatewayType(subnet, subnet.Spec.GatewayType); err != nil {
if err := c.deletePolicyRouteByGatewayType(subnet, subnet.Spec.GatewayType, true); err != nil {
klog.Errorf("failed to delete policy route for overlay subnet %s, %v", subnet.Name, err)
return err
}
Expand Down Expand Up @@ -1071,14 +1071,16 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
return err
}
}
if node.Annotations[util.AllocatedAnnotation] != "true" {
continue
}
nodeIP, err := getNodeTunlIP(node)
if err != nil {
klog.Errorf("failed to get node %s tunnel ip, %v", node.Name, err)
return err
}
nextHop := getNextHopByTunnelIP(nodeIP)

changed := false
podNets, err := c.getPodKubeovnNets(pod)
if err != nil {
klog.Errorf("failed to get pod nets %v", err)
Expand Down Expand Up @@ -1134,7 +1136,6 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
}

if len(portsToAdd) != 0 {
changed = true
klog.Infof("new port %v should be added to port group %s", portsToAdd, pgName)
newPgPorts := make([]string, len(portsToAdd), len(portsToAdd)+len(pgPorts))
copy(newPgPorts, portsToAdd)
Expand All @@ -1152,29 +1153,30 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
if pod.Annotations[util.NorthGatewayAnnotation] != "" {
continue
}
if changed {
for _, nextHopIp := range strings.Split(nextHop, ",") {
if nextHopIp == "" {
continue
}

ipSuffix := "ip4"
if util.CheckProtocol(nextHopIp) == kubeovnv1.ProtocolIPv6 {
ipSuffix = "ip6"
}
pgAs := fmt.Sprintf("%s_%s", pgName, ipSuffix)
match := fmt.Sprintf("%s.src == $%s", ipSuffix, pgAs)
externalIDs := map[string]string{
"vendor": util.CniTypeName,
"subnet": subnet.Name,
"node": node.Name,
}
// in case the node name is "vendor", "subnet" or "node"
externalIDs[node.Name] = nextHopIp
if err = c.ovnLegacyClient.AddPolicyRoute(c.config.ClusterRouter, util.GatewayRouterPolicyPriority, match, "reroute", nextHopIp, externalIDs); err != nil {
klog.Errorf("failed to add policy route for port-group address-set %s: %v", pgAs, err)
return err
}
// policy route should be flushed when cidr changed for join subnet
for _, nextHopIp := range strings.Split(nextHop, ",") {
if nextHopIp == "" {
continue
}

ipSuffix := "ip4"
if util.CheckProtocol(nextHopIp) == kubeovnv1.ProtocolIPv6 {
ipSuffix = "ip6"
}
pgAs := fmt.Sprintf("%s_%s", pgName, ipSuffix)
match := fmt.Sprintf("%s.src == $%s", ipSuffix, pgAs)
externalIDs := map[string]string{
"vendor": util.CniTypeName,
"subnet": subnet.Name,
"node": node.Name,
}
// in case the node name is "vendor", "subnet" or "node"
externalIDs[node.Name] = nextHopIp

if err = c.ovnLegacyClient.AddPolicyRoute(c.config.ClusterRouter, util.GatewayRouterPolicyPriority, match, "reroute", nextHopIp, externalIDs); err != nil {
klog.Errorf("failed to add policy route for port-group address-set %s: %v", pgAs, err)
return err
}
}
}
Expand Down Expand Up @@ -1295,7 +1297,7 @@ func (c *Controller) reconcileOvnRoute(subnet *kubeovnv1.Subnet) error {
}
}

if err := c.deletePolicyRouteByGatewayType(subnet, kubeovnv1.GWDistributedType); err != nil {
if err := c.deletePolicyRouteByGatewayType(subnet, kubeovnv1.GWDistributedType, false); err != nil {
klog.Errorf("failed to delete policy route for overlay subnet %s, %v", subnet.Name, err)
return err
}
Expand Down Expand Up @@ -1703,33 +1705,33 @@ func (c *Controller) updatePolicyRouteForCentralizedSubnet(subnetName, cidr stri
}

func (c *Controller) addPolicyRouteForCentralizedSubnet(subnet *kubeovnv1.Subnet, nodeName string, ipNameMap map[string]string, nodeIPs []string) error {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
exist, err := c.checkPolicyRouteExistForNode(nodeName, cidrBlock)
if err != nil {
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
continue
}
if exist {
continue
}

var nextHops []string
nameIpMap := map[string]string{}
for _, nodeIp := range nodeIPs {
if util.CheckProtocol(nodeIp) != util.CheckProtocol(cidrBlock) {
for _, nodeIP := range nodeIPs {
for _, cidrBlock := range strings.Split(subnet.Spec.CIDRBlock, ",") {
if util.CheckProtocol(cidrBlock) != util.CheckProtocol(nodeIP) {
continue
}
exist, err := c.checkPolicyRouteExistForNode(nodeName, cidrBlock, nodeIP)
if err != nil {
klog.Errorf("check ecmp policy route exist for subnet %v, error %v", subnet.Name, err)
continue
}
if exist {
continue
}
nextHops = append(nextHops, nodeIp)

var nextHops []string
nameIpMap := map[string]string{}

nextHops = append(nextHops, nodeIP)
tmpName := nodeName
if nodeName == "" {
tmpName = ipNameMap[nodeIp]
tmpName = ipNameMap[nodeIP]
}
nameIpMap[tmpName] = nodeIp
}
nameIpMap[tmpName] = nodeIP

if err := c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
return err
if err := c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
return err
}
}
}
return nil
Expand Down Expand Up @@ -1808,13 +1810,13 @@ func (c *Controller) deletePolicyRouteForDistributedSubnet(subnet *kubeovnv1.Sub
return nil
}

func (c *Controller) deletePolicyRouteByGatewayType(subnet *kubeovnv1.Subnet, gatewayType string) error {
if subnet.Spec.Vlan != "" || subnet.Spec.Vpc != util.DefaultVpc || subnet.Name == c.config.NodeSwitch {
func (c *Controller) deletePolicyRouteByGatewayType(subnet *kubeovnv1.Subnet, gatewayType string, isDelete bool) error {
if subnet.Spec.Vlan != "" || subnet.Spec.Vpc != util.DefaultVpc {
return nil
}

for _, cidr := range strings.Split(subnet.Spec.CIDRBlock, ",") {
if cidr == "" {
if cidr == "" || !isDelete {
continue
}

Expand All @@ -1823,11 +1825,15 @@ func (c *Controller) deletePolicyRouteByGatewayType(subnet *kubeovnv1.Subnet, ga
af = 6
}
match := fmt.Sprintf("ip%d.dst == %s", af, cidr)
klog.Infof("delete policy route for subnet %s, match %s", subnet.Name, match)
if err := c.ovnLegacyClient.DeletePolicyRoute(c.config.ClusterRouter, util.SubnetRouterPolicyPriority, match); err != nil {
klog.Errorf("failed to delete logical router policy for CIDR %s of subnet %s: %v", cidr, subnet.Name, err)
return err
}
}
if subnet.Name == c.config.NodeSwitch {
return nil
}

if gatewayType == kubeovnv1.GWDistributedType {
nodes, err := c.nodesLister.List(labels.Everything())
Expand Down
33 changes: 33 additions & 0 deletions pkg/ovs/ovn-nbctl-legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,17 @@ func (c LegacyClient) AddStaticRoute(policy, cidr, nextHop, router string, route

// AddPolicyRoute add a policy route rule in ovn
func (c LegacyClient) AddPolicyRoute(router string, priority int32, match, action, nextHop string, externalIDs map[string]string) error {
consistent, err := c.CheckPolicyRouteNexthopConsistent(router, match, nextHop, priority)
if err != nil {
return err
}
if !consistent {
if err := c.DeletePolicyRoute(router, priority, match); err != nil {
klog.Errorf("failed to delete policy route: %v", err)
return err
}
}

// lr-policy-add ROUTER PRIORITY MATCH ACTION [NEXTHOP]
args := []string{MayExist, "lr-policy-add", router, strconv.Itoa(int(priority)), match, action}
if nextHop != "" {
Expand Down Expand Up @@ -2263,6 +2274,28 @@ func (c LegacyClient) SetPolicyRouteExternalIds(priority int32, match string, na
return nil
}

func (c LegacyClient) CheckPolicyRouteNexthopConsistent(router, match, nexthop string, priority int32) (bool, error) {
exist, err := c.PolicyRouteExists(priority, match)
if err != nil {
return false, err
}
if !exist {
return false, nil
}

nextHops, _, err := c.GetPolicyRouteParas(priority, match)
if err != nil {
klog.Errorf("failed to get policy route paras, %v", err)
return false, err
}
for _, next := range nextHops {
if next == nexthop {
return true, nil
}
}
return false, nil
}

type DHCPOptionsUUIDs struct {
DHCPv4OptionsUUID string
DHCPv6OptionsUUID string
Expand Down

0 comments on commit 83c9e84

Please sign in to comment.