Skip to content

Commit

Permalink
feat: remove finalizer dependency improve svc performance
Browse files Browse the repository at this point in the history
  • Loading branch information
oilbeater committed May 13, 2019
1 parent cd62ada commit 278ccfe
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 65 deletions.
6 changes: 3 additions & 3 deletions dist/images/cleanup.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/bin/bash
set -euo pipefail
set -eu

# Remove finalizers in svc
kubectl patch svc -n kube-ovn ovn-nb --type='json' -p '[{"op": "replace", "path": "/metadata/finalizers", "value": []}]'
kubectl patch svc -n kube-ovn ovn-sb --type='json' -p '[{"op": "replace", "path": "/metadata/finalizers", "value": []}]'
kubectl patch svc -n kube-ovn ovn-nb --type='json' -p '[{"op": "replace", "path": "/metadata/finalizers", "value": []}]' || true
kubectl patch svc -n kube-ovn ovn-sb --type='json' -p '[{"op": "replace", "path": "/metadata/finalizers", "value": []}]' || true

# Delete Kube-OVN components
kubectl delete ns kube-ovn
Expand Down
22 changes: 11 additions & 11 deletions dist/images/start-db.sh
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
#!/bin/bash
set -euo pipefail
set -eo pipefail

DB_NB_ADDR=${1:-0.0.0.0}
DB_NB_PORT=${1:-6641}
DB_SB_ADDR=${1:-0.0.0.0}
DB_SB_PORT=${1:-6642}
DB_NB_ADDR=${DB_NB_ADDR:-0.0.0.0}
DB_NB_PORT=${DB_NB_PORT:-6641}
DB_SB_ADDR=${DB_SB_ADDR:-0.0.0.0}
DB_SB_PORT=${DB_SB_PORT:-6642}

function gen_conn_str {
t=$(echo -n ${NODE_IPS} | sed 's/[[:space:]]//g' | sed 's/,/ /g')
x=$(for i in $t; do echo -n "tcp:$i:$1",; done| sed 's/,$//')
x=$(for i in ${t}; do echo -n "tcp:$i:$1",; done| sed 's/,$//')
echo "$x"
}

function get_first_node_ip {
t=$(echo -n ${NODE_IPS} | sed 's/[[:space:]]//g' | sed 's/,/ /g')
echo -n $t | cut -f 1 -d " "
echo -n ${t} | cut -f 1 -d " "
}

function quit {
Expand All @@ -23,13 +23,13 @@ function quit {
}
trap quit EXIT

if [ -z "$NODE_IPS" ]; then
if [[ -z "$NODE_IPS" ]]; then
/usr/share/openvswitch/scripts/ovn-ctl restart_northd
else
/usr/share/openvswitch/scripts/ovn-ctl stop_northd

first_node_ip=$(get_first_node_ip)
if [ "$first_node_ip" == "${POD_IP}" ]; then
if [[ "$first_node_ip" == "${POD_IP}" ]]; then
# Start ovn-northd, ovn-nb and ovn-sb
/usr/share/openvswitch/scripts/ovn-ctl \
--db-nb-create-insecure-remote=yes \
Expand All @@ -51,8 +51,8 @@ else
--db-sb-create-insecure-remote=yes \
--db-nb-cluster-local-addr=${POD_IP} \
--db-sb-cluster-local-addr=${POD_IP} \
--db-nb-cluster-remote-addr=$first_node_ip \
--db-sb-cluster-remote-addr=$first_node_ip \
--db-nb-cluster-remote-addr=${first_node_ip} \
--db-sb-cluster-remote-addr=${first_node_ip} \
--ovn-northd-nb-db=$(gen_conn_str 6641) \
--ovn-northd-sb-db=$(gen_conn_str 6642) \
start_northd
Expand Down
26 changes: 15 additions & 11 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ type Controller struct {
addNodeQueue workqueue.RateLimitingInterface
deleteNodeQueue workqueue.RateLimitingInterface

servicesLister v1.ServiceLister
serviceSynced cache.InformerSynced
addServiceQueue workqueue.RateLimitingInterface
updateServiceQueue workqueue.RateLimitingInterface
servicesLister v1.ServiceLister
serviceSynced cache.InformerSynced
deleteTcpServiceQueue workqueue.RateLimitingInterface
deleteUdpServiceQueue workqueue.RateLimitingInterface
updateServiceQueue workqueue.RateLimitingInterface

endpointsLister v1.EndpointsLister
endpointsSynced cache.InformerSynced
Expand Down Expand Up @@ -114,10 +115,11 @@ func NewController(config *Configuration) *Controller {
addNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddNode"),
deleteNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNode"),

servicesLister: serviceInformer.Lister(),
serviceSynced: serviceInformer.Informer().HasSynced,
addServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddService"),
updateServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteService"),
servicesLister: serviceInformer.Lister(),
serviceSynced: serviceInformer.Informer().HasSynced,
deleteTcpServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteTcpService"),
deleteUdpServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteUdpService"),
updateServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateService"),

endpointsLister: endpointInformer.Lister(),
endpointsSynced: endpointInformer.Informer().HasSynced,
Expand Down Expand Up @@ -146,7 +148,7 @@ func NewController(config *Configuration) *Controller {
})

serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueAddService,
DeleteFunc: controller.enqueueDeleteService,
UpdateFunc: controller.enqueueUpdateService,
})

Expand Down Expand Up @@ -177,7 +179,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
defer c.addNodeQueue.ShutDown()
defer c.deleteNodeQueue.ShutDown()

defer c.addServiceQueue.ShutDown()
defer c.deleteTcpServiceQueue.ShutDown()
defer c.deleteUdpServiceQueue.ShutDown()
defer c.updateServiceQueue.ShutDown()

