From 5cc879bc3a9b2258ca569b5cb971231d9de0ff9b Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Thu, 4 Jun 2020 14:38:35 -0700 Subject: [PATCH] Fix aggressive poller for non-retriable error (#977) --- internal/internal_retry.go | 3 ++- internal/internal_utils.go | 2 +- internal/internal_worker_base.go | 20 +++++++++++++++++++- internal/internal_worker_test.go | 28 ++++++++++++++++++++++++++++ 4 files changed, 50 insertions(+), 3 deletions(-) diff --git a/internal/internal_retry.go b/internal/internal_retry.go index c64fdb815..56918e2c6 100644 --- a/internal/internal_retry.go +++ b/internal/internal_retry.go @@ -70,7 +70,8 @@ func isServiceTransientError(err error) bool { *s.DomainAlreadyExistsError, *s.QueryFailedError, *s.DomainNotActiveError, - *s.CancellationAlreadyRequestedError: + *s.CancellationAlreadyRequestedError, + *s.ClientVersionNotSupportedError: return false } diff --git a/internal/internal_utils.go b/internal/internal_utils.go index b0a430448..9613983b8 100644 --- a/internal/internal_utils.go +++ b/internal/internal_utils.go @@ -275,7 +275,7 @@ func awaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool { func getKillSignal() <-chan os.Signal { c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) return c } diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 13b727dd8..bf0b86f7c 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -28,6 +28,7 @@ import ( "errors" "fmt" "sync" + "syscall" "time" "github.com/uber-go/tally" @@ -261,7 +262,12 @@ func (bw *baseWorker) pollTask() { if err != nil && enableVerboseLogging { bw.logger.Debug("Failed to poll for task.", zap.Error(err)) } - if err != nil && isServiceTransientError(err) { + if err != nil { + if isNonRetriableError(err) { + bw.logger.Error("Worker received non-retriable error. Shutting down.", zap.Error(err)) + syscall.Kill(syscall.Getpid(), syscall.SIGINT) + return + } bw.retrier.Failed() } else { bw.retrier.Succeeded() @@ -278,6 +284,18 @@ func (bw *baseWorker) pollTask() { } } +func isNonRetriableError(err error) bool { + if err == nil { + return false + } + switch err.(type) { + case *shared.BadRequestError, + *shared.ClientVersionNotSupportedError: + return true + } + return false +} + func (bw *baseWorker) processTask(task interface{}) { defer bw.shutdownWG.Done() // If the task is from poller, after processing it we would need to request a new poll. Otherwise, the task is from diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index f82c5a52b..e068ed081 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -1320,3 +1320,31 @@ func testEncodeFunctionArgs(dataConverter DataConverter, args ...interface{}) [] } return input } + +func TestIsNonRetriableError(t *testing.T) { + tests := []struct { + err error + expected bool + }{ + { + err: nil, + expected: false, + }, + { + err: &shared.ServiceBusyError{}, + expected: false, + }, + { + err: &shared.BadRequestError{}, + expected: true, + }, + { + err: &shared.ClientVersionNotSupportedError{}, + expected: true, + }, + } + + for _, test := range tests { + require.Equal(t, test.expected, isNonRetriableError(test.err)) + } +}