Skip to content

Commit

Permalink
Fix aggressive poller for non-retriable error (#977)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu committed Jun 4, 2020
1 parent 44428ce commit 5cc879b
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 3 deletions.
3 changes: 2 additions & 1 deletion internal/internal_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func isServiceTransientError(err error) bool {
*s.DomainAlreadyExistsError,
*s.QueryFailedError,
*s.DomainNotActiveError,
*s.CancellationAlreadyRequestedError:
*s.CancellationAlreadyRequestedError,
*s.ClientVersionNotSupportedError:
return false
}

Expand Down
2 changes: 1 addition & 1 deletion internal/internal_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func awaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool {

func getKillSignal() <-chan os.Signal {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
return c
}

Expand Down
20 changes: 19 additions & 1 deletion internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"errors"
"fmt"
"sync"
"syscall"
"time"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -261,7 +262,12 @@ func (bw *baseWorker) pollTask() {
if err != nil && enableVerboseLogging {
bw.logger.Debug("Failed to poll for task.", zap.Error(err))
}
if err != nil && isServiceTransientError(err) {
if err != nil {
if isNonRetriableError(err) {
bw.logger.Error("Worker received non-retriable error. Shutting down.", zap.Error(err))
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
return
}
bw.retrier.Failed()
} else {
bw.retrier.Succeeded()
Expand All @@ -278,6 +284,18 @@ func (bw *baseWorker) pollTask() {
}
}

func isNonRetriableError(err error) bool {
if err == nil {
return false
}
switch err.(type) {
case *shared.BadRequestError,
*shared.ClientVersionNotSupportedError:
return true
}
return false
}

func (bw *baseWorker) processTask(task interface{}) {
defer bw.shutdownWG.Done()
// If the task is from poller, after processing it we would need to request a new poll. Otherwise, the task is from
Expand Down
28 changes: 28 additions & 0 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1320,3 +1320,31 @@ func testEncodeFunctionArgs(dataConverter DataConverter, args ...interface{}) []
}
return input
}

func TestIsNonRetriableError(t *testing.T) {
tests := []struct {
err error
expected bool
}{
{
err: nil,
expected: false,
},
{
err: &shared.ServiceBusyError{},
expected: false,
},
{
err: &shared.BadRequestError{},
expected: true,
},
{
err: &shared.ClientVersionNotSupportedError{},
expected: true,
},
}

for _, test := range tests {
require.Equal(t, test.expected, isNonRetriableError(test.err))
}
}

0 comments on commit 5cc879b

Please sign in to comment.