Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,21 @@ public TestWorkflowEnvironmentInternal(TestEnvironmentOptions options) {
service = new TestWorkflowService();
timeLockingInterceptor = new TimeLockingInterceptor(service);
service.lockTimeSkipping("TestWorkflowEnvironmentInternal constructor");
workflowServiceStubs =
WorkflowServiceStubs.newInstance(
service,
WorkflowServiceStubsOptions.newBuilder()
.setMetricsScope(options.getMetricsScope())
.build());

if (this.testEnvironmentOptions.isUseExternalService()) {
workflowServiceStubs =
WorkflowServiceStubs.newInstance(
WorkflowServiceStubsOptions.newBuilder()
.setTarget(this.testEnvironmentOptions.getTarget())
.build());
} else {
workflowServiceStubs =
WorkflowServiceStubs.newInstance(
service,
WorkflowServiceStubsOptions.newBuilder()
.setMetricsScope(options.getMetricsScope())
.build());
}
WorkflowClient client = WorkflowClient.newInstance(workflowServiceStubs, workflowClientOptions);
workerFactory = WorkerFactory.newInstance(client, options.getWorkerFactoryOptions());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.metrics.MetricsType;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ActivityPollTask implements Poller.PollTask<PollActivityTaskQueueResponse> {
final class ActivityPollTask implements Poller.PollTask<ActivityTask> {

private final WorkflowServiceStubs service;
private final String namespace;
Expand All @@ -44,6 +45,7 @@ final class ActivityPollTask implements Poller.PollTask<PollActivityTaskQueueRes
private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class);
private final double taskQueueActivitiesPerSecond;
private final Scope metricsScope;
private final Semaphore pollSemaphore;

public ActivityPollTask(
WorkflowServiceStubs service,
Expand All @@ -58,10 +60,11 @@ public ActivityPollTask(
this.options = options;
this.metricsScope = options.getMetricsScope();
this.taskQueueActivitiesPerSecond = taskQueueActivitiesPerSecond;
this.pollSemaphore = new Semaphore(options.getTaskExecutorThreadPoolSize());
}

@Override
public PollActivityTaskQueueResponse poll() {
public ActivityTask poll() {
PollActivityTaskQueueRequest.Builder pollRequest =
PollActivityTaskQueueRequest.newBuilder()
.setNamespace(namespace)
Expand All @@ -86,31 +89,40 @@ public PollActivityTaskQueueResponse poll() {
if (log.isTraceEnabled()) {
log.trace("poll request begin: " + pollRequest);
}
PollActivityTaskQueueResponse result;
PollActivityTaskQueueResponse response;
boolean isSuccessful = false;

try {
pollSemaphore.acquire();
} catch (InterruptedException e) {
return null;
}
try {
result =
response =
service
.blockingStub()
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
.pollActivityTaskQueue(pollRequest.build());

if (response == null || response.getTaskToken().isEmpty()) {
metricsScope.counter(MetricsType.ACTIVITY_POLL_NO_TASK_COUNTER).inc(1);
return null;
}
metricsScope
.timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY)
.record(
ProtobufTimeUtils.toM3Duration(
response.getStartedTime(), response.getCurrentAttemptScheduledTime()));
isSuccessful = true;
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE
&& e.getMessage().startsWith("UNAVAILABLE: Channel shutdown")) {
return null;
}
throw e;
} finally {
if (!isSuccessful) pollSemaphore.release();
}

if (result == null || result.getTaskToken().isEmpty()) {
metricsScope.counter(MetricsType.ACTIVITY_POLL_NO_TASK_COUNTER).inc(1);
return null;
}
metricsScope
.timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY)
.record(
ProtobufTimeUtils.toM3Duration(
result.getStartedTime(), result.getCurrentAttemptScheduledTime()));

return result;
return new ActivityTask(response, pollSemaphore::release);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.internal.worker;

import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
import io.temporal.workflow.Functions;

final class ActivityTask {
private final PollActivityTaskQueueResponse response;
private final Functions.Proc completionHandle;

public ActivityTask(PollActivityTaskQueueResponse response, Functions.Proc completionHandle) {
this.response = response;
this.completionHandle = completionHandle;
}

public PollActivityTaskQueueResponse getResponse() {
return response;
}

/**
* Completion handle function that must be called by the handler whenever activity processing is
* completed.
*/
public Functions.Proc getCompletionHandle() {
return completionHandle;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public ActivityWorker(
public void start() {
if (handler.isAnyTypeSupported()) {
poller =
new Poller<>(
new Poller<ActivityTask>(
options.getIdentity(),
new ActivityPollTask(
service, namespace, taskQueue, options, taskQueueActivitiesPerSecond),
Expand Down Expand Up @@ -151,8 +151,7 @@ public boolean isSuspended() {
return poller.isSuspended();
}

private class TaskHandlerImpl
implements PollTaskExecutor.TaskHandler<PollActivityTaskQueueResponse> {
private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<ActivityTask> {

final ActivityTaskHandler handler;

Expand All @@ -161,44 +160,43 @@ private TaskHandlerImpl(ActivityTaskHandler handler) {
}

@Override
public void handle(PollActivityTaskQueueResponse task) throws Exception {

public void handle(ActivityTask task) throws Exception {
PollActivityTaskQueueResponse r = task.getResponse();
Scope metricsScope =
options
.getMetricsScope()
.tagged(
ImmutableMap.of(
MetricsTag.ACTIVITY_TYPE,
task.getActivityType().getName(),
r.getActivityType().getName(),
MetricsTag.WORKFLOW_TYPE,
task.getWorkflowType().getName()));

metricsScope
.timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY)
.record(
ProtobufTimeUtils.toM3Duration(
task.getStartedTime(), task.getCurrentAttemptScheduledTime()));
r.getWorkflowType().getName()));
try {
metricsScope
.timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY)
.record(
ProtobufTimeUtils.toM3Duration(
r.getStartedTime(), r.getCurrentAttemptScheduledTime()));

// The following tags are for logging.
MDC.put(LoggerTag.ACTIVITY_ID, task.getActivityId());
MDC.put(LoggerTag.ACTIVITY_TYPE, task.getActivityType().getName());
MDC.put(LoggerTag.WORKFLOW_ID, task.getWorkflowExecution().getWorkflowId());
MDC.put(LoggerTag.RUN_ID, task.getWorkflowExecution().getRunId());
// The following tags are for logging.
MDC.put(LoggerTag.ACTIVITY_ID, r.getActivityId());
MDC.put(LoggerTag.ACTIVITY_TYPE, r.getActivityType().getName());
MDC.put(LoggerTag.WORKFLOW_ID, r.getWorkflowExecution().getWorkflowId());
MDC.put(LoggerTag.RUN_ID, r.getWorkflowExecution().getRunId());

propagateContext(task);
propagateContext(r);

try {
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
ActivityTaskHandler.Result response;
try {
response = handler.handle(task, metricsScope, false);
response = handler.handle(r, metricsScope, false);
} finally {
sw.stop();
}
sendReply(task, response, metricsScope);
sendReply(r, response, metricsScope);

Duration duration =
ProtobufTimeUtils.toM3DurationSinceNow(task.getCurrentAttemptScheduledTime());
ProtobufTimeUtils.toM3DurationSinceNow(r.getCurrentAttemptScheduledTime());
metricsScope.timer(MetricsType.ACTIVITY_E2E_LATENCY).record(duration);

} catch (FailureWrapperException e) {
Expand All @@ -213,15 +211,16 @@ public void handle(PollActivityTaskQueueResponse task) throws Exception {
canceledRequest.setDetails(info.getDetails());
}
sendReply(
task,
new Result(task.getActivityId(), null, null, canceledRequest.build(), null),
r,
new Result(r.getActivityId(), null, null, canceledRequest.build(), null),
metricsScope);
}
} finally {
MDC.remove(LoggerTag.ACTIVITY_ID);
MDC.remove(LoggerTag.ACTIVITY_TYPE);
MDC.remove(LoggerTag.WORKFLOW_ID);
MDC.remove(LoggerTag.RUN_ID);
task.getCompletionHandle().apply();
}
}

Expand All @@ -243,17 +242,18 @@ void propagateContext(PollActivityTaskQueueResponse response) {
}

@Override
public Throwable wrapFailure(PollActivityTaskQueueResponse task, Throwable failure) {
WorkflowExecution execution = task.getWorkflowExecution();
public Throwable wrapFailure(ActivityTask t, Throwable failure) {
PollActivityTaskQueueResponse response = t.getResponse();
WorkflowExecution execution = response.getWorkflowExecution();
return new RuntimeException(
"Failure processing activity task. WorkflowId="
"Failure processing activity response. WorkflowId="
+ execution.getWorkflowId()
+ ", RunId="
+ execution.getRunId()
+ ", ActivityType="
+ task.getActivityType().getName()
+ response.getActivityType().getName()
+ ", ActivityId="
+ task.getActivityId(),
+ response.getActivityId(),
failure);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -260,24 +259,14 @@ public void run() {
}

private class PollExecutionTask implements Poller.ThrowingRunnable {
private final Semaphore pollSemaphore;

PollExecutionTask() {
this.pollSemaphore = new Semaphore(pollerOptions.getPollThreadCount());
}

@Override
public void run() throws Exception {
try {
pollSemaphore.acquire();
T task = pollTask.poll();
if (task == null) {
return;
}
taskExecutor.process(task);
} finally {
pollSemaphore.release();
T task = pollTask.poll();
if (task == null) {
return;
}
taskExecutor.process(task);
}
}
}
Loading