New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix aggressive poller for non-retriable error #977
Changes from 4 commits
7d4cf46
258d644
18a05c9
c58ee6c
9892de4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -275,7 +275,7 @@ func awaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool { | |
|
||
func getKillSignal() <-chan os.Signal { | ||
c := make(chan os.Signal, 1) | ||
signal.Notify(c, os.Interrupt, syscall.SIGTERM) | ||
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I may lack some context here. But I think it's better to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They are same thing so just make it unified, in os.
|
||
return c | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ import ( | |
"errors" | ||
"fmt" | ||
"sync" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/uber-go/tally" | ||
|
@@ -261,7 +262,12 @@ func (bw *baseWorker) pollTask() { | |
if err != nil && enableVerboseLogging { | ||
bw.logger.Debug("Failed to poll for task.", zap.Error(err)) | ||
} | ||
if err != nil && isServiceTransientError(err) { | ||
if err != nil { | ||
if isNonRetriableError(err) { | ||
bw.logger.Error("Worker received non-retriable error. Shutting down.", zap.Error(err)) | ||
syscall.Kill(syscall.Getpid(), syscall.SIGINT) | ||
return | ||
yycptt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
bw.retrier.Failed() | ||
} else { | ||
bw.retrier.Succeeded() | ||
|
@@ -278,6 +284,18 @@ func (bw *baseWorker) pollTask() { | |
} | ||
} | ||
|
||
func isNonRetriableError(err error) bool { | ||
if err == nil { | ||
return false | ||
} | ||
switch err.(type) { | ||
case *shared.BadRequestError, | ||
*shared.ClientVersionNotSupportedError: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ClientVersionNotSupportedError will not happen in prod. What is the BadRequestError that you want to protect? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is the one help me find the bug. When set badbinary for a domain, all workers with that badbinary should stop working, but without this fix it end up in dead loop polling same error. I checked and believe all other BadRequest that poller can get from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a bug on the auto reset? The auto reset happens when a workflow(marked with a bad binary checksum) try to make progress. But the BadRequestError returns from the frontend and workflow will never make progress. And if we do this, how can it be auto reset? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a bug on auto reset. As discussed, when different binary (either older or newer) worker out, workflow will be reseted There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See if you can add a unit test case to test this logic (i.e. mock return bad request and verify that worker got shutdown). |
||
return true | ||
} | ||
return false | ||
} | ||
|
||
func (bw *baseWorker) processTask(task interface{}) { | ||
defer bw.shutdownWG.Done() | ||
// If the task is from poller, after processing it we would need to request a new poll. Otherwise, the task is from | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this transient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not transient, so return false