Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.temporal.worker

import io.temporal.activity.ActivityOptions
import io.temporal.activity.LocalActivityOptions
import io.temporal.common.metadata.activityName
import io.temporal.kotlin.TemporalDsl

Expand Down Expand Up @@ -70,3 +71,42 @@ inline fun @TemporalDsl WorkflowImplementationOptions.Builder.setDefaultActivity
) {
setDefaultActivityOptions(ActivityOptions(defaultActivityOptions))
}

/**
* Set individual Local Activity options per `activityType`.
*
* The [activityName] method could be used resolve activity method references to activity names:
*
* ```kotlin
* val options = WorkflowImplementationOptions {
* // ...
* setLocalActivityOptions(
* localActivityName(Activity1::method1) to LocalActivityOptions {
* // options for local activity method1
* },
* localActivityName(Activity2::method2) to LocalActivityOptions {
* // options for local activity method2
* },
* )
* }
* ```
*
* @param localActivityOptions map from activityType to [LocalActivityOptions]
* @see WorkflowImplementationOptions.Builder.setLocalActivityOptions
* @see WorkflowImplementationOptions.getLocalActivityOptions
*/
fun WorkflowImplementationOptions.Builder.setLocalActivityOptions(
vararg localActivityOptions: Pair<String, LocalActivityOptions>
) {
setLocalActivityOptions(localActivityOptions.toMap())
}

