Skip to content

Commit

Permalink
optimize kube-ovn-controller logic (#2771)
Browse files Browse the repository at this point in the history
1. get resources using lister interfaces;
2. replace keymutex with the k8s implementation.
  • Loading branch information
zhangzujian committed May 9, 2023
1 parent 3b2b071 commit a2b789c
Show file tree
Hide file tree
Showing 21 changed files with 213 additions and 182 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ require (
github.com/kubeovn/gonetworkmanager/v2 v2.0.0-20230327064018-0b27f88874f7
github.com/mdlayher/arp v0.0.0-20220512170110-6706a2966875
github.com/moby/sys/mountinfo v0.6.2
github.com/neverlee/keymutex v0.0.0-20171121013845-f593aa834bf9
github.com/oilbeater/go-ping v0.0.0-20200413021620-332b7197c5b5
github.com/onsi/ginkgo/v2 v2.9.2
github.com/onsi/gomega v1.27.6
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,6 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/ncw/swift v1.0.47/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/neverlee/keymutex v0.0.0-20171121013845-f593aa834bf9 h1:UfW5pM66x0MWE72ySrpd2Ymrn+b62kNHirozKkY3ojE=
github.com/neverlee/keymutex v0.0.0-20171121013845-f593aa834bf9/go.mod h1:3hf2IoUXDKjCg/EuqSLUB5TY8StGS3haWYJiqzP907c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
Expand Down
27 changes: 16 additions & 11 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package controller
import (
"context"
"fmt"
"runtime"
"sync"
"time"

"github.com/neverlee/keymutex"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -22,6 +22,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/keymutex"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
kubeovninformer "github.com/kubeovn/kube-ovn/pkg/client/informers/externalversions"
Expand Down Expand Up @@ -63,7 +64,7 @@ type Controller struct {
addOrUpdatePodQueue workqueue.RateLimitingInterface
deletePodQueue workqueue.RateLimitingInterface
updatePodSecurityQueue workqueue.RateLimitingInterface
podKeyMutex *keymutex.KeyMutex
podKeyMutex keymutex.KeyMutex

vpcsLister kubeovnlister.VpcLister
vpcSynced cache.InformerSynced
Expand All @@ -81,7 +82,7 @@ type Controller struct {
updateVpcDnatQueue workqueue.RateLimitingInterface
updateVpcSnatQueue workqueue.RateLimitingInterface
updateVpcSubnetQueue workqueue.RateLimitingInterface
vpcNatGwKeyMutex *keymutex.KeyMutex
vpcNatGwKeyMutex keymutex.KeyMutex

switchLBRuleLister kubeovnlister.SwitchLBRuleLister
switchLBRuleSynced cache.InformerSynced
Expand All @@ -101,7 +102,7 @@ type Controller struct {
deleteRouteQueue workqueue.RateLimitingInterface
updateSubnetStatusQueue workqueue.RateLimitingInterface
syncVirtualPortsQueue workqueue.RateLimitingInterface
subnetStatusKeyMutex *keymutex.KeyMutex
subnetStatusKeyMutex keymutex.KeyMutex

ipsLister kubeovnlister.IPLister
ipSynced cache.InformerSynced
Expand Down Expand Up @@ -208,14 +209,14 @@ type Controller struct {
npsSynced cache.InformerSynced
updateNpQueue workqueue.RateLimitingInterface
deleteNpQueue workqueue.RateLimitingInterface
npKeyMutex *keymutex.KeyMutex
npKeyMutex keymutex.KeyMutex

sgsLister kubeovnlister.SecurityGroupLister
sgSynced cache.InformerSynced
addOrUpdateSgQueue workqueue.RateLimitingInterface
delSgQueue workqueue.RateLimitingInterface
syncSgPortsQueue workqueue.RateLimitingInterface
sgKeyMutex *keymutex.KeyMutex
sgKeyMutex keymutex.KeyMutex

qosPoliciesLister kubeovnlister.QoSPolicyLister
qosPolicySynced cache.InformerSynced
Expand Down Expand Up @@ -287,6 +288,10 @@ func Run(ctx context.Context, config *Configuration) {
ovnSnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnSnatRules()
ovnDnatRuleInformer := kubeovnInformerFactory.Kubeovn().V1().OvnDnatRules()

numKeyLocks := runtime.NumCPU() * 2
if numKeyLocks < config.WorkerNum*2 {
numKeyLocks = config.WorkerNum * 2
}
controller := &Controller{
config: config,
vpcs: &sync.Map{},
Expand All @@ -311,7 +316,7 @@ func Run(ctx context.Context, config *Configuration) {
updateVpcDnatQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcDnat"),
updateVpcSnatQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcSnat"),
updateVpcSubnetQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "UpdateVpcSubnet"),
vpcNatGwKeyMutex: keymutex.New(97),
vpcNatGwKeyMutex: keymutex.NewHashed(numKeyLocks),

subnetsLister: subnetInformer.Lister(),
subnetSynced: subnetInformer.Informer().HasSynced,
Expand All @@ -320,7 +325,7 @@ func Run(ctx context.Context, config *Configuration) {
deleteRouteQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteRoute"),
updateSubnetStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnetStatus"),
syncVirtualPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "SyncVirtualPort"),
subnetStatusKeyMutex: keymutex.New(97),
subnetStatusKeyMutex: keymutex.NewHashed(numKeyLocks),

ipsLister: ipInformer.Lister(),
ipSynced: ipInformer.Informer().HasSynced,
Expand Down Expand Up @@ -385,7 +390,7 @@ func Run(ctx context.Context, config *Configuration) {
workqueue.DefaultControllerRateLimiter(),
),
updatePodSecurityQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePodSecurity"),
podKeyMutex: keymutex.New(97),
podKeyMutex: keymutex.NewHashed(numKeyLocks),

namespacesLister: namespaceInformer.Lister(),
namespacesSynced: namespaceInformer.Informer().HasSynced,
Expand Down Expand Up @@ -416,7 +421,7 @@ func Run(ctx context.Context, config *Configuration) {
configMapsLister: configMapInformer.Lister(),
configMapsSynced: configMapInformer.Informer().HasSynced,

sgKeyMutex: keymutex.New(97),
sgKeyMutex: keymutex.NewHashed(numKeyLocks),
sgsLister: sgInformer.Lister(),
sgSynced: sgInformer.Informer().HasSynced,
addOrUpdateSgQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSg"),
Expand Down Expand Up @@ -477,7 +482,7 @@ func Run(ctx context.Context, config *Configuration) {
controller.npsSynced = npInformer.Informer().HasSynced
controller.updateNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNp")
controller.deleteNpQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNp")
controller.npKeyMutex = keymutex.New(97)
controller.npKeyMutex = keymutex.NewHashed(128)
}

defer controller.shutdown()
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -247,14 +246,17 @@ func (c *Controller) gcNode() error {

func (c *Controller) gcVip() error {
klog.Infof("start to gc vips")
vips, err := c.config.KubeOvnClient.KubeovnV1().Vips().List(context.Background(), metav1.ListOptions{
LabelSelector: fields.OneTermNotEqualSelector(util.IpReservedLabel, "").String()},
)
selector, err := util.LabelSelectorNotEmpty(util.IpReservedLabel)
if err != nil {
klog.Errorf("failed to generate selector for label %s: %v", util.IpReservedLabel, err)
return err
}
vips, err := c.virtualIpsLister.List(selector)
if err != nil {
klog.Errorf("failed to list VIPs: %v", err)
return err
}
for _, vip := range vips.Items {
for _, vip := range vips {
portName := vip.Labels[util.IpReservedLabel]
portNameSplits := strings.Split(portName, ".")
if len(portNameSplits) >= 2 {
Expand Down Expand Up @@ -681,7 +683,7 @@ func (c *Controller) gcChassis() error {

func (c *Controller) isOVNProvided(providerName string, pod *corev1.Pod) (bool, error) {
ls := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, providerName)]
subnet, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), ls, metav1.GetOptions{})
subnet, err := c.subnetsLister.Get(ls)
if err != nil {
klog.Errorf("parse annotation logical switch %s error %v", ls, err)
return false, err
Expand Down Expand Up @@ -815,15 +817,13 @@ func (c *Controller) gcVpcDns() error {
}
}

slrs, err := c.config.KubeOvnClient.KubeovnV1().SwitchLBRules().List(context.Background(), metav1.ListOptions{
LabelSelector: sel.String(),
})
slrs, err := c.switchLBRuleLister.List(sel)
if err != nil {
klog.Errorf("failed to list vpc-dns SwitchLBRules, %s", err)
return err
}

for _, slr := range slrs.Items {
for _, slr := range slrs {
canFind := false
for _, vd := range vds {
name := genVpcDnsDpName(vd.Name)
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (c *Controller) InitDefaultVpc() error {

// InitDefaultLogicalSwitch init the default logical switch for ovn network
func (c *Controller) initDefaultLogicalSwitch() error {
subnet, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), c.config.DefaultLogicalSwitch, metav1.GetOptions{})
subnet, err := c.subnetsLister.Get(c.config.DefaultLogicalSwitch)
if err == nil {
if subnet != nil && util.CheckProtocol(c.config.DefaultCIDR) != util.CheckProtocol(subnet.Spec.CIDRBlock) {
// single-stack upgrade to dual-stack
Expand Down Expand Up @@ -152,7 +152,7 @@ func (c *Controller) initDefaultLogicalSwitch() error {

// InitNodeSwitch init node switch to connect host and pod
func (c *Controller) initNodeSwitch() error {
subnet, err := c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), c.config.NodeSwitch, metav1.GetOptions{})
subnet, err := c.subnetsLister.Get(c.config.NodeSwitch)
if err == nil {
if util.CheckProtocol(c.config.NodeSwitchCIDR) == kubeovnv1.ProtocolDual && util.CheckProtocol(subnet.Spec.CIDRBlock) != kubeovnv1.ProtocolDual {
// single-stack upgrade to dual-stack
Expand Down Expand Up @@ -226,13 +226,13 @@ func (c *Controller) initLB(name, protocol string, sessionAffinity bool) error {

// InitLoadBalancer init the default tcp and udp cluster loadbalancer
func (c *Controller) initLoadBalancer() error {
vpcs, err := c.config.KubeOvnClient.KubeovnV1().Vpcs().List(context.Background(), metav1.ListOptions{})
vpcs, err := c.vpcsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list vpc: %v", err)
return err
}

for _, cachedVpc := range vpcs.Items {
for _, cachedVpc := range vpcs {
vpc := cachedVpc.DeepCopy()
vpcLb := c.GenVpcLoadBalancer(vpc.Name)
if err = c.initLB(vpcLb.TcpLoadBalancer, string(v1.ProtocolTCP), false); err != nil {
Expand Down Expand Up @@ -612,7 +612,7 @@ func (c *Controller) initDefaultVlan() error {

func (c *Controller) initSyncCrdIPs() error {
klog.Info("start to sync ips")
ips, err := c.config.KubeOvnClient.KubeovnV1().IPs().List(context.Background(), metav1.ListOptions{})
ips, err := c.ipsLister.List(labels.Everything())
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
Expand All @@ -622,7 +622,7 @@ func (c *Controller) initSyncCrdIPs() error {

ipMap := strset.New(c.getVmLsps()...)

for _, ipCr := range ips.Items {
for _, ipCr := range ips {
ip := ipCr.DeepCopy()
changed := false
if ipMap.Has(ip.Name) && ip.Spec.PodType == "" {
Expand Down Expand Up @@ -669,7 +669,7 @@ func (c *Controller) initSyncCrdSubnets() error {

// only sync subnet spec enableEcmp when subnet.Spec.EnableEcmp is false and c.config.EnableEcmp is true
if subnet.Spec.GatewayType == kubeovnv1.GWCentralizedType && !subnet.Spec.EnableEcmp && subnet.Spec.EnableEcmp != c.config.EnableEcmp {
subnet, err = c.config.KubeOvnClient.KubeovnV1().Subnets().Get(context.Background(), subnet.Name, metav1.GetOptions{})
subnet, err = c.subnetsLister.Get(subnet.Name)
if err != nil {
klog.Errorf("failed to get subnet %s: %v", subnet.Name, err)
return err
Expand Down
11 changes: 5 additions & 6 deletions pkg/controller/network_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"unicode"

"github.com/ovn-org/libovsdb/ovsdb"
corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -16,8 +17,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"github.com/ovn-org/libovsdb/ovsdb"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/ovs"
"github.com/kubeovn/kube-ovn/pkg/util"
Expand Down Expand Up @@ -141,8 +140,8 @@ func (c *Controller) handleUpdateNp(key string) error {
return nil
}

c.npKeyMutex.Lock(key)
defer c.npKeyMutex.Unlock(key)
c.npKeyMutex.LockKey(key)
defer func() { _ = c.npKeyMutex.UnlockKey(key) }()
klog.Infof("handle add/update network policy %s", key)

np, err := c.npsLister.NetworkPolicies(namespace).Get(name)
Expand Down Expand Up @@ -564,8 +563,8 @@ func (c *Controller) handleDeleteNp(key string) error {
return nil
}

c.npKeyMutex.Lock(key)
defer c.npKeyMutex.Unlock(key)
c.npKeyMutex.LockKey(key)
defer func() { _ = c.npKeyMutex.UnlockKey(key) }()
klog.Infof("handle delete network policy %s", key)

npName := name
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (c *Controller) createOrUpdateCrdIPs(podName, ip, mac, subnetName, ns, node
if existingCR != nil {
ipCr = *existingCR
} else {
ipCr, err = c.config.KubeOvnClient.KubeovnV1().IPs().Get(context.Background(), ipName, metav1.GetOptions{})
ipCr, err = c.ipsLister.Get(ipName)
if err != nil {
if !k8serrors.IsNotFound(err) {
errMsg := fmt.Errorf("failed to get ip CR %s: %v", ipName, err)
Expand Down
14 changes: 8 additions & 6 deletions pkg/controller/ovn_eip.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import (
"strconv"
"time"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
"github.com/kubeovn/kube-ovn/pkg/util"
)

func (c *Controller) enqueueAddOvnEip(obj interface{}) {
Expand Down Expand Up @@ -711,24 +713,24 @@ func (c *Controller) isOvnEipNotUse(cachedEip *kubeovnv1.OvnEip) (bool, error) {
switch cachedEip.Status.Type {
case util.DnatUsingEip:
// nat change eip not that fast
dnats, err := c.config.KubeOvnClient.KubeovnV1().OvnDnatRules().List(context.Background(), metav1.ListOptions{})
dnats, err := c.ovnDnatRulesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get ovn dnat list, %v", err)
return false, err
}
for _, item := range dnats.Items {
for _, item := range dnats {
if item.Annotations[util.VpcEipAnnotation] == cachedEip.Name {
return false, nil
}
}
case util.SnatUsingEip:
// nat change eip not that fast
snats, err := c.config.KubeOvnClient.KubeovnV1().OvnSnatRules().List(context.Background(), metav1.ListOptions{})
snats, err := c.ovnSnatRulesLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to get ovn snat, %v", err)
return false, err
}
for _, item := range snats.Items {
for _, item := range snats {
if item.Annotations[util.VpcEipAnnotation] == cachedEip.Name {
return false, nil
}
Expand Down
Loading

0 comments on commit a2b789c

Please sign in to comment.