defer c.updateEndpointQueue.ShutDown()
Expand Down Expand Up @@ -226,7 +229,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) error {
go wait.Until(c.runDeleteNodeWorker, time.Second, stopCh)

go wait.Until(c.runUpdateServiceWorker, time.Second, stopCh)
go wait.Until(c.runAddServiceWorker, time.Second, stopCh)
go wait.Until(c.runDeleteTcpServiceWorker, time.Second, stopCh)
go wait.Until(c.runDeleteUdpServiceWorker, time.Second, stopCh)

go wait.Until(c.runUpdateEndpointWorker, time.Second, stopCh)
}
Expand Down
112 changes: 72 additions & 40 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"fmt"
"k8s.io/apimachinery/pkg/labels"
"strings"

"github.com/alauda/kube-ovn/pkg/util"
Expand All @@ -12,18 +13,21 @@ import (
"k8s.io/klog"
)

func (c *Controller) enqueueAddService(obj interface{}) {
func (c *Controller) enqueueDeleteService(obj interface{}) {
if !c.isLeader() {
return
}
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
svc := obj.(*v1.Service)
klog.V(3).Infof("enqueue delete service %s/%s", svc.Namespace, svc.Name)
if svc.Spec.ClusterIP != v1.ClusterIPNone && svc.Spec.ClusterIP != "" {
for _, port := range svc.Spec.Ports {
if port.Protocol == v1.ProtocolTCP {
c.deleteTcpServiceQueue.AddRateLimited(fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, port.Port))
} else if port.Protocol == v1.ProtocolUDP {
c.deleteUdpServiceQueue.AddRateLimited(fmt.Sprintf("%s:%d", svc.Spec.ClusterIP, port.Port))
}
}
}
klog.V(3).Infof("enqueue add service %s", key)
c.addServiceQueue.AddRateLimited(key)
}

func (c *Controller) enqueueUpdateService(old, new interface{}) {
Expand All @@ -46,8 +50,13 @@ func (c *Controller) enqueueUpdateService(old, new interface{}) {
c.updateServiceQueue.AddRateLimited(key)
}

func (c *Controller) runAddServiceWorker() {
for c.processNextAddServiceWorkItem() {
func (c *Controller) runDeleteTcpServiceWorker() {
for c.processNextDeleteTcpServiceWorkItem() {
}
}

func (c *Controller) runDeleteUdpServiceWorker() {
for c.processNextDeleteUdpServiceWorkItem() {
}
}

Expand All @@ -56,27 +65,58 @@ func (c *Controller) runUpdateServiceWorker() {
}
}

func (c *Controller) processNextAddServiceWorkItem() bool {
obj, shutdown := c.addServiceQueue.Get()
func (c *Controller) processNextDeleteTcpServiceWorkItem() bool {
obj, shutdown := c.deleteTcpServiceQueue.Get()

if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.deleteTcpServiceQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.deleteTcpServiceQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.handleDeleteService(key, v1.ProtocolTCP); err != nil {
c.deleteTcpServiceQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.deleteTcpServiceQueue.Forget(obj)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}

func (c *Controller) processNextDeleteUdpServiceWorkItem() bool {
obj, shutdown := c.deleteUdpServiceQueue.Get()

if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.addServiceQueue.Done(obj)
defer c.deleteUdpServiceQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.addServiceQueue.Forget(obj)
c.deleteUdpServiceQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.handleAddService(key); err != nil {
c.addServiceQueue.AddRateLimited(key)
if err := c.handleDeleteService(key, v1.ProtocolUDP); err != nil {
c.deleteUdpServiceQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.addServiceQueue.Forget(obj)
c.deleteUdpServiceQueue.Forget(obj)
return nil
}(obj)

Expand Down Expand Up @@ -118,41 +158,33 @@ func (c *Controller) processNextUpdateServiceWorkItem() bool {
return true
}

func (c *Controller) handleAddService(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
func (c *Controller) handleDeleteService(vip string, protocol v1.Protocol) error {
svcs, err := c.servicesLister.Services(v1.NamespaceAll).List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
klog.Errorf("failed to list svc, %v", err)
return err
}
svc, err := c.servicesLister.Services(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
for _, svc := range svcs {
if svc.Spec.ClusterIP == strings.Split(vip, ":")[0] {
return nil
}
return err
}

if !containsString(svc.Finalizers, util.ServiceAnnotation) {
svc.SetFinalizers(append(svc.Finalizers, util.ServiceAnnotation))
_, err = c.config.KubeClient.CoreV1().Services(namespace).Update(svc)
if protocol == v1.ProtocolTCP {
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterTcpLoadBalancer)
if err != nil {
klog.Errorf("failed to delete vip %s from tcp lb, %v", vip, err)
return err
}
}

ip := svc.Spec.ClusterIP
if ip == "" || ip == v1.ClusterIPNone {
return nil
}

if !svc.DeletionTimestamp.IsZero() {
if containsString(svc.Finalizers, util.ServiceAnnotation) {
svc.SetFinalizers(removeString(svc.Finalizers, util.ServiceAnnotation))
_, err = c.config.KubeClient.CoreV1().Services(namespace).Update(svc)
} else {
err := c.ovnClient.DeleteLoadBalancerVip(vip, c.config.ClusterUdpLoadBalancer)
if err != nil {
klog.Errorf("failed to delete vip %s from udp lb, %v", vip, err)
return err
}
}

return err
return nil
}

func (c *Controller) handleUpdateService(key string) error {
Expand Down

0 comments on commit 278ccfe

Please sign in to comment.