Skip to content

Commit

Permalink
Fix queue rate limiting busy loop (#3111)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored and alexshtin committed Jul 29, 2022
1 parent a1d1650 commit 4d7772d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 14 deletions.
24 changes: 15 additions & 9 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,19 +218,19 @@ processorPumpLoop:
}

func (p *queueProcessorBase) processBatch() {

if !p.verifyReschedulerSize() {
return
}

ctx, cancel := context.WithTimeout(context.Background(), loadQueueTaskThrottleRetryDelay)
if err := p.rateLimiter.Wait(ctx); err != nil {
deadline, _ := ctx.Deadline()
p.throttle(deadline.Sub(p.timeSource.Now()))
cancel()
p.notifyNewTask() // re-enqueue the event
return
}
cancel()

if !p.verifyReschedulerSize() {
return
}

p.lastPollTime = p.timeSource.Now()
tasks, more, err := p.ackMgr.readQueueTasks()

Expand Down Expand Up @@ -274,14 +274,20 @@ func (p *queueProcessorBase) verifyReschedulerSize() bool {
p.backoffTimer = nil
}
if !passed && p.backoffTimer == nil {
p.backoffTimer = time.AfterFunc(p.options.PollBackoffInterval(), func() {
p.notifyNewTask() // re-enqueue the event
})
p.throttle(p.options.PollBackoffInterval())
}

return passed
}

func (p *queueProcessorBase) throttle(duration time.Duration) {
if p.backoffTimer == nil {
p.backoffTimer = time.AfterFunc(duration, func() {
p.notifyNewTask() // re-enqueue the event
})
}
}

func (p *queueProcessorBase) submitTask(
executable queues.Executable,
) {
Expand Down
11 changes: 6 additions & 5 deletions service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,18 +297,19 @@ func (t *timerQueueProcessorBase) internalProcessor() error {
}

func (t *timerQueueProcessorBase) readAndFanoutTimerTasks() (*time.Time, error) {
if !t.verifyReschedulerSize() {
return nil, nil
}

ctx, cancel := context.WithTimeout(context.Background(), loadTimerTaskThrottleRetryDelay)
if err := t.rateLimiter.Wait(ctx); err != nil {
deadline, _ := ctx.Deadline()
t.notifyNewTimer(deadline) // re-enqueue the event
cancel()
t.notifyNewTimer(time.Time{}) // re-enqueue the event
return nil, nil
}
cancel()

if !t.verifyReschedulerSize() {
return nil, nil
}

t.lastPollTime = t.timeSource.Now()
timerTasks, nextFireTime, moreTasks, err := t.timerQueueAckMgr.readTimerTasks()
if err != nil {
Expand Down

0 comments on commit 4d7772d

Please sign in to comment.