From eef64c19b18cf25f3a13152889e051b5b457539a Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Sat, 9 May 2020 18:15:07 -0700 Subject: [PATCH 1/4] Added poll thread count options --- .../internal/metrics/ServiceMethod.java | 99 ------------------- .../internal/sync/SyncActivityWorker.java | 5 +- .../internal/worker/ActivityPollTask.java | 12 ++- .../internal/worker/ActivityWorker.java | 6 +- .../testing/TestEnvironmentOptions.java | 8 +- src/main/java/io/temporal/worker/Worker.java | 22 ++++- .../io/temporal/worker/WorkerFactory.java | 7 +- .../temporal/worker/WorkerFactoryOptions.java | 34 ++++++- .../io/temporal/worker/WorkerOptions.java | 82 ++++++++++++++- .../worker/WorkerPollerThreadCountTest.java | 98 ++++++++++++++++++ .../io/temporal/worker/WorkerStressTests.java | 9 +- 11 files changed, 257 insertions(+), 125 deletions(-) delete mode 100644 src/main/java/io/temporal/internal/metrics/ServiceMethod.java create mode 100644 src/test/java/io/temporal/worker/WorkerPollerThreadCountTest.java diff --git a/src/main/java/io/temporal/internal/metrics/ServiceMethod.java b/src/main/java/io/temporal/internal/metrics/ServiceMethod.java deleted file mode 100644 index 2991398415..0000000000 --- a/src/main/java/io/temporal/internal/metrics/ServiceMethod.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. - * - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Modifications copyright (C) 2017 Uber Technologies, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"). You may not - * use this file except in compliance with the License. A copy of the License is - * located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package io.temporal.internal.metrics; - -/** TODO(maxim): Add gRPC interceptor for metrics */ -public class ServiceMethod { - public static final String DEPRECATE_NAMESPACE = - MetricsType.TEMPORAL_METRICS_PREFIX + "DeprecateNamespace"; - public static final String DESCRIBE_NAMESPACE = - MetricsType.TEMPORAL_METRICS_PREFIX + "DescribeNamespace"; - public static final String LIST_NAMESPACES = - MetricsType.TEMPORAL_METRICS_PREFIX + "ListNamespaces"; - public static final String GET_WORKFLOW_EXECUTION_HISTORY = - MetricsType.TEMPORAL_METRICS_PREFIX + "GetWorkflowExecutionHistory"; - public static final String LIST_CLOSED_WORKFLOW_EXECUTIONS = - MetricsType.TEMPORAL_METRICS_PREFIX + "ListClosedWorkflowExecutions"; - public static final String LIST_OPEN_WORKFLOW_EXECUTIONS = - MetricsType.TEMPORAL_METRICS_PREFIX + "ListOpenWorkflowExecutions"; - public static final String LIST_WORKFLOW_EXECUTIONS = - MetricsType.TEMPORAL_METRICS_PREFIX + "ListWorkflowExecutions"; - public static final String LIST_ARCHIVED_WORKFLOW_EXECUTIONS = - MetricsType.TEMPORAL_METRICS_PREFIX + "ListArchivedWorkflowExecutions"; - public static final String LIST_TASK_LIST_PARTITIONS = - MetricsType.TEMPORAL_METRICS_PREFIX + "ListTaskListPartitions"; - public static final String SCAN_WORKFLOW_EXECUTIONS = - MetricsType.TEMPORAL_METRICS_PREFIX + "ScanWorkflowExecutions"; - public static final String COUNT_WORKFLOW_EXECUTIONS = - MetricsType.TEMPORAL_METRICS_PREFIX + "CountWorkflowExecutions"; - public static final String GET_SEARCH_ATTRIBUTES = - MetricsType.TEMPORAL_METRICS_PREFIX + "GetSearchAttributes"; - public static final String POLL_FOR_ACTIVITY_TASK = - MetricsType.TEMPORAL_METRICS_PREFIX + "PollForActivityTask"; - public static final String POLL_FOR_DECISION_TASK = - MetricsType.TEMPORAL_METRICS_PREFIX + "PollForDecisionTask"; - public static final String RECORD_ACTIVITY_TASK_HEARTBEAT = - MetricsType.TEMPORAL_METRICS_PREFIX + "RecordActivityTaskHeartbeat"; - public static final String RECORD_ACTIVITY_TASK_HEARTBEAT_BY_ID = - MetricsType.TEMPORAL_METRICS_PREFIX + "RecordActivityTaskHeartbeatById"; - public static final String REGISTER_NAMESPACE = - MetricsType.TEMPORAL_METRICS_PREFIX + "RegisterNamespace"; - public static final String REQUEST_CANCEL_WORKFLOW_EXECUTION = - MetricsType.TEMPORAL_METRICS_PREFIX + "RequestCancelWorkflowExecution"; - public static final String RESPOND_ACTIVITY_TASK_CANCELED = - MetricsType.TEMPORAL_METRICS_PREFIX + "RespondActivityTaskCanceled"; - public static final String RESPOND_ACTIVITY_TASK_COMPLETED = - MetricsType.TEMPORAL_METRICS_PREFIX + "RespondActivityTaskCompleted"; - public static final String RESPOND_ACTIVITY_TASK_FAILED = - MetricsType.TEMPORAL_METRICS_PREFIX + "RespondActivityTaskFailed"; - public static final String RESPOND_ACTIVITY_TASK_CANCELED_BY_ID = - MetricsType.TEMPORAL_METRICS_PREFIX + "RespondActivityTaskCanceledById"; - public static final String RESPOND_ACTIVITY_TASK_COMPLETED_BY_ID = - MetricsType.TEMPORAL_METRICS_PREFIX + "RespondActivityTaskCompletedById"; - public static final String RESPOND_ACTIVITY_TASK_FAILED_BY_ID = - MetricsType.TEMPORAL_METRICS_PREFIX + "RespondActivityTaskFailedById"; - public static final String RESPOND_DECISION_TASK_COMPLETED = - MetricsType.TEMPORAL_METRICS_PREFIX + "RespondDecisionTaskCompleted"; - public static final String RESPOND_DECISION_TASK_FAILED = - MetricsType.TEMPORAL_METRICS_PREFIX + "RespondDecisionTaskFailed"; - public static final String SIGNAL_WORKFLOW_EXECUTION = - MetricsType.TEMPORAL_METRICS_PREFIX + "SignalWorkflowExecution"; - public static final String SIGNAL_WITH_START_WORKFLOW_EXECUTION = - MetricsType.TEMPORAL_METRICS_PREFIX + "SignalWithStartWorkflowExecution"; - public static final String START_WORKFLOW_EXECUTION = - MetricsType.TEMPORAL_METRICS_PREFIX + "StartWorkflowExecution"; - public static final String TERMINATE_WORKFLOW_EXECUTION = - MetricsType.TEMPORAL_METRICS_PREFIX + "TerminateWorkflowExecution"; - public static final String UPDATE_NAMESPACE = - MetricsType.TEMPORAL_METRICS_PREFIX + "UpdateNamespace"; - public static final String QUERY_WORKFLOW = MetricsType.TEMPORAL_METRICS_PREFIX + "QueryWorkflow"; - public static final String DESCRIBE_TASK_LIST = - MetricsType.TEMPORAL_METRICS_PREFIX + "DescribeTaskList"; - public static final String GET_CLUSTER_INFO = - MetricsType.TEMPORAL_METRICS_PREFIX + "GetClusterInfo"; - public static final String RESPOND_QUERY_TASK_COMPLETED = - MetricsType.TEMPORAL_METRICS_PREFIX + "RespondQueryTaskCompleted"; - public static final String DESCRIBE_WORKFLOW_EXECUTION = - MetricsType.TEMPORAL_METRICS_PREFIX + "DescribeWorkflowExecution"; - public static final String RESET_STICKY_TASK_LIST = - MetricsType.TEMPORAL_METRICS_PREFIX + "ResetStickyTaskList"; - public static final String RESET_WORKFLOW_EXECUTION = - MetricsType.TEMPORAL_METRICS_PREFIX + "ResetWorkflowExecution"; -} diff --git a/src/main/java/io/temporal/internal/sync/SyncActivityWorker.java b/src/main/java/io/temporal/internal/sync/SyncActivityWorker.java index 11e80da83a..069ebd6689 100644 --- a/src/main/java/io/temporal/internal/sync/SyncActivityWorker.java +++ b/src/main/java/io/temporal/internal/sync/SyncActivityWorker.java @@ -39,11 +39,14 @@ public SyncActivityWorker( WorkflowServiceStubs service, String namespace, String taskList, + double taskListActivitiesPerSecond, SingleWorkerOptions options) { taskHandler = new POJOActivityTaskHandler( service, namespace, options.getDataConverter(), heartbeatExecutor); - worker = new ActivityWorker(service, namespace, taskList, options, taskHandler); + worker = + new ActivityWorker( + service, namespace, taskList, taskListActivitiesPerSecond, options, taskHandler); } public void setActivitiesImplementation(Object... activitiesImplementation) { diff --git a/src/main/java/io/temporal/internal/worker/ActivityPollTask.java b/src/main/java/io/temporal/internal/worker/ActivityPollTask.java index d5f526176f..b265297f11 100644 --- a/src/main/java/io/temporal/internal/worker/ActivityPollTask.java +++ b/src/main/java/io/temporal/internal/worker/ActivityPollTask.java @@ -40,17 +40,20 @@ final class ActivityPollTask implements Poller.PollTask 0) { + pollRequest.setTaskListMetadata( + TaskListMetadata.newBuilder() + .setMaxTasksPerSecond( + DoubleValue.newBuilder().setValue(taskListActivitiesPerSecond).build()) + .build()); + } if (options.getTaskListActivitiesPerSecond() > 0) { pollRequest.setTaskListMetadata( diff --git a/src/main/java/io/temporal/internal/worker/ActivityWorker.java b/src/main/java/io/temporal/internal/worker/ActivityWorker.java index 8d0545b51b..232e496bd8 100644 --- a/src/main/java/io/temporal/internal/worker/ActivityWorker.java +++ b/src/main/java/io/temporal/internal/worker/ActivityWorker.java @@ -56,16 +56,19 @@ public final class ActivityWorker implements SuspendableWorker { private final String namespace; private final String taskList; private final SingleWorkerOptions options; + private final double taskListActivitiesPerSecond; public ActivityWorker( WorkflowServiceStubs service, String namespace, String taskList, + double taskListActivitiesPerSecond, SingleWorkerOptions options, ActivityTaskHandler handler) { this.service = Objects.requireNonNull(service); this.namespace = Objects.requireNonNull(namespace); this.taskList = Objects.requireNonNull(taskList); + this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; this.handler = handler; PollerOptions pollerOptions = options.getPollerOptions(); @@ -85,7 +88,8 @@ public void start() { poller = new Poller<>( options.getIdentity(), - new ActivityPollTask(service, namespace, taskList, options), + new ActivityPollTask( + service, namespace, taskList, options, taskListActivitiesPerSecond), new PollTaskExecutor<>(namespace, taskList, options, new TaskHandlerImpl(handler)), options.getPollerOptions(), options.getMetricsScope()); diff --git a/src/main/java/io/temporal/testing/TestEnvironmentOptions.java b/src/main/java/io/temporal/testing/TestEnvironmentOptions.java index 68994ea458..152c18d960 100644 --- a/src/main/java/io/temporal/testing/TestEnvironmentOptions.java +++ b/src/main/java/io/temporal/testing/TestEnvironmentOptions.java @@ -83,12 +83,8 @@ public TestEnvironmentOptions build() { public TestEnvironmentOptions validateAndBuildWithDefaults() { return new TestEnvironmentOptions( - workflowClientOptions == null - ? WorkflowClientOptions.newBuilder().validateAndBuildWithDefaults() - : workflowClientOptions, - workerFactoryOptions == null - ? WorkerFactoryOptions.newBuilder().validateAndBuildWithDefaults() - : workerFactoryOptions, + WorkflowClientOptions.newBuilder(workflowClientOptions).validateAndBuildWithDefaults(), + WorkerFactoryOptions.newBuilder(workerFactoryOptions).validateAndBuildWithDefaults(), metricsScope == null ? new NoopScope() : metricsScope); } } diff --git a/src/main/java/io/temporal/worker/Worker.java b/src/main/java/io/temporal/worker/Worker.java index 2b5905dd69..093353dd05 100644 --- a/src/main/java/io/temporal/worker/Worker.java +++ b/src/main/java/io/temporal/worker/Worker.java @@ -92,7 +92,8 @@ public final class Worker implements Suspendable { this.taskList = taskList; this.options = WorkerOptions.newBuilder(options).validateAndBuildWithDefaults(); - this.factoryOptions = factoryOptions; + this.factoryOptions = + WorkerFactoryOptions.newBuilder(factoryOptions).validateAndBuildWithDefaults(); WorkflowServiceStubs service = client.getWorkflowServiceStubs(); WorkflowClientOptions clientOptions = client.getOptions(); String namespace = clientOptions.getNamespace(); @@ -105,7 +106,13 @@ public final class Worker implements Suspendable { taskList, contextPropagators, metricsScope); - activityWorker = new SyncActivityWorker(service, namespace, taskList, activityOptions); + activityWorker = + new SyncActivityWorker( + service, + namespace, + taskList, + this.options.getTaskListActivitiesPerSecond(), + activityOptions); SingleWorkerOptions workflowOptions = toWorkflowOptions( @@ -128,12 +135,13 @@ public final class Worker implements Suspendable { service, namespace, taskList, - factoryOptions.getWorkflowInterceptor(), + this.factoryOptions.getWorkflowInterceptor(), workflowOptions, localActivityOptions, this.cache, this.stickyTaskListName, - Duration.ofSeconds(factoryOptions.getStickyDecisionScheduleToStartTimeoutInSeconds()), + Duration.ofSeconds( + this.factoryOptions.getStickyDecisionScheduleToStartTimeoutInSeconds()), this.threadPoolExecutor); } @@ -155,6 +163,7 @@ private static SingleWorkerOptions toActivityOptions( .setPollerOptions( PollerOptions.newBuilder() .setMaximumPollRatePerSecond(options.getMaxActivitiesPerSecond()) + .setPollThreadCount(options.getActivityPollThreadCount()) .build()) .setTaskExecutorThreadPoolSize(options.getMaxConcurrentActivityExecutionSize()) .setMetricsScope(metricsScope.tagged(tags)) @@ -178,7 +187,10 @@ private static SingleWorkerOptions toWorkflowOptions( return SingleWorkerOptions.newBuilder() .setDataConverter(clientOptions.getDataConverter()) .setIdentity(clientOptions.getIdentity()) - .setPollerOptions(PollerOptions.newBuilder().build()) + .setPollerOptions( + PollerOptions.newBuilder() + .setPollThreadCount(options.getWorkflowPollThreadCount()) + .build()) .setTaskExecutorThreadPoolSize(options.getMaxConcurrentWorkflowTaskExecutionSize()) .setMetricsScope(metricsScope.tagged(tags)) .setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay()) diff --git a/src/main/java/io/temporal/worker/WorkerFactory.java b/src/main/java/io/temporal/worker/WorkerFactory.java index 0403f87c3c..6c907678fa 100644 --- a/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/src/main/java/io/temporal/worker/WorkerFactory.java @@ -57,6 +57,8 @@ public static WorkerFactory newInstance( return new WorkerFactory(workflowClient, options); } + private static final String POLL_THREAD_NAME = "Host Local Workflow Poller"; + private final List workers = new ArrayList<>(); private final WorkflowClient workflowClient; private final UUID id = @@ -122,7 +124,10 @@ private WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factor id.toString()) .get(), dispatcher, - PollerOptions.newBuilder().build(), + PollerOptions.newBuilder() + .setPollThreadNamePrefix(POLL_THREAD_NAME) + .setPollThreadCount(this.factoryOptions.getWorkflowHostLocalPollThreadCount()) + .build(), metricsScope); } diff --git a/src/main/java/io/temporal/worker/WorkerFactoryOptions.java b/src/main/java/io/temporal/worker/WorkerFactoryOptions.java index 3c984dfcca..31a4335595 100644 --- a/src/main/java/io/temporal/worker/WorkerFactoryOptions.java +++ b/src/main/java/io/temporal/worker/WorkerFactoryOptions.java @@ -19,6 +19,7 @@ package io.temporal.worker; +import com.google.common.base.Preconditions; import io.temporal.common.interceptors.NoopWorkflowInterceptor; import io.temporal.common.interceptors.WorkflowInterceptor; @@ -36,6 +37,8 @@ public static WorkerFactoryOptions getDefaultInstance() { return DEFAULT_INSTANCE; } + private static final int DEFAULT_HOST_LOCAL_WORKFLOW_POLL_THREAD_COUNT = 5; + private static final WorkerFactoryOptions DEFAULT_INSTANCE; static { @@ -48,6 +51,7 @@ public static class Builder { private int maxWorkflowThreadCount; private WorkflowInterceptor workflowInterceptor; private boolean enableLoggingInReplay; + private int workflowHostLocalPollThreadCount; private Builder() {} @@ -61,6 +65,7 @@ private Builder(WorkerFactoryOptions options) { this.maxWorkflowThreadCount = options.maxWorkflowThreadCount; this.workflowInterceptor = options.workflowInterceptor; this.enableLoggingInReplay = options.enableLoggingInReplay; + this.workflowHostLocalPollThreadCount = options.workflowHostLocalPollThreadCount; } /** @@ -103,6 +108,11 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) { return this; } + public Builder setWorkflowHostLocalPollThreadCount(int workflowHostLocalPollThreadCount) { + this.workflowHostLocalPollThreadCount = workflowHostLocalPollThreadCount; + return this; + } + public WorkerFactoryOptions build() { return new WorkerFactoryOptions( cacheMaximumSize, @@ -110,6 +120,7 @@ public WorkerFactoryOptions build() { stickyDecisionScheduleToStartTimeoutInSeconds, workflowInterceptor, enableLoggingInReplay, + workflowHostLocalPollThreadCount, false); } @@ -120,6 +131,7 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() { stickyDecisionScheduleToStartTimeoutInSeconds, workflowInterceptor, enableLoggingInReplay, + workflowHostLocalPollThreadCount, true); } } @@ -129,6 +141,7 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() { private final int stickyDecisionScheduleToStartTimeoutInSeconds; private final WorkflowInterceptor workflowInterceptor; private final boolean enableLoggingInReplay; + private final int workflowHostLocalPollThreadCount; private WorkerFactoryOptions( int cacheMaximumSize, @@ -136,6 +149,7 @@ private WorkerFactoryOptions( int stickyDecisionScheduleToStartTimeoutInSeconds, WorkflowInterceptor workflowInterceptor, boolean enableLoggingInReplay, + int workflowHostLocalPollThreadCount, boolean validate) { if (validate) { if (cacheMaximumSize <= 0) { @@ -144,12 +158,21 @@ private WorkerFactoryOptions( if (maxWorkflowThreadCount <= 0) { maxWorkflowThreadCount = 600; } - if (stickyDecisionScheduleToStartTimeoutInSeconds <= 0) { + Preconditions.checkState( + stickyDecisionScheduleToStartTimeoutInSeconds >= 0, + "negative stickyDecisionScheduleToStartTimeoutInSeconds"); + + if (stickyDecisionScheduleToStartTimeoutInSeconds == 0) { stickyDecisionScheduleToStartTimeoutInSeconds = 5; } if (workflowInterceptor == null) { workflowInterceptor = new NoopWorkflowInterceptor(); } + Preconditions.checkState( + workflowHostLocalPollThreadCount >= 0, "negative workflowHostLocalPollThreadCount"); + if (workflowHostLocalPollThreadCount == 0) { + workflowHostLocalPollThreadCount = DEFAULT_HOST_LOCAL_WORKFLOW_POLL_THREAD_COUNT; + } } this.cacheMaximumSize = cacheMaximumSize; this.maxWorkflowThreadCount = maxWorkflowThreadCount; @@ -157,6 +180,7 @@ private WorkerFactoryOptions( stickyDecisionScheduleToStartTimeoutInSeconds; this.workflowInterceptor = workflowInterceptor; this.enableLoggingInReplay = enableLoggingInReplay; + this.workflowHostLocalPollThreadCount = workflowHostLocalPollThreadCount; } public int getCacheMaximumSize() { @@ -178,4 +202,12 @@ public WorkflowInterceptor getWorkflowInterceptor() { public boolean isEnableLoggingInReplay() { return enableLoggingInReplay; } + + public int getWorkflowHostLocalPollThreadCount() { + return workflowHostLocalPollThreadCount; + } + + public Builder toBuilder() { + return new Builder(this); + } } diff --git a/src/main/java/io/temporal/worker/WorkerOptions.java b/src/main/java/io/temporal/worker/WorkerOptions.java index 4ffe5e7549..1456c2db2a 100644 --- a/src/main/java/io/temporal/worker/WorkerOptions.java +++ b/src/main/java/io/temporal/worker/WorkerOptions.java @@ -19,6 +19,7 @@ package io.temporal.worker; +import com.google.common.base.Objects; import com.google.common.base.Preconditions; public final class WorkerOptions { @@ -43,11 +44,16 @@ public static WorkerOptions getDefaultInstance() { public static final class Builder { + private static final int DEFAULT_WORKFLOW_POLL_THREAD_COUNT = 2; + private static final int DEFAULT_ACTIVITY_POLL_THREAD_COUNT = 5; + private double maxActivitiesPerSecond; private int maxConcurrentActivityExecutionSize = 100; private int maxConcurrentWorkflowTaskExecutionSize = 50; private int maxConcurrentLocalActivityExecutionSize = 100; private double taskListActivitiesPerSecond = 100000; + private int workflowPollThreadCount; + private int activityPollThreadCount; private Builder() {} @@ -60,6 +66,8 @@ private Builder(WorkerOptions o) { maxConcurrentWorkflowTaskExecutionSize = o.maxConcurrentWorkflowTaskExecutionSize; maxConcurrentLocalActivityExecutionSize = o.maxConcurrentLocalActivityExecutionSize; taskListActivitiesPerSecond = o.taskListActivitiesPerSecond; + workflowPollThreadCount = o.workflowPollThreadCount; + activityPollThreadCount = o.activityPollThreadCount; } /** @@ -126,13 +134,25 @@ public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond return this; } + public Builder setWorkflowPollThreadCount(int workflowPollThreadCount) { + this.workflowPollThreadCount = workflowPollThreadCount; + return this; + } + + public Builder setActivityPollThreadCount(int activityPollThreadCount) { + this.activityPollThreadCount = activityPollThreadCount; + return this; + } + public WorkerOptions build() { return new WorkerOptions( maxActivitiesPerSecond, maxConcurrentActivityExecutionSize, maxConcurrentWorkflowTaskExecutionSize, maxConcurrentLocalActivityExecutionSize, - taskListActivitiesPerSecond); + taskListActivitiesPerSecond, + workflowPollThreadCount, + activityPollThreadCount); } public WorkerOptions validateAndBuildWithDefaults() { @@ -147,12 +167,20 @@ public WorkerOptions validateAndBuildWithDefaults() { "negative maxConcurrentLocalActivityExecutionSize"); Preconditions.checkState( taskListActivitiesPerSecond >= 0, "negative taskListActivitiesPerSecond"); + Preconditions.checkState(workflowPollThreadCount >= 0, "negative workflowPollThreadCount"); + Preconditions.checkState(activityPollThreadCount >= 0, "negative activityPollThreadCount"); return new WorkerOptions( maxActivitiesPerSecond, maxConcurrentActivityExecutionSize, maxConcurrentWorkflowTaskExecutionSize, maxConcurrentLocalActivityExecutionSize, - taskListActivitiesPerSecond); + taskListActivitiesPerSecond, + workflowPollThreadCount == 0 + ? DEFAULT_WORKFLOW_POLL_THREAD_COUNT + : workflowPollThreadCount, + activityPollThreadCount == 0 + ? DEFAULT_ACTIVITY_POLL_THREAD_COUNT + : activityPollThreadCount); } } @@ -161,18 +189,24 @@ public WorkerOptions validateAndBuildWithDefaults() { private final int maxConcurrentWorkflowTaskExecutionSize; private final int maxConcurrentLocalActivityExecutionSize; private final double taskListActivitiesPerSecond; + private final int workflowPollThreadCount; + private final int activityPollThreadCount; private WorkerOptions( double maxActivitiesPerSecond, int maxConcurrentActivityExecutionSize, int maxConcurrentWorkflowExecutionSize, int maxConcurrentLocalActivityExecutionSize, - double taskListActivitiesPerSecond) { + double taskListActivitiesPerSecond, + int workflowPollThreadCount, + int activityPollThreadCount) { this.maxActivitiesPerSecond = maxActivitiesPerSecond; this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize; this.maxConcurrentWorkflowTaskExecutionSize = maxConcurrentWorkflowExecutionSize; this.maxConcurrentLocalActivityExecutionSize = maxConcurrentLocalActivityExecutionSize; this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; + this.workflowPollThreadCount = workflowPollThreadCount; + this.activityPollThreadCount = activityPollThreadCount; } public double getMaxActivitiesPerSecond() { @@ -191,6 +225,44 @@ public int getMaxConcurrentLocalActivityExecutionSize() { return maxConcurrentLocalActivityExecutionSize; } + public double getTaskListActivitiesPerSecond() { + return taskListActivitiesPerSecond; + } + + public int getWorkflowPollThreadCount() { + return workflowPollThreadCount; + } + + public int getActivityPollThreadCount() { + return activityPollThreadCount; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + WorkerOptions that = (WorkerOptions) o; + return Double.compare(that.maxActivitiesPerSecond, maxActivitiesPerSecond) == 0 + && maxConcurrentActivityExecutionSize == that.maxConcurrentActivityExecutionSize + && maxConcurrentWorkflowTaskExecutionSize == that.maxConcurrentWorkflowTaskExecutionSize + && maxConcurrentLocalActivityExecutionSize == that.maxConcurrentLocalActivityExecutionSize + && Double.compare(that.taskListActivitiesPerSecond, taskListActivitiesPerSecond) == 0 + && workflowPollThreadCount == that.workflowPollThreadCount + && activityPollThreadCount == that.activityPollThreadCount; + } + + @Override + public int hashCode() { + return Objects.hashCode( + maxActivitiesPerSecond, + maxConcurrentActivityExecutionSize, + maxConcurrentWorkflowTaskExecutionSize, + maxConcurrentLocalActivityExecutionSize, + taskListActivitiesPerSecond, + workflowPollThreadCount, + activityPollThreadCount); + } + @Override public String toString() { return "WorkerOptions{" @@ -204,6 +276,10 @@ public String toString() { + maxConcurrentLocalActivityExecutionSize + ", taskListActivitiesPerSecond=" + taskListActivitiesPerSecond + + ", workflowPollThreadCount=" + + workflowPollThreadCount + + ", activityPollThreadCount=" + + activityPollThreadCount + '}'; } } diff --git a/src/test/java/io/temporal/worker/WorkerPollerThreadCountTest.java b/src/test/java/io/temporal/worker/WorkerPollerThreadCountTest.java new file mode 100644 index 0000000000..1d6430ef47 --- /dev/null +++ b/src/test/java/io/temporal/worker/WorkerPollerThreadCountTest.java @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.worker; + +import static java.util.stream.Collectors.groupingBy; +import static org.junit.Assert.assertEquals; + +import io.temporal.activity.ActivityInterface; +import io.temporal.testing.TestEnvironmentOptions; +import io.temporal.testing.TestWorkflowEnvironment; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.junit.Test; + +public class WorkerPollerThreadCountTest { + + @ActivityInterface + public interface Activity { + void foo(); + } + + public static class ActivityImpl implements Activity { + @Override + public void foo() {} + } + + @WorkflowInterface + public interface Workflow { + @WorkflowMethod + public void bar(); + } + + public static class WorkflowImpl implements Workflow { + + @Override + public void bar() {} + } + + @Test + public void testPollThreadCount() throws InterruptedException { + String activityPollerThreadNamePrefix = "Activity Poller task"; + String workflowPollerThreadNamePrefix = "Workflow Poller task"; + String workflowHostLocalPollerThreadNamePrefix = "Host Local Workflow "; + int hostLocalThreadCount = 22; + int workflowPollCount = 11; + int activityPollCount = 18; + + TestEnvironmentOptions options = + TestEnvironmentOptions.newBuilder() + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder() + .setWorkflowHostLocalPollThreadCount(hostLocalThreadCount) + .build()) + .build(); + TestWorkflowEnvironment env = TestWorkflowEnvironment.newInstance(options); + Worker worker = + env.newWorker( + "tl1", + WorkerOptions.newBuilder() + .setWorkflowPollThreadCount(workflowPollCount) + .setActivityPollThreadCount(activityPollCount) + .build()); + // Need to register something for workers to start + worker.registerActivitiesImplementations(new ActivityImpl()); + worker.registerWorkflowImplementationTypes(WorkflowImpl.class); + env.start(); + Thread.sleep(1000); + Map threads = + Thread.getAllStackTraces() + .keySet() + .stream() + .map((t) -> t.getName().substring(0, Math.min(20, t.getName().length()))) + .collect(groupingBy(Function.identity(), Collectors.counting())); + assertEquals(hostLocalThreadCount, (long) threads.get(workflowHostLocalPollerThreadNamePrefix)); + assertEquals(workflowPollCount, (long) threads.get(workflowPollerThreadNamePrefix)); + assertEquals(activityPollCount, (long) threads.get(activityPollerThreadNamePrefix)); + } +} diff --git a/src/test/java/io/temporal/worker/WorkerStressTests.java b/src/test/java/io/temporal/worker/WorkerStressTests.java index 287d5d12b0..8852de7e4a 100644 --- a/src/test/java/io/temporal/worker/WorkerStressTests.java +++ b/src/test/java/io/temporal/worker/WorkerStressTests.java @@ -43,7 +43,6 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -75,8 +74,6 @@ public static Object[] data() { @Rule public TestName testName = new TestName(); - // Todo: Write a unit test specifically to test DecisionTaskWithHistoryIteratorImpl - @Ignore("Takes a long time to run") @Test public void longHistoryWorkflowsCompleteSuccessfully() throws InterruptedException { @@ -171,9 +168,7 @@ private class TestEnvironmentWrapper { private WorkerFactory factory; public TestEnvironmentWrapper(WorkerFactoryOptions options) { - if (options == null) { - options = WorkerFactoryOptions.newBuilder().build(); - } + options = WorkerFactoryOptions.newBuilder(options).validateAndBuildWithDefaults(); WorkflowClientOptions clientOptions = WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build(); if (useDockerService) { @@ -271,7 +266,7 @@ public static class ActivitiesImpl implements SleepActivity { @Override public void sleep(int chain, int concurrency, byte[] bytes) { - log.info("sleep called"); + log.trace("sleep called"); } } } From c17d41a7cb4610506315de2d1470f5ef51bfbda7 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Sat, 9 May 2020 19:38:06 -0700 Subject: [PATCH 2/4] Refactored Factory and Worker defaults --- .../internal/replay/DeciderCache.java | 6 +- src/main/java/io/temporal/worker/Worker.java | 2 +- .../io/temporal/worker/WorkerFactory.java | 2 +- .../temporal/worker/WorkerFactoryOptions.java | 97 +++++++++++-------- .../io/temporal/worker/WorkerOptions.java | 60 +++++++++--- .../io/temporal/worker/StickyWorkerTest.java | 2 +- 6 files changed, 110 insertions(+), 59 deletions(-) diff --git a/src/main/java/io/temporal/internal/replay/DeciderCache.java b/src/main/java/io/temporal/internal/replay/DeciderCache.java index 0b75d4577b..d07be4d37b 100644 --- a/src/main/java/io/temporal/internal/replay/DeciderCache.java +++ b/src/main/java/io/temporal/internal/replay/DeciderCache.java @@ -37,12 +37,12 @@ public final class DeciderCache { private Lock cacheLock = new ReentrantLock(); private Set inProcessing = new HashSet<>(); - public DeciderCache(int maxCacheSize, Scope scope) { - Preconditions.checkArgument(maxCacheSize > 0, "Max cache size must be greater than 0"); + public DeciderCache(int workflowCacheSize, Scope scope) { + Preconditions.checkArgument(workflowCacheSize > 0, "Max cache size must be greater than 0"); this.metricsScope = Objects.requireNonNull(scope); this.cache = CacheBuilder.newBuilder() - .maximumSize(maxCacheSize) + .maximumSize(workflowCacheSize) .removalListener( e -> { Decider entry = (Decider) e.getValue(); diff --git a/src/main/java/io/temporal/worker/Worker.java b/src/main/java/io/temporal/worker/Worker.java index 093353dd05..c7f3a40fbb 100644 --- a/src/main/java/io/temporal/worker/Worker.java +++ b/src/main/java/io/temporal/worker/Worker.java @@ -141,7 +141,7 @@ public final class Worker implements Suspendable { this.cache, this.stickyTaskListName, Duration.ofSeconds( - this.factoryOptions.getStickyDecisionScheduleToStartTimeoutInSeconds()), + this.factoryOptions.getWorkflowHostLocalTaskListScheduleToStartTimeoutSeconds()), this.threadPoolExecutor); } diff --git a/src/main/java/io/temporal/worker/WorkerFactory.java b/src/main/java/io/temporal/worker/WorkerFactory.java index 6c907678fa..c42756a157 100644 --- a/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/src/main/java/io/temporal/worker/WorkerFactory.java @@ -110,7 +110,7 @@ private WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factor .put(MetricsTag.TASK_LIST, workflowClient.getOptions().getIdentity()) .build()); - this.cache = new DeciderCache(this.factoryOptions.getCacheMaximumSize(), metricsScope); + this.cache = new DeciderCache(this.factoryOptions.getWorkflowCacheSize(), metricsScope); dispatcher = new PollDecisionTaskDispatcher(workflowClient.getWorkflowServiceStubs()); stickyPoller = diff --git a/src/main/java/io/temporal/worker/WorkerFactoryOptions.java b/src/main/java/io/temporal/worker/WorkerFactoryOptions.java index 31a4335595..2cc06b1ada 100644 --- a/src/main/java/io/temporal/worker/WorkerFactoryOptions.java +++ b/src/main/java/io/temporal/worker/WorkerFactoryOptions.java @@ -38,6 +38,9 @@ public static WorkerFactoryOptions getDefaultInstance() { } private static final int DEFAULT_HOST_LOCAL_WORKFLOW_POLL_THREAD_COUNT = 5; + private static final int DEFAULT_WORKFLOW_CACHE_SIZE = 600; + private static final int DEFAULT_MAX_WORKFLOW_THREAD_COUNT = 600; + private static final int DEFAULT_WORKFLOW_HOST_LOCAL_TASK_LIST_SCHEDULE_TO_START_TIMEOUT = 10; private static final WorkerFactoryOptions DEFAULT_INSTANCE; @@ -46,8 +49,8 @@ public static WorkerFactoryOptions getDefaultInstance() { } public static class Builder { - private int stickyDecisionScheduleToStartTimeoutInSeconds; - private int cacheMaximumSize; + private int workflowHostLocalTaskListScheduleToStartTimeoutSeconds; + private int workflowCacheSize; private int maxWorkflowThreadCount; private WorkflowInterceptor workflowInterceptor; private boolean enableLoggingInReplay; @@ -59,9 +62,9 @@ private Builder(WorkerFactoryOptions options) { if (options == null) { return; } - this.stickyDecisionScheduleToStartTimeoutInSeconds = - options.stickyDecisionScheduleToStartTimeoutInSeconds; - this.cacheMaximumSize = options.cacheMaximumSize; + this.workflowHostLocalTaskListScheduleToStartTimeoutSeconds = + options.workflowHostLocalTaskListScheduleToStartTimeoutSeconds; + this.workflowCacheSize = options.workflowCacheSize; this.maxWorkflowThreadCount = options.maxWorkflowThreadCount; this.workflowInterceptor = options.workflowInterceptor; this.enableLoggingInReplay = options.enableLoggingInReplay; @@ -69,17 +72,23 @@ private Builder(WorkerFactoryOptions options) { } /** - * When Sticky execution is enabled this will set the maximum allowed number of workflows - * cached. This cache is shared by all workers created by the Factory. Default value is 600 + * To avoid constant replay of code the workflow objects are cached on a worker. This cache is + * shared by all workers created by the Factory. Note that in the majority of situations the + * number of cached workflows is limited not by this value, but by the number of the threads + * defined through {@link #setMaxWorkflowThreadCount(int)}. + * + *

Default value is 600 */ - public Builder setCacheMaximumSize(int cacheMaximumSize) { - this.cacheMaximumSize = cacheMaximumSize; + public Builder setWorkflowCacheSize(int workflowCacheSize) { + this.workflowCacheSize = workflowCacheSize; return this; } /** * Maximum number of threads available for workflow execution across all workers created by the - * Factory. + * Factory. This includes cached workflows. + * + *

Default is 600 */ public Builder setMaxWorkflowThreadCount(int maxWorkflowThreadCount) { this.maxWorkflowThreadCount = maxWorkflowThreadCount; @@ -87,13 +96,15 @@ public Builder setMaxWorkflowThreadCount(int maxWorkflowThreadCount) { } /** - * Timeout for sticky workflow decision to be picked up by the host assigned to it. Once it - * times out then it can be picked up by any worker. Default value is 5 seconds. + * Timeout for a workflow task routed to the the host that caches a workflow object. Once it + * times out then it can be picked up by any worker. + * + *

Default value is 10 seconds. */ - public Builder setStickyDecisionScheduleToStartTimeoutInSeconds( - int stickyDecisionScheduleToStartTimeoutInSeconds) { - this.stickyDecisionScheduleToStartTimeoutInSeconds = - stickyDecisionScheduleToStartTimeoutInSeconds; + public Builder setWorkflowHostLocalTaskListScheduleToStartTimeoutSeconds( + int workflowHostLocalTaskListScheduleToStartTimeoutSeconds) { + this.workflowHostLocalTaskListScheduleToStartTimeoutSeconds = + workflowHostLocalTaskListScheduleToStartTimeoutSeconds; return this; } @@ -115,9 +126,9 @@ public Builder setWorkflowHostLocalPollThreadCount(int workflowHostLocalPollThre public WorkerFactoryOptions build() { return new WorkerFactoryOptions( - cacheMaximumSize, + workflowCacheSize, maxWorkflowThreadCount, - stickyDecisionScheduleToStartTimeoutInSeconds, + workflowHostLocalTaskListScheduleToStartTimeoutSeconds, workflowInterceptor, enableLoggingInReplay, workflowHostLocalPollThreadCount, @@ -126,9 +137,9 @@ public WorkerFactoryOptions build() { public WorkerFactoryOptions validateAndBuildWithDefaults() { return new WorkerFactoryOptions( - cacheMaximumSize, + workflowCacheSize, maxWorkflowThreadCount, - stickyDecisionScheduleToStartTimeoutInSeconds, + workflowHostLocalTaskListScheduleToStartTimeoutSeconds, workflowInterceptor, enableLoggingInReplay, workflowHostLocalPollThreadCount, @@ -136,63 +147,69 @@ public WorkerFactoryOptions validateAndBuildWithDefaults() { } } - private final int cacheMaximumSize; + private final int workflowCacheSize; private final int maxWorkflowThreadCount; - private final int stickyDecisionScheduleToStartTimeoutInSeconds; + private final int workflowHostLocalTaskListScheduleToStartTimeoutSeconds; private final WorkflowInterceptor workflowInterceptor; private final boolean enableLoggingInReplay; private final int workflowHostLocalPollThreadCount; private WorkerFactoryOptions( - int cacheMaximumSize, + int workflowCacheSize, int maxWorkflowThreadCount, - int stickyDecisionScheduleToStartTimeoutInSeconds, + int workflowHostLocalTaskListScheduleToStartTimeoutSeconds, WorkflowInterceptor workflowInterceptor, boolean enableLoggingInReplay, int workflowHostLocalPollThreadCount, boolean validate) { if (validate) { - if (cacheMaximumSize <= 0) { - cacheMaximumSize = 600; + Preconditions.checkState(workflowCacheSize >= 0, "negative workflowCacheSize"); + if (workflowCacheSize <= 0) { + workflowCacheSize = DEFAULT_WORKFLOW_CACHE_SIZE; } - if (maxWorkflowThreadCount <= 0) { - maxWorkflowThreadCount = 600; + + Preconditions.checkState(maxWorkflowThreadCount >= 0, "negative maxWorkflowThreadCount"); + if (maxWorkflowThreadCount == 0) { + maxWorkflowThreadCount = DEFAULT_MAX_WORKFLOW_THREAD_COUNT; } - Preconditions.checkState( - stickyDecisionScheduleToStartTimeoutInSeconds >= 0, - "negative stickyDecisionScheduleToStartTimeoutInSeconds"); - if (stickyDecisionScheduleToStartTimeoutInSeconds == 0) { - stickyDecisionScheduleToStartTimeoutInSeconds = 5; + Preconditions.checkState( + workflowHostLocalTaskListScheduleToStartTimeoutSeconds >= 0, + "negative workflowHostLocalTaskListScheduleToStartTimeoutSeconds"); + if (workflowHostLocalTaskListScheduleToStartTimeoutSeconds == 0) { + workflowHostLocalTaskListScheduleToStartTimeoutSeconds = + DEFAULT_WORKFLOW_HOST_LOCAL_TASK_LIST_SCHEDULE_TO_START_TIMEOUT; } + if (workflowInterceptor == null) { workflowInterceptor = new NoopWorkflowInterceptor(); } + Preconditions.checkState( workflowHostLocalPollThreadCount >= 0, "negative workflowHostLocalPollThreadCount"); if (workflowHostLocalPollThreadCount == 0) { workflowHostLocalPollThreadCount = DEFAULT_HOST_LOCAL_WORKFLOW_POLL_THREAD_COUNT; } } - this.cacheMaximumSize = cacheMaximumSize; + this.workflowCacheSize = workflowCacheSize; this.maxWorkflowThreadCount = maxWorkflowThreadCount; - this.stickyDecisionScheduleToStartTimeoutInSeconds = - stickyDecisionScheduleToStartTimeoutInSeconds; + this.workflowHostLocalTaskListScheduleToStartTimeoutSeconds = + workflowHostLocalTaskListScheduleToStartTimeoutSeconds; this.workflowInterceptor = workflowInterceptor; this.enableLoggingInReplay = enableLoggingInReplay; this.workflowHostLocalPollThreadCount = workflowHostLocalPollThreadCount; } - public int getCacheMaximumSize() { - return cacheMaximumSize; + public int getWorkflowCacheSize() { + return workflowCacheSize; } public int getMaxWorkflowThreadCount() { return maxWorkflowThreadCount; } - public int getStickyDecisionScheduleToStartTimeoutInSeconds() { - return stickyDecisionScheduleToStartTimeoutInSeconds; + public int getWorkflowHostLocalTaskListScheduleToStartTimeoutSeconds() { + return workflowHostLocalTaskListScheduleToStartTimeoutSeconds; } public WorkflowInterceptor getWorkflowInterceptor() { diff --git a/src/main/java/io/temporal/worker/WorkerOptions.java b/src/main/java/io/temporal/worker/WorkerOptions.java index 1456c2db2a..0f27bd0699 100644 --- a/src/main/java/io/temporal/worker/WorkerOptions.java +++ b/src/main/java/io/temporal/worker/WorkerOptions.java @@ -46,12 +46,15 @@ public static final class Builder { private static final int DEFAULT_WORKFLOW_POLL_THREAD_COUNT = 2; private static final int DEFAULT_ACTIVITY_POLL_THREAD_COUNT = 5; + private static final int DEFAULT_MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE = 200; + private static final int DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE = 200; + private static final int DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE = 200; private double maxActivitiesPerSecond; - private int maxConcurrentActivityExecutionSize = 100; - private int maxConcurrentWorkflowTaskExecutionSize = 50; - private int maxConcurrentLocalActivityExecutionSize = 100; - private double taskListActivitiesPerSecond = 100000; + private int maxConcurrentActivityExecutionSize; + private int maxConcurrentWorkflowTaskExecutionSize; + private int maxConcurrentLocalActivityExecutionSize; + private double taskListActivitiesPerSecond; private int workflowPollThreadCount; private int activityPollThreadCount; @@ -72,7 +75,8 @@ private Builder(WorkerOptions o) { /** * Maximum number of activities started per second by this worker. Default is 0 which means - * unlimited. + * unlimited. If worker is not fully loaded while tasks are backing up on the service consider + * increasing {@link #setActivityPollThreadCount(int)}. * *

Note that this is a per worker limit. Use {@link #setTaskListActivitiesPerSecond(double)} * to set per task list limit across multiple workers. @@ -85,7 +89,11 @@ public Builder setMaxActivitiesPerSecond(double maxActivitiesPerSecond) { return this; } - /** Maximum number of parallely executed activities. */ + /** + * Maximum number of parallely executed activities. + * + *

Default is 200. + */ public Builder setMaxConcurrentActivityExecutionSize(int maxConcurrentActivityExecutionSize) { if (maxConcurrentActivityExecutionSize <= 0) { throw new IllegalArgumentException( @@ -99,6 +107,8 @@ public Builder setMaxConcurrentActivityExecutionSize(int maxConcurrentActivityEx * Maximum number of simultaneously executed workflow tasks. Note that this is not related to * the total number of open workflows which do not need to be loaded in a worker when they are * not making state transitions. + * + *

Default is 200. */ public Builder setMaxConcurrentWorkflowTaskExecutionSize( int maxConcurrentWorkflowTaskExecutionSize) { @@ -110,7 +120,11 @@ public Builder setMaxConcurrentWorkflowTaskExecutionSize( return this; } - /** Maximum number of parallely executed local activities. */ + /** + * Maximum number of parallely executed local activities. + * + *

Default is 200. + */ public Builder setMaxConcurrentLocalActivityExecutionSize( int maxConcurrentLocalActivityExecutionSize) { if (maxConcurrentLocalActivityExecutionSize <= 0) { @@ -127,18 +141,32 @@ public Builder setMaxConcurrentLocalActivityExecutionSize( * Notice that the number is represented in double, so that you can set it to less than 1 if * needed. For example, set the number to 0.1 means you want your activity to be executed once * every 10 seconds. This can be used to protect down stream services from flooding. The zero - * value of this uses the default value. Default: 100k + * value of this uses the default value. Default is unlimited. */ public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond) { this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; return this; } + /** + * Number of simultaneous poll requests on workflow task list. Note that the majority of the + * workflow tasks will be using host local task list due to caching. So try incrementing {@link + * WorkerFactoryOptions.Builder#setWorkflowHostLocalPollThreadCount(int)} before this one. + * + *

Default is 2. + */ public Builder setWorkflowPollThreadCount(int workflowPollThreadCount) { this.workflowPollThreadCount = workflowPollThreadCount; return this; } + /** + * Number of simultaneous poll requests on activity task list. Consider incrementing if the + * worker is not throttled due to `MaxActivitiesPerSecond` or + * `MaxConcurrentActivityExecutionSize` options and still cannot keep up with the request rate. + * + *

Default is 5. + */ public Builder setActivityPollThreadCount(int activityPollThreadCount) { this.activityPollThreadCount = activityPollThreadCount; return this; @@ -148,8 +176,8 @@ public WorkerOptions build() { return new WorkerOptions( maxActivitiesPerSecond, maxConcurrentActivityExecutionSize, - maxConcurrentWorkflowTaskExecutionSize, - maxConcurrentLocalActivityExecutionSize, + DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE, + DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE, taskListActivitiesPerSecond, workflowPollThreadCount, activityPollThreadCount); @@ -171,9 +199,15 @@ public WorkerOptions validateAndBuildWithDefaults() { Preconditions.checkState(activityPollThreadCount >= 0, "negative activityPollThreadCount"); return new WorkerOptions( maxActivitiesPerSecond, - maxConcurrentActivityExecutionSize, - maxConcurrentWorkflowTaskExecutionSize, - maxConcurrentLocalActivityExecutionSize, + maxConcurrentActivityExecutionSize == 0 + ? DEFAULT_MAX_CONCURRENT_ACTIVITY_EXECUTION_SIZE + : maxConcurrentActivityExecutionSize, + maxConcurrentWorkflowTaskExecutionSize == 0 + ? DEFAULT_MAX_CONCURRENT_WORKFLOW_TASK_EXECUTION_SIZE + : maxConcurrentWorkflowTaskExecutionSize, + maxConcurrentLocalActivityExecutionSize == 0 + ? DEFAULT_MAX_CONCURRENT_LOCAL_ACTIVITY_EXECUTION_SIZE + : maxConcurrentLocalActivityExecutionSize, taskListActivitiesPerSecond, workflowPollThreadCount == 0 ? DEFAULT_WORKFLOW_POLL_THREAD_COUNT diff --git a/src/test/java/io/temporal/worker/StickyWorkerTest.java b/src/test/java/io/temporal/worker/StickyWorkerTest.java index 7f1eb3d092..e97407c8fd 100644 --- a/src/test/java/io/temporal/worker/StickyWorkerTest.java +++ b/src/test/java/io/temporal/worker/StickyWorkerTest.java @@ -204,7 +204,7 @@ public void workflowCacheEvictionDueToThreads() { scope, WorkerFactoryOptions.newBuilder() .setMaxWorkflowThreadCount(10) - .setCacheMaximumSize(100) + .setWorkflowCacheSize(100) .build()); WorkerFactory factory = wrapper.getWorkerFactory(); Worker worker = From a24af89ecce75227b0a25c8c86e472b08512b287 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Tue, 12 May 2020 08:45:46 -0700 Subject: [PATCH 3/4] Removed taskListActivitiesPerSecond from SinglePollerOptions --- .../internal/worker/ActivityPollTask.java | 6 ++---- .../internal/worker/SingleWorkerOptions.java | 15 --------------- 2 files changed, 2 insertions(+), 19 deletions(-) diff --git a/src/main/java/io/temporal/internal/worker/ActivityPollTask.java b/src/main/java/io/temporal/internal/worker/ActivityPollTask.java index b265297f11..77b22d33a8 100644 --- a/src/main/java/io/temporal/internal/worker/ActivityPollTask.java +++ b/src/main/java/io/temporal/internal/worker/ActivityPollTask.java @@ -74,13 +74,11 @@ public PollForActivityTaskResponse poll() { .build()); } - if (options.getTaskListActivitiesPerSecond() > 0) { + if (taskListActivitiesPerSecond > 0) { pollRequest.setTaskListMetadata( TaskListMetadata.newBuilder() .setMaxTasksPerSecond( - DoubleValue.newBuilder() - .setValue(options.getTaskListActivitiesPerSecond()) - .build()) + DoubleValue.newBuilder().setValue(taskListActivitiesPerSecond).build()) .build()); } diff --git a/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java b/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java index 2e16596aa6..d348830f03 100644 --- a/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java +++ b/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java @@ -42,7 +42,6 @@ public static final class Builder { private String identity; private DataConverter dataConverter; private int taskExecutorThreadPoolSize = 100; - private double taskListActivitiesPerSecond; private PollerOptions pollerOptions; private Scope metricsScope; private boolean enableLoggingInReplay; @@ -57,7 +56,6 @@ private Builder(SingleWorkerOptions options) { this.identity = options.getIdentity(); this.dataConverter = options.getDataConverter(); this.pollerOptions = options.getPollerOptions(); - this.taskListActivitiesPerSecond = options.getTaskListActivitiesPerSecond(); this.taskExecutorThreadPoolSize = options.getTaskExecutorThreadPoolSize(); this.metricsScope = options.getMetricsScope(); this.enableLoggingInReplay = options.getEnableLoggingInReplay(); @@ -94,11 +92,6 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) { return this; } - public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond) { - this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; - return this; - } - /** Specifies the list of context propagators to use during this workflow. */ public Builder setContextPropagators(List contextPropagators) { this.contextPropagators = contextPropagators; @@ -127,7 +120,6 @@ public SingleWorkerOptions build() { identity, dataConverter, taskExecutorThreadPoolSize, - taskListActivitiesPerSecond, pollerOptions, metricsScope, enableLoggingInReplay, @@ -138,7 +130,6 @@ public SingleWorkerOptions build() { private final String identity; private final DataConverter dataConverter; private final int taskExecutorThreadPoolSize; - private final double taskListActivitiesPerSecond; private final PollerOptions pollerOptions; private final Scope metricsScope; private final boolean enableLoggingInReplay; @@ -148,7 +139,6 @@ private SingleWorkerOptions( String identity, DataConverter dataConverter, int taskExecutorThreadPoolSize, - double taskListActivitiesPerSecond, PollerOptions pollerOptions, Scope metricsScope, boolean enableLoggingInReplay, @@ -156,7 +146,6 @@ private SingleWorkerOptions( this.identity = identity; this.dataConverter = dataConverter; this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize; - this.taskListActivitiesPerSecond = taskListActivitiesPerSecond; this.pollerOptions = pollerOptions; this.metricsScope = metricsScope; this.enableLoggingInReplay = enableLoggingInReplay; @@ -179,10 +168,6 @@ PollerOptions getPollerOptions() { return pollerOptions; } - double getTaskListActivitiesPerSecond() { - return taskListActivitiesPerSecond; - } - public Scope getMetricsScope() { return metricsScope; } From 8842bf562c5b8c88b6c1f2f90c802286e4dccc27 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Tue, 12 May 2020 09:29:57 -0700 Subject: [PATCH 4/4] Removed unnecessary check --- .../java/io/temporal/internal/testservice/StateMachines.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/io/temporal/internal/testservice/StateMachines.java b/src/main/java/io/temporal/internal/testservice/StateMachines.java index fd4c1f45a6..f4bd1ab9c3 100644 --- a/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -1174,10 +1174,6 @@ private static void startDecisionTaskImpl( .getEventsList(); long lastEventId = events.get(events.size() - 1).getEventId(); if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) { - if (data.lastSuccessfulStartedEventId <= 0) { - throw new IllegalStateException( - "Invalid previousStartedEventId: " + data.lastSuccessfulStartedEventId); - } events = events.subList((int) data.lastSuccessfulStartedEventId, events.size()); } if (queryOnly && !data.workflowCompleted) {