Skip to content

Commit

Permalink
add policy route for distributed subnet in default vpc
Browse files Browse the repository at this point in the history
  • Loading branch information
hongzhen-ma committed Jan 30, 2022
1 parent ab528ae commit 9365a62
Show file tree
Hide file tree
Showing 8 changed files with 436 additions and 16 deletions.
8 changes: 8 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,14 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
if err := c.initSyncCrdVlans(); err != nil {
klog.Errorf("failed to sync crd vlans: %v", err)
}
if err := c.initDeleteOverlayPodsStaticRoutes(); err != nil {
klog.Errorf("failed to delete pod's static route in default vpc: %v", err)
}
// The static route for node gw can be deleted when gc static route, so add it after gc process
dstIp := "0.0.0.0/0,::/0"
if err := c.ovnClient.AddStaticRoute("", dstIp, c.config.NodeSwitchGateway, c.config.ClusterRouter, util.NormalRouteType); err != nil {
klog.Errorf("failed to add static route for node gw: %v", err)
}

// start workers to do all the network operations
c.startWorkers(stopCh)
Expand Down
16 changes: 16 additions & 0 deletions pkg/controller/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/util"
)
Expand Down Expand Up @@ -496,6 +497,21 @@ func (c *Controller) gcPortGroup() error {
for _, node := range nodes {
npNames = append(npNames, fmt.Sprintf("%s/%s", "node", node.Name))
}

// append overlay subnets port group to npNames to avoid gc distributed subnets port group
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list subnets %v", err)
return err
}
for _, subnet := range subnets {
if subnet.Spec.Vpc != util.DefaultVpc || subnet.Spec.Vlan != "" || subnet.Name == c.config.NodeSwitch || subnet.Spec.GatewayType != kubeovnv1.GWDistributedType {
continue
}
for _, node := range nodes {
npNames = append(npNames, fmt.Sprintf("%s/%s", subnet.Name, node.Name))
}
}
}

pgs, err := c.ovnClient.ListNpPortGroup()
Expand Down
37 changes: 37 additions & 0 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func (c *Controller) InitOVN() error {
return err
}

if err := c.createOverlaySubnetsAddressSet(); err != nil {
klog.Errorf("failed to create overlay subnets address-set, %v", err)
return err
}

return nil
}

Expand Down Expand Up @@ -682,3 +687,35 @@ func (c *Controller) initHtbQos() error {
}
return err
}

func (c *Controller) initDeleteOverlayPodsStaticRoutes() error {
pods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list pods: %v", err)
return err
}
for _, pod := range pods {
if pod.Spec.HostNetwork {
continue
}
podNets, err := c.getPodKubeovnNets(pod)
if err != nil {
klog.Errorf("failed to get pod kubeovn nets %s.%s address %s: %v", pod.Name, pod.Namespace, pod.Annotations[util.IpAddressAnnotation], err)
continue
}
for _, podNet := range podNets {
if !isOvnSubnet(podNet.Subnet) || podNet.Subnet.Spec.Vpc != util.DefaultVpc || podNet.Subnet.Spec.Vlan != "" || podNet.Subnet.Spec.GatewayType != kubeovnv1.GWDistributedType {
continue
}
if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "true" {
for _, podIP := range strings.Split(pod.Annotations[fmt.Sprintf(util.IpAddressAnnotationTemplate, podNet.ProviderName)], ",") {
if err := c.ovnClient.DeleteStaticRoute(podIP, podNet.Subnet.Spec.Vpc); err != nil {
return err
}
}
}
}
}

return nil
}
41 changes: 36 additions & 5 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ func (c *Controller) handleAddNode(key string) error {
}
}

if err := c.addNodeGwStaticRoute(); err != nil {
klog.Errorf("failed to add static route for node gw: %v", err)
return err
}

