Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public ActivityTask poll() {
ProtobufTimeUtils.toM3Duration(
response.getStartedTime(), response.getCurrentAttemptScheduledTime()));
isSuccessful = true;
return new ActivityTask(response, pollSemaphore::release);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Paired with

if (!isSuccessful) pollSemaphore.release();

in finally, this location of return is cleaner for the reader.

} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE
&& e.getMessage().startsWith("UNAVAILABLE: Channel shutdown")) {
Expand All @@ -119,6 +120,5 @@ public ActivityTask poll() {
} finally {
if (!isSuccessful) pollSemaphore.release();
}
return new ActivityTask(response, pollSemaphore::release);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,18 @@ public void awaitTermination(long timeout, TimeUnit unit) {

@Override
public void suspendPolling() {
log.info("suspendPolling: {}", this);
suspendLatch.set(new CountDownLatch(1));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absense of CAS here may replace and lose the latch which is already used by Pollers to block on. This way they will never be unblocked if suspentPolling was called twice.

if (suspendLatch.compareAndSet(null, new CountDownLatch(1))) {
log.info("Suspend Polling: {}", this);
} else {
log.info("Polling is already suspended: {}", this);
}
}

@Override
public void resumePolling() {
log.info("resumePolling {}", this);
CountDownLatch existing = suspendLatch.getAndSet(null);
if (existing != null) {
log.info("Resume Polling {}", this);
existing.countDown();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,35 +126,33 @@ public Builder setDefaultHeartbeatThrottleInterval(Duration defaultHeartbeatThro
}

public SingleWorkerOptions build() {
PollerOptions pollerOptions = this.pollerOptions;
if (pollerOptions == null) {
pollerOptions =
PollerOptions.newBuilder()
.setPollBackoffInitialInterval(Duration.ofMillis(200))
Copy link
Contributor Author

@Spikhalskiy Spikhalskiy Mar 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code path is dead. We come here only in some tests where we do

        SingleWorkerOptions.newBuilder().build()

and these options and not important for these tests. As a result, these magic values 200 and 20 here are never used and better be removed from the production code paths to don't mislead the reader.
After this cleanup, it's actually clear that we never call PollerOptions#setPollBackoffMaximumInterval anywhere.

.setPollBackoffMaximumInterval(Duration.ofSeconds(20))
.setPollThreadCount(1)
.build();
pollerOptions = PollerOptions.newBuilder().build();
}

DataConverter dataConverter = this.dataConverter;
if (dataConverter == null) {
dataConverter = DataConverter.getDefaultInstance();
}

Scope metricsScope = this.metricsScope;
if (metricsScope == null) {
metricsScope = new NoopScope();
}

return new SingleWorkerOptions(
identity,
binaryChecksum,
this.identity,
this.binaryChecksum,
dataConverter,
taskExecutorThreadPoolSize,
this.taskExecutorThreadPoolSize,
pollerOptions,
metricsScope,
enableLoggingInReplay,
contextPropagators,
defaultDeadlockDetectionTimeout,
maxHeartbeatThrottleInterval,
defaultHeartbeatThrottleInterval);
this.enableLoggingInReplay,
this.contextPropagators,
this.defaultDeadlockDetectionTimeout,
this.maxHeartbeatThrottleInterval,
this.defaultHeartbeatThrottleInterval);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private static SingleWorkerOptions toLocalActivityOptions(
List<ContextPropagator> contextPropagators,
Scope metricsScope) {
return toSingleWorkerOptions(factoryOptions, options, clientOptions, contextPropagators)
.setPollerOptions(PollerOptions.newBuilder().build())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActivityWorker and WorkflowWorker PollerOptions specify this parameter explicitly.
To simplify the untangling for the reader, let's specify it for LocalActivityWorker explicitly too. 1 is the default value for pollThreadCount used inside PollerOptions.

.setPollerOptions(PollerOptions.newBuilder().setPollThreadCount(1).build())
.setTaskExecutorThreadPoolSize(options.getMaxConcurrentLocalActivityExecutionSize())
.setMetricsScope(metricsScope)
.build();
Expand Down