-
Notifications
You must be signed in to change notification settings - Fork 177
Cleanup the way Pollers handle InterruptedExceptions #478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cleanup the way Pollers handle InterruptedExceptions #478
Conversation
.build()); | ||
} | ||
|
||
if (taskQueueActivitiesPerSecond > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copy-paste cleanup, Issue #204
e08d4fc
to
3330fa8
Compare
} | ||
return accepted; | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here previously we potentially were completely loosing an interrupted flag of a worker thread. [WorkflowWorker -> ReplayWorkflowTaskHandler -> ReplayWorkflowRunTaskHandler -> LocalActivityPollTask#apply]
3330fa8
to
cc9dc09
Compare
|| !(e.getCause() instanceof InterruptedException) | ||
&& !(e instanceof RejectedExecutionException)) { | ||
// if we are terminating and getting rejected execution - it's normal | ||
if (!pollExecutor.isTerminating() || !(e instanceof RejectedExecutionException)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pollExecutor.isTerminating()
is covering both pollExecutor.isShutdown()
and pollExecutor.isTerminating()
.
|| !(e.getCause() instanceof InterruptedException)
was here for handling what LocalActtivityPollTask
was doing here:
catch (InterruptedException e) {
throw new RuntimeException("local activity poll task interrupted", e);
}
It's now replaced with a proper interrupted flag handling.
// flush the flag | ||
Thread.currentThread().interrupt(); | ||
} | ||
return pollExecutor.isTerminating() || threadIsInterrupted; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pollExecutor.isTerminating()
is covering both pollExecutor.isShutdown()
and pollExecutor.isTerminating()
cc9dc09
to
f4d2664
Compare
@Override | ||
public void run() { | ||
try { | ||
if (pollExecutor.isShutdown()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not needed here. This condition was just checked in Poller in finally
block AND by the thread pool itself on the task acceptance.
8dee28d
to
451747e
Compare
451747e
to
73dd89e
Compare
pollBackoffThrottler.success(); | ||
} catch (InterruptedException e) { | ||
// we restore the flag here, so it can be checked and processed (with exit) in finally | ||
Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also do pollBackoffThrottler.failure()
to avoid leaking?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say no. InterruptedException is not really something that should trigger a backoff, it's not really a "failure" of any sort, it's a sign that we should terminate the thread because something is canceled or shutting down.
But I don't think it's very important or critical here, we can add it if you think it looks cleaner this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree, main use case for this is probably shutdown and it doesn't really matter what happens with counters at that point, I just didn't want to have a code path where we are not incrementing the counter back. Not a blocker if you feel strongly about it though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually not fully getting what do you mean by "leaking" and "incrementing the counter back". It sounds like you expect throttler
to have some counter that decreases before the execution and increases at the end, right? And you feel uneasy for it to stuck in a broken state when we decreased it but didn't increase back.
But it's actually not how the throttler works inside. It has .failure()
that counts the number of failures that happened in a row and .success()
that flushes the counter of failures. I don't think either of these methods should really be called here. And there is no anything that looks like a leak happening here or I don't see it (.throttle
doesn't really modify any counters inside).
So, yeah, let's maybe just leave it as it is.
@vitarb Thank you for the review! |
What was changed:
Code in Worker's
Poller
andPollerTask
s was cleaned up to one unified way of handlingInterruptedException
.Before
LocalActivityPollTask
andActivityPollTask
had a full mix of correct and incorrect handling ofInterruptedException
:See [the article](https://dzone.com/articles/how-to-handle-the-interruptedexception for more context.) for more context about appropriate ways of handling
InterruptedException
.Now
PollerTask
s are refactored to one approach (same as GRPC) - raising interrupted flag. AndPoller
was refactored to the more explicit way of handling interruptions.Closes
This PR also includes a small cleanup of ActivityPollTask and closes Issue #204