Skip to content

Commit

Permalink
kube-ovn-controller: fix memory growth caused by unused workqueue
Browse files Browse the repository at this point in the history
Signed-off-by: 张祖建 <zhangzujian.7@gmail.com>
  • Loading branch information
zhangzujian committed Nov 2, 2023
1 parent 3adf719 commit dabe599
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 34 deletions.
59 changes: 25 additions & 34 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,21 @@ type Controller struct {
resetIptablesEipQueue workqueue.RateLimitingInterface
delIptablesEipQueue workqueue.RateLimitingInterface

podAnnotatedIptablesEipLister v1.PodLister
podAnnotatedIptablesEipSynced cache.InformerSynced
addPodAnnotatedIptablesEipQueue workqueue.RateLimitingInterface
updatePodAnnotatedIptablesEipQueue workqueue.RateLimitingInterface
delPodAnnotatedIptablesEipQueue workqueue.RateLimitingInterface
podAnnotatedIptablesEipLister v1.PodLister
podAnnotatedIptablesEipSynced cache.InformerSynced
addPodAnnotatedIptablesEipQueue workqueue.RateLimitingInterface
delPodAnnotatedIptablesEipQueue workqueue.RateLimitingInterface

iptablesFipsLister kubeovnlister.IptablesFIPRuleLister
iptablesFipSynced cache.InformerSynced
addIptablesFipQueue workqueue.RateLimitingInterface
updateIptablesFipQueue workqueue.RateLimitingInterface
delIptablesFipQueue workqueue.RateLimitingInterface

podAnnotatedIptablesFipLister v1.PodLister
podAnnotatedIptablesFipSynced cache.InformerSynced
addPodAnnotatedIptablesFipQueue workqueue.RateLimitingInterface
updatePodAnnotatedIptablesFipQueue workqueue.RateLimitingInterface
delPodAnnotatedIptablesFipQueue workqueue.RateLimitingInterface
podAnnotatedIptablesFipLister v1.PodLister
podAnnotatedIptablesFipSynced cache.InformerSynced
addPodAnnotatedIptablesFipQueue workqueue.RateLimitingInterface
delPodAnnotatedIptablesFipQueue workqueue.RateLimitingInterface

iptablesDnatRulesLister kubeovnlister.IptablesDnatRuleLister
iptablesDnatRuleSynced cache.InformerSynced
Expand Down Expand Up @@ -306,23 +304,21 @@ func NewController(config *Configuration) *Controller {
resetIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "resetIptablesEip"),
delIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delIptablesEip"),

podAnnotatedIptablesEipLister: podAnnotatedIptablesEipInformer.Lister(),
podAnnotatedIptablesEipSynced: podAnnotatedIptablesEipInformer.Informer().HasSynced,
addPodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addPodAnnotatedIptablesEip"),
updatePodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updatePodAnnotatedIptablesEip"),
delPodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delPodAnnotatedIptablesEip"),
podAnnotatedIptablesEipLister: podAnnotatedIptablesEipInformer.Lister(),
podAnnotatedIptablesEipSynced: podAnnotatedIptablesEipInformer.Informer().HasSynced,
addPodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddPodAnnotatedIptablesEip"),
delPodAnnotatedIptablesEipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeletePodAnnotatedIptablesEip"),

iptablesFipsLister: iptablesFipInformer.Lister(),
iptablesFipSynced: iptablesFipInformer.Informer().HasSynced,
addIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addIptablesFip"),
updateIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updateIptablesFip"),
delIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delIptablesFip"),

podAnnotatedIptablesFipLister: podAnnotatedIptablesFipInformer.Lister(),
podAnnotatedIptablesFipSynced: podAnnotatedIptablesFipInformer.Informer().HasSynced,
addPodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "addPodAnnotatedIptablesFip"),
updatePodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "updatePodAnnotatedIptablesFip"),
delPodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "delPodAnnotatedIptablesFip"),
podAnnotatedIptablesFipLister: podAnnotatedIptablesFipInformer.Lister(),
podAnnotatedIptablesFipSynced: podAnnotatedIptablesFipInformer.Informer().HasSynced,
addPodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "AddPodAnnotatedIptablesFip"),
delPodAnnotatedIptablesFipQueue: workqueue.NewNamedRateLimitingQueue(custCrdRateLimiter, "DeletePodAnnotatedIptablesFip"),

iptablesDnatRulesLister: iptablesDnatRuleInformer.Lister(),
iptablesDnatRuleSynced: iptablesDnatRuleInformer.Informer().HasSynced,
Expand Down Expand Up @@ -803,15 +799,12 @@ func (c *Controller) shutdown() {
c.delIptablesSnatRuleQueue.ShutDown()
}

