Skip to content

Commit

Permalink
Multi-cursor: fix scheduled queue look-ahead
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Sep 10, 2022
1 parent 5389cd5 commit 5138a34
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 55 deletions.
17 changes: 0 additions & 17 deletions service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ type (

checkpointRetrier backoff.Retrier
checkpointTimer *time.Timer
pollTimer *time.Timer

alertCh <-chan *Alert
}
Expand Down Expand Up @@ -263,10 +262,6 @@ func (p *queueBase) Start() {
p.rescheduler.Start()
p.readerGroup.Start()

p.pollTimer = time.NewTimer(backoff.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
p.checkpointTimer = time.NewTimer(backoff.JitDuration(
p.options.CheckpointInterval(),
p.options.CheckpointIntervalJitterCoefficient(),
Expand All @@ -277,7 +272,6 @@ func (p *queueBase) Stop() {
p.monitor.Close()
p.readerGroup.Stop()
p.rescheduler.Stop()
p.pollTimer.Stop()
p.checkpointTimer.Stop()
}

Expand All @@ -299,17 +293,6 @@ func (p *queueBase) UnlockTaskProcessing() {
// no-op
}

func (p *queueBase) processPollTimer() {
if p.lastPollTime.Add(p.options.MaxPollInterval()).Before(p.timeSource.Now()) {
p.processNewRange()
}

p.pollTimer.Reset(backoff.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
}

func (p *queueBase) processNewRange() {
newMaxKey := p.shard.GetQueueExclusiveHighReadWatermark(
p.category,
Expand Down
24 changes: 22 additions & 2 deletions service/history/queues/queue_immediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -145,14 +146,20 @@ func (p *immediateQueue) NotifyNewTasks(_ string, tasks []tasks.Task) {
func (p *immediateQueue) processEventLoop() {
defer p.shutdownWG.Done()

pollTimer := time.NewTimer(backoff.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
defer pollTimer.Stop()

for {
select {
case <-p.shutdownCh:
return
case <-p.notifyCh:
p.processNewRange()
case <-p.pollTimer.C:
p.processPollTimer()
case <-pollTimer.C:
p.processPollTimer(pollTimer)
case <-p.checkpointTimer.C:
p.checkpoint()
case alert := <-p.alertCh:
Expand All @@ -161,6 +168,19 @@ func (p *immediateQueue) processEventLoop() {
}
}

func (p *immediateQueue) processPollTimer(pollTimer *time.Timer) {
// NOTE: this check can actually make the maximum poll interval becomes
// 2 * configured maxPollInterval + jitter
if p.lastPollTime.Add(p.options.MaxPollInterval()).Before(p.timeSource.Now()) {
p.processNewRange()
}

pollTimer.Reset(backoff.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))
}

func (p *immediateQueue) notify() {
select {
case p.notifyCh <- struct{}{}:
Expand Down
19 changes: 10 additions & 9 deletions service/history/queues/queue_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -193,8 +194,6 @@ func (p *scheduledQueue) processEventLoop() {
p.processNewTime()
case <-p.timerGate.FireChan():
p.processNewRange()
case <-p.pollTimer.C:
p.processPollTimer()
case <-p.checkpointTimer.C:
p.checkpoint()
case alert := <-p.alertCh:
Expand Down Expand Up @@ -232,14 +231,12 @@ func (p *scheduledQueue) processNewRange() {
p.lookAheadTask()
}

func (p *scheduledQueue) processPollTimer() {
p.queueBase.processPollTimer()
p.lookAheadTask()
}

func (p *scheduledQueue) lookAheadTask() {
lookAheadMinTime := p.nonReadableScope.Range.InclusiveMin.FireTime
lookAheadMaxTime := lookAheadMinTime.Add(p.options.MaxPollInterval())
lookAheadMaxTime := lookAheadMinTime.Add(backoff.JitDuration(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
))

ctx, cancel := newQueueIOContext()
defer cancel()
Expand All @@ -264,7 +261,11 @@ func (p *scheduledQueue) lookAheadTask() {
p.timerGate.Update(response.Tasks[0].GetKey().FireTime)
}

// no look ahead task, wait for max poll interval or new task notification
// no look ahead task, next loading will be triggerred at the end of the current
// look ahead window or when new task notification comes
// NOTE: with this we don't need a separate max poll timer, loading will be triggerred
// every maxPollInterval + jitter.
p.timerGate.Update(lookAheadMaxTime)
}

// IsTimeExpired checks if the testing time is equal or before
Expand Down
52 changes: 25 additions & 27 deletions service/history/queues/queue_scheduled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package queues

import (
"context"
"errors"
"math/rand"
"testing"
Expand Down Expand Up @@ -196,11 +197,13 @@ func (s *scheduledQueueSuite) TestLookAheadTask_NoLookAheadTask() {
lookAheadRange, _ := s.setupLookAheadMock(false)
s.scheduledQueue.lookAheadTask()

timerGate.SetCurrentTime(lookAheadRange.InclusiveMin.FireTime.Add(testQueueOptions.MaxPollInterval()))
timerGate.SetCurrentTime(lookAheadRange.InclusiveMin.FireTime.Add(time.Duration(
(1 + testQueueOptions.MaxPollIntervalJitterCoefficient()) * float64(testQueueOptions.MaxPollInterval()),
)))
select {
case <-s.scheduledQueue.timerGate.FireChan():
s.Fail("timer gate should not fire")
default:
s.Fail("timer gate should fire at the end of look ahead window")
}
}

Expand Down Expand Up @@ -238,20 +241,6 @@ func (s *scheduledQueueSuite) TestProcessNewRange_LookAheadPerformed() {
s.scheduledQueue.processNewRange()
}

func (s *scheduledQueueSuite) TestProcessPollTimer_LookAheadPerformed() {
timerGate := timer.NewRemoteGate()
s.scheduledQueue.timerGate = timerGate
s.scheduledQueue.pollTimer = time.NewTimer(time.Second)

// test if look ahead if performed after processing poll timer
s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), gomock.Any()).Return(&persistence.GetHistoryTasksResponse{
Tasks: []tasks.Task{},
NextPageToken: nil,
}, nil).Times(1)

s.scheduledQueue.processPollTimer()
}

func (s *scheduledQueueSuite) setupLookAheadMock(
hasLookAheadTask bool,
) (lookAheadRange Range, lookAheadTask *tasks.MockTask) {
Expand All @@ -269,17 +258,26 @@ func (s *scheduledQueueSuite) setupLookAheadMock(
loadedTasks = append(loadedTasks, lookAheadTask)
}

s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{
ShardID: s.mockShard.GetShardID(),
TaskCategory: tasks.CategoryTimer,
InclusiveMinTaskKey: lookAheadRange.InclusiveMin,
ExclusiveMaxTaskKey: lookAheadRange.ExclusiveMax,
BatchSize: 1,
NextPageToken: nil,
}).Return(&persistence.GetHistoryTasksResponse{
Tasks: loadedTasks,
NextPageToken: nil,
}, nil).Times(1)
s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, request *persistence.GetHistoryTasksRequest) (*persistence.GetHistoryTasksResponse, error) {
s.Equal(s.mockShard.GetShardID(), request.ShardID)
s.Equal(tasks.CategoryTimer, request.TaskCategory)
s.Equal(lookAheadRange.InclusiveMin, request.InclusiveMinTaskKey)
s.Equal(1, request.BatchSize)
s.Nil(request.NextPageToken)

s.Equal(lookAheadRange.ExclusiveMax.TaskID, request.ExclusiveMaxTaskKey.TaskID)
fireTimeDifference := request.ExclusiveMaxTaskKey.FireTime.Sub(lookAheadRange.ExclusiveMax.FireTime)
if fireTimeDifference < 0 {
fireTimeDifference = -fireTimeDifference
}
maxAllowedFireTimeDifference := time.Duration(float64(testQueueOptions.MaxPollInterval()) * testQueueOptions.MaxPollIntervalJitterCoefficient())
s.LessOrEqual(fireTimeDifference, maxAllowedFireTimeDifference)

return &persistence.GetHistoryTasksResponse{
Tasks: loadedTasks,
NextPageToken: nil,
}, nil
}).Times(1)

return lookAheadRange, lookAheadTask
}

0 comments on commit 5138a34

Please sign in to comment.