From 7d4cf467b5910a5d80613de4d7fca365902d9ac6 Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Thu, 28 May 2020 17:26:17 -0700 Subject: [PATCH 1/2] Fix aggressive poller for non-retriable error --- internal/internal_retry.go | 3 ++- internal/internal_worker_base.go | 18 ++++++++++++- internal/internal_worker_test.go | 28 ++++++++++++++++++++ internal/internal_workflow_client_test.go | 2 +- internal/internal_workflow_testsuite_test.go | 2 +- 5 files changed, 49 insertions(+), 4 deletions(-) diff --git a/internal/internal_retry.go b/internal/internal_retry.go index c64fdb815..56918e2c6 100644 --- a/internal/internal_retry.go +++ b/internal/internal_retry.go @@ -70,7 +70,8 @@ func isServiceTransientError(err error) bool { *s.DomainAlreadyExistsError, *s.QueryFailedError, *s.DomainNotActiveError, - *s.CancellationAlreadyRequestedError: + *s.CancellationAlreadyRequestedError, + *s.ClientVersionNotSupportedError: return false } diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 43bc7ab46..972309a3f 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -260,7 +260,11 @@ func (bw *baseWorker) pollTask() { if err != nil && enableVerboseLogging { bw.logger.Debug("Failed to poll for task.", zap.Error(err)) } - if err != nil && isServiceTransientError(err) { + if err != nil { + if isNonRetriableError(err) { + bw.logger.Error("Worker stop polling task.", zap.Error(err)) + return + } bw.retrier.Failed() } else { bw.retrier.Succeeded() @@ -277,6 +281,18 @@ func (bw *baseWorker) pollTask() { } } +func isNonRetriableError(err error) bool { + if err == nil { + return false + } + switch err.(type) { + case *shared.BadRequestError, + *shared.ClientVersionNotSupportedError: + return true + } + return false +} + func (bw *baseWorker) processTask(task interface{}) { defer bw.shutdownWG.Done() // If the task is from poller, after processing it we would need to request a new poll. Otherwise, the task is from diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 9ab4e117a..4968510ce 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -1308,3 +1308,31 @@ func testEncodeFunctionArgs(dataConverter DataConverter, workflowFunc interface{ } return input } + +func TestIsNonRetriableError(t *testing.T) { + tests := []struct { + err error + expected bool + }{ + { + err: nil, + expected: false, + }, + { + err: &shared.ServiceBusyError{}, + expected: false, + }, + { + err: &shared.BadRequestError{}, + expected: true, + }, + { + err: &shared.ClientVersionNotSupportedError{}, + expected: true, + }, + } + + for _, test := range tests { + require.Equal(t, test.expected, isNonRetriableError(test.err)) + } +} diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 1cba876cf..dc9dcae06 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -934,7 +934,7 @@ func getGetWorkflowExecutionHistoryRequest(filterType shared.HistoryEventFilterT }, WaitForNewEvent: common.BoolPtr(isLongPoll), HistoryEventFilterType: &filterType, - SkipArchival: common.BoolPtr(true), + SkipArchival: common.BoolPtr(true), } return request diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 271618ddf..9384d50d2 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -2841,7 +2841,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_AwaitWithTimeout() { workflowFn := func(ctx Context) (bool, error) { t := NewTimer(ctx, time.Second) value := false - err := Await(ctx, func() bool { return t.IsReady() || value}) + err := Await(ctx, func() bool { return t.IsReady() || value }) return value, err } From 18a05c92e91cd6aaae721114842e1fe6ca7e7186 Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Wed, 3 Jun 2020 11:00:17 -0700 Subject: [PATCH 2/2] terminate worker for non-retriable error --- internal/internal_utils.go | 2 +- internal/internal_worker_base.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/internal_utils.go b/internal/internal_utils.go index b0a430448..9613983b8 100644 --- a/internal/internal_utils.go +++ b/internal/internal_utils.go @@ -275,7 +275,7 @@ func awaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool { func getKillSignal() <-chan os.Signal { c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) return c } diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 972309a3f..1bd72b55a 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -28,6 +28,7 @@ import ( "errors" "fmt" "sync" + "syscall" "time" "github.com/uber-go/tally" @@ -262,7 +263,8 @@ func (bw *baseWorker) pollTask() { } if err != nil { if isNonRetriableError(err) { - bw.logger.Error("Worker stop polling task.", zap.Error(err)) + bw.logger.Error("Worker received non-retriable error. Shutting down.", zap.Error(err)) + syscall.Kill(syscall.Getpid(), syscall.SIGINT) return } bw.retrier.Failed()