Skip to content

Commit

Permalink
manage ovn lr policy with libovsdb (#2788)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzujian committed May 12, 2023
1 parent 7350db5 commit 6ddd03b
Show file tree
Hide file tree
Showing 26 changed files with 536 additions and 1,044 deletions.
1 change: 0 additions & 1 deletion go.mod
Expand Up @@ -35,7 +35,6 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.2
github.com/vishvananda/netlink v1.2.1-beta.2
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/sys v0.8.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.55.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Expand Up @@ -1533,8 +1533,6 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
150 changes: 104 additions & 46 deletions mocks/pkg/ovs/interface.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/apis/kubeovn/v1/types.go
Expand Up @@ -413,7 +413,7 @@ var (
)

type PolicyRoute struct {
Priority int32 `json:"priority,omitempty"`
Priority int `json:"priority,omitempty"`
Match string `json:"match,omitempty"`
Action PolicyRouteAction `json:"action,omitempty"`
// NextHopIP is an optional parameter. It needs to be provided only when 'action' is 'reroute'.
Expand Down
44 changes: 23 additions & 21 deletions pkg/controller/external_vpc.go
Expand Up @@ -2,13 +2,13 @@ package controller

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"

v1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/ovsdb/ovnnb"
"github.com/kubeovn/kube-ovn/pkg/util"
)

Expand Down Expand Up @@ -83,7 +83,9 @@ func (c *Controller) syncExternalVpc() {

func (c *Controller) getRouterStatus() (logicalRouters map[string]util.LogicalRouter, err error) {
logicalRouters = make(map[string]util.LogicalRouter)
externalOvnRouters, err := c.ovnLegacyClient.CustomFindEntity("logical_router", []string{"name", "port"}, fmt.Sprintf("external_ids{!=}vendor=%s", util.CniTypeName))
externalOvnRouters, err := c.ovnClient.ListLogicalRouter(false, func(lr *ovnnb.LogicalRouter) bool {
return len(lr.ExternalIDs) == 0 || lr.ExternalIDs["vendor"] != util.CniTypeName
})
if err != nil {
klog.Errorf("failed to list external logical router, %v", err)
return logicalRouters, err
Expand All @@ -93,44 +95,44 @@ func (c *Controller) getRouterStatus() (logicalRouters map[string]util.LogicalRo
return logicalRouters, nil
}

for _, aExternalRouter := range externalOvnRouters {
var aLogicalRouter util.LogicalRouter
aLogicalRouter.Name = aExternalRouter["name"][0]
var ports []util.Port
for _, portUUId := range aExternalRouter["port"] {
portName, err := c.ovnLegacyClient.GetEntityInfo("logical_router_port", portUUId, []string{"name"})
for _, externalLR := range externalOvnRouters {
lr := util.LogicalRouter{
Name: externalLR.Name,
Ports: make([]util.Port, 0, len(externalLR.Ports)),
}
for _, uuid := range externalLR.Ports {
lrp, err := c.ovnClient.GetLogicalRouterPortByUUID(uuid)
if err != nil {
klog.Info("get port error")
klog.Warningf("failed to get LRP by UUID %s: %v", uuid, err)
continue
}
aPort := util.Port{
Name: portName["name"],
Subnet: "",
}
ports = append(ports, aPort)
lr.Ports = append(lr.Ports, util.Port{Name: lrp.Name})
}
aLogicalRouter.Ports = ports
logicalRouters[aLogicalRouter.Name] = aLogicalRouter
logicalRouters[lr.Name] = lr
}
UUID := "_uuid"
for routerName, logicalRouter := range logicalRouters {
tmpRouter := logicalRouter
for _, port := range logicalRouter.Ports {
peerPorts, err := c.ovnLegacyClient.CustomFindEntity("logical_switch_port", []string{UUID}, fmt.Sprintf("options:router-port=%s", port.Name))
peerPorts, err := c.ovnClient.ListLogicalSwitchPorts(false, nil, func(lsp *ovnnb.LogicalSwitchPort) bool {
return len(lsp.Options) != 0 && lsp.Options["router-port"] == port.Name
})
if err != nil || len(peerPorts) > 1 {
klog.Errorf("failed to list peer port of %s, %v", port, err)
continue
}
if len(peerPorts) == 0 {
continue
}
switches, err := c.ovnLegacyClient.CustomFindEntity("logical_switch", []string{"name"}, fmt.Sprintf("ports{>=}%s", peerPorts[0][UUID][0]))
lsp := peerPorts[0]
switches, err := c.ovnClient.ListLogicalSwitch(false, func(ls *ovnnb.LogicalSwitch) bool {
return util.ContainsString(ls.Ports, lsp.UUID)
})
if err != nil || len(switches) > 1 {
klog.Errorf("failed to list peer switch of %s, %v", peerPorts, err)
klog.Errorf("failed to get logical switch of LSP %s: %v", lsp.Name, err)
continue
}
var aLogicalSwitch util.LogicalSwitch
aLogicalSwitch.Name = switches[0]["name"][0]
aLogicalSwitch.Name = switches[0].Name
tmpRouter.LogicalSwitches = append(tmpRouter.LogicalSwitches, aLogicalSwitch)
}
logicalRouters[routerName] = tmpRouter
Expand Down
30 changes: 10 additions & 20 deletions pkg/controller/init.go
Expand Up @@ -744,15 +744,16 @@ func (c *Controller) initSyncCrdVlans() error {
}

func (c *Controller) migrateNodeRoute(af int, node, ip, nexthop string) error {
// migrate from old version static route to policy route
match := fmt.Sprintf("ip%d.dst == %s", af, ip)
consistent, err := c.ovnLegacyClient.CheckPolicyRouteNexthopConsistent(match, nexthop, util.NodeRouterPolicyPriority)
if err != nil {
return err
externalIDs := map[string]string{
"vendor": util.CniTypeName,
"node": node,
}
if consistent {
klog.V(3).Infof("node policy route migrated")
return nil
klog.V(3).Infof("add policy route for router: %s, priority: %d, match %s, action %s, nexthop %s, extrenalID %v",
c.config.ClusterRouter, util.NodeRouterPolicyPriority, match, "reroute", nexthop, externalIDs)
if err := c.ovnClient.AddLogicalRouterPolicy(c.config.ClusterRouter, util.NodeRouterPolicyPriority, match, "reroute", []string{nexthop}, externalIDs); err != nil {
klog.Errorf("failed to add logical router policy for node %s: %v", node, err)
return err
}

routeTables, err := c.ovnLegacyClient.GetRouteTables(c.config.ClusterRouter)
Expand All @@ -769,8 +770,8 @@ func (c *Controller) migrateNodeRoute(af int, node, ip, nexthop string) error {

asName := nodeUnderlayAddressSetName(node, af)
obsoleteMatch := fmt.Sprintf("ip%d.dst == %s && ip%d.src != $%s", af, ip, af, asName)
klog.Infof("delete policy route for router: %s, priority: %d, match %s", c.config.ClusterRouter, util.NodeRouterPolicyPriority, obsoleteMatch)
if err := c.ovnLegacyClient.DeletePolicyRoute(c.config.ClusterRouter, util.NodeRouterPolicyPriority, obsoleteMatch); err != nil {
klog.V(3).Infof("delete policy route for router: %s, priority: %d, match %s", c.config.ClusterRouter, util.NodeRouterPolicyPriority, obsoleteMatch)
if err := c.ovnClient.DeleteLogicalRouterPolicy(c.config.ClusterRouter, util.NodeRouterPolicyPriority, obsoleteMatch); err != nil {
klog.Errorf("failed to delete obsolete logical router policy for node %s: %v", node, err)
return err
}
Expand All @@ -780,17 +781,6 @@ func (c *Controller) migrateNodeRoute(af int, node, ip, nexthop string) error {
return err
}

externalIDs := map[string]string{
"vendor": util.CniTypeName,
"node": node,
}
klog.Infof("add policy route for router: %s, priority: %d, match %s, action %s, nexthop %s, extrenalID %v",
c.config.ClusterRouter, util.NodeRouterPolicyPriority, match, "reroute", nexthop, externalIDs)
if err := c.ovnLegacyClient.AddPolicyRoute(c.config.ClusterRouter, util.NodeRouterPolicyPriority, match, "reroute", nexthop, externalIDs); err != nil {
klog.Errorf("failed to add logical router policy for node %s: %v", node, err)
return err
}

return nil
}

Expand Down
78 changes: 39 additions & 39 deletions pkg/controller/node.go
Expand Up @@ -11,6 +11,7 @@ import (
"time"

goping "github.com/oilbeater/go-ping"
"github.com/scylladb/go-set/strset"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -277,12 +278,12 @@ func (c *Controller) handleAddNode(key string) error {
"address-family": strconv.Itoa(af),
}
klog.Infof("add policy route for router: %s, match %s, action %s, nexthop %s, externalID %v", c.config.ClusterRouter, match, "reroute", ip, externalIDs)
if err = c.ovnLegacyClient.AddPolicyRoute(c.config.ClusterRouter, util.NodeRouterPolicyPriority, match, "reroute", ip, externalIDs); err != nil {
if err = c.ovnClient.AddLogicalRouterPolicy(c.config.ClusterRouter, util.NodeRouterPolicyPriority, match, "reroute", []string{ip}, externalIDs); err != nil {
klog.Errorf("failed to add logical router policy for node %s: %v", node.Name, err)
return err
}

if err = c.deletePolicyRouteForLocalDnsCacheOnNode(portName, node.Name, af); err != nil {
if err = c.deletePolicyRouteForLocalDnsCacheOnNode(node.Name, af); err != nil {
return err
}

Expand Down Expand Up @@ -478,7 +479,7 @@ func (c *Controller) handleDeleteNode(key string) error {

afs := []int{4, 6}
for _, af := range afs {
if err := c.deletePolicyRouteForLocalDnsCacheOnNode(portName, key, af); err != nil {
if err := c.deletePolicyRouteForLocalDnsCacheOnNode(key, af); err != nil {
return err
}
}
Expand All @@ -500,7 +501,7 @@ func (c *Controller) handleDeleteNode(key string) error {
if addr.Ip == "" {
continue
}
if err := c.ovnLegacyClient.DeletePolicyRouteByNexthop(c.config.ClusterRouter, util.NodeRouterPolicyPriority, addr.Ip); err != nil {
if err := c.ovnClient.DeleteLogicalRouterPolicyByNexthop(c.config.ClusterRouter, util.NodeRouterPolicyPriority, addr.Ip); err != nil {
klog.Errorf("failed to delete router policy for node %s: %v", key, err)
return err
}
Expand Down Expand Up @@ -815,24 +816,24 @@ func (c *Controller) checkGatewayReady() error {
if !success {
if exist {
klog.Warningf("failed to ping ovn0 %s or node %s is not ready, delete ecmp policy route for node", ip, node.Name)
nextHops = util.RemoveString(nextHops, ip)
nextHops.Remove(ip)
delete(nameIpMap, node.Name)
klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIpMap); err != nil {
klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
return err
}
}
} else {
klog.V(3).Infof("succeed to ping gw %s", ip)
if !exist {
nextHops = append(nextHops, ip)
nextHops.Add(ip)
if nameIpMap == nil {
nameIpMap = make(map[string]string, 1)
}
nameIpMap[node.Name] = ip
klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIpMap); err != nil {
klog.Errorf("failed to add ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
return err
}
Expand All @@ -841,10 +842,10 @@ func (c *Controller) checkGatewayReady() error {
} else {
if exist {
klog.Infof("subnet %s gatewayNode does not contains node %v, delete policy route for node ip %s", subnet.Name, node.Name, ip)
nextHops = util.RemoveString(nextHops, ip)
nextHops.Remove(ip)
delete(nameIpMap, node.Name)
klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIpMap); err != nil {
klog.Errorf("failed to delete ecmp policy route for subnet %s on node %s, %v", subnet.Name, node.Name, err)
return err
}
Expand Down Expand Up @@ -1083,22 +1084,24 @@ func (c *Controller) addNodeGwStaticRoute() error {
return nil
}

func (c *Controller) getPolicyRouteParas(cidr string, priority int32) ([]string, map[string]string, error) {
// TODO:// support get policy route by vpc
func (c *Controller) getPolicyRouteParas(cidr string, priority int) (*strset.Set, map[string]string, error) {
ipSuffix := "ip4"
if util.CheckProtocol(cidr) == kubeovnv1.ProtocolIPv6 {
ipSuffix = "ip6"
}
match := fmt.Sprintf("%s.src == %s", ipSuffix, cidr)
nextHops, nameIpMap, err := c.ovnLegacyClient.GetPolicyRouteParas(priority, match)
policy, err := c.ovnClient.GetLogicalRouterPolicy(c.config.ClusterRouter, priority, match, true)
if err != nil {
klog.Errorf("failed to get policy route paras, %v", err)
return nextHops, nameIpMap, err
klog.Errorf("failed to get logical router policy: %v", err)
return nil, nil, err
}
if policy == nil {
return nil, nil, err
}
return nextHops, nameIpMap, nil
return strset.New(policy.Nexthops...), policy.ExternalIDs, nil
}

func (c *Controller) checkPolicyRouteExistForNode(nodeName, cidr, nexthop string, priority int32) (bool, error) {
func (c *Controller) checkPolicyRouteExistForNode(nodeName, cidr, nexthop string, priority int) (bool, error) {
_, nameIpMap, err := c.getPolicyRouteParas(cidr, priority)
if err != nil {
klog.Errorf("failed to get policy route paras, %v", err)
Expand Down Expand Up @@ -1151,18 +1154,18 @@ func (c *Controller) deletePolicyRouteForNode(nodeName string) error {
}

if exist {
nextHops = util.RemoveString(nextHops, nameIpMap[nodeName])
nextHops.Remove(nameIpMap[nodeName])
delete(nameIpMap, nodeName)

if len(nextHops) == 0 {
if nextHops.Size() == 0 {
klog.Infof("delete policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
if err := c.deletePolicyRouteForCentralizedSubnet(subnet); err != nil {
klog.Errorf("failed to delete policy route for centralized subnet %s, %v", subnet.Name, err)
return err
}
} else {
klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIpMap); err != nil {
klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
return err
}
Expand Down Expand Up @@ -1217,13 +1220,13 @@ func (c *Controller) addPolicyRouteForCentralizedSubnetOnNode(nodeName, nodeIP s
klog.Errorf("get ecmp policy route paras for subnet %v, error %v", subnet.Name, err)
continue
}
nextHops = append(nextHops, nextHop)
nextHops.Add(nextHop)
if nameIpMap == nil {
nameIpMap = make(map[string]string, 1)
}
nameIpMap[nodeName] = nextHop
klog.Infof("update policy route for centralized subnet %s, nextHops %s", subnet.Name, nextHops)
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops, nameIpMap); err != nil {
if err = c.updatePolicyRouteForCentralizedSubnet(subnet.Name, cidrBlock, nextHops.List(), nameIpMap); err != nil {
klog.Errorf("failed to update policy route for subnet %s on node %s, %v", subnet.Name, nodeName, err)
return err
}
Expand Down Expand Up @@ -1254,36 +1257,33 @@ func (c *Controller) addPolicyRouteForLocalDnsCacheOnNode(nodePortName, nodeIP,
pgAs := strings.Replace(fmt.Sprintf("%s_ip%d", nodePortName, af), "-", ".", -1)
match := fmt.Sprintf("ip%d.src == $%s && ip%d.dst == %s", af, pgAs, af, c.config.NodeLocalDnsIP)
klog.Infof("add node local dns cache policy route for router: %s, match %s, action %s, nexthop %s, externalID %v", c.config.ClusterRouter, match, "reroute", nodeIP, externalIDs)
if err := c.ovnLegacyClient.AddPolicyRoute(c.config.ClusterRouter, util.NodeLocalDnsPolicyPriority, match, "reroute", nodeIP, externalIDs); err != nil {
if err := c.ovnClient.AddLogicalRouterPolicy(c.config.ClusterRouter, util.NodeLocalDnsPolicyPriority, match, "reroute", []string{nodeIP}, externalIDs); err != nil {
klog.Errorf("failed to add logical router policy for node %s: %v", nodeName, err)
return err
}
return nil
}

func (c *Controller) deletePolicyRouteForLocalDnsCacheOnNode(nodePortName, nodeName string, af int) error {
results, err := c.ovnLegacyClient.CustomFindEntity("Logical_Router_Policy", []string{"_uuid", "match", "priority"},
fmt.Sprintf("external_ids:vendor=\"%s\"", util.CniTypeName),
fmt.Sprintf("external_ids:node=\"%s\"", nodeName),
fmt.Sprintf("external_ids:address-family=\"%s\"", strconv.Itoa(af)),
"external_ids:isLocalDnsCache=\"true\"",
)
func (c *Controller) deletePolicyRouteForLocalDnsCacheOnNode(nodeName string, af int) error {
policies, err := c.ovnClient.ListLogicalRouterPolicies(c.config.ClusterRouter, -1, map[string]string{
"vendor": util.CniTypeName,
"node": nodeName,
"address-family": strconv.Itoa(af),
"isLocalDnsCache": "true",
})
if err != nil {
klog.Errorf("customFindEntity failed, %v", err)
klog.Errorf("failed to list logical router policies: %v", err)
return err
}

if len(results) == 0 {
if len(policies) == 0 {
return nil
}

var uuids []string
for _, result := range results {
uuids = append(uuids, result["_uuid"][0])
klog.Infof("delete node local dns cache policy route for router %s with match %s ", c.config.ClusterRouter, result["match"])
for _, policy := range policies {
klog.Infof("delete node local dns cache policy route for router %s with match %s", c.config.ClusterRouter, policy.Match)

if err := c.ovnLegacyClient.DeletePolicyRouteByUUID(c.config.ClusterRouter, uuids); err != nil {
klog.Errorf("failed to delete policy route for node local dns in router %s with match %s : %v", c.config.ClusterRouter, result["match"], err)
if err := c.ovnClient.DeleteLogicalRouterPolicyByUUID(c.config.ClusterRouter, policy.UUID); err != nil {
klog.Errorf("failed to delete policy route for node local dns in router %s with match %s: %v", c.config.ClusterRouter, policy.Match, err)
return err
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/ovn-ic.go
Expand Up @@ -459,7 +459,7 @@ func (c *Controller) syncOneRouteToPolicy(key, value string) {
klog.Errorf("logical router does not exist %v at %v", err, time.Now())
return
}
lrRouteList, err := c.ovnClient.GetLogicalRouterRouteByOpts(key, value)
lrRouteList, err := c.ovnClient.ListLogicalRouterStaticRoutesByOption(lr.Name, key, value)
if err != nil {
klog.Errorf("failed to list lr ovn-ic route %v", err)
return
Expand All @@ -468,14 +468,14 @@ func (c *Controller) syncOneRouteToPolicy(key, value string) {
klog.V(5).Info("lr ovn-ic route does not exist")
err := c.ovnClient.DeleteLogicalRouterPolicies(lr.Name, util.OvnICPolicyPriority, map[string]string{key: value})
if err != nil {
klog.Errorf("delete ovn-ic lr policy", err)
klog.Errorf("failed to delete ovn-ic lr policy: %v", err)
return
}
return
}

policyMap := map[string]string{}
lrPolicyList, err := c.ovnClient.ListLogicalRouterPolicies(util.OvnICPolicyPriority, map[string]string{key: value})
lrPolicyList, err := c.ovnClient.ListLogicalRouterPolicies(lr.Name, util.OvnICPolicyPriority, map[string]string{key: value})
if err != nil {
klog.Errorf("failed to list ovn-ic lr policy ", err)
return
Expand Down

0 comments on commit 6ddd03b

Please sign in to comment.