Skip to content

Commit

Permalink
refactor: the ipam now has lock itself no need for ippool queue
Browse files Browse the repository at this point in the history
  • Loading branch information
oilbeater committed Mar 5, 2020
1 parent 1bdb2a8 commit 3c786f5
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 57 deletions.
22 changes: 8 additions & 14 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ type Controller struct {
podsLister v1.PodLister
podsSynced cache.InformerSynced

addPodQueue workqueue.RateLimitingInterface
addIpPoolPodQueue workqueue.RateLimitingInterface
deletePodQueue workqueue.RateLimitingInterface
updatePodQueue workqueue.RateLimitingInterface
addPodQueue workqueue.RateLimitingInterface
deletePodQueue workqueue.RateLimitingInterface
updatePodQueue workqueue.RateLimitingInterface

subnetsLister kubeovnlister.SubnetLister
subnetSynced cache.InformerSynced
Expand Down Expand Up @@ -127,12 +126,11 @@ func NewController(config *Configuration) *Controller {
ipsLister: ipInformer.Lister(),
ipSynced: ipInformer.Informer().HasSynced,

podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
addPodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddPod"),
addIpPoolPodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddIpPoolPod"),
deletePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeletePod"),
updatePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePod"),
podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
addPodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddPod"),
deletePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeletePod"),
updatePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePod"),

namespacesLister: namespaceInformer.Lister(),
namespacesSynced: namespaceInformer.Informer().HasSynced,
Expand Down Expand Up @@ -256,7 +254,6 @@ func (c *Controller) shutdown() {
utilruntime.HandleCrash()

c.addPodQueue.ShutDown()
c.addIpPoolPodQueue.ShutDown()
c.deletePodQueue.ShutDown()
c.updatePodQueue.ShutDown()

Expand Down Expand Up @@ -299,9 +296,6 @@ func (c *Controller) startWorkers(stopCh <-chan struct{}) {
}
}

// run in a single worker to avoid ip conflict
go wait.Until(c.runAddIpPoolPodWorker, time.Second, stopCh)

// run in a single worker to avoid subnet cidr conflict
go wait.Until(c.runAddNamespaceWorker, time.Second, stopCh)

Expand Down
43 changes: 0 additions & 43 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ func (c *Controller) enqueueAddPod(obj interface{}) {
return
}

if p.Annotations[util.IpPoolAnnotation] != "" && p.Annotations[util.IpAddressAnnotation] == "" {
klog.V(3).Infof("enqueue add ip pool address pod %s", key)
c.addIpPoolPodQueue.Add(key)
return
}

klog.V(3).Infof("enqueue add pod %s", key)
c.addPodQueue.Add(key)
}
Expand Down Expand Up @@ -160,11 +154,6 @@ func (c *Controller) runAddPodWorker() {
}
}

func (c *Controller) runAddIpPoolPodWorker() {
for c.processNextAddIpPoolPodWorkItem() {
}
}

func (c *Controller) runDeletePodWorker() {
for c.processNextDeletePodWorkItem() {
}
Expand Down Expand Up @@ -209,38 +198,6 @@ func (c *Controller) processNextAddPodWorkItem() bool {
return true
}

func (c *Controller) processNextAddIpPoolPodWorkItem() bool {
obj, shutdown := c.addIpPoolPodQueue.Get()

if shutdown {
return false
}

err := func(obj interface{}) error {
defer c.addIpPoolPodQueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.addIpPoolPodQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
if err := c.handleAddPod(key); err != nil {
c.addIpPoolPodQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
c.addIpPoolPodQueue.Forget(obj)
return nil
}(obj)

if err != nil {
utilruntime.HandleError(err)
return true
}

return true
}

func (c *Controller) processNextDeletePodWorkItem() bool {
obj, shutdown := c.deletePodQueue.Get()

Expand Down

0 comments on commit 3c786f5

Please sign in to comment.