Skip to content

Commit

Permalink
add key lock for more resources
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangzujian committed May 10, 2023
1 parent e1154ac commit cdca437
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 12 deletions.
21 changes: 16 additions & 5 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Controller struct {
addOrUpdateVpcQueue workqueue.RateLimitingInterface
delVpcQueue workqueue.RateLimitingInterface
updateVpcStatusQueue workqueue.RateLimitingInterface
vpcKeyMutex keymutex.KeyMutex

vpcNatGatewayLister kubeovnlister.VpcNatGatewayLister
vpcNatGatewaySynced cache.InformerSynced
Expand Down Expand Up @@ -101,7 +102,7 @@ type Controller struct {
deleteSubnetQueue workqueue.RateLimitingInterface
updateSubnetStatusQueue workqueue.RateLimitingInterface
syncVirtualPortsQueue workqueue.RateLimitingInterface
subnetStatusKeyMutex keymutex.KeyMutex
subnetKeyMutex keymutex.KeyMutex

ipsLister kubeovnlister.IPLister
ipSynced cache.InformerSynced
Expand Down Expand Up @@ -174,35 +175,39 @@ type Controller struct {
updateOvnDnatRuleQueue workqueue.RateLimitingInterface
delOvnDnatRuleQueue workqueue.RateLimitingInterface

vlansLister kubeovnlister.VlanLister
vlanSynced cache.InformerSynced

providerNetworksLister kubeovnlister.ProviderNetworkLister
providerNetworkSynced cache.InformerSynced

vlansLister kubeovnlister.VlanLister
vlanSynced cache.InformerSynced
addVlanQueue workqueue.RateLimitingInterface
delVlanQueue workqueue.RateLimitingInterface
updateVlanQueue workqueue.RateLimitingInterface
vlanKeyMutex keymutex.KeyMutex

namespacesLister v1.NamespaceLister
namespacesSynced cache.InformerSynced
addNamespaceQueue workqueue.RateLimitingInterface
nsKeyMutex keymutex.KeyMutex

nodesLister v1.NodeLister
nodesSynced cache.InformerSynced
addNodeQueue workqueue.RateLimitingInterface
updateNodeQueue workqueue.RateLimitingInterface
deleteNodeQueue workqueue.RateLimitingInterface
nodeKeyMutex keymutex.KeyMutex

servicesLister v1.ServiceLister
serviceSynced cache.InformerSynced
addServiceQueue workqueue.RateLimitingInterface
deleteServiceQueue workqueue.RateLimitingInterface
updateServiceQueue workqueue.RateLimitingInterface
svcKeyMutex keymutex.KeyMutex

endpointsLister v1.EndpointsLister
endpointsSynced cache.InformerSynced
updateEndpointQueue workqueue.RateLimitingInterface
epKeyMutex keymutex.KeyMutex

npsLister netv1.NetworkPolicyLister
npsSynced cache.InformerSynced
Expand Down Expand Up @@ -304,6 +309,7 @@ func Run(ctx context.Context, config *Configuration) {
addOrUpdateVpcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddOrUpdateVpc"),
delVpcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteVpc"),
updateVpcStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVpcStatus"),
vpcKeyMutex: keymutex.NewHashed(numKeyLocks),

vpcNatGatewayLister: vpcNatGatewayInformer.Lister(),
vpcNatGatewaySynced: vpcNatGatewayInformer.Informer().HasSynced,
Expand All @@ -323,7 +329,7 @@ func Run(ctx context.Context, config *Configuration) {
deleteSubnetQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteSubnet"),
updateSubnetStatusQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateSubnetStatus"),
syncVirtualPortsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "SyncVirtualPort"),
subnetStatusKeyMutex: keymutex.NewHashed(numKeyLocks),
subnetKeyMutex: keymutex.NewHashed(numKeyLocks),

ipsLister: ipInformer.Lister(),
ipSynced: ipInformer.Informer().HasSynced,
Expand Down Expand Up @@ -376,6 +382,7 @@ func Run(ctx context.Context, config *Configuration) {
addVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddVlan"),
delVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DelVlan"),
updateVlanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateVlan"),
vlanKeyMutex: keymutex.NewHashed(numKeyLocks),

providerNetworksLister: providerNetworkInformer.Lister(),
providerNetworkSynced: providerNetworkInformer.Informer().HasSynced,
Expand All @@ -393,22 +400,26 @@ func Run(ctx context.Context, config *Configuration) {
namespacesLister: namespaceInformer.Lister(),
namespacesSynced: namespaceInformer.Informer().HasSynced,
addNamespaceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddNamespace"),
nsKeyMutex: keymutex.NewHashed(numKeyLocks),

nodesLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
addNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddNode"),
updateNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateNode"),
deleteNodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteNode"),
nodeKeyMutex: keymutex.NewHashed(numKeyLocks),

servicesLister: serviceInformer.Lister(),
serviceSynced: serviceInformer.Informer().HasSynced,
addServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddService"),
deleteServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeleteService"),
updateServiceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateService"),
svcKeyMutex: keymutex.NewHashed(numKeyLocks),

endpointsLister: endpointInformer.Lister(),
endpointsSynced: endpointInformer.Informer().HasSynced,
updateEndpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdateEndpoint"),
epKeyMutex: keymutex.NewHashed(numKeyLocks),

qosPoliciesLister: qosPolicyInformer.Lister(),
qosPolicySynced: qosPolicyInformer.Informer().HasSynced,
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
klog.Infof("update endpoint %s/%s", namespace, name)

c.epKeyMutex.LockKey(key)
defer func() { _ = c.epKeyMutex.UnlockKey(key) }()
klog.Infof("update add/update endpoint %s/%s", namespace, name)

ep, err := c.endpointsLister.Endpoints(namespace).Get(name)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (c *Controller) processNextAddNamespaceWorkItem() bool {
}

func (c *Controller) handleAddNamespace(key string) error {
c.nsKeyMutex.LockKey(key)
defer func() { _ = c.nsKeyMutex.UnlockKey(key) }()
klog.Infof("handle add/update namespace %s", key)

cachedNs, err := c.namespacesLister.Get(key)
if err != nil {
if errors.IsNotFound(err) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/controller/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ func nodeUnderlayAddressSetName(node string, af int) string {
}

func (c *Controller) handleAddNode(key string) error {
c.nodeKeyMutex.LockKey(key)
defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()

cachedNode, err := c.nodesLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -454,6 +457,10 @@ func (c *Controller) handleNodeAnnotationsForProviderNetworks(node *v1.Node) err
}

func (c *Controller) handleDeleteNode(key string) error {
c.nodeKeyMutex.LockKey(key)
defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
klog.Infof("handle delete node %s", key)

portName := fmt.Sprintf("node-%s", key)
klog.Infof("delete logical switch port %s", portName)
if err := c.ovnClient.DeleteLogicalSwitchPort(portName); err != nil {
Expand Down Expand Up @@ -579,6 +586,10 @@ func (c *Controller) updateProviderNetworkForNodeDeletion(pn *kubeovnv1.Provider
}

func (c *Controller) handleUpdateNode(key string) error {
c.nodeKeyMutex.LockKey(key)
defer func() { _ = c.nodeKeyMutex.UnlockKey(key) }()
klog.Infof("handle update node %s", key)

node, err := c.nodesLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down
20 changes: 19 additions & 1 deletion pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,16 @@ func (c *Controller) processNextUpdateServiceWorkItem() bool {
}

func (c *Controller) handleDeleteService(service *vpcService) error {
key, err := cache.MetaNamespaceKeyFunc(service.Svc)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get meta namespace key of %#v: %v", service.Svc, err))
return nil
}

c.svcKeyMutex.LockKey(key)
defer func() { _ = c.svcKeyMutex.UnlockKey(key) }()
klog.Infof("handle delete service %s", key)

svcs, err := c.servicesLister.Services(v1.NamespaceAll).List(labels.Everything())
if err != nil {
klog.Errorf("failed to list svc, %v", err)
Expand Down Expand Up @@ -297,7 +307,11 @@ func (c *Controller) handleUpdateService(key string) error {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
klog.Infof("update svc %s/%s", namespace, name)

c.svcKeyMutex.LockKey(key)
defer func() { _ = c.svcKeyMutex.UnlockKey(key) }()
klog.Infof("handle update service %s", key)

svc, err := c.servicesLister.Services(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -428,6 +442,10 @@ func (c *Controller) handleAddService(key string) error {
return nil
}

c.svcKeyMutex.LockKey(key)
defer func() { _ = c.svcKeyMutex.UnlockKey(key) }()
klog.Infof("handle add service %s", key)

svc, err := c.servicesLister.Services(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down
11 changes: 7 additions & 4 deletions pkg/controller/subnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,8 @@ func (c Controller) patchSubnetStatus(subnet *kubeovnv1.Subnet, reason string, e
}

func (c *Controller) handleAddOrUpdateSubnet(key string) error {
c.subnetStatusKeyMutex.LockKey(key)
defer func() { _ = c.subnetStatusKeyMutex.UnlockKey(key) }()
c.subnetKeyMutex.LockKey(key)
defer func() { _ = c.subnetKeyMutex.UnlockKey(key) }()

cachedSubnet, err := c.subnetsLister.Get(key)
if err != nil {
Expand Down Expand Up @@ -746,8 +746,8 @@ func (c *Controller) handleAddOrUpdateSubnet(key string) error {
}

func (c *Controller) handleUpdateSubnetStatus(key string) error {
c.subnetStatusKeyMutex.LockKey(key)
defer func() { _ = c.subnetStatusKeyMutex.UnlockKey(key) }()
c.subnetKeyMutex.LockKey(key)
defer func() { _ = c.subnetKeyMutex.UnlockKey(key) }()

cachedSubnet, err := c.subnetsLister.Get(key)
subnet := cachedSubnet.DeepCopy()
Expand Down Expand Up @@ -816,6 +816,9 @@ func (c *Controller) handleDeleteLogicalSwitch(key string) (err error) {
}

func (c *Controller) handleDeleteSubnet(subnet *kubeovnv1.Subnet) error {
c.subnetKeyMutex.LockKey(subnet.Name)
defer func() { _ = c.subnetKeyMutex.UnlockKey(subnet.Name) }()

c.updateVpcStatusQueue.Add(subnet.Spec.Vpc)
klog.Infof("delete u2o interconnection policy route for subnet %s", subnet.Name)
if err := c.deletePolicyRouteForU2OInterconn(subnet); err != nil {
Expand Down
13 changes: 12 additions & 1 deletion pkg/controller/vlan.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,15 @@ func (c *Controller) processNextDelVlanWorkItem() bool {
}

func (c *Controller) handleAddVlan(key string) error {
c.vlanKeyMutex.LockKey(key)
defer func() { _ = c.vlanKeyMutex.UnlockKey(key) }()
klog.Infof("handle add vlan %s", key)

cachedVlan, err := c.vlansLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}

return err
}

Expand Down Expand Up @@ -229,6 +232,10 @@ func (c *Controller) handleAddVlan(key string) error {
}

func (c *Controller) handleUpdateVlan(key string) error {
c.vlanKeyMutex.LockKey(key)
defer func() { _ = c.vlanKeyMutex.UnlockKey(key) }()
klog.Infof("handle update vlan %s", key)

vlan, err := c.vlansLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -263,6 +270,10 @@ func (c *Controller) handleUpdateVlan(key string) error {
}

func (c *Controller) handleDelVlan(key string) error {
c.vlanKeyMutex.LockKey(key)
defer func() { _ = c.vlanKeyMutex.UnlockKey(key) }()
klog.Infof("handle delete vlan %s", key)

subnet, err := c.subnetsLister.List(labels.Everything())
if err != nil {
klog.Errorf("failed to list subnets: %v", err)
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/vpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func (c *Controller) runDelVpcWorker() {
}

func (c *Controller) handleDelVpc(vpc *kubeovnv1.Vpc) error {
c.vpcKeyMutex.LockKey(vpc.Name)
defer func() { _ = c.vpcKeyMutex.UnlockKey(vpc.Name) }()
klog.Infof("handle delete vpc %s", vpc.Name)

if err := c.deleteVpcLb(vpc); err != nil {
return err
}
Expand All @@ -119,6 +123,10 @@ func (c *Controller) handleDelVpc(vpc *kubeovnv1.Vpc) error {
}

func (c *Controller) handleUpdateVpcStatus(key string) error {
c.vpcKeyMutex.LockKey(key)
defer func() { _ = c.vpcKeyMutex.UnlockKey(key) }()
klog.Infof("handle status update for vpc %s", key)

cachedVpc, err := c.vpcsLister.Get(key)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand Down Expand Up @@ -223,6 +231,10 @@ func (c *Controller) addLoadBalancer(vpc string) (*VpcLoadBalancer, error) {
}

func (c *Controller) handleAddOrUpdateVpc(key string) error {
c.vpcKeyMutex.LockKey(key)
defer func() { _ = c.vpcKeyMutex.UnlockKey(key) }()
klog.Infof("handle add/update vpc %s", key)

// get latest vpc info
cachedVpc, err := c.vpcsLister.Get(key)
if err != nil {
Expand Down

0 comments on commit cdca437

Please sign in to comment.