Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.temporal.client.WorkflowClientOptions;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.internal.common.SignalWithStartWorkflowExecutionParameters;
import io.temporal.internal.common.WorkflowExecutionUtils;
import io.temporal.internal.external.GenericWorkflowClientExternal;
import java.lang.reflect.Type;
import java.util.*;
Expand Down Expand Up @@ -91,7 +90,7 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu
@Override
public <R> GetResultOutput<R> getResult(GetResultInput<R> input) throws TimeoutException {
Optional<Payloads> resultValue =
WorkflowExecutionUtils.getWorkflowExecutionResult(
WorkflowClientLongPollHelper.getWorkflowExecutionResult(
genericClient.getService(),
genericClient.getNamespace(),
input.getWorkflowExecution(),
Expand All @@ -107,7 +106,7 @@ public <R> GetResultOutput<R> getResult(GetResultInput<R> input) throws TimeoutE
@Override
public <R> GetResultAsyncOutput<R> getResultAsync(GetResultInput<R> input) {
CompletableFuture<Optional<Payloads>> resultValue =
WorkflowExecutionUtils.getWorkflowExecutionResultAsync(
WorkflowClientLongPollAsyncHelper.getWorkflowExecutionResultAsync(
genericClient.getService(),
genericClient.getNamespace(),
input.getWorkflowExecution(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.internal.client;

import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;

import com.google.protobuf.ByteString;
import com.uber.m3.tally.Scope;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
import io.temporal.client.WorkflowClient;
import io.temporal.internal.common.WorkflowExecutionUtils;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.Iterator;

/**
* Contains different methods that could but didn't become a part of the main {@link
* WorkflowClient}, mostly because they shouldn't be a part of normal usage and exist for tests /
* debug only.
*/
public class WorkflowClientHelper {

/** Returns workflow instance history in a human readable format. */
public static String prettyPrintHistory(
WorkflowServiceStubs service,
String namespace,
WorkflowExecution workflowExecution,
Scope metricsScope) {
return prettyPrintHistory(service, namespace, workflowExecution, true, metricsScope);
}

/**
* Returns workflow instance history in a human readable format.
*
* @param showWorkflowTasks when set to false workflow task events (command events) are not
* included
* @param metricsScope
*/
public static String prettyPrintHistory(
WorkflowServiceStubs service,
String namespace,
WorkflowExecution workflowExecution,
boolean showWorkflowTasks,
Scope metricsScope) {
Iterator<HistoryEvent> events = getHistory(service, namespace, workflowExecution, metricsScope);
return WorkflowExecutionUtils.prettyPrintHistory(events, showWorkflowTasks);
}

public static Iterator<HistoryEvent> getHistory(
WorkflowServiceStubs service,
String namespace,
WorkflowExecution workflowExecution,
Scope metricsScope) {
return new Iterator<HistoryEvent>() {
ByteString nextPageToken = ByteString.EMPTY;
Iterator<HistoryEvent> current;

{
getNextPage();
}

@Override
public boolean hasNext() {
return current.hasNext() || !nextPageToken.isEmpty();
}

@Override
public HistoryEvent next() {
if (current.hasNext()) {
return current.next();
}
getNextPage();
return current.next();
}

private void getNextPage() {
GetWorkflowExecutionHistoryResponse history =
getHistoryPage(service, namespace, workflowExecution, nextPageToken, metricsScope);
current = history.getHistory().getEventsList().iterator();
nextPageToken = history.getNextPageToken();
}
};
}

public static GetWorkflowExecutionHistoryResponse getHistoryPage(
WorkflowServiceStubs service,
String namespace,
WorkflowExecution workflowExecution,
ByteString nextPageToken,
Scope metricsScope) {
GetWorkflowExecutionHistoryRequest getHistoryRequest =
GetWorkflowExecutionHistoryRequest.newBuilder()
.setNamespace(namespace)
.setExecution(workflowExecution)
.setNextPageToken(nextPageToken)
.build();
return service
.blockingStub()
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
.getWorkflowExecutionHistory(getHistoryRequest);
}

public static WorkflowExecutionInfo describeWorkflowInstance(
WorkflowServiceStubs service,
String namespace,
WorkflowExecution workflowExecution,
Scope metricsScope) {
DescribeWorkflowExecutionRequest describeRequest =
DescribeWorkflowExecutionRequest.newBuilder()
.setNamespace(namespace)
.setExecution(workflowExecution)
.build();
DescribeWorkflowExecutionResponse executionDetail =
service
.blockingStub()
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
.describeWorkflowExecution(describeRequest);
WorkflowExecutionInfo instanceMetadata = executionDetail.getWorkflowExecutionInfo();
return instanceMetadata;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.internal.client;

import static io.temporal.internal.common.WorkflowExecutionUtils.getResultFromCloseEvent;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import io.grpc.Deadline;
import io.grpc.Status;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.enums.v1.HistoryEventFilterType;
import io.temporal.api.history.v1.History;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
import io.temporal.common.converter.DataConverter;
import io.temporal.internal.common.WorkflowExecutionUtils;
import io.temporal.internal.retryer.GrpcRetryer;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.*;

/** This class encapsulates async long poll logic of {@link RootWorkflowClientInvoker} */
class WorkflowClientLongPollAsyncHelper {

static CompletableFuture<Optional<Payloads>> getWorkflowExecutionResultAsync(
WorkflowServiceStubs service,
String namespace,
WorkflowExecution workflowExecution,
Optional<String> workflowType,
long timeout,
TimeUnit unit,
DataConverter converter) {
return getInstanceCloseEventAsync(service, namespace, workflowExecution, timeout, unit)
.thenApply(
(closeEvent) ->
getResultFromCloseEvent(workflowExecution, workflowType, closeEvent, converter));
}

/** Returns an instance closing event, potentially waiting for workflow to complete. */
private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
WorkflowServiceStubs service,
String namespace,
final WorkflowExecution workflowExecution,
long timeout,
TimeUnit unit) {
return getInstanceCloseEventAsync(
service, namespace, workflowExecution, ByteString.EMPTY, timeout, unit);
}

private static CompletableFuture<HistoryEvent> getInstanceCloseEventAsync(
WorkflowServiceStubs service,
String namespace,
final WorkflowExecution workflowExecution,
ByteString pageToken,
long timeout,
TimeUnit unit) {
// TODO: Interrupt service long poll call on timeout and on interrupt
long start = System.currentTimeMillis();
GetWorkflowExecutionHistoryRequest request =
GetWorkflowExecutionHistoryRequest.newBuilder()
.setNamespace(namespace)
.setExecution(workflowExecution)
.setHistoryEventFilterType(HistoryEventFilterType.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT)
.setNextPageToken(pageToken)
.build();
CompletableFuture<GetWorkflowExecutionHistoryResponse> response =
getWorkflowExecutionHistoryAsync(service, request, timeout, unit);
return response.thenComposeAsync(
(r) -> {
if (timeout != 0 && System.currentTimeMillis() - start > unit.toMillis(timeout)) {
throw CheckedExceptionWrapper.wrap(
new TimeoutException(
"WorkflowId="
+ workflowExecution.getWorkflowId()
+ ", runId="
+ workflowExecution.getRunId()
+ ", timeout="
+ timeout
+ ", unit="
+ unit));
}
History history = r.getHistory();
if (history.getEventsCount() == 0) {
// Empty poll returned
return getInstanceCloseEventAsync(
service, namespace, workflowExecution, pageToken, timeout, unit);
}
HistoryEvent event = history.getEvents(0);
if (!WorkflowExecutionUtils.isWorkflowExecutionCompletedEvent(event)) {
throw new RuntimeException("Last history event is not completion event: " + event);
}
// Workflow called continueAsNew. Start polling the new generation with new runId.
if (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW) {
WorkflowExecution nextWorkflowExecution =
WorkflowExecution.newBuilder()
.setWorkflowId(workflowExecution.getWorkflowId())
.setRunId(
event
.getWorkflowExecutionContinuedAsNewEventAttributes()
.getNewExecutionRunId())
.build();
return getInstanceCloseEventAsync(
service, namespace, nextWorkflowExecution, r.getNextPageToken(), timeout, unit);
}
return CompletableFuture.completedFuture(event);
});
}

private static CompletableFuture<GetWorkflowExecutionHistoryResponse>
getWorkflowExecutionHistoryAsync(
WorkflowServiceStubs service,
GetWorkflowExecutionHistoryRequest r,
long timeout,
TimeUnit unit) {
long start = System.currentTimeMillis();
Deadline expiration = Deadline.after(timeout, TimeUnit.MILLISECONDS);
RpcRetryOptions retryOptions =
RpcRetryOptions.newBuilder()
.setBackoffCoefficient(1.5)
.setInitialInterval(Duration.ofMillis(1))
.setMaximumInterval(Duration.ofSeconds(1))
.setMaximumAttempts(Integer.MAX_VALUE)
.setExpiration(Duration.ofMillis(expiration.timeRemaining(TimeUnit.MILLISECONDS)))
.addDoNotRetry(Status.Code.INVALID_ARGUMENT, null)
.addDoNotRetry(Status.Code.NOT_FOUND, null)
.build();

return GrpcRetryer.retryWithResultAsync(
retryOptions,
() -> {
CompletableFuture<GetWorkflowExecutionHistoryResponse> result = new CompletableFuture<>();
long elapsedInRetry = System.currentTimeMillis() - start;
Deadline expirationInRetry =
Deadline.after(unit.toMillis(timeout) - elapsedInRetry, TimeUnit.MILLISECONDS);
ListenableFuture<GetWorkflowExecutionHistoryResponse> resultFuture =
service.futureStub().withDeadline(expirationInRetry).getWorkflowExecutionHistory(r);
resultFuture.addListener(
() -> {
try {
result.complete(resultFuture.get());
} catch (ExecutionException e) {
result.completeExceptionally(e.getCause());
} catch (Exception e) {
result.completeExceptionally(e);
}
},
ForkJoinPool.commonPool());
return result;
});
}
}
Loading