patchPayloadTemplate :=
`[{
"op": "%s",
Expand Down Expand Up @@ -609,7 +614,7 @@ func (c *Controller) checkGatewayReady() error {
continue
}

exist, err := c.checkNodeEcmpRouteExist(ip, cidrBlock)
exist, err := c.checkRouteExist(ip, cidrBlock, ovs.PolicySrcIP)
if err != nil {
klog.Errorf("get ecmp static route for subnet %v, error %v", subnet.Name, err)
break
Expand Down Expand Up @@ -671,19 +676,20 @@ func (c *Controller) checkGatewayReady() error {
return nil
}

func (c *Controller) checkNodeEcmpRouteExist(nodeIp, cidrBlock string) (bool, error) {
func (c *Controller) checkRouteExist(nextHop, cidrBlock, routePolicy string) (bool, error) {
routes, err := c.ovnClient.GetStaticRouteList(c.config.ClusterRouter)
if err != nil {
klog.Errorf("failed to list static route %v", err)
return false, err
}

for _, route := range routes {
if route.Policy != ovs.PolicySrcIP {
if route.Policy != routePolicy {
continue
}
if route.CIDR == cidrBlock && route.NextHop == nodeIp {
klog.V(3).Infof("src-ip static route exist for cidr %s, nexthop %v", cidrBlock, nodeIp)

if route.CIDR == cidrBlock && route.NextHop == nextHop {
klog.V(3).Infof("static route exists for cidr %s, nexthop %v", cidrBlock, nextHop)
return true, nil
}
}
Expand Down Expand Up @@ -905,3 +911,28 @@ func (c *Controller) RemoveRedundantChassis(node *v1.Node) error {
}
return nil
}

func (c *Controller) addNodeGwStaticRoute() error {
dstCidr := "0.0.0.0/0,::/0"
for _, cidrBlock := range strings.Split(dstCidr, ",") {
for _, nextHop := range strings.Split(c.config.NodeSwitchGateway, ",") {
if util.CheckProtocol(cidrBlock) != util.CheckProtocol(nextHop) {
continue
}

exist, err := c.checkRouteExist(nextHop, cidrBlock, ovs.PolicyDstIP)
if err != nil {
klog.Errorf("get static route for node gw error %v", err)
return err
}

if !exist {
if err := c.ovnClient.AddStaticRoute("", cidrBlock, nextHop, c.config.ClusterRouter, util.NormalRouteType); err != nil {
klog.Errorf("failed to add static route for node gw: %v", err)
return err
}
}
}
}
return nil
}
56 changes: 54 additions & 2 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ func (c *Controller) handleDeletePod(pod *v1.Pod) error {
if err != nil {
klog.Warningf("filed to get port '%s' sg, %v", portName, err)
}
// when lsp is deleted, the port of pod is deleted from any port-group automatically.
if err := c.ovnClient.DeleteLogicalSwitchPort(portName); err != nil {
klog.Errorf("failed to delete lsp %s, %v", portName, err)
return err
Expand Down Expand Up @@ -720,6 +721,12 @@ func (c *Controller) handleUpdatePod(key string) error {
return err
}

_, idNameMap, err := c.ovnClient.ListLspForNodePortgroup()
if err != nil {
klog.Errorf("failed to list lsp info, %v", err)
return err
}

for _, podNet := range podNets {
if !isOvnSubnet(podNet.Subnet) {
continue
Expand Down Expand Up @@ -764,6 +771,19 @@ func (c *Controller) handleUpdatePod(key string) error {
klog.Errorf("get node %s failed %v", pod.Spec.NodeName, err)
return err
}

pgName := getOverlaySubnetsPortGroupName(subnet.Name, node.Name)
exist, err := c.ovnClient.PortGroupExists(pgName)
if err != nil {
return err
}
if !exist {
if err = c.ovnClient.CreateNpPortGroup(pgName, subnet.Name, node.Name); err != nil {
klog.Errorf("failed to create port group for subnet %s and node %s, %v", subnet.Name, node.Name, err)
return err
}
}

nodeTunlIPAddr, err := getNodeTunlIP(node)
if err != nil {
return err
Expand All @@ -774,10 +794,42 @@ func (c *Controller) handleUpdatePod(key string) error {
if util.CheckProtocol(nodeAddr.String()) != util.CheckProtocol(podAddr) {
continue
}
if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podAddr, nodeAddr.String(), c.config.ClusterRouter, util.NormalRouteType); err != nil {
klog.Errorf("failed to add static route, %v", err)

pgPorts, err := c.getPgPorts(idNameMap, pgName)
if err != nil {
klog.Errorf("failed to fetch ports for pg %v, %v", pgName, err)
return err
}

portName := ovs.PodNameToPortName(pod.Name, pod.Namespace, podNet.ProviderName)
if !util.IsStringIn(portName, pgPorts) {
pgPorts = append(pgPorts, portName)

if err = c.ovnClient.SetPortsToPortGroup(pgName, pgPorts); err != nil {
klog.Errorf("failed to set ports to port group %v, %v", pgName, err)
return err
}
}

ipSuffix := "ip4"
subnetAsName := getOverlaySubnetsAddressSetName(c.config.ClusterRouter, kubeovnv1.ProtocolIPv4)
if util.CheckProtocol(nodeAddr.String()) == kubeovnv1.ProtocolIPv6 {
ipSuffix = "ip6"
subnetAsName = getOverlaySubnetsAddressSetName(c.config.ClusterRouter, kubeovnv1.ProtocolIPv6)
}
pgAs := fmt.Sprintf("%s_%s", pgName, ipSuffix)
match := fmt.Sprintf("%s.src == $%s && %s.dst != $%s", ipSuffix, pgAs, ipSuffix, subnetAsName)

exist, err := c.ovnClient.PolicyRouteExists(util.PodRouterPolicyPriority, match)
if err != nil {
return err
}
if !exist {
if err = c.ovnClient.AddPolicyRoute(c.config.ClusterRouter, util.PodRouterPolicyPriority, match, "reroute", nodeAddr.String()); err != nil {
klog.Errorf("failed to add logical router policy for port-group address-set %s: %v", pgAs, err)
return err
}
}
}
}
}
Expand Down
Loading

0 comments on commit 9365a62

Please sign in to comment.