Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ steps:
- docker-compose#v3.0.0:
run: unit-test-test-service
config: docker/buildkite/docker-compose.yaml
- label: ":java: Unit test with docker service"
agents:
queue: "default"
docker: "*"
command: "./gradlew --no-daemon test"
timeout_in_minutes: 15
plugins:
- docker-compose#v3.0.0:
run: unit-test-docker
config: docker/buildkite/docker-compose.yaml
# - label: ":java: Unit test with docker service"
# agents:
# queue: "default"
# docker: "*"
# command: "./gradlew --no-daemon test"
# timeout_in_minutes: 15
# plugins:
# - docker-compose#v3.0.0:
# run: unit-test-docker
# config: docker/buildkite/docker-compose.yaml
- wait
99 changes: 0 additions & 99 deletions src/main/java/io/temporal/internal/metrics/ServiceMethod.java

This file was deleted.

6 changes: 3 additions & 3 deletions src/main/java/io/temporal/internal/replay/DeciderCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ public final class DeciderCache {
private Lock cacheLock = new ReentrantLock();
private Set<String> inProcessing = new HashSet<>();

public DeciderCache(int maxCacheSize, Scope scope) {
Preconditions.checkArgument(maxCacheSize > 0, "Max cache size must be greater than 0");
public DeciderCache(int workflowCacheSize, Scope scope) {
Preconditions.checkArgument(workflowCacheSize > 0, "Max cache size must be greater than 0");
this.metricsScope = Objects.requireNonNull(scope);
this.cache =
CacheBuilder.newBuilder()
.maximumSize(maxCacheSize)
.maximumSize(workflowCacheSize)
.removalListener(
e -> {
Decider entry = (Decider) e.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ public SyncActivityWorker(
WorkflowServiceStubs service,
String namespace,
String taskList,
double taskListActivitiesPerSecond,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass this separately and not use SingleWorkerOptions value for taskListActivitiesPerSecond? https://github.com/temporalio/temporal-java-sdk/blob/master/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java#L45

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Removed taskListActivitiesPerSecond from SinglePollerOptions as they don't apply to the workflow worker.

SingleWorkerOptions options) {
taskHandler =
new POJOActivityTaskHandler(
service, namespace, options.getDataConverter(), heartbeatExecutor);
worker = new ActivityWorker(service, namespace, taskList, options, taskHandler);
worker =
new ActivityWorker(
service, namespace, taskList, taskListActivitiesPerSecond, options, taskHandler);
}

public void setActivitiesImplementation(Object... activitiesImplementation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1174,10 +1174,6 @@ private static void startDecisionTaskImpl(
.getEventsList();
long lastEventId = events.get(events.size() - 1).getEventId();
if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) {
if (data.lastSuccessfulStartedEventId <= 0) {
throw new IllegalStateException(
"Invalid previousStartedEventId: " + data.lastSuccessfulStartedEventId);
}
events = events.subList((int) data.lastSuccessfulStartedEventId, events.size());
}
if (queryOnly && !data.workflowCompleted) {
Expand Down
18 changes: 13 additions & 5 deletions src/main/java/io/temporal/internal/worker/ActivityPollTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,20 @@ final class ActivityPollTask implements Poller.PollTask<PollForActivityTaskRespo
private final String taskList;
private final SingleWorkerOptions options;
private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class);
private final double taskListActivitiesPerSecond;

public ActivityPollTask(
WorkflowServiceStubs service,
String namespace,
String taskList,
SingleWorkerOptions options) {
SingleWorkerOptions options,
double taskListActivitiesPerSecond) {

this.service = service;
this.namespace = namespace;
this.taskList = taskList;
this.options = options;
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
}

@Override
Expand All @@ -63,14 +66,19 @@ public PollForActivityTaskResponse poll() {
.setNamespace(namespace)
.setIdentity(options.getIdentity())
.setTaskList(TaskList.newBuilder().setName(taskList));
if (taskListActivitiesPerSecond > 0) {
pollRequest.setTaskListMetadata(
TaskListMetadata.newBuilder()
.setMaxTasksPerSecond(
DoubleValue.newBuilder().setValue(taskListActivitiesPerSecond).build())
.build());
}

if (options.getTaskListActivitiesPerSecond() > 0) {
if (taskListActivitiesPerSecond > 0) {
pollRequest.setTaskListMetadata(
TaskListMetadata.newBuilder()
.setMaxTasksPerSecond(
DoubleValue.newBuilder()
.setValue(options.getTaskListActivitiesPerSecond())
.build())
DoubleValue.newBuilder().setValue(taskListActivitiesPerSecond).build())
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,19 @@ public final class ActivityWorker implements SuspendableWorker {
private final String namespace;
private final String taskList;
private final SingleWorkerOptions options;
private final double taskListActivitiesPerSecond;

public ActivityWorker(
WorkflowServiceStubs service,
String namespace,
String taskList,
double taskListActivitiesPerSecond,
SingleWorkerOptions options,
ActivityTaskHandler handler) {
this.service = Objects.requireNonNull(service);
this.namespace = Objects.requireNonNull(namespace);
this.taskList = Objects.requireNonNull(taskList);
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
this.handler = handler;

PollerOptions pollerOptions = options.getPollerOptions();
Expand All @@ -85,7 +88,8 @@ public void start() {
poller =
new Poller<>(
options.getIdentity(),
new ActivityPollTask(service, namespace, taskList, options),
new ActivityPollTask(
service, namespace, taskList, options, taskListActivitiesPerSecond),
new PollTaskExecutor<>(namespace, taskList, options, new TaskHandlerImpl(handler)),
options.getPollerOptions(),
options.getMetricsScope());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public static final class Builder {
private String identity;
private DataConverter dataConverter;
private int taskExecutorThreadPoolSize = 100;
private double taskListActivitiesPerSecond;
private PollerOptions pollerOptions;
private Scope metricsScope;
private boolean enableLoggingInReplay;
Expand All @@ -57,7 +56,6 @@ private Builder(SingleWorkerOptions options) {
this.identity = options.getIdentity();
this.dataConverter = options.getDataConverter();
this.pollerOptions = options.getPollerOptions();
this.taskListActivitiesPerSecond = options.getTaskListActivitiesPerSecond();
this.taskExecutorThreadPoolSize = options.getTaskExecutorThreadPoolSize();
this.metricsScope = options.getMetricsScope();
this.enableLoggingInReplay = options.getEnableLoggingInReplay();
Expand Down Expand Up @@ -94,11 +92,6 @@ public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay) {
return this;
}

public Builder setTaskListActivitiesPerSecond(double taskListActivitiesPerSecond) {
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
return this;
}

/** Specifies the list of context propagators to use during this workflow. */
public Builder setContextPropagators(List<ContextPropagator> contextPropagators) {
this.contextPropagators = contextPropagators;
Expand Down Expand Up @@ -127,7 +120,6 @@ public SingleWorkerOptions build() {
identity,
dataConverter,
taskExecutorThreadPoolSize,
taskListActivitiesPerSecond,
pollerOptions,
metricsScope,
enableLoggingInReplay,
Expand All @@ -138,7 +130,6 @@ public SingleWorkerOptions build() {
private final String identity;
private final DataConverter dataConverter;
private final int taskExecutorThreadPoolSize;
private final double taskListActivitiesPerSecond;
private final PollerOptions pollerOptions;
private final Scope metricsScope;
private final boolean enableLoggingInReplay;
Expand All @@ -148,15 +139,13 @@ private SingleWorkerOptions(
String identity,
DataConverter dataConverter,
int taskExecutorThreadPoolSize,
double taskListActivitiesPerSecond,
PollerOptions pollerOptions,
Scope metricsScope,
boolean enableLoggingInReplay,
List<ContextPropagator> contextPropagators) {
this.identity = identity;
this.dataConverter = dataConverter;
this.taskExecutorThreadPoolSize = taskExecutorThreadPoolSize;
this.taskListActivitiesPerSecond = taskListActivitiesPerSecond;
this.pollerOptions = pollerOptions;
this.metricsScope = metricsScope;
this.enableLoggingInReplay = enableLoggingInReplay;
Expand All @@ -179,10 +168,6 @@ PollerOptions getPollerOptions() {
return pollerOptions;
}

double getTaskListActivitiesPerSecond() {
return taskListActivitiesPerSecond;
}

public Scope getMetricsScope() {
return metricsScope;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,8 @@ public TestEnvironmentOptions build() {

public TestEnvironmentOptions validateAndBuildWithDefaults() {
return new TestEnvironmentOptions(
workflowClientOptions == null
? WorkflowClientOptions.newBuilder().validateAndBuildWithDefaults()
: workflowClientOptions,
workerFactoryOptions == null
? WorkerFactoryOptions.newBuilder().validateAndBuildWithDefaults()
: workerFactoryOptions,
WorkflowClientOptions.newBuilder(workflowClientOptions).validateAndBuildWithDefaults(),
WorkerFactoryOptions.newBuilder(workerFactoryOptions).validateAndBuildWithDefaults(),
metricsScope == null ? new NoopScope() : metricsScope);
}
}
Expand Down
22 changes: 17 additions & 5 deletions src/main/java/io/temporal/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public final class Worker implements Suspendable {

this.taskList = taskList;
this.options = WorkerOptions.newBuilder(options).validateAndBuildWithDefaults();
this.factoryOptions = factoryOptions;
this.factoryOptions =
WorkerFactoryOptions.newBuilder(factoryOptions).validateAndBuildWithDefaults();
WorkflowServiceStubs service = client.getWorkflowServiceStubs();
WorkflowClientOptions clientOptions = client.getOptions();
String namespace = clientOptions.getNamespace();
Expand All @@ -105,7 +106,13 @@ public final class Worker implements Suspendable {
taskList,
contextPropagators,
metricsScope);
activityWorker = new SyncActivityWorker(service, namespace, taskList, activityOptions);
activityWorker =
new SyncActivityWorker(
service,
namespace,
taskList,
this.options.getTaskListActivitiesPerSecond(),
activityOptions);

SingleWorkerOptions workflowOptions =
toWorkflowOptions(
Expand All @@ -128,12 +135,13 @@ public final class Worker implements Suspendable {
service,
namespace,
taskList,
factoryOptions.getWorkflowInterceptor(),
this.factoryOptions.getWorkflowInterceptor(),
workflowOptions,
localActivityOptions,
this.cache,
this.stickyTaskListName,
Duration.ofSeconds(factoryOptions.getStickyDecisionScheduleToStartTimeoutInSeconds()),
Duration.ofSeconds(
this.factoryOptions.getWorkflowHostLocalTaskListScheduleToStartTimeoutSeconds()),
this.threadPoolExecutor);
}

Expand All @@ -155,6 +163,7 @@ private static SingleWorkerOptions toActivityOptions(
.setPollerOptions(
PollerOptions.newBuilder()
.setMaximumPollRatePerSecond(options.getMaxActivitiesPerSecond())
.setPollThreadCount(options.getActivityPollThreadCount())
.build())
.setTaskExecutorThreadPoolSize(options.getMaxConcurrentActivityExecutionSize())
.setMetricsScope(metricsScope.tagged(tags))
Expand All @@ -178,7 +187,10 @@ private static SingleWorkerOptions toWorkflowOptions(
return SingleWorkerOptions.newBuilder()
.setDataConverter(clientOptions.getDataConverter())
.setIdentity(clientOptions.getIdentity())
.setPollerOptions(PollerOptions.newBuilder().build())
.setPollerOptions(
PollerOptions.newBuilder()
.setPollThreadCount(options.getWorkflowPollThreadCount())
.build())
.setTaskExecutorThreadPoolSize(options.getMaxConcurrentWorkflowTaskExecutionSize())
.setMetricsScope(metricsScope.tagged(tags))
.setEnableLoggingInReplay(factoryOptions.isEnableLoggingInReplay())
Expand Down
Loading