Skip to content

Commit

Permalink
Start queue processor before failover callback registration (#3494)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Sep 26, 2020
1 parent 95261ce commit 41e1a38
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
9 changes: 7 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,16 @@ func (e *historyEngineImpl) Start() {
e.logger.Info("", tag.LifeCycleStarting)
defer e.logger.Info("", tag.LifeCycleStarted)

e.registerNamespaceFailoverCallback()

e.txProcessor.Start()
e.timerProcessor.Start()

// failover callback will try to create a failover queue processor to scan all inflight tasks
// if domain needs to be failovered. However, in the multicursor queue logic, the scan range
// can't be retrieved before the processor is started. If failover callback is registered
// before queue processor is started, it may result in a deadline as to create the failover queue,
// queue processor need to be started.
e.registerNamespaceFailoverCallback()

clusterMetadata := e.shard.GetClusterMetadata()
if e.replicatorProcessor != nil &&
(clusterMetadata.GetReplicationConsumerConfig().Type != config.ReplicationConsumerTypeRPC ||
Expand Down
21 changes: 17 additions & 4 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -70,9 +71,9 @@ type (
ackLevel timerKey
logger log.Logger
matchingClient matching.Client
isStarted int32
isStopped int32
status int32
shutdownChan chan struct{}
shutdownWG sync.WaitGroup
queueTaskProcessor queueTaskProcessor
activeTimerProcessor *timerQueueActiveProcessorImpl
standbyTimerProcessors map[string]*timerQueueStandbyProcessorImpl
Expand Down Expand Up @@ -132,6 +133,7 @@ func newTimerQueueProcessor(
ackLevel: timerKey{VisibilityTimestamp: shard.GetTimerAckLevel()},
logger: logger,
matchingClient: matchingClient,
status: common.DaemonStatusInitialized,
shutdownChan: make(chan struct{}),
queueTaskProcessor: queueTaskProcessor,
activeTimerProcessor: newTimerQueueActiveProcessor(
Expand All @@ -147,7 +149,7 @@ func newTimerQueueProcessor(
}

func (t *timerQueueProcessorImpl) Start() {
if !atomic.CompareAndSwapInt32(&t.isStarted, 0, 1) {
if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}
t.activeTimerProcessor.Start()
Expand All @@ -156,11 +158,13 @@ func (t *timerQueueProcessorImpl) Start() {
standbyTimerProcessor.Start()
}
}

t.shutdownWG.Add(1)
go t.completeTimersLoop()
}

func (t *timerQueueProcessorImpl) Stop() {
if !atomic.CompareAndSwapInt32(&t.isStopped, 0, 1) {
if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}
t.activeTimerProcessor.Stop()
Expand All @@ -170,6 +174,7 @@ func (t *timerQueueProcessorImpl) Stop() {
}
}
close(t.shutdownChan)
common.AwaitWaitGroup(&t.shutdownWG, time.Minute)
}

// NotifyNewTimers - Notify the processor about the new active / standby timer arrival.
Expand All @@ -196,6 +201,12 @@ func (t *timerQueueProcessorImpl) NotifyNewTimers(
func (t *timerQueueProcessorImpl) FailoverNamespace(
namespaceIDs map[string]struct{},
) {
// Failover queue is used to scan all inflight tasks, if queue processor is not
// started, there's no inflight task and we don't need to create a failover processor.
// Also the HandleAction will be blocked if queue processor processing loop is not running.
if atomic.LoadInt32(&t.status) != common.DaemonStatusStarted {
return
}

minLevel := t.shard.GetTimerClusterAckLevel(t.currentClusterName)
standbyClusterName := t.currentClusterName
Expand Down Expand Up @@ -252,6 +263,8 @@ func (t *timerQueueProcessorImpl) UnlockTaskProcessing() {
}

func (t *timerQueueProcessorImpl) completeTimersLoop() {
defer t.shutdownWG.Done()

timer := time.NewTimer(t.config.TimerProcessorCompleteTimerInterval())
defer timer.Stop()
for {
Expand Down

0 comments on commit 41e1a38

Please sign in to comment.