Skip to content

Commit

Permalink
Improve scheduled queue lookahead logic (#3982)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Feb 27, 2023
1 parent edd92bb commit 35dcba8
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 60 deletions.
12 changes: 7 additions & 5 deletions service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func newQueueBase(
executor Executor,
options *Options,
hostReaderRateLimiter quotas.RequestRateLimiter,
completionFn ReaderCompletionFn,
logger log.Logger,
metricsHandler metrics.Handler,
) *queueBase {
Expand Down Expand Up @@ -196,6 +197,7 @@ func newQueueBase(
timeSource,
readerRateLimiter,
monitor,
completionFn,
logger,
metricsHandler,
)
Expand Down Expand Up @@ -281,22 +283,23 @@ func (p *queueBase) FailoverNamespace(
p.rescheduler.Reschedule(namespaceID)
}

func (p *queueBase) processNewRange() error {
func (p *queueBase) processNewRange() {
var newMaxKey tasks.Key
switch categoryType := p.category.Type(); categoryType {
case tasks.CategoryTypeImmediate:
newMaxKey = p.shard.GetImmediateQueueExclusiveHighReadWatermark()
case tasks.CategoryTypeScheduled:
var err error
if newMaxKey, err = p.shard.UpdateScheduledQueueExclusiveHighReadWatermark(); err != nil {
return err
p.logger.Error("Unable to process new range", tag.Error(err))
return
}
default:
panic(fmt.Sprintf("Unknown task category type: %v", categoryType.String()))
}

if !p.nonReadableScope.CanSplitByRange(newMaxKey) {
return nil
return
}

var newReadScope Scope
Expand All @@ -311,7 +314,7 @@ func (p *queueBase) processNewRange() error {
reader, ok := p.readerGroup.ReaderByID(DefaultReaderId)
if !ok {
p.readerGroup.NewReader(DefaultReaderId, newSlice)
return nil
return
}

if now := p.timeSource.Now(); now.After(p.nextForceNewSliceTime) {
Expand All @@ -320,7 +323,6 @@ func (p *queueBase) processNewRange() error {
} else {
reader.MergeSlices(newSlice)
}
return nil
}

func (p *queueBase) checkpoint() {
Expand Down
11 changes: 9 additions & 2 deletions service/history/queues/queue_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (s *queueBaseSuite) TestNewProcessBase_NoPreviousState() {
nil,
s.options,
s.rateLimiter,
NoopReaderCompletionFn,
s.logger,
s.metricsHandler,
)
Expand Down Expand Up @@ -225,6 +226,7 @@ func (s *queueBaseSuite) TestNewProcessBase_WithPreviousState() {
nil,
s.options,
s.rateLimiter,
NoopReaderCompletionFn,
s.logger,
s.metricsHandler,
)
Expand Down Expand Up @@ -284,13 +286,14 @@ func (s *queueBaseSuite) TestStartStop() {
nil,
s.options,
s.rateLimiter,
NoopReaderCompletionFn,
s.logger,
s.metricsHandler,
)

s.mockRescheduler.EXPECT().Start().Times(1)
base.Start()
s.NoError(base.processNewRange())
base.processNewRange()

<-doneCh
<-base.checkpointTimer.C
Expand Down Expand Up @@ -333,12 +336,13 @@ func (s *queueBaseSuite) TestProcessNewRange() {
nil,
s.options,
s.rateLimiter,
NoopReaderCompletionFn,
s.logger,
s.metricsHandler,
)
s.True(base.nonReadableScope.Range.Equals(NewRange(tasks.MinimumKey, tasks.MaximumKey)))

s.NoError(base.processNewRange())
base.processNewRange()
defaultReader, ok := base.readerGroup.ReaderByID(DefaultReaderId)
s.True(ok)
scopes := defaultReader.Scopes()
Expand Down Expand Up @@ -389,6 +393,7 @@ func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks() {
nil,
s.options,
s.rateLimiter,
NoopReaderCompletionFn,
s.logger,
s.metricsHandler,
)
Expand Down Expand Up @@ -461,6 +466,7 @@ func (s *queueBaseSuite) TestCheckPoint_NoPendingTasks() {
nil,
s.options,
s.rateLimiter,
NoopReaderCompletionFn,
s.logger,
s.metricsHandler,
)
Expand Down Expand Up @@ -548,6 +554,7 @@ func (s *queueBaseSuite) TestCheckPoint_MoveSlices() {
nil,
s.options,
s.rateLimiter,
NoopReaderCompletionFn,
s.logger,
s.metricsHandler,
)
Expand Down
10 changes: 3 additions & 7 deletions service/history/queues/queue_immediate.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func NewImmediateQueue(
executor,
options,
hostRateLimiter,
NoopReaderCompletionFn,
logger,
metricsHandler,
),
Expand Down Expand Up @@ -159,9 +160,7 @@ func (p *immediateQueue) processEventLoop() {
case <-p.shutdownCh:
return
case <-p.notifyCh:
if err := p.processNewRange(); err != nil {
p.logger.Error("Unable to process new range", tag.Error(err))
}
p.processNewRange()
case <-pollTimer.C:
p.processPollTimer(pollTimer)
case <-p.checkpointTimer.C:
Expand All @@ -173,10 +172,7 @@ func (p *immediateQueue) processEventLoop() {
}

func (p *immediateQueue) processPollTimer(pollTimer *time.Timer) {
if err := p.processNewRange(); err != nil {
p.logger.Error("Unable to process new range", tag.Error(err))
}

p.processNewRange()
pollTimer.Reset(backoff.Jitter(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
Expand Down
37 changes: 17 additions & 20 deletions service/history/queues/queue_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type (
newTimeLock sync.Mutex
newTime time.Time

lookAheadCh chan struct{}
lookAheadRateLimitRequest quotas.Request
}
)
Expand Down Expand Up @@ -106,6 +107,18 @@ func NewScheduledQueue(
}
}

lookAheadCh := make(chan struct{}, 1)
readerCompletionFn := func(readerID int32) {
if readerID != DefaultReaderId {
return
}

select {
case lookAheadCh <- struct{}{}:
default:
}
}

return &scheduledQueue{
queueBase: newQueueBase(
shard,
Expand All @@ -117,13 +130,15 @@ func NewScheduledQueue(
executor,
options,
hostRateLimiter,
readerCompletionFn,
logger,
metricsHandler,
),

timerGate: timer.NewLocalGate(shard.GetTimeSource()),
newTimerCh: make(chan struct{}, 1),

lookAheadCh: lookAheadCh,
lookAheadRateLimitRequest: newReaderRequest(DefaultReaderId),
}
}
Expand Down Expand Up @@ -188,6 +203,8 @@ func (p *scheduledQueue) processEventLoop() {
case <-p.newTimerCh:
p.metricsHandler.Counter(metrics.NewTimerNotifyCounter.GetMetricName()).Record(1)
p.processNewTime()
case <-p.lookAheadCh:
p.lookAheadTask()
case <-p.timerGate.FireChan():
p.processNewRange()
case <-p.checkpointTimer.C:
Expand Down Expand Up @@ -222,26 +239,6 @@ func (p *scheduledQueue) processNewTime() {
p.timerGate.Update(newTime)
}

func (p *scheduledQueue) processNewRange() {
if err := p.queueBase.processNewRange(); err != nil {
// This only happens when shard state is invalid,
// in which case no look ahead is needed.
// Notification will be sent when shard is reacquired, but
// still set a max poll timer here as a catch all case.
p.timerGate.Update(p.timeSource.Now().Add(backoff.Jitter(
p.options.MaxPollInterval(),
p.options.MaxPollIntervalJitterCoefficient(),
)))
return
}

// Only do look ahead when shard state is valid.
// When shard is invalid, even look ahead task is found,
// it can't be loaded as scheduled queue max read level can't move
// forward.
p.lookAheadTask()
}

func (p *scheduledQueue) lookAheadTask() {
rateLimitCtx, rateLimitCancel := context.WithTimeout(context.Background(), lookAheadRateLimitDelay)
rateLimitErr := p.readerRateLimiter.Wait(rateLimitCtx, p.lookAheadRateLimitRequest)
Expand Down
13 changes: 0 additions & 13 deletions service/history/queues/queue_scheduled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,19 +245,6 @@ func (s *scheduledQueueSuite) TestLookAheadTask_ErrorLookAhead() {
}
}

func (s *scheduledQueueSuite) TestProcessNewRange_LookAheadPerformed() {
timerGate := timer.NewRemoteGate()
s.scheduledQueue.timerGate = timerGate

// test if look ahead if performed after processing new range
s.mockExecutionManager.EXPECT().GetHistoryTasks(gomock.Any(), gomock.Any()).Return(&persistence.GetHistoryTasksResponse{
Tasks: []tasks.Task{},
NextPageToken: nil,
}, nil).Times(1)

s.scheduledQueue.processNewRange()
}

func (s *scheduledQueueSuite) setupLookAheadMock(
hasLookAheadTask bool,
) (lookAheadRange Range, lookAheadTask *tasks.MockTask) {
Expand Down
18 changes: 18 additions & 0 deletions service/history/queues/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type (

SlicePredicate func(s Slice) bool

ReaderCompletionFn func(readerID int32)

ReaderImpl struct {
sync.Mutex

Expand All @@ -86,6 +88,7 @@ type (
timeSource clock.TimeSource
ratelimiter quotas.RequestRateLimiter
monitor Monitor
completionFn ReaderCompletionFn
logger log.Logger
metricsHandler metrics.Handler

Expand All @@ -106,6 +109,10 @@ type (
}
)

var (
NoopReaderCompletionFn = func(_ int32) {}
)

func NewReader(
readerID int32,
slices []Slice,
Expand All @@ -115,6 +122,7 @@ func NewReader(
timeSource clock.TimeSource,
ratelimiter quotas.RequestRateLimiter,
monitor Monitor,
completionFn ReaderCompletionFn,
logger log.Logger,
metricsHandler metrics.Handler,
) *ReaderImpl {
Expand All @@ -134,6 +142,7 @@ func NewReader(
timeSource: timeSource,
ratelimiter: ratelimiter,
monitor: monitor,
completionFn: completionFn,
logger: log.With(logger, tag.QueueReaderID(readerID)),
metricsHandler: metricsHandler,

Expand Down Expand Up @@ -427,6 +436,7 @@ func (r *ReaderImpl) loadAndSubmitTasks() {
}

if r.nextReadSlice == nil {
r.completionFn(r.readerID)
return
}

Expand Down Expand Up @@ -457,7 +467,11 @@ func (r *ReaderImpl) loadAndSubmitTasks() {

if r.nextReadSlice = r.nextReadSlice.Next(); r.nextReadSlice != nil {
r.notify()
return
}

// no more task to load, trigger completion callback
r.completionFn(r.readerID)
}

func (r *ReaderImpl) resetNextReadSliceLocked() {
Expand All @@ -471,7 +485,11 @@ func (r *ReaderImpl) resetNextReadSliceLocked() {

if r.nextReadSlice != nil {
r.notify()
return
}

// no more task to load, trigger completion callback
r.completionFn(r.readerID)
}

func (r *ReaderImpl) notify() {
Expand Down
Loading

0 comments on commit 35dcba8

Please sign in to comment.