Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ext {
// Platforms
grpcVersion = '1.49.0' // [1.34.0,)
jacksonVersion = '2.13.4' // [2.9.0,)
micrometerVersion = '1.9.3' // [1.0.0,)
micrometerVersion = '1.9.4' // [1.0.0,)

slf4jVersion = '1.7.36' // [1.4.0,) // stay on 1.x for a while to don't use any APIs from 2.x which may break our users which decide on 1.x
protoVersion = '3.21.5' // [3.10.0,)
Expand All @@ -45,7 +45,7 @@ ext {

// test scoped
logbackVersion = '1.2.11'
mockitoVersion = '4.7.0'
mockitoVersion = '4.8.0'
junitVersion = '4.13.2'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,19 @@

import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
import io.temporal.workflow.Functions;
import javax.annotation.Nonnull;

public final class ActivityTask {
private final PollActivityTaskQueueResponse response;
private final Functions.Proc completionCallback;
private final @Nonnull PollActivityTaskQueueResponse response;
private final @Nonnull Functions.Proc completionCallback;

public ActivityTask(PollActivityTaskQueueResponse response, Functions.Proc completionCallback) {
public ActivityTask(
@Nonnull PollActivityTaskQueueResponse response, @Nonnull Functions.Proc completionCallback) {
this.response = response;
this.completionCallback = completionCallback;
}

@Nonnull
public PollActivityTaskQueueResponse getResponse() {
return response;
}
Expand All @@ -40,6 +43,7 @@ public PollActivityTaskQueueResponse getResponse() {
* Completion handle function that must be called by the handler whenever activity processing is
* completed.
*/
@Nonnull
public Functions.Proc getCompletionCallback() {
return completionCallback;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,25 @@
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.MetricsType;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class WorkflowPollTask implements Poller.PollTask<PollWorkflowTaskQueueResponse> {
public final class WorkflowPollTask implements Poller.PollTask<WorkflowTask> {
private static final Logger log = LoggerFactory.getLogger(WorkflowPollTask.class);

private final WorkflowServiceStubs service;
private final String namespace;
private final String taskQueue;
private final TaskQueueKind taskQueueKind;
private final String identity;
private final String binaryChecksum;
private final Semaphore workflowTaskExecutorSemaphore;
private final Scope metricsScope;
private final WorkflowServiceGrpc.WorkflowServiceBlockingStub serviceStub;
private final PollWorkflowTaskQueueRequest pollRequest;

public WorkflowPollTask(
@Nonnull WorkflowServiceStubs service,
Expand All @@ -54,65 +54,72 @@ public WorkflowPollTask(
@Nonnull TaskQueueKind taskQueueKind,
@Nonnull String identity,
@Nullable String binaryChecksum,
@Nonnull Semaphore workflowTaskExecutorSemaphore,
@Nonnull Scope metricsScope) {
this.service = Objects.requireNonNull(service);
this.namespace = Objects.requireNonNull(namespace);
this.taskQueue = Objects.requireNonNull(taskQueue);
this.taskQueueKind = Objects.requireNonNull(taskQueueKind);
this.identity = Objects.requireNonNull(identity);
this.binaryChecksum = binaryChecksum;
this.workflowTaskExecutorSemaphore = workflowTaskExecutorSemaphore;
this.metricsScope = Objects.requireNonNull(metricsScope);
}
this.serviceStub =
Objects.requireNonNull(service)
.blockingStub()
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope);

@Override
public PollWorkflowTaskQueueResponse poll() {
PollWorkflowTaskQueueRequest pollRequest =
this.pollRequest =
PollWorkflowTaskQueueRequest.newBuilder()
.setNamespace(namespace)
.setNamespace(Objects.requireNonNull(namespace))
.setIdentity(Objects.requireNonNull(identity))
.setBinaryChecksum(binaryChecksum)
.setIdentity(identity)
.setTaskQueue(
TaskQueue.newBuilder()
.setName(taskQueue)
// For matching performance optimizations of Temporal Server it's important to
// know if the poll comes for a sticky or a normal queue. Because sticky queues
// have only 1 partition, no forwarding is needed.
.setKind(taskQueueKind)
.setKind(Objects.requireNonNull(taskQueueKind))
.build())
.build();
}

if (log.isTraceEnabled()) {
log.trace("poll request begin: " + pollRequest);
}
PollWorkflowTaskQueueResponse result;
result =
service
.blockingStub()
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
.pollWorkflowTaskQueue(pollRequest);
if (log.isTraceEnabled()) {
log.trace(
"poll request returned workflow task: workflowType="
+ result.getWorkflowType()
+ ", workflowExecution="
+ result.getWorkflowExecution()
+ ", startedEventId="
+ result.getStartedEventId()
+ ", previousStartedEventId="
+ result.getPreviousStartedEventId()
+ (result.getQuery() != null
? ", queryType=" + result.getQuery().getQueryType()
: ""));
}
@Override
public WorkflowTask poll() {
log.trace("poll request begin: {}", pollRequest);
PollWorkflowTaskQueueResponse response;
boolean isSuccessful = false;

if (result == null || result.getTaskToken().isEmpty()) {
metricsScope.counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_EMPTY_COUNTER).inc(1);
try {
workflowTaskExecutorSemaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
metricsScope.counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_SUCCEED_COUNTER).inc(1);
metricsScope
.timer(MetricsType.WORKFLOW_TASK_SCHEDULE_TO_START_LATENCY)
.record(ProtobufTimeUtils.toM3Duration(result.getStartedTime(), result.getScheduledTime()));
return result;

try {
response = serviceStub.pollWorkflowTaskQueue(pollRequest);
if (log.isTraceEnabled()) {
log.trace(
"poll request returned workflow task: taskQueue={}, workflowType={}, workflowExecution={}, startedEventId={}, previousStartedEventId={}{}",
taskQueue,
response.getWorkflowType(),
response.getWorkflowExecution(),
response.getStartedEventId(),
response.getPreviousStartedEventId(),
response.hasQuery() ? ", queryType=" + response.getQuery().getQueryType() : "");
}

if (response == null || response.getTaskToken().isEmpty()) {
metricsScope.counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_EMPTY_COUNTER).inc(1);
return null;
}
metricsScope.counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_SUCCEED_COUNTER).inc(1);
metricsScope
.timer(MetricsType.WORKFLOW_TASK_SCHEDULE_TO_START_LATENCY)
.record(
ProtobufTimeUtils.toM3Duration(
response.getStartedTime(), response.getScheduledTime()));
isSuccessful = true;
return new WorkflowTask(response, workflowTaskExecutorSemaphore::release);
} finally {
if (!isSuccessful) workflowTaskExecutorSemaphore.release();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.worker;

import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.workflow.Functions;
import javax.annotation.Nonnull;

public class WorkflowTask {
@Nonnull private final PollWorkflowTaskQueueResponse response;
@Nonnull private final Functions.Proc completionCallback;

public WorkflowTask(
@Nonnull PollWorkflowTaskQueueResponse response, @Nonnull Functions.Proc completionCallback) {
this.response = response;
this.completionCallback = completionCallback;
}

@Nonnull
public PollWorkflowTaskQueueResponse getResponse() {
return response;
}

/**
* Completion handle function that must be called by the handler whenever activity processing is
* completed.
*/
@Nonnull
public Functions.Proc getCompletionCallback() {
return completionCallback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -72,7 +73,7 @@ final class WorkflowWorker implements SuspendableWorker {

@Nullable private SuspendableWorker stickyPoller = new NoopSuspendableWorker();

private PollTaskExecutor<PollWorkflowTaskQueueResponse> pollTaskExecutor;
private PollTaskExecutor<WorkflowTask> pollTaskExecutor;

public WorkflowWorker(
@Nonnull WorkflowServiceStubs service,
Expand Down Expand Up @@ -108,6 +109,8 @@ public void start() {
pollerOptions,
options.getTaskExecutorThreadPoolSize(),
workerMetricsScope);
Semaphore workflowTaskExecutorSemaphore =
new Semaphore(options.getTaskExecutorThreadPoolSize());
poller =
new Poller<>(
options.getIdentity(),
Expand All @@ -118,6 +121,7 @@ public void start() {
TaskQueueKind.TASK_QUEUE_KIND_NORMAL,
options.getIdentity(),
options.getBinaryChecksum(),
workflowTaskExecutorSemaphore,
workerMetricsScope),
pollTaskExecutor,
pollerOptions,
Expand All @@ -140,6 +144,7 @@ public void start() {
TaskQueueKind.TASK_QUEUE_KIND_STICKY,
options.getIdentity(),
options.getBinaryChecksum(),
workflowTaskExecutorSemaphore,
stickyScope),
pollTaskExecutor,
stickyPollerOptions,
Expand Down Expand Up @@ -232,8 +237,7 @@ private PollerOptions getStickyPollerOptions(SingleWorkerOptions options) {
return stickyPollerOptions;
}

private class TaskHandlerImpl
implements PollTaskExecutor.TaskHandler<PollWorkflowTaskQueueResponse> {
private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<WorkflowTask> {

final WorkflowTaskHandler handler;

Expand All @@ -242,10 +246,11 @@ private TaskHandlerImpl(WorkflowTaskHandler handler) {
}

@Override
public void handle(PollWorkflowTaskQueueResponse task) throws Exception {
WorkflowExecution workflowExecution = task.getWorkflowExecution();
public void handle(WorkflowTask task) throws Exception {
PollWorkflowTaskQueueResponse workflowTaskResponse = task.getResponse();
WorkflowExecution workflowExecution = workflowTaskResponse.getWorkflowExecution();
String runId = workflowExecution.getRunId();
String workflowType = task.getWorkflowType().getName();
String workflowType = workflowTaskResponse.getWorkflowType().getName();

Scope workflowTypeScope =
workerMetricsScope.tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType));
Expand Down Expand Up @@ -288,14 +293,15 @@ public void handle(PollWorkflowTaskQueueResponse task) throws Exception {
Stopwatch swTotal =
workflowTypeScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_TOTAL_LATENCY).start();
try {
Optional<PollWorkflowTaskQueueResponse> nextTask = Optional.of(task);
Optional<PollWorkflowTaskQueueResponse> nextWFTResponse = Optional.of(workflowTaskResponse);
do {
PollWorkflowTaskQueueResponse currentTask = nextTask.get();
WorkflowTaskHandler.Result response = handleTask(currentTask, workflowTypeScope);
PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get();
WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope);
try {
nextTask = sendReply(currentTask.getTaskToken(), service, workflowTypeScope, response);
nextWFTResponse =
sendReply(currentTask.getTaskToken(), service, workflowTypeScope, result);
} catch (Exception e) {
logExceptionDuringResultReporting(e, currentTask, response);
logExceptionDuringResultReporting(e, currentTask, result);
workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER).inc(1);
// if we failed to report the workflow task completion back to the server,
// our cached version of the workflow may be more advanced than the server is aware of.
Expand All @@ -308,37 +314,39 @@ public void handle(PollWorkflowTaskQueueResponse task) throws Exception {

// this should be after sendReply, otherwise we may log
// WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER twice if sendReply throws
if (response.getTaskFailed() != null) {
if (result.getTaskFailed() != null) {
// we don't trigger the counter in case of the legacy query
// (which never has taskFailed set)
workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER).inc(1);
}
if (nextTask.isPresent()) {
if (nextWFTResponse.isPresent()) {
workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_HEARTBEAT_COUNTER).inc(1);
}
} while (nextTask.isPresent());
} while (nextWFTResponse.isPresent());
} finally {
swTotal.stop();
MDC.remove(LoggerTag.WORKFLOW_ID);
MDC.remove(LoggerTag.WORKFLOW_TYPE);
MDC.remove(LoggerTag.RUN_ID);

task.getCompletionCallback().apply();

if (locked) {
runLocks.unlock(runId);
}
}
}

@Override
public Throwable wrapFailure(PollWorkflowTaskQueueResponse task, Throwable failure) {
WorkflowExecution execution = task.getWorkflowExecution();
public Throwable wrapFailure(WorkflowTask task, Throwable failure) {
WorkflowExecution execution = task.getResponse().getWorkflowExecution();
return new RuntimeException(
"Failure processing workflow task. WorkflowId="
+ execution.getWorkflowId()
+ ", RunId="
+ execution.getRunId()
+ ", Attempt="
+ task.getAttempt(),
+ task.getResponse().getAttempt(),
failure);
}

Expand Down
4 changes: 2 additions & 2 deletions temporal-spring-boot-autoconfigure-alpha/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ description = '''Spring Boot AutoConfigure for Temporal Java SDK'''
ext {
springBootVersion = '2.7.3'// [2.4.0,)

otelVersion = '1.17.0'
otShimVersion = '1.17.0-alpha'
otelVersion = '1.18.0'
otShimVersion = '1.18.0-alpha'
}

dependencies {
Expand Down