Skip to content

Commit

Permalink
Fix failover queue shutdown (#3232)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored and alexshtin committed Aug 16, 2022
1 parent 0bd7435 commit 4426702
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 59 deletions.
4 changes: 0 additions & 4 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ type (
queueShutdown() error
}

timerProcessor interface {
notifyNewTimers(timerTask []tasks.Task)
}

timerQueueAckMgr interface {
getFinishedChan() <-chan struct{}
readTimerTasks() ([]queues.Executable, *time.Time, bool, error)
Expand Down
35 changes: 0 additions & 35 deletions service/history/historyEngineInterfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type (
shard shard.Context
timeSource clock.TimeSource
options *QueueProcessorOptions
processor processor
queueProcessor common.Daemon
logger log.Logger
metricsScope metrics.Scope
rateLimiter quotas.RateLimiter // Read rate limiter
Expand Down Expand Up @@ -89,7 +89,7 @@ func newQueueProcessorBase(
clusterName string,
shard shard.Context,
options *QueueProcessorOptions,
processor processor,
queueProcessor common.Daemon,
queueAckMgr queueAckMgr,
historyCache workflow.Cache,
scheduler queues.Scheduler,
Expand All @@ -100,19 +100,19 @@ func newQueueProcessorBase(
) *queueProcessorBase {

p := &queueProcessorBase{
clusterName: clusterName,
shard: shard,
timeSource: shard.GetTimeSource(),
options: options,
processor: processor,
rateLimiter: rateLimiter,
status: common.DaemonStatusInitialized,
notifyCh: make(chan struct{}, 1),
shutdownCh: make(chan struct{}),
logger: logger,
clusterName: clusterName,
shard: shard,
timeSource: shard.GetTimeSource(),
options: options,
queueProcessor: queueProcessor,
rateLimiter: rateLimiter,
status: common.DaemonStatusInitialized,
notifyCh: make(chan struct{}, 1),
shutdownCh: make(chan struct{}),
logger: logger,
metricsScope: metricsScope,
ackMgr: queueAckMgr,
lastPollTime: time.Time{},
ackMgr: queueAckMgr,
lastPollTime: time.Time{},
readTaskRetrier: backoff.NewRetrier(
common.CreateReadTaskRetryPolicy(),
backoff.SystemClock,
Expand Down Expand Up @@ -186,7 +186,8 @@ processorPumpLoop:
break processorPumpLoop
case <-p.ackMgr.getFinishedChan():
// use a separate gorouting since the caller hold the shutdownWG
go p.Stop()
// stop the entire queue processor, not just processor base.
go p.queueProcessor.Stop()
case <-p.notifyCh:
p.processBatch()
case <-pollTimer.C:
Expand All @@ -204,7 +205,8 @@ processorPumpLoop:
))
if err := p.ackMgr.updateQueueAckLevel(); shard.IsShardOwnershipLostError(err) {
// shard is no longer owned by this instance, bail out
go p.Stop()
// stop the entire queue processor, not just processor base.
go p.queueProcessor.Stop()
break processorPumpLoop
}
}
Expand Down
10 changes: 6 additions & 4 deletions service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type (
logger log.Logger
metricsClient metrics.Client
metricsScope metrics.Scope
timerProcessor timerProcessor
timerProcessor common.Daemon
timerQueueAckMgr timerQueueAckMgr
timerGate timer.Gate
timeSource clock.TimeSource
Expand All @@ -88,7 +88,7 @@ func newTimerQueueProcessorBase(
scope int,
shard shard.Context,
workflowCache workflow.Cache,
timerProcessor timerProcessor,
timerProcessor common.Daemon,
timerQueueAckMgr timerQueueAckMgr,
timerGate timer.Gate,
scheduler queues.Scheduler,
Expand Down Expand Up @@ -245,7 +245,8 @@ func (t *timerQueueProcessorBase) internalProcessor() error {
// timer queue ack manager indicate that all task scanned
// are finished and no more tasks
// use a separate goroutine since the caller hold the shutdownWG
go t.Stop()
// stop the entire timer queue processor, not just processor base.
go t.timerProcessor.Stop()
return nil
case <-t.timerGate.FireChan():
nextFireTime, err := t.readAndFanoutTimerTasks()
Expand Down Expand Up @@ -276,7 +277,8 @@ func (t *timerQueueProcessorBase) internalProcessor() error {
))
if err := t.timerQueueAckMgr.updateAckLevel(); shard.IsShardOwnershipLostError(err) {
// shard is closed, shutdown timerQProcessor and bail out
go t.Stop()
// stop the entire timer queue processor, not just processor base.
go t.timerProcessor.Stop()
return err
}
case <-t.newTimerCh:
Expand Down

0 comments on commit 4426702

Please sign in to comment.