From 41e1a3891337a2d187b2537b60ffaaaca9601b82 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Wed, 23 Sep 2020 19:52:48 -0700 Subject: [PATCH] Start queue processor before failover callback registration (#3494) --- service/history/historyEngine.go | 9 +++++++-- service/history/timerQueueProcessor.go | 21 +++++++++++++++++---- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 9b11da327ae..5f622177648 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -335,11 +335,16 @@ func (e *historyEngineImpl) Start() { e.logger.Info("", tag.LifeCycleStarting) defer e.logger.Info("", tag.LifeCycleStarted) - e.registerNamespaceFailoverCallback() - e.txProcessor.Start() e.timerProcessor.Start() + // failover callback will try to create a failover queue processor to scan all inflight tasks + // if domain needs to be failovered. However, in the multicursor queue logic, the scan range + // can't be retrieved before the processor is started. If failover callback is registered + // before queue processor is started, it may result in a deadline as to create the failover queue, + // queue processor need to be started. + e.registerNamespaceFailoverCallback() + clusterMetadata := e.shard.GetClusterMetadata() if e.replicatorProcessor != nil && (clusterMetadata.GetReplicationConsumerConfig().Type != config.ReplicationConsumerTypeRPC || diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index 1baca2bef9d..eb3e247a716 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -30,6 +30,7 @@ import ( "context" "errors" "fmt" + "sync" "sync/atomic" "time" @@ -70,9 +71,9 @@ type ( ackLevel timerKey logger log.Logger matchingClient matching.Client - isStarted int32 - isStopped int32 + status int32 shutdownChan chan struct{} + shutdownWG sync.WaitGroup queueTaskProcessor queueTaskProcessor activeTimerProcessor *timerQueueActiveProcessorImpl standbyTimerProcessors map[string]*timerQueueStandbyProcessorImpl @@ -132,6 +133,7 @@ func newTimerQueueProcessor( ackLevel: timerKey{VisibilityTimestamp: shard.GetTimerAckLevel()}, logger: logger, matchingClient: matchingClient, + status: common.DaemonStatusInitialized, shutdownChan: make(chan struct{}), queueTaskProcessor: queueTaskProcessor, activeTimerProcessor: newTimerQueueActiveProcessor( @@ -147,7 +149,7 @@ func newTimerQueueProcessor( } func (t *timerQueueProcessorImpl) Start() { - if !atomic.CompareAndSwapInt32(&t.isStarted, 0, 1) { + if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return } t.activeTimerProcessor.Start() @@ -156,11 +158,13 @@ func (t *timerQueueProcessorImpl) Start() { standbyTimerProcessor.Start() } } + + t.shutdownWG.Add(1) go t.completeTimersLoop() } func (t *timerQueueProcessorImpl) Stop() { - if !atomic.CompareAndSwapInt32(&t.isStopped, 0, 1) { + if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { return } t.activeTimerProcessor.Stop() @@ -170,6 +174,7 @@ func (t *timerQueueProcessorImpl) Stop() { } } close(t.shutdownChan) + common.AwaitWaitGroup(&t.shutdownWG, time.Minute) } // NotifyNewTimers - Notify the processor about the new active / standby timer arrival. @@ -196,6 +201,12 @@ func (t *timerQueueProcessorImpl) NotifyNewTimers( func (t *timerQueueProcessorImpl) FailoverNamespace( namespaceIDs map[string]struct{}, ) { + // Failover queue is used to scan all inflight tasks, if queue processor is not + // started, there's no inflight task and we don't need to create a failover processor. + // Also the HandleAction will be blocked if queue processor processing loop is not running. + if atomic.LoadInt32(&t.status) != common.DaemonStatusStarted { + return + } minLevel := t.shard.GetTimerClusterAckLevel(t.currentClusterName) standbyClusterName := t.currentClusterName @@ -252,6 +263,8 @@ func (t *timerQueueProcessorImpl) UnlockTaskProcessing() { } func (t *timerQueueProcessorImpl) completeTimersLoop() { + defer t.shutdownWG.Done() + timer := time.NewTimer(t.config.TimerProcessorCompleteTimerInterval()) defer timer.Stop() for {