Skip to content

Commit

Permalink
Added workflow retry policy propagation during continue-as-new and to…
Browse files Browse the repository at this point in the history
… ContinueAsNewOptions. (#1961)

* Added WorkflowInfo.retryOptions and ContinueAsNewOptions.retryOptions

* moved toRetryPolicy to RetryOptionsUtils

* Copy RetryOptions on empty ContinueAsNewOptions
  • Loading branch information
mfateev committed Jan 3, 2024
1 parent 0bb0782 commit 9174397
Show file tree
Hide file tree
Showing 17 changed files with 185 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ private <R> Tracer.SpanBuilder createChildWorkflowStartSpanBuilder(ChildWorkflow
}

private Tracer.SpanBuilder createContinueAsNewWorkflowStartSpanBuilder(ContinueAsNewInput input) {
WorkflowInfo parentWorkflowInfo = Workflow.getInfo();
WorkflowInfo continuedWorkflowInfo = Workflow.getInfo();
return spanFactory.createContinueAsNewWorkflowStartSpan(
tracer,
MoreObjects.firstNonNull(input.getWorkflowType(), parentWorkflowInfo.getWorkflowType()),
parentWorkflowInfo.getWorkflowId(),
parentWorkflowInfo.getRunId());
MoreObjects.firstNonNull(input.getWorkflowType(), continuedWorkflowInfo.getWorkflowType()),
continuedWorkflowInfo.getWorkflowId(),
continuedWorkflowInfo.getRunId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
package io.temporal.internal.client;

import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc;
import static io.temporal.internal.common.SerializerUtils.toRetryPolicy;
import static io.temporal.internal.common.RetryOptionsUtils.toRetryPolicy;

import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.temporal.failure.ApplicationFailure;
import io.temporal.failure.ChildWorkflowFailure;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -95,4 +96,21 @@ public static RetryOptions toRetryOptions(RetryPolicy retryPolicy) {

return roBuilder.validateBuildWithDefaults();
}

public static RetryPolicy.Builder toRetryPolicy(RetryOptions retryOptions) {
RetryPolicy.Builder builder =
RetryPolicy.newBuilder()
.setInitialInterval(
ProtobufTimeUtils.toProtoDuration(retryOptions.getInitialInterval()))
.setMaximumInterval(
ProtobufTimeUtils.toProtoDuration(retryOptions.getMaximumInterval()))
.setBackoffCoefficient(retryOptions.getBackoffCoefficient())
.setMaximumAttempts(retryOptions.getMaximumAttempts());

if (retryOptions.getDoNotRetry() != null) {
builder = builder.addAllNonRetryableErrorTypes(Arrays.asList(retryOptions.getDoNotRetry()));
}

return builder;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

package io.temporal.internal.replay;

import static io.temporal.internal.common.RetryOptionsUtils.toRetryOptions;

import com.google.common.base.Preconditions;
import com.google.protobuf.util.Timestamps;
import io.temporal.api.common.v1.*;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
import io.temporal.common.RetryOptions;
import io.temporal.internal.common.ProtobufTimeUtils;
import java.time.Duration;
import java.util.Map;
Expand Down Expand Up @@ -146,4 +149,12 @@ public Payloads getLastCompletionResult() {
public Failure getPreviousRunFailure() {
return previousRunFailure;
}

@Nullable
public RetryOptions getRetryOptions() {
if (!startedAttributes.hasRetryPolicy()) {
return null;
}
return toRetryOptions(startedAttributes.getRetryPolicy());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.temporal.api.common.v1.*;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.common.RetryOptions;
import io.temporal.internal.common.SdkFlag;
import io.temporal.internal.statemachines.ExecuteActivityParameters;
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
Expand Down Expand Up @@ -117,6 +118,9 @@ public Functions.Proc1<Exception> getCancellationHandle() {
/** Workflow task queue name. */
String getTaskQueue();

@Nullable
RetryOptions getRetryOptions();

/** Workflow namespace. */
String getNamespace();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
import io.temporal.common.RetryOptions;
import io.temporal.failure.CanceledFailure;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.common.SdkFlag;
Expand Down Expand Up @@ -141,6 +142,12 @@ public String getTaskQueue() {
return basicWorkflowContext.getTaskQueue();
}

@Nullable
@Override
public RetryOptions getRetryOptions() {
return basicWorkflowContext.getRetryOptions();
}

@Override
public String getNamespace() {
return basicWorkflowContext.getNamespace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc;
import static io.temporal.internal.common.SerializerUtils.toRetryPolicy;
import static io.temporal.internal.common.RetryOptionsUtils.toRetryPolicy;
import static io.temporal.internal.sync.WorkflowInternal.DEFAULT_VERSION;

import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -1108,6 +1108,11 @@ public void continueAsNew(ContinueAsNewInput input) {
if (options.getTaskQueue() != null && !options.getTaskQueue().isEmpty()) {
attributes.setTaskQueue(TaskQueue.newBuilder().setName(options.getTaskQueue()));
}
if (options.getRetryOptions() != null) {
attributes.setRetryPolicy(toRetryPolicy(options.getRetryOptions()));
} else if (replayContext.getRetryOptions() != null) {
attributes.setRetryPolicy(toRetryPolicy(replayContext.getRetryOptions()));
}
Map<String, Object> searchAttributes = options.getSearchAttributes();
if (searchAttributes != null && !searchAttributes.isEmpty()) {
if (options.getTypedSearchAttributes() != null) {
Expand All @@ -1132,6 +1137,9 @@ public void continueAsNew(ContinueAsNewInput input) {
.determineUseCompatibleFlag(
replayContext.getTaskQueue().equals(options.getTaskQueue())));
}
} else if (replayContext.getRetryOptions() != null) {
// Have to copy retry options as server doesn't copy them.
attributes.setRetryPolicy(toRetryPolicy(replayContext.getRetryOptions()));
}

List<ContextPropagator> propagators =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.common.RetryOptions;
import io.temporal.internal.replay.ReplayWorkflowContext;
import io.temporal.workflow.WorkflowInfo;
import java.time.Duration;
Expand Down Expand Up @@ -80,6 +81,12 @@ public String getTaskQueue() {
return context.getTaskQueue();
}

@Nullable
@Override
public RetryOptions getRetryOptions() {
return context.getRetryOptions();
}

@Override
public Duration getWorkflowRunTimeout() {
return context.getWorkflowRunTimeout();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.temporal.workflow;

import io.temporal.common.RetryOptions;
import io.temporal.common.SearchAttributes;
import io.temporal.common.VersioningIntent;
import io.temporal.common.context.ContextPropagator;
Expand Down Expand Up @@ -56,6 +57,7 @@ public static final class Builder {

private Duration workflowRunTimeout;
private String taskQueue;
private RetryOptions retryOptions;
private Duration workflowTaskTimeout;
private Map<String, Object> memo;
private Map<String, Object> searchAttributes;
Expand All @@ -71,6 +73,7 @@ private Builder(ContinueAsNewOptions options) {
}
this.workflowRunTimeout = options.workflowRunTimeout;
this.taskQueue = options.taskQueue;
this.retryOptions = options.retryOptions;
this.workflowTaskTimeout = options.workflowTaskTimeout;
this.memo = options.getMemo();
this.searchAttributes = options.getSearchAttributes();
Expand All @@ -89,6 +92,11 @@ public Builder setTaskQueue(String taskQueue) {
return this;
}

public Builder setRetryOptions(RetryOptions retryOptions) {
this.retryOptions = retryOptions;
return this;
}

public Builder setWorkflowTaskTimeout(Duration workflowTaskTimeout) {
this.workflowTaskTimeout = workflowTaskTimeout;
return this;
Expand Down Expand Up @@ -143,6 +151,7 @@ public ContinueAsNewOptions build() {
return new ContinueAsNewOptions(
workflowRunTimeout,
taskQueue,
retryOptions,
workflowTaskTimeout,
memo,
searchAttributes,
Expand All @@ -154,6 +163,7 @@ public ContinueAsNewOptions build() {

private final @Nullable Duration workflowRunTimeout;
private final @Nullable String taskQueue;
private final @Nullable RetryOptions retryOptions;
private final @Nullable Duration workflowTaskTimeout;
private final @Nullable Map<String, Object> memo;
private final @Nullable Map<String, Object> searchAttributes;
Expand All @@ -164,6 +174,7 @@ public ContinueAsNewOptions build() {
public ContinueAsNewOptions(
@Nullable Duration workflowRunTimeout,
@Nullable String taskQueue,
@Nullable RetryOptions retryOptions,
@Nullable Duration workflowTaskTimeout,
@Nullable Map<String, Object> memo,
@Nullable Map<String, Object> searchAttributes,
Expand All @@ -172,6 +183,7 @@ public ContinueAsNewOptions(
@Nullable VersioningIntent versioningIntent) {
this.workflowRunTimeout = workflowRunTimeout;
this.taskQueue = taskQueue;
this.retryOptions = retryOptions;
this.workflowTaskTimeout = workflowTaskTimeout;
this.memo = memo;
this.searchAttributes = searchAttributes;
Expand All @@ -188,6 +200,11 @@ public ContinueAsNewOptions(
return taskQueue;
}

@Nullable
public RetryOptions getRetryOptions() {
return retryOptions;
}

public @Nullable Duration getWorkflowTaskTimeout() {
return workflowTaskTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package io.temporal.workflow;

import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.common.RetryOptions;
import java.time.Duration;
import java.util.Optional;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -89,6 +90,9 @@ public interface WorkflowInfo {
*/
String getTaskQueue();

@Nullable
RetryOptions getRetryOptions();

/**
* @return Timeout for a Workflow Run specified during Workflow start in {@link
* io.temporal.client.WorkflowOptions.Builder#setWorkflowRunTimeout(Duration)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.temporal.internal.common;

import static io.temporal.internal.common.RetryOptionsUtils.toRetryPolicy;
import static org.junit.Assert.assertEquals;

import io.temporal.api.common.v1.RetryPolicy;
Expand All @@ -32,7 +33,7 @@ public class RetryOptionsUtilsTest {
public void buildRetryOptions() {
Duration initialInterval = Duration.ofSeconds(2);
Duration maxInterval = Duration.ofSeconds(5);
RetryPolicy retryPolicy =
RetryPolicy retryPolicy1 =
RetryPolicy.newBuilder()
.setInitialInterval(ProtobufTimeUtils.toProtoDuration(initialInterval))
.setMaximumInterval(ProtobufTimeUtils.toProtoDuration(maxInterval))
Expand All @@ -41,11 +42,33 @@ public void buildRetryOptions() {
.addNonRetryableErrorTypes(IllegalStateException.class.getName())
.build();

RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(retryPolicy);
RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(retryPolicy1);
assertEquals(initialInterval, retryOptions.getInitialInterval());
assertEquals(maxInterval, retryOptions.getMaximumInterval());
assertEquals(5, retryOptions.getMaximumAttempts());
assertEquals(2, retryOptions.getBackoffCoefficient(), 0.001);
assertEquals(IllegalStateException.class.getName(), retryOptions.getDoNotRetry()[0]);

assertEquals(
retryPolicy1.getInitialInterval().getSeconds(),
retryOptions.getInitialInterval().getSeconds());
assertEquals(
retryPolicy1.getMaximumInterval().getSeconds(),
retryOptions.getMaximumInterval().getSeconds());
assertEquals(retryPolicy1.getMaximumAttempts(), retryOptions.getMaximumAttempts());
assertEquals(retryPolicy1.getBackoffCoefficient(), retryOptions.getBackoffCoefficient(), 0.001);
assertEquals(retryPolicy1.getNonRetryableErrorTypes(0), retryOptions.getDoNotRetry()[0]);

RetryPolicy retryPolicy2 = toRetryPolicy(retryOptions).build();

assertEquals(
retryPolicy2.getInitialInterval().getSeconds(),
retryOptions.getInitialInterval().getSeconds());
assertEquals(
retryPolicy2.getMaximumInterval().getSeconds(),
retryOptions.getMaximumInterval().getSeconds());
assertEquals(retryPolicy2.getMaximumAttempts(), retryOptions.getMaximumAttempts());
assertEquals(retryPolicy2.getBackoffCoefficient(), retryOptions.getBackoffCoefficient(), 0.001);
assertEquals(retryPolicy2.getNonRetryableErrorTypes(0), retryOptions.getDoNotRetry()[0]);
}
}

0 comments on commit 9174397

Please sign in to comment.