Skip to content

Commit

Permalink
perf: use policy-route to replace src-ip route
Browse files Browse the repository at this point in the history
Reduce number of openflow need to be installed on node.
  • Loading branch information
oilbeater committed Jul 1, 2020
1 parent 32e6d57 commit b8f0324
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 181 deletions.
24 changes: 12 additions & 12 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ type Controller struct {
podsLister v1.PodLister
podsSynced cache.InformerSynced

addPodQueue workqueue.RateLimitingInterface
deletePodQueue workqueue.RateLimitingInterface
updatePodQueue workqueue.RateLimitingInterface
addPodQueue workqueue.RateLimitingInterface
deletePodQueue workqueue.RateLimitingInterface
updatePodRouteQueue workqueue.RateLimitingInterface

subnetsLister kubeovnlister.SubnetLister
subnetSynced cache.InformerSynced
addOrUpdateSubnetQueue workqueue.RateLimitingInterface
deleteSubnetQueue workqueue.RateLimitingInterface
deleteRouteQueue workqueue.RateLimitingInterface
deleteSubnetRouteQueue workqueue.RateLimitingInterface
updateSubnetStatusQueue workqueue.RateLimitingInterface

ipsLister kubeovnlister.IPLister
Expand Down Expand Up @@ -128,7 +128,7 @@ func NewController(config *Configuration) *Controller {
subnetSynced: subnetInformer.Informer().HasSynced,
addOrUpdateSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddSubnet"),
deleteSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteSubnet"),
deleteRouteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteRoute"),
deleteSubnetRouteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteRoute"),
updateSubnetStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnetStatus"),

ipsLister: ipInformer.Lister(),
Expand All @@ -140,11 +140,11 @@ func NewController(config *Configuration) *Controller {
delVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DelVlan"),
updateVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVlan"),

podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
addPodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddPod"),
deletePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeletePod"),
updatePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePod"),
podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
addPodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddPod"),
deletePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeletePod"),
updatePodRouteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePod"),

namespacesLister: namespaceInformer.Lister(),
namespacesSynced: namespaceInformer.Informer().HasSynced,
Expand Down Expand Up @@ -276,13 +276,13 @@ func (c *Controller) shutdown() {

c.addPodQueue.ShutDown()
c.deletePodQueue.ShutDown()
c.updatePodQueue.ShutDown()
c.updatePodRouteQueue.ShutDown()

c.addNamespaceQueue.ShutDown()

c.addOrUpdateSubnetQueue.ShutDown()
c.deleteSubnetQueue.ShutDown()
c.deleteRouteQueue.ShutDown()
c.deleteSubnetRouteQueue.ShutDown()
c.updateSubnetStatusQueue.ShutDown()

c.addNodeQueue.ShutDown()
Expand Down
28 changes: 0 additions & 28 deletions pkg/controller/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package controller

import (
"fmt"
"github.com/alauda/kube-ovn/pkg/ovs"
"github.com/alauda/kube-ovn/pkg/util"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -20,7 +19,6 @@ func (c *Controller) gc() error {
c.gcLogicalSwitchPort,
c.gcLoadBalancer,
c.gcPortGroup,
c.gcStaticRoute,
}
for _, gcFunc := range gcFunctions {
if err := gcFunc(); err != nil {
Expand Down Expand Up @@ -226,29 +224,3 @@ func (c *Controller) gcPortGroup() error {
}
return nil
}

func (c *Controller) gcStaticRoute() error {
routes, err := c.ovnClient.ListStaticRoute()
if err != nil {
klog.Errorf("failed to list static route %v", err)
return err
}
for _, route := range routes {
if route.Policy == ovs.PolicyDstIP || route.Policy == "" {
if !c.ipam.ContainAddress(route.NextHop) {
klog.Infof("gc static route %s %s %s", route.Policy, route.CIDR, route.NextHop)
if err := c.ovnClient.DeleteStaticRouteByNextHop(route.NextHop); err != nil {
klog.Errorf("failed to delete stale nexthop route %s, %v", route.NextHop, err)
}
}
} else {
if !c.ipam.ContainAddress(route.CIDR) {
klog.Infof("gc static route %s %s %s", route.Policy, route.CIDR, route.NextHop)
if err := c.ovnClient.DeleteStaticRoute(route.CIDR, c.config.ClusterRouter); err != nil {
klog.Errorf("failed to delete stale route %s, %v", route.NextHop, err)
}
}
}
}
return nil
}
24 changes: 22 additions & 2 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ func (c *Controller) InitOVN() error {
return err
}

if err := c.initSubnetAddressSet(); err != nil {
klog.Errorf("init subnet address set failed %v", err)
return err
}

if err := c.initDefaultVlan(); err != nil {
klog.Errorf("init default vlan failed %v", err)
return err
Expand Down Expand Up @@ -102,8 +107,19 @@ func (c *Controller) initNodeSwitch() error {
nodeSubnet.Spec.Vlan = c.config.DefaultVlanName
}

_, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Create(&nodeSubnet)
return err
if _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Create(&nodeSubnet); err != nil {
return err
}

allCidr := "0.0.0.0/0"
if util.CheckProtocol(c.config.NodeSwitchCIDR) == kubeovnv1.ProtocolIPv6 {
allCidr = "::/0"
}
if err := c.ovnClient.AddStaticRoute("", allCidr, c.config.NodeSwitchGateway, c.config.ClusterRouter); err != nil {
return err
}

return nil
}

// InitClusterRouter init cluster router to connect different logical switches
Expand Down Expand Up @@ -155,6 +171,10 @@ func (c *Controller) initLoadBalancer() error {
return nil
}

func (c *Controller) initSubnetAddressSet() error {
return c.ovnClient.CreateAddressSet(util.SubnetAddressSet)
}

func (c *Controller) InitIPAM() error {
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,17 @@ func (c *Controller) handleAddNode(key string) error {
return err
}
}
nodeLocalAddressSet := fmt.Sprintf(util.NodeLocalAddressSetTemplate, strings.Replace(key, "-", ".", -1))
if err := c.ovnClient.CreateAddressSet(nodeLocalAddressSet); err != nil {
klog.Errorf("failed to create node local address set for node %s, %v", key, err)
return err
}

match := fmt.Sprintf("ip4.src==$%s && ip4.dst!=$%s", nodeLocalAddressSet, util.SubnetAddressSet)
if err := c.ovnClient.CreatePolicyRoute(c.config.ClusterRouter, match, ip, util.DistributedGatewayPolicyRoutePriority); err != nil {
klog.Errorf("failed to create policy route for node %s, %v", key, err)
return err
}

patchPayloadTemplate :=
`[{
Expand Down Expand Up @@ -311,6 +322,16 @@ func (c *Controller) handleAddNode(key string) error {
}

func (c *Controller) handleDeleteNode(key string) error {
if _, err := c.nodesLister.Get(key); err != nil {
// node still exists
return nil
}

if err := c.ovnClient.DeleteAddressSet(fmt.Sprintf(util.NodeLocalAddressSetTemplate, strings.Replace(key, "-", ".", -1))); err != nil {
klog.Errorf("failed to delete node local address set for node %s, %v", key, err)
return err
}

portName := fmt.Sprintf("node-%s", key)
if err := c.ovnClient.DeletePort(portName); err != nil {
klog.Errorf("failed to delete node switch port node-%s %v", key, err)
Expand Down
107 changes: 36 additions & 71 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ func (c *Controller) enqueueAddPod(obj interface{}) {
// In case update event might lost during leader election
if p.Annotations != nil &&
p.Annotations[util.AllocatedAnnotation] == "true" &&
p.Annotations[util.RoutedAnnotation] != "true" &&
p.Status.HostIP != "" && p.Status.PodIP != "" {
c.updatePodQueue.Add(key)
c.updatePodRouteQueue.Add(p.Spec.NodeName)
return
}

Expand All @@ -96,6 +95,8 @@ func (c *Controller) enqueueDeletePod(obj interface{}) {
}

p := obj.(*v1.Pod)
c.updatePodRouteQueue.Add(p.Spec.NodeName)

for _, np := range c.podMatchNetworkPolicies(p) {
c.updateNpQueue.Add(np)
}
Expand Down Expand Up @@ -183,10 +184,9 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) {

// pod assigned an ip
if newPod.Annotations[util.AllocatedAnnotation] == "true" &&
newPod.Annotations[util.RoutedAnnotation] != "true" &&
newPod.Spec.NodeName != "" {
klog.V(3).Infof("enqueue update pod %s", key)
c.updatePodQueue.Add(key)
c.updatePodRouteQueue.Add(newPod.Spec.NodeName)
}
}

Expand Down Expand Up @@ -274,26 +274,26 @@ func (c *Controller) processNextDeletePodWorkItem() bool {
}

func (c *Controller) processNextUpdatePodWorkItem() bool {
obj, shutdown := c.updatePodQueue.Get()
obj, shutdown := c.updatePodRouteQueue.Get()

if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.updatePodQueue.Done(obj)
defer c.updatePodRouteQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.updatePodQueue.Forget(obj)
c.updatePodRouteQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.handleUpdatePod(key); err != nil {
c.updatePodQueue.AddRateLimited(key)
if err := c.reconcileNodeLocalAddressSet(key); err != nil {
c.updatePodRouteQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.updatePodQueue.Forget(obj)
c.updatePodRouteQueue.Forget(obj)
return nil
}(obj)

Expand Down Expand Up @@ -393,93 +393,58 @@ func (c *Controller) handleAddPod(key string) error {
return nil
}

func (c *Controller) handleDeletePod(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
pod, err := c.podsLister.Pods(namespace).Get(name)
if pod != nil && isPodAlive(pod) {
// Pod with same name exists, just return here
func (c *Controller) reconcileNodeLocalAddressSet(node string) error {
if node == "" {
return nil
}

ips, _ := c.ipam.GetPodAddress(key)
for _, ip := range ips {
if err := c.ovnClient.DeleteStaticRoute(ip, c.config.ClusterRouter); err != nil {
return err
}
}

if err := c.ovnClient.DeletePort(ovs.PodNameToPortName(name, namespace)); err != nil {
klog.Errorf("failed to delete lsp %s, %v", ovs.PodNameToPortName(name, namespace), err)
pods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("list pods failed %v", err)
return err
}

if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(ovs.PodNameToPortName(name, namespace), &metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", ovs.PodNameToPortName(name, namespace), err)
return err
addressSet := []string{}
for _, pod := range pods {
if pod.Annotations[util.IpAddressAnnotation] != "" &&
pod.Spec.NodeName == node {
addressSet = append(addressSet, pod.Annotations[util.IpAddressAnnotation])
}
}
if err := c.ovnClient.SetAddressesToAddressSet(addressSet,
fmt.Sprintf(util.NodeLocalAddressSetTemplate, strings.Replace(node, "-", ".", -1))); err != nil {
klog.Errorf("failed to set node local address set for %s, %v", node, err)
return err
}

c.ipam.ReleaseAddressByPod(key)
return nil
}

func (c *Controller) handleUpdatePod(key string) error {
func (c *Controller) handleDeletePod(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
pod, err := c.podsLister.Pods(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
if pod != nil && isPodAlive(pod) {
// Pod with same name exists, just return here
return nil
}

klog.Infof("update pod %s/%s", namespace, name)
podIP := pod.Annotations[util.IpAddressAnnotation]

subnet, err := c.getPodDefaultSubnet(pod)
if err != nil {
klog.Errorf("failed to get subnet %v", err)
if err := c.ovnClient.DeletePort(ovs.PodNameToPortName(name, namespace)); err != nil {
klog.Errorf("failed to delete lsp %s, %v", ovs.PodNameToPortName(name, namespace), err)
return err
}

if !subnet.Spec.UnderlayGateway {
if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
node, err := c.nodesLister.Get(pod.Spec.NodeName)
if err != nil {
klog.Errorf("get node %s failed %v", pod.Spec.NodeName, err)
return err
}
nodeTunlIPAddr, err := getNodeTunlIP(node)
if err != nil {
return err
}

if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podIP, nodeTunlIPAddr.String(), c.config.ClusterRouter); err != nil {
return errors.Annotate(err, "add static route failed")
}
if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(ovs.PodNameToPortName(name, namespace), &metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", ovs.PodNameToPortName(name, namespace), err)
return err
}
}

pod.Annotations[util.RoutedAnnotation] = "true"
if _, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(name, types.JSONPatchType, generatePatchPayload(pod.Annotations, "replace")); err != nil {
if k8serrors.IsNotFound(err) {
// Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod.
// Then we need to recycle the resource again.
c.deletePodQueue.AddRateLimited(key)
return nil
}
klog.Errorf("patch pod %s/%s failed %v", name, namespace, err)
return err
}
c.ipam.ReleaseAddressByPod(key)
return nil
}

Expand Down

0 comments on commit b8f0324

Please sign in to comment.