Skip to content

Commit

Permalink
Revert "perf: use policy-route to replace src-ip route"
Browse files Browse the repository at this point in the history
  • Loading branch information
oilbeater committed Jul 6, 2020
1 parent d9074be commit 34b7cba
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 139 deletions.
24 changes: 12 additions & 12 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ type Controller struct {
podsLister v1.PodLister
podsSynced cache.InformerSynced

addPodQueue workqueue.RateLimitingInterface
deletePodQueue workqueue.RateLimitingInterface
updatePodRouteQueue workqueue.RateLimitingInterface
addPodQueue workqueue.RateLimitingInterface
deletePodQueue workqueue.RateLimitingInterface
updatePodQueue workqueue.RateLimitingInterface

subnetsLister kubeovnlister.SubnetLister
subnetSynced cache.InformerSynced
addOrUpdateSubnetQueue workqueue.RateLimitingInterface
deleteSubnetQueue workqueue.RateLimitingInterface
deleteSubnetRouteQueue workqueue.RateLimitingInterface
deleteRouteQueue workqueue.RateLimitingInterface
updateSubnetStatusQueue workqueue.RateLimitingInterface

ipsLister kubeovnlister.IPLister
Expand Down Expand Up @@ -128,7 +128,7 @@ func NewController(config *Configuration) *Controller {
subnetSynced: subnetInformer.Informer().HasSynced,
addOrUpdateSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddSubnet"),
deleteSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteSubnet"),
deleteSubnetRouteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteRoute"),
deleteRouteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteRoute"),
updateSubnetStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnetStatus"),

ipsLister: ipInformer.Lister(),
Expand All @@ -140,11 +140,11 @@ func NewController(config *Configuration) *Controller {
delVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DelVlan"),
updateVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVlan"),

podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
addPodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddPod"),
deletePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeletePod"),
updatePodRouteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePod"),
podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
addPodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddPod"),
deletePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeletePod"),
updatePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePod"),

namespacesLister: namespaceInformer.Lister(),
namespacesSynced: namespaceInformer.Informer().HasSynced,
Expand Down Expand Up @@ -276,13 +276,13 @@ func (c *Controller) shutdown() {

c.addPodQueue.ShutDown()
c.deletePodQueue.ShutDown()
c.updatePodRouteQueue.ShutDown()
c.updatePodQueue.ShutDown()

c.addNamespaceQueue.ShutDown()

c.addOrUpdateSubnetQueue.ShutDown()
c.deleteSubnetQueue.ShutDown()
c.deleteSubnetRouteQueue.ShutDown()
c.deleteRouteQueue.ShutDown()
c.updateSubnetStatusQueue.ShutDown()

c.addNodeQueue.ShutDown()
Expand Down
28 changes: 28 additions & 0 deletions pkg/controller/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"fmt"
"github.com/alauda/kube-ovn/pkg/ovs"
"github.com/alauda/kube-ovn/pkg/util"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -19,6 +20,7 @@ func (c *Controller) gc() error {
c.gcLogicalSwitchPort,
c.gcLoadBalancer,
c.gcPortGroup,
c.gcStaticRoute,
}
for _, gcFunc := range gcFunctions {
if err := gcFunc(); err != nil {
Expand Down Expand Up @@ -224,3 +226,29 @@ func (c *Controller) gcPortGroup() error {
}
return nil
}

func (c *Controller) gcStaticRoute() error {
routes, err := c.ovnClient.ListStaticRoute()
if err != nil {
klog.Errorf("failed to list static route %v", err)
return err
}
for _, route := range routes {
if route.Policy == ovs.PolicyDstIP || route.Policy == "" {
if !c.ipam.ContainAddress(route.NextHop) {
klog.Infof("gc static route %s %s %s", route.Policy, route.CIDR, route.NextHop)
if err := c.ovnClient.DeleteStaticRouteByNextHop(route.NextHop); err != nil {
klog.Errorf("failed to delete stale nexthop route %s, %v", route.NextHop, err)
}
}
} else {
if !c.ipam.ContainAddress(route.CIDR) {
klog.Infof("gc static route %s %s %s", route.Policy, route.CIDR, route.NextHop)
if err := c.ovnClient.DeleteStaticRoute(route.CIDR, c.config.ClusterRouter); err != nil {
klog.Errorf("failed to delete stale route %s, %v", route.NextHop, err)
}
}
}
}
return nil
}
24 changes: 2 additions & 22 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ func (c *Controller) InitOVN() error {
return err
}

if err := c.initSubnetAddressSet(); err != nil {
klog.Errorf("init subnet address set failed %v", err)
return err
}

if err := c.initDefaultVlan(); err != nil {
klog.Errorf("init default vlan failed %v", err)
return err
Expand Down Expand Up @@ -107,19 +102,8 @@ func (c *Controller) initNodeSwitch() error {
nodeSubnet.Spec.Vlan = c.config.DefaultVlanName
}

if _, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Create(&nodeSubnet); err != nil {
return err
}

allCidr := "0.0.0.0/0"
if util.CheckProtocol(c.config.NodeSwitchCIDR) == kubeovnv1.ProtocolIPv6 {
allCidr = "::/0"
}
if err := c.ovnClient.AddStaticRoute("", allCidr, c.config.NodeSwitchGateway, c.config.ClusterRouter); err != nil {
return err
}

return nil
_, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Create(&nodeSubnet)
return err
}

// InitClusterRouter init cluster router to connect different logical switches
Expand Down Expand Up @@ -171,10 +155,6 @@ func (c *Controller) initLoadBalancer() error {
return nil
}

func (c *Controller) initSubnetAddressSet() error {
return c.ovnClient.CreateAddressSet(util.SubnetAddressSet)
}

func (c *Controller) InitIPAM() error {
subnets, err := c.subnetsLister.List(labels.Everything())
if err != nil {
Expand Down
21 changes: 0 additions & 21 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,6 @@ func (c *Controller) handleAddNode(key string) error {
return err
}
}
nodeLocalAddressSet := fmt.Sprintf(util.NodeLocalAddressSetTemplate, strings.Replace(key, "-", ".", -1))
if err := c.ovnClient.CreateAddressSet(nodeLocalAddressSet); err != nil {
klog.Errorf("failed to create node local address set for node %s, %v", key, err)
return err
}

match := fmt.Sprintf("ip4.src==$%s && ip4.dst!=$%s", nodeLocalAddressSet, util.SubnetAddressSet)
if err := c.ovnClient.CreatePolicyRoute(c.config.ClusterRouter, match, ip, util.DistributedGatewayPolicyRoutePriority); err != nil {
klog.Errorf("failed to create policy route for node %s, %v", key, err)
return err
}

patchPayloadTemplate :=
`[{
Expand Down Expand Up @@ -322,16 +311,6 @@ func (c *Controller) handleAddNode(key string) error {
}

func (c *Controller) handleDeleteNode(key string) error {
if _, err := c.nodesLister.Get(key); err != nil {
// node still exists
return nil
}

if err := c.ovnClient.DeleteAddressSet(fmt.Sprintf(util.NodeLocalAddressSetTemplate, strings.Replace(key, "-", ".", -1))); err != nil {
klog.Errorf("failed to delete node local address set for node %s, %v", key, err)
return err
}

portName := fmt.Sprintf("node-%s", key)
if err := c.ovnClient.DeletePort(portName); err != nil {
klog.Errorf("failed to delete node switch port node-%s %v", key, err)
Expand Down
107 changes: 71 additions & 36 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ func (c *Controller) enqueueAddPod(obj interface{}) {
// In case update event might lost during leader election
if p.Annotations != nil &&
p.Annotations[util.AllocatedAnnotation] == "true" &&
p.Annotations[util.RoutedAnnotation] != "true" &&
p.Status.HostIP != "" && p.Status.PodIP != "" {
c.updatePodRouteQueue.Add(p.Spec.NodeName)
c.updatePodQueue.Add(key)
return
}

Expand All @@ -95,8 +96,6 @@ func (c *Controller) enqueueDeletePod(obj interface{}) {
}

p := obj.(*v1.Pod)
c.updatePodRouteQueue.Add(p.Spec.NodeName)

for _, np := range c.podMatchNetworkPolicies(p) {
c.updateNpQueue.Add(np)
}
Expand Down Expand Up @@ -184,9 +183,10 @@ func (c *Controller) enqueueUpdatePod(oldObj, newObj interface{}) {

// pod assigned an ip
if newPod.Annotations[util.AllocatedAnnotation] == "true" &&
newPod.Annotations[util.RoutedAnnotation] != "true" &&
newPod.Spec.NodeName != "" {
klog.V(3).Infof("enqueue update pod %s", key)
c.updatePodRouteQueue.Add(newPod.Spec.NodeName)
c.updatePodQueue.Add(key)
}
}

Expand Down Expand Up @@ -274,26 +274,26 @@ func (c *Controller) processNextDeletePodWorkItem() bool {
}

func (c *Controller) processNextUpdatePodWorkItem() bool {
obj, shutdown := c.updatePodRouteQueue.Get()
obj, shutdown := c.updatePodQueue.Get()

if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.updatePodRouteQueue.Done(obj)
defer c.updatePodQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.updatePodRouteQueue.Forget(obj)
c.updatePodQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.reconcileNodeLocalAddressSet(key); err != nil {
c.updatePodRouteQueue.AddRateLimited(key)
if err := c.handleUpdatePod(key); err != nil {
c.updatePodQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.updatePodRouteQueue.Forget(obj)
c.updatePodQueue.Forget(obj)
return nil
}(obj)

Expand Down Expand Up @@ -393,58 +393,93 @@ func (c *Controller) handleAddPod(key string) error {
return nil
}

func (c *Controller) reconcileNodeLocalAddressSet(node string) error {
if node == "" {
func (c *Controller) handleDeletePod(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}

pods, err := c.podsLister.List(labels.Everything())
if err != nil {
klog.Errorf("list pods failed %v", err)
return err
pod, err := c.podsLister.Pods(namespace).Get(name)
if pod != nil && isPodAlive(pod) {
// Pod with same name exists, just return here
return nil
}

addressSet := []string{}
for _, pod := range pods {
if pod.Annotations[util.IpAddressAnnotation] != "" &&
pod.Spec.NodeName == node {
addressSet = append(addressSet, pod.Annotations[util.IpAddressAnnotation])
ips, _ := c.ipam.GetPodAddress(key)
for _, ip := range ips {
if err := c.ovnClient.DeleteStaticRoute(ip, c.config.ClusterRouter); err != nil {
return err
}
}
if err := c.ovnClient.SetAddressesToAddressSet(addressSet,
fmt.Sprintf(util.NodeLocalAddressSetTemplate, strings.Replace(node, "-", ".", -1))); err != nil {
klog.Errorf("failed to set node local address set for %s, %v", node, err)

if err := c.ovnClient.DeletePort(ovs.PodNameToPortName(name, namespace)); err != nil {
klog.Errorf("failed to delete lsp %s, %v", ovs.PodNameToPortName(name, namespace), err)
return err
}

if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(ovs.PodNameToPortName(name, namespace), &metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", ovs.PodNameToPortName(name, namespace), err)
return err
}
}

c.ipam.ReleaseAddressByPod(key)
return nil
}

func (c *Controller) handleDeletePod(key string) error {
func (c *Controller) handleUpdatePod(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
pod, err := c.podsLister.Pods(namespace).Get(name)
if pod != nil && isPodAlive(pod) {
// Pod with same name exists, just return here
return nil
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
return err
}

if err := c.ovnClient.DeletePort(ovs.PodNameToPortName(name, namespace)); err != nil {
klog.Errorf("failed to delete lsp %s, %v", ovs.PodNameToPortName(name, namespace), err)
klog.Infof("update pod %s/%s", namespace, name)
podIP := pod.Annotations[util.IpAddressAnnotation]

subnet, err := c.getPodDefaultSubnet(pod)
if err != nil {
klog.Errorf("failed to get subnet %v", err)
return err
}

if err := c.config.KubeOvnClient.KubeovnV1().IPs().Delete(ovs.PodNameToPortName(name, namespace), &metav1.DeleteOptions{}); err != nil {
if !k8serrors.IsNotFound(err) {
klog.Errorf("failed to delete ip %s, %v", ovs.PodNameToPortName(name, namespace), err)
return err
if !subnet.Spec.UnderlayGateway {
if subnet.Spec.GatewayType == kubeovnv1.GWDistributedType {
node, err := c.nodesLister.Get(pod.Spec.NodeName)
if err != nil {
klog.Errorf("get node %s failed %v", pod.Spec.NodeName, err)
return err
}
nodeTunlIPAddr, err := getNodeTunlIP(node)
if err != nil {
return err
}

if err := c.ovnClient.AddStaticRoute(ovs.PolicySrcIP, podIP, nodeTunlIPAddr.String(), c.config.ClusterRouter); err != nil {
return errors.Annotate(err, "add static route failed")
}
}
}

c.ipam.ReleaseAddressByPod(key)
pod.Annotations[util.RoutedAnnotation] = "true"
if _, err := c.config.KubeClient.CoreV1().Pods(namespace).Patch(name, types.JSONPatchType, generatePatchPayload(pod.Annotations, "replace")); err != nil {
if k8serrors.IsNotFound(err) {
// Sometimes pod is deleted between kube-ovn configure ovn-nb and patch pod.
// Then we need to recycle the resource again.
c.deletePodQueue.AddRateLimited(key)
return nil
}
klog.Errorf("patch pod %s/%s failed %v", name, namespace, err)
return err
}
return nil
}

Expand Down

0 comments on commit 34b7cba

Please sign in to comment.