if c.config.PodDefaultFipType == util.IptablesFip {
c.addPodAnnotatedIptablesEipQueue.ShutDown()
c.updatePodAnnotatedIptablesEipQueue.ShutDown()
c.delPodAnnotatedIptablesEipQueue.ShutDown()
c.addPodAnnotatedIptablesEipQueue.ShutDown()
c.delPodAnnotatedIptablesEipQueue.ShutDown()

c.addPodAnnotatedIptablesFipQueue.ShutDown()
c.delPodAnnotatedIptablesFipQueue.ShutDown()

c.addPodAnnotatedIptablesFipQueue.ShutDown()
c.updatePodAnnotatedIptablesFipQueue.ShutDown()
c.delPodAnnotatedIptablesFipQueue.ShutDown()
}
if c.config.EnableNP {
c.updateNpQueue.ShutDown()
c.deleteNpQueue.ShutDown()
Expand Down Expand Up @@ -1005,13 +998,11 @@ func (c *Controller) startWorkers(ctx context.Context) {
go wait.Until(c.runUpdateIptablesSnatRuleWorker, time.Second, ctx.Done())
go wait.Until(c.runDelIptablesSnatRuleWorker, time.Second, ctx.Done())

if c.config.PodDefaultFipType == util.IptablesFip {
go wait.Until(c.runAddPodAnnotatedIptablesEipWorker, time.Second, ctx.Done())
go wait.Until(c.runDelPodAnnotatedIptablesEipWorker, time.Second, ctx.Done())
go wait.Until(c.runAddPodAnnotatedIptablesEipWorker, time.Second, ctx.Done())
go wait.Until(c.runDelPodAnnotatedIptablesEipWorker, time.Second, ctx.Done())

go wait.Until(c.runAddPodAnnotatedIptablesFipWorker, time.Second, ctx.Done())
go wait.Until(c.runDelPodAnnotatedIptablesFipWorker, time.Second, ctx.Done())
}
go wait.Until(c.runAddPodAnnotatedIptablesFipWorker, time.Second, ctx.Done())
go wait.Until(c.runDelPodAnnotatedIptablesFipWorker, time.Second, ctx.Done())
}

func (c *Controller) initResourceOnce() {
Expand Down
14 changes: 14 additions & 0 deletions pkg/controller/pod_iptables_eip.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ func (c *Controller) processNextAddPodAnnotatedIptablesEipWorkItem() bool {
if shutdown {
return false
}

if c.config.PodDefaultFipType != util.IptablesFip {
c.addPodAnnotatedIptablesEipQueue.Forget(obj)
c.addPodAnnotatedIptablesEipQueue.Done(obj)
return true
}

err := func(obj interface{}) error {
defer c.addPodAnnotatedIptablesEipQueue.Done(obj)
var key string
Expand Down Expand Up @@ -195,6 +202,13 @@ func (c *Controller) processNextDeletePodAnnotatedIptablesEipWorkItem() bool {
if shutdown {
return false
}

if c.config.PodDefaultFipType != util.IptablesFip {
c.delPodAnnotatedIptablesEipQueue.Forget(obj)
c.delPodAnnotatedIptablesEipQueue.Done(obj)
return true
}

err := func(obj interface{}) error {
defer c.delPodAnnotatedIptablesEipQueue.Done(obj)
var pod *v1.Pod
Expand Down
14 changes: 14 additions & 0 deletions pkg/controller/pod_iptables_fip.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ func (c *Controller) processNextAddPodAnnotatedIptablesFipWorkItem() bool {
if shutdown {
return false
}

if c.config.PodDefaultFipType != util.IptablesFip {
c.addPodAnnotatedIptablesFipQueue.Forget(obj)
c.addPodAnnotatedIptablesFipQueue.Done(obj)
return true
}

err := func(obj interface{}) error {
defer c.addPodAnnotatedIptablesFipQueue.Done(obj)
var key string
Expand Down Expand Up @@ -182,6 +189,13 @@ func (c *Controller) processNextDeletePodAnnotatedIptablesFipWorkItem() bool {
if shutdown {
return false
}

if c.config.PodDefaultFipType != util.IptablesFip {
c.delPodAnnotatedIptablesFipQueue.Forget(obj)
c.delPodAnnotatedIptablesFipQueue.Done(obj)
return true
}

err := func(obj interface{}) error {
defer c.delPodAnnotatedIptablesFipQueue.Done(obj)
var pod *v1.Pod
Expand Down

0 comments on commit dabe599

Please sign in to comment.