/**
* @see WorkflowImplementationOptions.Builder.setDefaultLocalActivityOptions
* @see WorkflowImplementationOptions.getDefaultLocalActivityOptions
*/
inline fun @TemporalDsl WorkflowImplementationOptions.Builder.setDefaultLocalActivityOptions(
defaultLocalActivityOptions: @TemporalDsl LocalActivityOptions.Builder.() -> Unit
) {
setDefaultLocalActivityOptions(LocalActivityOptions(defaultLocalActivityOptions))
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ final class SyncWorkflowContext implements WorkflowOutboundCallsInterceptor {

private ActivityOptions defaultActivityOptions = null;
private Map<String, ActivityOptions> activityOptionsMap = new HashMap<>();
private LocalActivityOptions defaultLocalActivityOptions = null;
private Map<String, LocalActivityOptions> localActivityOptionsMap = new HashMap<>();

public SyncWorkflowContext(
ReplayWorkflowContext context,
Expand Down Expand Up @@ -128,6 +130,9 @@ public SyncWorkflowContext(
if (workflowImplementationOptions != null) {
this.defaultActivityOptions = workflowImplementationOptions.getDefaultActivityOptions();
this.activityOptionsMap = workflowImplementationOptions.getActivityOptions();
this.defaultLocalActivityOptions =
workflowImplementationOptions.getDefaultLocalActivityOptions();
this.localActivityOptionsMap = workflowImplementationOptions.getLocalActivityOptions();
}
// initial values for headInboundInterceptor and headOutboundInterceptor until they initialized
// with actual interceptors through #initHeadInboundCallsInterceptor and
Expand Down Expand Up @@ -176,6 +181,14 @@ public Map<String, ActivityOptions> getActivityOptions() {
return activityOptionsMap;
}

public LocalActivityOptions getDefaultLocalActivityOptions() {
return defaultLocalActivityOptions;
}

public Map<String, LocalActivityOptions> getLocalActivityOptions() {
return localActivityOptionsMap;
}

public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) {
this.defaultActivityOptions =
(this.defaultActivityOptions == null)
Expand All @@ -198,6 +211,29 @@ public void setActivityOptions(Map<String, ActivityOptions> activityMethodOption
key, value, (o1, o2) -> o1.toBuilder().mergeActivityOptions(o2).build()));
}

public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) {
this.defaultLocalActivityOptions =
(this.defaultLocalActivityOptions == null)
? defaultLocalActivityOptions
: this.defaultLocalActivityOptions
.toBuilder()
.mergeActivityOptions(defaultLocalActivityOptions)
.build();
}

public void setLocalActivityOptions(
Map<String, LocalActivityOptions> localActivityMethodOptions) {
Objects.requireNonNull(localActivityMethodOptions);
if (this.localActivityOptionsMap == null) {
this.localActivityOptionsMap = new HashMap<>(localActivityMethodOptions);
return;
}
localActivityMethodOptions.forEach(
(key, value) ->
this.localActivityOptionsMap.merge(
key, value, (o1, o2) -> o1.toBuilder().mergeActivityOptions(o2).build()));
}

@Override
public <T> ActivityOutput<T> executeActivity(ActivityInput<T> input) {
Optional<Payloads> args = converter.toPayloads(input.getArgs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,15 @@ public static void setActivityOptions(Map<String, ActivityOptions> activityMetho
getRootWorkflowContext().setActivityOptions(activityMethodOptions);
}

public static void setDefaultLocalActivityOptions(LocalActivityOptions localActivityOptions) {
getRootWorkflowContext().setDefaultLocalActivityOptions(localActivityOptions);
}

public static void setLocalActivityOptions(
Map<String, LocalActivityOptions> localActivityMethodOptions) {
getRootWorkflowContext().setLocalActivityOptions(localActivityMethodOptions);
}

/**
* Creates client stub to activities that implement given interface.
*
Expand Down Expand Up @@ -249,11 +258,26 @@ public static <T> T newLocalActivityStub(
Class<T> activityInterface,
LocalActivityOptions options,
Map<String, LocalActivityOptions> activityMethodOptions) {
// Merge the activity options we may have received from the workflow with the options we may
// have received in WorkflowImplementationOptions.
SyncWorkflowContext context = getRootWorkflowContext();
options = (options == null) ? context.getDefaultLocalActivityOptions() : options;
Map<String, LocalActivityOptions> mergedLocalActivityOptionsMap = new HashMap<>();
Map<String, LocalActivityOptions> localActivityOptions = context.getLocalActivityOptions();
if (localActivityOptions != null) {
mergedLocalActivityOptionsMap.putAll(localActivityOptions);
}
if (activityMethodOptions != null) {
activityMethodOptions.forEach(
(key, value) ->
mergedLocalActivityOptionsMap.merge(
key, value, (o1, o2) -> o1.toBuilder().mergeActivityOptions(o2).build()));
}
InvocationHandler invocationHandler =
LocalActivityInvocationHandler.newInstance(
activityInterface,
options,
activityMethodOptions,
mergedLocalActivityOptionsMap,
WorkflowInternal.getWorkflowInterceptor());
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.temporal.worker;

import io.temporal.activity.ActivityOptions;
import io.temporal.activity.LocalActivityOptions;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -46,6 +47,8 @@ public static final class Builder {
private Class<? extends Throwable>[] failWorkflowExceptionTypes;
private Map<String, ActivityOptions> activityOptions;
private ActivityOptions defaultActivityOptions;
private Map<String, LocalActivityOptions> localActivityOptions;
private LocalActivityOptions defaultLocalActivityOptions;

private Builder() {}

Expand Down Expand Up @@ -96,25 +99,60 @@ public Builder setDefaultActivityOptions(ActivityOptions defaultActivityOptions)
return this;
}

/**
* Set individual local activity options per activityType. Will be merged with the map from
* {@link io.temporal.workflow.Workflow#newLocalActivityStub(Class, LocalActivityOptions, Map)}
* which has highest precedence.
*
* @param localActivityOptions map from activityType to ActivityOptions
*/
public Builder setLocalActivityOptions(Map<String, LocalActivityOptions> localActivityOptions) {
this.localActivityOptions = new HashMap<>(Objects.requireNonNull(localActivityOptions));
return this;
}

/**
* These local activity options have the lowest precedence across all local activity options.
* Will be overwritten entirely by {@link
* io.temporal.workflow.Workflow#newLocalActivityStub(Class, LocalActivityOptions)} and then by
* the individual local activity options if any are set through {@link
* #setLocalActivityOptions(Map)}
*
* @param defaultLocalActivityOptions ActivityOptions for all activities in the workflow.
*/
public Builder setDefaultLocalActivityOptions(
LocalActivityOptions defaultLocalActivityOptions) {
this.defaultLocalActivityOptions = Objects.requireNonNull(defaultLocalActivityOptions);
return this;
}

public WorkflowImplementationOptions build() {
return new WorkflowImplementationOptions(
failWorkflowExceptionTypes == null ? new Class[0] : failWorkflowExceptionTypes,
activityOptions == null ? new HashMap<>() : activityOptions,
defaultActivityOptions);
defaultActivityOptions,
localActivityOptions == null ? new HashMap<>() : localActivityOptions,
defaultLocalActivityOptions);
}
}

private final Class<? extends Throwable>[] failWorkflowExceptionTypes;
private final Map<String, ActivityOptions> activityOptions;
private final ActivityOptions defaultActivityOptions;
private final Map<String, LocalActivityOptions> localActivityOptions;
private final LocalActivityOptions defaultLocalActivityOptions;

public WorkflowImplementationOptions(
Class<? extends Throwable>[] failWorkflowExceptionTypes,
Map<String, ActivityOptions> activityOptions,
ActivityOptions defaultActivityOptions) {
ActivityOptions defaultActivityOptions,
Map<String, LocalActivityOptions> localActivityOptions,
LocalActivityOptions defaultLocalActivityOptions) {
this.failWorkflowExceptionTypes = failWorkflowExceptionTypes;
this.activityOptions = activityOptions;
this.defaultActivityOptions = defaultActivityOptions;
this.localActivityOptions = localActivityOptions;
this.defaultLocalActivityOptions = defaultLocalActivityOptions;
}

public Class<? extends Throwable>[] getFailWorkflowExceptionTypes() {
Expand All @@ -129,6 +167,14 @@ public ActivityOptions getDefaultActivityOptions() {
return defaultActivityOptions;
}

public Map<String, LocalActivityOptions> getLocalActivityOptions() {
return localActivityOptions;
}

public LocalActivityOptions getDefaultLocalActivityOptions() {
return defaultLocalActivityOptions;
}

@Override
public String toString() {
return "WorkflowImplementationOptions{"
Expand All @@ -138,6 +184,10 @@ public String toString() {
+ activityOptions
+ ", defaultActivityOptions="
+ defaultActivityOptions
+ ", localActivityOptions="
+ localActivityOptions
+ ", defaultLocalActivityOptions="
+ defaultLocalActivityOptions
+ '}';
}

Expand All @@ -148,12 +198,19 @@ public boolean equals(Object o) {
WorkflowImplementationOptions that = (WorkflowImplementationOptions) o;
return Arrays.equals(failWorkflowExceptionTypes, that.failWorkflowExceptionTypes)
&& Objects.equals(activityOptions, that.activityOptions)
&& Objects.equals(defaultActivityOptions, that.defaultActivityOptions);
&& Objects.equals(defaultActivityOptions, that.defaultActivityOptions)
&& Objects.equals(localActivityOptions, that.localActivityOptions)
&& Objects.equals(defaultLocalActivityOptions, that.defaultLocalActivityOptions);
}

@Override
public int hashCode() {
int result = Objects.hash(activityOptions, defaultActivityOptions);
int result =
Objects.hash(
activityOptions,
defaultActivityOptions,
localActivityOptions,
defaultLocalActivityOptions);
result = 31 * result + Arrays.hashCode(failWorkflowExceptionTypes);
return result;
}
Expand Down
10 changes: 10 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ public static void setActivityOptions(Map<String, ActivityOptions> activityMetho
WorkflowInternal.setActivityOptions(activityMethodOptions);
}

public static void setDefaultLocalActivityOptions(
LocalActivityOptions defaultLocalActivityOptions) {
WorkflowInternal.setDefaultLocalActivityOptions(defaultLocalActivityOptions);
}

public static void setLocalActivityOptions(
Map<String, LocalActivityOptions> localActivityMethodOptions) {
WorkflowInternal.setLocalActivityOptions(localActivityMethodOptions);
}

/**
* Creates client stub to activities that implement given interface. `
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,23 @@ public static ActivityOptions newActivityOptions2() {
.setContextPropagators(null)
.build();
}

public static LocalActivityOptions newLocalActivityOptions1() {
return LocalActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofDays(1))
.setStartToCloseTimeout(Duration.ofSeconds(2))
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
.setLocalRetryThreshold(Duration.ofSeconds(1))
.setDoNotIncludeArgumentsIntoMarker(true)
.build();
}

public static LocalActivityOptions newLocalActivityOptions2() {
return LocalActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofDays(3))
.setStartToCloseTimeout(Duration.ofSeconds(3))
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(2).build())
.setLocalRetryThreshold(Duration.ofSeconds(3))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,22 @@
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

public class DefaultActivityOptionsOnWorkflowNotSetTest {
private final ActivityOptions defaultOps = SDKTestOptions.newActivityOptions20sScheduleToClose();
private final LocalActivityOptions defaultLocalOps =
SDKTestOptions.newLocalActivityOptions20sScheduleToClose();

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(
WorkflowImplementationOptions.getDefaultInstance(),
TestSetNullActivityOptionsWorkflowImpl.class)
.setActivityImplementations(new TestActivityImpl())
.setActivityImplementations(new TestActivityImpl(), new LocalActivityTestImpl())
.build();

@Test
Expand All @@ -67,15 +70,47 @@ public void testDefaultActivityOptionsNotSetTest() {
defaultOps.getScheduleToCloseTimeout(), activity2Values.get("StartToCloseTimeout"));
}

@Ignore("Pending fix to Local Activity cancellations to use startToCloseTimeout") // TODO
@Test
public void testDefaultLocalActivityOptionsNotSetTest() {
TestWorkflowReturnMap workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflowReturnMap.class);
Map<String, Map<String, Duration>> result = workflowStub.execute();

// Check that both activities have options passed in the stub.
Map<String, Duration> localActivity1Values = result.get("LocalActivity1");
Assert.assertEquals(
defaultLocalOps.getScheduleToCloseTimeout(),
localActivity1Values.get("ScheduleToCloseTimeout"));
// If not set, Temporal service sets to ScheduleToCloseTimeout value
Assert.assertEquals(
defaultLocalOps.getScheduleToCloseTimeout(),
localActivity1Values.get("StartToCloseTimeout"));

Map<String, Duration> localActivity2Values = result.get("LocalActivity2");
Assert.assertEquals(
defaultLocalOps.getScheduleToCloseTimeout(),
localActivity2Values.get("ScheduleToCloseTimeout"));
// If not set, Temporal service sets to ScheduleToCloseTimeout value
Assert.assertEquals(
defaultLocalOps.getScheduleToCloseTimeout(),
localActivity2Values.get("StartToCloseTimeout"));
}

public static class TestSetNullActivityOptionsWorkflowImpl implements TestWorkflowReturnMap {
@Override
public Map<String, Map<String, Duration>> execute() {
Map<String, Map<String, Duration>> result = new HashMap<>();
TestActivity activities =
Workflow.newActivityStub(
TestActivity.class, SDKTestOptions.newActivityOptions20sScheduleToClose());
LocalActivityTest localActivities =
Workflow.newLocalActivityStub(
LocalActivityTest.class, SDKTestOptions.newLocalActivityOptions20sScheduleToClose());
result.put("Activity1", activities.activity1());
result.put("Activity2", activities.activity2());
result.put("LocalActivity1", localActivities.localActivity1());
result.put("LocalActivity2", localActivities.localActivity2());
return result;
}
}
Expand Down
Loading