diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index 482221b7df0..3c573d8238a 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -303,7 +303,8 @@ func (t *timerQueueProcessorImpl) completeTimersLoop() { } return case <-timer.C: - if err := backoff.ThrottleRetry(func() error { + // TODO: We should have a better approach to handle shard and its component lifecycle + _ = backoff.ThrottleRetry(func() error { err := t.completeTimers() if err != nil { t.logger.Info("Failed to complete timer task", tag.Error(err)) @@ -316,11 +317,7 @@ func (t *timerQueueProcessorImpl) completeTimersLoop() { default: } return !shard.IsShardOwnershipLostError(err) - }); shard.IsShardOwnershipLostError(err) { - // shard is unloaded, timer processor should quit as well - go t.Stop() - return - } + }) timer.Reset(t.config.TimerProcessorCompleteTimerInterval()) } diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index 10670011de6..754b8337be7 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -297,7 +297,8 @@ func (t *transferQueueProcessorImpl) completeTransferLoop() { } return case <-timer.C: - if err := backoff.ThrottleRetry(func() error { + // TODO: We should have a better approach to handle shard and its component lifecycle + _ = backoff.ThrottleRetry(func() error { err := t.completeTransfer() if err != nil { t.logger.Info("Failed to complete transfer task", tag.Error(err)) @@ -310,11 +311,7 @@ func (t *transferQueueProcessorImpl) completeTransferLoop() { default: } return !shard.IsShardOwnershipLostError(err) - }); shard.IsShardOwnershipLostError(err) { - // shard is unloaded, transfer processor should quit as well - t.Stop() - return - } + }) timer.Reset(t.config.TransferProcessorCompleteTransferInterval()) } diff --git a/service/history/visibilityQueueProcessor.go b/service/history/visibilityQueueProcessor.go index e1b39674de8..b9a5fcc82a1 100644 --- a/service/history/visibilityQueueProcessor.go +++ b/service/history/visibilityQueueProcessor.go @@ -269,7 +269,8 @@ func (t *visibilityQueueProcessorImpl) completeTaskLoop() { } return case <-timer.C: - if err := backoff.ThrottleRetry(func() error { + // TODO: We should have a better approach to handle shard and its component lifecycle + _ = backoff.ThrottleRetry(func() error { err := t.completeTask() if err != nil { t.logger.Info("Failed to complete transfer task", tag.Error(err)) @@ -282,11 +283,7 @@ func (t *visibilityQueueProcessorImpl) completeTaskLoop() { default: } return !shard.IsShardOwnershipLostError(err) - }); shard.IsShardOwnershipLostError(err) { - // shard closed, trigger shutdown and bail out - t.Stop() - return - } + }) timer.Reset(t.config.VisibilityProcessorCompleteTaskInterval()) }