diff --git a/build.gradle b/build.gradle index 8a15ecd1c3..09bccc5be9 100644 --- a/build.gradle +++ b/build.gradle @@ -31,7 +31,7 @@ ext { // Platforms grpcVersion = '1.49.0' // [1.34.0,) jacksonVersion = '2.13.4' // [2.9.0,) - micrometerVersion = '1.9.3' // [1.0.0,) + micrometerVersion = '1.9.4' // [1.0.0,) slf4jVersion = '1.7.36' // [1.4.0,) // stay on 1.x for a while to don't use any APIs from 2.x which may break our users which decide on 1.x protoVersion = '3.21.5' // [3.10.0,) @@ -45,7 +45,7 @@ ext { // test scoped logbackVersion = '1.2.11' - mockitoVersion = '4.7.0' + mockitoVersion = '4.8.0' junitVersion = '4.13.2' } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityTask.java index 994dd25c5e..cac280508a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityTask.java @@ -22,16 +22,19 @@ import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse; import io.temporal.workflow.Functions; +import javax.annotation.Nonnull; public final class ActivityTask { - private final PollActivityTaskQueueResponse response; - private final Functions.Proc completionCallback; + private final @Nonnull PollActivityTaskQueueResponse response; + private final @Nonnull Functions.Proc completionCallback; - public ActivityTask(PollActivityTaskQueueResponse response, Functions.Proc completionCallback) { + public ActivityTask( + @Nonnull PollActivityTaskQueueResponse response, @Nonnull Functions.Proc completionCallback) { this.response = response; this.completionCallback = completionCallback; } + @Nonnull public PollActivityTaskQueueResponse getResponse() { return response; } @@ -40,6 +43,7 @@ public PollActivityTaskQueueResponse getResponse() { * Completion handle function that must be called by the handler whenever activity processing is * completed. */ + @Nonnull public Functions.Proc getCompletionCallback() { return completionCallback; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java index 7933ef706a..79327f5c17 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java @@ -27,25 +27,25 @@ import io.temporal.api.taskqueue.v1.TaskQueue; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse; +import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.worker.MetricsType; import java.util.Objects; +import java.util.concurrent.Semaphore; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public final class WorkflowPollTask implements Poller.PollTask { +public final class WorkflowPollTask implements Poller.PollTask { private static final Logger log = LoggerFactory.getLogger(WorkflowPollTask.class); - private final WorkflowServiceStubs service; - private final String namespace; private final String taskQueue; - private final TaskQueueKind taskQueueKind; - private final String identity; - private final String binaryChecksum; + private final Semaphore workflowTaskExecutorSemaphore; private final Scope metricsScope; + private final WorkflowServiceGrpc.WorkflowServiceBlockingStub serviceStub; + private final PollWorkflowTaskQueueRequest pollRequest; public WorkflowPollTask( @Nonnull WorkflowServiceStubs service, @@ -54,65 +54,72 @@ public WorkflowPollTask( @Nonnull TaskQueueKind taskQueueKind, @Nonnull String identity, @Nullable String binaryChecksum, + @Nonnull Semaphore workflowTaskExecutorSemaphore, @Nonnull Scope metricsScope) { - this.service = Objects.requireNonNull(service); - this.namespace = Objects.requireNonNull(namespace); this.taskQueue = Objects.requireNonNull(taskQueue); - this.taskQueueKind = Objects.requireNonNull(taskQueueKind); - this.identity = Objects.requireNonNull(identity); - this.binaryChecksum = binaryChecksum; + this.workflowTaskExecutorSemaphore = workflowTaskExecutorSemaphore; this.metricsScope = Objects.requireNonNull(metricsScope); - } + this.serviceStub = + Objects.requireNonNull(service) + .blockingStub() + .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope); - @Override - public PollWorkflowTaskQueueResponse poll() { - PollWorkflowTaskQueueRequest pollRequest = + this.pollRequest = PollWorkflowTaskQueueRequest.newBuilder() - .setNamespace(namespace) + .setNamespace(Objects.requireNonNull(namespace)) + .setIdentity(Objects.requireNonNull(identity)) .setBinaryChecksum(binaryChecksum) - .setIdentity(identity) .setTaskQueue( TaskQueue.newBuilder() .setName(taskQueue) // For matching performance optimizations of Temporal Server it's important to // know if the poll comes for a sticky or a normal queue. Because sticky queues // have only 1 partition, no forwarding is needed. - .setKind(taskQueueKind) + .setKind(Objects.requireNonNull(taskQueueKind)) .build()) .build(); + } - if (log.isTraceEnabled()) { - log.trace("poll request begin: " + pollRequest); - } - PollWorkflowTaskQueueResponse result; - result = - service - .blockingStub() - .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) - .pollWorkflowTaskQueue(pollRequest); - if (log.isTraceEnabled()) { - log.trace( - "poll request returned workflow task: workflowType=" - + result.getWorkflowType() - + ", workflowExecution=" - + result.getWorkflowExecution() - + ", startedEventId=" - + result.getStartedEventId() - + ", previousStartedEventId=" - + result.getPreviousStartedEventId() - + (result.getQuery() != null - ? ", queryType=" + result.getQuery().getQueryType() - : "")); - } + @Override + public WorkflowTask poll() { + log.trace("poll request begin: {}", pollRequest); + PollWorkflowTaskQueueResponse response; + boolean isSuccessful = false; - if (result == null || result.getTaskToken().isEmpty()) { - metricsScope.counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_EMPTY_COUNTER).inc(1); + try { + workflowTaskExecutorSemaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); return null; } - metricsScope.counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_SUCCEED_COUNTER).inc(1); - metricsScope - .timer(MetricsType.WORKFLOW_TASK_SCHEDULE_TO_START_LATENCY) - .record(ProtobufTimeUtils.toM3Duration(result.getStartedTime(), result.getScheduledTime())); - return result; + + try { + response = serviceStub.pollWorkflowTaskQueue(pollRequest); + if (log.isTraceEnabled()) { + log.trace( + "poll request returned workflow task: taskQueue={}, workflowType={}, workflowExecution={}, startedEventId={}, previousStartedEventId={}{}", + taskQueue, + response.getWorkflowType(), + response.getWorkflowExecution(), + response.getStartedEventId(), + response.getPreviousStartedEventId(), + response.hasQuery() ? ", queryType=" + response.getQuery().getQueryType() : ""); + } + + if (response == null || response.getTaskToken().isEmpty()) { + metricsScope.counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_EMPTY_COUNTER).inc(1); + return null; + } + metricsScope.counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_SUCCEED_COUNTER).inc(1); + metricsScope + .timer(MetricsType.WORKFLOW_TASK_SCHEDULE_TO_START_LATENCY) + .record( + ProtobufTimeUtils.toM3Duration( + response.getStartedTime(), response.getScheduledTime())); + isSuccessful = true; + return new WorkflowTask(response, workflowTaskExecutorSemaphore::release); + } finally { + if (!isSuccessful) workflowTaskExecutorSemaphore.release(); + } } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowTask.java new file mode 100644 index 0000000000..7e4462f80f --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowTask.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.internal.worker; + +import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse; +import io.temporal.workflow.Functions; +import javax.annotation.Nonnull; + +public class WorkflowTask { + @Nonnull private final PollWorkflowTaskQueueResponse response; + @Nonnull private final Functions.Proc completionCallback; + + public WorkflowTask( + @Nonnull PollWorkflowTaskQueueResponse response, @Nonnull Functions.Proc completionCallback) { + this.response = response; + this.completionCallback = completionCallback; + } + + @Nonnull + public PollWorkflowTaskQueueResponse getResponse() { + return response; + } + + /** + * Completion handle function that must be called by the handler whenever activity processing is + * completed. + */ + @Nonnull + public Functions.Proc getCompletionCallback() { + return completionCallback; + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java index 8da7ae09f4..761eb48dda 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java @@ -41,6 +41,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nonnull; @@ -72,7 +73,7 @@ final class WorkflowWorker implements SuspendableWorker { @Nullable private SuspendableWorker stickyPoller = new NoopSuspendableWorker(); - private PollTaskExecutor pollTaskExecutor; + private PollTaskExecutor pollTaskExecutor; public WorkflowWorker( @Nonnull WorkflowServiceStubs service, @@ -108,6 +109,8 @@ public void start() { pollerOptions, options.getTaskExecutorThreadPoolSize(), workerMetricsScope); + Semaphore workflowTaskExecutorSemaphore = + new Semaphore(options.getTaskExecutorThreadPoolSize()); poller = new Poller<>( options.getIdentity(), @@ -118,6 +121,7 @@ public void start() { TaskQueueKind.TASK_QUEUE_KIND_NORMAL, options.getIdentity(), options.getBinaryChecksum(), + workflowTaskExecutorSemaphore, workerMetricsScope), pollTaskExecutor, pollerOptions, @@ -140,6 +144,7 @@ public void start() { TaskQueueKind.TASK_QUEUE_KIND_STICKY, options.getIdentity(), options.getBinaryChecksum(), + workflowTaskExecutorSemaphore, stickyScope), pollTaskExecutor, stickyPollerOptions, @@ -232,8 +237,7 @@ private PollerOptions getStickyPollerOptions(SingleWorkerOptions options) { return stickyPollerOptions; } - private class TaskHandlerImpl - implements PollTaskExecutor.TaskHandler { + private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler { final WorkflowTaskHandler handler; @@ -242,10 +246,11 @@ private TaskHandlerImpl(WorkflowTaskHandler handler) { } @Override - public void handle(PollWorkflowTaskQueueResponse task) throws Exception { - WorkflowExecution workflowExecution = task.getWorkflowExecution(); + public void handle(WorkflowTask task) throws Exception { + PollWorkflowTaskQueueResponse workflowTaskResponse = task.getResponse(); + WorkflowExecution workflowExecution = workflowTaskResponse.getWorkflowExecution(); String runId = workflowExecution.getRunId(); - String workflowType = task.getWorkflowType().getName(); + String workflowType = workflowTaskResponse.getWorkflowType().getName(); Scope workflowTypeScope = workerMetricsScope.tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType)); @@ -288,14 +293,15 @@ public void handle(PollWorkflowTaskQueueResponse task) throws Exception { Stopwatch swTotal = workflowTypeScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_TOTAL_LATENCY).start(); try { - Optional nextTask = Optional.of(task); + Optional nextWFTResponse = Optional.of(workflowTaskResponse); do { - PollWorkflowTaskQueueResponse currentTask = nextTask.get(); - WorkflowTaskHandler.Result response = handleTask(currentTask, workflowTypeScope); + PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get(); + WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope); try { - nextTask = sendReply(currentTask.getTaskToken(), service, workflowTypeScope, response); + nextWFTResponse = + sendReply(currentTask.getTaskToken(), service, workflowTypeScope, result); } catch (Exception e) { - logExceptionDuringResultReporting(e, currentTask, response); + logExceptionDuringResultReporting(e, currentTask, result); workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER).inc(1); // if we failed to report the workflow task completion back to the server, // our cached version of the workflow may be more advanced than the server is aware of. @@ -308,21 +314,23 @@ public void handle(PollWorkflowTaskQueueResponse task) throws Exception { // this should be after sendReply, otherwise we may log // WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER twice if sendReply throws - if (response.getTaskFailed() != null) { + if (result.getTaskFailed() != null) { // we don't trigger the counter in case of the legacy query // (which never has taskFailed set) workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER).inc(1); } - if (nextTask.isPresent()) { + if (nextWFTResponse.isPresent()) { workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_HEARTBEAT_COUNTER).inc(1); } - } while (nextTask.isPresent()); + } while (nextWFTResponse.isPresent()); } finally { swTotal.stop(); MDC.remove(LoggerTag.WORKFLOW_ID); MDC.remove(LoggerTag.WORKFLOW_TYPE); MDC.remove(LoggerTag.RUN_ID); + task.getCompletionCallback().apply(); + if (locked) { runLocks.unlock(runId); } @@ -330,15 +338,15 @@ public void handle(PollWorkflowTaskQueueResponse task) throws Exception { } @Override - public Throwable wrapFailure(PollWorkflowTaskQueueResponse task, Throwable failure) { - WorkflowExecution execution = task.getWorkflowExecution(); + public Throwable wrapFailure(WorkflowTask task, Throwable failure) { + WorkflowExecution execution = task.getResponse().getWorkflowExecution(); return new RuntimeException( "Failure processing workflow task. WorkflowId=" + execution.getWorkflowId() + ", RunId=" + execution.getRunId() + ", Attempt=" - + task.getAttempt(), + + task.getResponse().getAttempt(), failure); } diff --git a/temporal-spring-boot-autoconfigure-alpha/build.gradle b/temporal-spring-boot-autoconfigure-alpha/build.gradle index 1a6bda2cbf..8a279d9cd7 100644 --- a/temporal-spring-boot-autoconfigure-alpha/build.gradle +++ b/temporal-spring-boot-autoconfigure-alpha/build.gradle @@ -3,8 +3,8 @@ description = '''Spring Boot AutoConfigure for Temporal Java SDK''' ext { springBootVersion = '2.7.3'// [2.4.0,) - otelVersion = '1.17.0' - otShimVersion = '1.17.0-alpha' + otelVersion = '1.18.0' + otShimVersion = '1.18.0-alpha' } dependencies {