Skip to content

Commit

Permalink
Added supports contextPropagators for localActivity. (#515)
Browse files Browse the repository at this point in the history
  • Loading branch information
KovaliovNA authored and vancexu committed Aug 20, 2020
1 parent 2141e1d commit b457d0c
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 5 deletions.
21 changes: 18 additions & 3 deletions src/main/java/com/uber/cadence/activity/LocalActivityOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import com.uber.cadence.common.MethodRetry;
import com.uber.cadence.common.RetryOptions;
import com.uber.cadence.context.ContextPropagator;
import java.time.Duration;
import java.util.List;
import java.util.Objects;

/** Options used to configure how an local activity is invoked. */
Expand Down Expand Up @@ -50,12 +52,14 @@ public static LocalActivityOptions merge(
ActivityOptions.mergeDuration(
a.scheduleToCloseTimeoutSeconds(), o.getScheduleToCloseTimeout()))
.setRetryOptions(RetryOptions.merge(r, o.getRetryOptions()))
.setContextPropagators(o.getContextPropagators())
.validateAndBuildWithDefaults();
}

public static final class Builder {
private Duration scheduleToCloseTimeout;
private RetryOptions retryOptions;
private List<ContextPropagator> contextPropagators;

public Builder() {}

Expand Down Expand Up @@ -83,25 +87,32 @@ public Builder setRetryOptions(RetryOptions retryOptions) {
return this;
}

public Builder setContextPropagators(List<ContextPropagator> contextPropagators) {
this.contextPropagators = contextPropagators;
return this;
}

public LocalActivityOptions build() {
return new LocalActivityOptions(scheduleToCloseTimeout, retryOptions);
return new LocalActivityOptions(scheduleToCloseTimeout, retryOptions, contextPropagators);
}

public LocalActivityOptions validateAndBuildWithDefaults() {
RetryOptions ro = null;
if (retryOptions != null) {
ro = new RetryOptions.Builder(retryOptions).validateBuildWithDefaults();
}
return new LocalActivityOptions(roundUpToSeconds(scheduleToCloseTimeout), ro);
return new LocalActivityOptions(roundUpToSeconds(scheduleToCloseTimeout), ro, contextPropagators);
}
}

private final Duration scheduleToCloseTimeout;
private final RetryOptions retryOptions;
private final List<ContextPropagator> contextPropagators;

private LocalActivityOptions(Duration scheduleToCloseTimeout, RetryOptions retryOptions) {
private LocalActivityOptions(Duration scheduleToCloseTimeout, RetryOptions retryOptions, List<ContextPropagator> contextPropagators) {
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
this.retryOptions = retryOptions;
this.contextPropagators = contextPropagators;
}

public Duration getScheduleToCloseTimeout() {
Expand All @@ -112,6 +123,10 @@ public RetryOptions getRetryOptions() {
return retryOptions;
}

public List<ContextPropagator> getContextPropagators() {
return contextPropagators;
}

@Override
public String toString() {
return "LocalActivityOptions{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.common.RetryOptions;
import java.util.Arrays;
import java.util.Map;

public class ExecuteLocalActivityParameters {

Expand All @@ -33,6 +34,7 @@ public class ExecuteLocalActivityParameters {
private RetryOptions retryOptions;
private long elapsedTime;
private int attempt;
private Map<String, byte[]> context;

public ExecuteLocalActivityParameters() {}

Expand Down Expand Up @@ -230,6 +232,14 @@ public void setWorkflowExecution(WorkflowExecution workflowExecution) {
this.workflowExecution = workflowExecution;
}

public Map<String, byte[]> getContext() {
return context;
}

public void setContext(Map<String, byte[]> context) {
this.context = context;
}

@Override
public String toString() {
return "ExecuteLocalActivityParameters{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,11 @@ private ExecuteActivityParameters constructExecuteActivityParameters(

private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
String name, LocalActivityOptions options, byte[] input, long elapsed, int attempt) {
ExecuteLocalActivityParameters parameters = new ExecuteLocalActivityParameters();
parameters
ExecuteLocalActivityParameters parameters = new ExecuteLocalActivityParameters()
.withActivityType(new ActivityType().setName(name))
.withInput(input)
.withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds());

RetryOptions retryOptions = options.getRetryOptions();
if (retryOptions != null) {
parameters.setRetryOptions(retryOptions);
Expand All @@ -336,6 +336,11 @@ private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
parameters.setElapsedTime(elapsed);
parameters.setWorkflowDomain(this.context.getDomain());
parameters.setWorkflowExecution(this.context.getWorkflowExecution());

List<ContextPropagator> propagators = Optional.ofNullable(options.getContextPropagators())
.orElse(contextPropagators);
parameters.setContext(extractContextsAndConvertToBytes(propagators));

return parameters;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -163,6 +164,8 @@ private TaskHandlerImpl(ActivityTaskHandler handler) {

@Override
public void handle(Task task) throws Exception {
propagateContext(task.params);

task.taskStartTime = System.currentTimeMillis();
ActivityTaskHandler.Result result = handleLocalActivity(task);

Expand Down Expand Up @@ -256,4 +259,19 @@ private ActivityTaskHandler.Result handleLocalActivity(Task task) throws Interru
}
}
}

private void propagateContext(ExecuteLocalActivityParameters params) {
if (options.getContextPropagators() == null || options.getContextPropagators().isEmpty()) {
return;
}

Optional.ofNullable(params.getContext())
.filter(context -> !context.isEmpty())
.ifPresent(this::restoreContext);
}

private void restoreContext(Map<String, byte[]> context) {
options.getContextPropagators()
.forEach(propagator -> propagator.setCurrentContext(propagator.deserializeContext(context)));
}
}
Loading

0 comments on commit b457d0c

Please sign in to comment.