Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,6 @@ public ActivityTask poll() {
.build());
}

if (taskQueueActivitiesPerSecond > 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy-paste cleanup, Issue #204

pollRequest.setTaskQueueMetadata(
TaskQueueMetadata.newBuilder()
.setMaxTasksPerSecond(
DoubleValue.newBuilder().setValue(taskQueueActivitiesPerSecond).build())
.build());
}

if (log.isTraceEnabled()) {
log.trace("poll request begin: " + pollRequest);
}
Expand All @@ -95,8 +87,10 @@ public ActivityTask poll() {
try {
pollSemaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}

try {
response =
service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public LocalActivityWorker.Task poll() {
log.trace("LocalActivity Task poll returned: " + task.getActivityId());
}
return task;

} catch (InterruptedException e) {
throw new RuntimeException("local activity poll task interrupted", e);
Thread.currentThread().interrupt();
return null;
}
}

Expand All @@ -67,6 +67,7 @@ public Boolean apply(LocalActivityWorker.Task task, Duration maxWaitAllowed) {
}
return accepted;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here previously we potentially were completely loosing an interrupted flag of a worker thread. [WorkflowWorker -> ReplayWorkflowTaskHandler -> ReplayWorkflowRunTaskHandler -> LocalActivityPollTask#apply]

return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@
public final class Poller<T> implements SuspendableWorker {

public interface PollTask<TT> {
/**
* Pollers should shade all {@code java.lang.InterruptedException}s and raise {@code
* Thread.interrupted()} flag if the exceptions happen. This follows GRPC stubs approach, see
* {@code io.grpc.stub.ClientCalls#blockingUnaryCall}. Because pollers use GRPC subs anyway,
* this implementation was chosen for consistency. The caller of the poll task is responsible
* for handling the flag
*
* @return result of the task
*/
TT poll();
}

Expand Down Expand Up @@ -93,7 +102,7 @@ public Poller(
@Override
public void start() {
if (log.isInfoEnabled()) {
log.info("start(): " + toString());
log.info("start(): " + this);
}
if (pollerOptions.getMaximumPollRatePerSecond() > 0.0) {
pollRateThrottler =
Expand Down Expand Up @@ -213,13 +222,7 @@ private class PollLoopTask implements Runnable {
@Override
public void run() {
try {
if (pollExecutor.isShutdown()) {
Copy link
Contributor Author

@Spikhalskiy Spikhalskiy May 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed here. This condition was just checked in Poller in finally block AND by the thread pool itself on the task acceptance.

return;
}
pollBackoffThrottler.throttle();
if (pollExecutor.isTerminating()) {
return;
}
if (pollRateThrottler != null) {
pollRateThrottler.throttle();
}
Expand All @@ -232,27 +235,45 @@ public void run() {
suspender.await();
}

if (pollExecutor.isShutdown()) {
if (shouldTerminate()) {
return;
}

task.run();
pollBackoffThrottler.success();
} catch (InterruptedException e) {
// we restore the flag here, so it can be checked and processed (with exit) in finally
Thread.currentThread().interrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also do pollBackoffThrottler.failure() to avoid leaking?

Copy link
Contributor Author

@Spikhalskiy Spikhalskiy May 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say no. InterruptedException is not really something that should trigger a backoff, it's not really a "failure" of any sort, it's a sign that we should terminate the thread because something is canceled or shutting down.
But I don't think it's very important or critical here, we can add it if you think it looks cleaner this way.

Copy link
Contributor

@vitarb vitarb May 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree, main use case for this is probably shutdown and it doesn't really matter what happens with counters at that point, I just didn't want to have a code path where we are not incrementing the counter back. Not a blocker if you feel strongly about it though.

Copy link
Contributor Author

@Spikhalskiy Spikhalskiy May 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually not fully getting what do you mean by "leaking" and "incrementing the counter back". It sounds like you expect throttler to have some counter that decreases before the execution and increases at the end, right? And you feel uneasy for it to stuck in a broken state when we decreased it but didn't increase back.
But it's actually not how the throttler works inside. It has .failure() that counts the number of failures that happened in a row and .success() that flushes the counter of failures. I don't think either of these methods should really be called here. And there is no anything that looks like a leak happening here or I don't see it (.throttle doesn't really modify any counters inside).
So, yeah, let's maybe just leave it as it is.

} catch (Throwable e) {
pollBackoffThrottler.failure();
if (!pollExecutor.isShutdown() && !pollExecutor.isTerminating()
|| !(e.getCause() instanceof InterruptedException)
&& !(e instanceof RejectedExecutionException)) {
// if we are terminating and getting rejected execution - it's normal
if (!pollExecutor.isTerminating() || !(e instanceof RejectedExecutionException)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pollExecutor.isTerminating() is covering both pollExecutor.isShutdown() and pollExecutor.isTerminating().
|| !(e.getCause() instanceof InterruptedException) was here for handling what LocalActtivityPollTask was doing here:

catch (InterruptedException e) {
      throw new RuntimeException("local activity poll task interrupted", e);
}

It's now replaced with a proper interrupted flag handling.

uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
}
} finally {
// Resubmit itself back to pollExecutor
if (!pollExecutor.isTerminating() && !pollExecutor.isShutdown()) {
if (!shouldTerminate()) {
// Resubmit itself back to pollExecutor
pollExecutor.execute(this);
} else {
log.info("poll loop done");
log.info("poll loop is terminated");
}
}
}

/**
* Defines if the task should be terminated.
*
* This method preserves the interrupted flag of the current thread.
*
* @return true if pollExecutor is terminating, or the current thread is interrupted.
*/
private boolean shouldTerminate() {
boolean threadIsInterrupted = Thread.interrupted();
if (threadIsInterrupted) {
Thread.currentThread().interrupt();
}
return pollExecutor.isTerminating() || threadIsInterrupted;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pollExecutor.isTerminating() is covering both pollExecutor.isShutdown() and pollExecutor.isTerminating()

}
}

private class PollExecutionTask implements Poller.ThrowingRunnable {
Expand Down