Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static final class Builder {
private Duration localRetryThreshold;
private Duration startToCloseTimeout;
private RetryOptions retryOptions;
private boolean doNotIncludeArgumentsIntoMarker;

/** Copy Builder fields from the options. */
private Builder(LocalActivityOptions options) {
Expand All @@ -60,7 +61,8 @@ private Builder(LocalActivityOptions options) {
this.scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
this.localRetryThreshold = options.getLocalRetryThreshold();
this.startToCloseTimeout = options.getStartToCloseTimeout();
this.retryOptions = options.retryOptions;
this.retryOptions = options.getRetryOptions();
this.doNotIncludeArgumentsIntoMarker = options.isDoNotIncludeArgumentsIntoMarker();
}

/** Overall timeout workflow is willing to wait for activity to complete. */
Expand Down Expand Up @@ -112,9 +114,28 @@ public Builder setMethodRetry(MethodRetry r) {
return this;
}

/**
* When set to true the serialized arguments of the local activity are not included into the
* Marker Event that stores local activity invocation result.
*
* <p>The serialized arguments are included only for human troubleshooting as they are never
* read by the SDK code. So in some cases it is worth not including them to reduce the history
* size.
*
* <p>Default is false.
*/
public Builder setDoNotIncludeArgumentsIntoMarker(boolean doNotIncludeArgumentsIntoMarker) {
this.doNotIncludeArgumentsIntoMarker = doNotIncludeArgumentsIntoMarker;
return this;
}

public LocalActivityOptions build() {
return new LocalActivityOptions(
startToCloseTimeout, localRetryThreshold, scheduleToCloseTimeout, retryOptions);
startToCloseTimeout,
localRetryThreshold,
scheduleToCloseTimeout,
retryOptions,
doNotIncludeArgumentsIntoMarker);
}

public LocalActivityOptions validateAndBuildWithDefaults() {
Expand All @@ -126,24 +147,28 @@ public LocalActivityOptions validateAndBuildWithDefaults() {
startToCloseTimeout,
localRetryThreshold,
scheduleToCloseTimeout,
RetryOptions.newBuilder(retryOptions).validateBuildWithDefaults());
RetryOptions.newBuilder(retryOptions).validateBuildWithDefaults(),
doNotIncludeArgumentsIntoMarker);
}
}

private final Duration scheduleToCloseTimeout;
private final Duration localRetryThreshold;
private final Duration startToCloseTimeout;
private final RetryOptions retryOptions;
private boolean doNotIncludeArgumentsIntoMarker;
Copy link
Contributor

@vitarb vitarb Apr 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. generally positive naming is preferable when possible, can we call it includeArgumentsIntoMarker instead and flip the logic, this way we can avoid code like !localActivityParameters.isDoNotIncludeArgumentsIntoMarker() which is not very readable cause it's a "not do not".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always follow the pattern that all properties defaults should match the type default. For a boolean the default value is false. That's why I named the property this way to make the false default value correct.


private LocalActivityOptions(
Duration startToCloseTimeout,
Duration localRetryThreshold,
Duration scheduleToCloseTimeout,
RetryOptions retryOptions) {
RetryOptions retryOptions,
boolean doNotIncludeArgumentsIntoMarker) {
this.localRetryThreshold = localRetryThreshold;
this.scheduleToCloseTimeout = scheduleToCloseTimeout;
this.startToCloseTimeout = startToCloseTimeout;
this.retryOptions = retryOptions;
this.doNotIncludeArgumentsIntoMarker = doNotIncludeArgumentsIntoMarker;
}

public Duration getScheduleToCloseTimeout() {
Expand All @@ -162,34 +187,49 @@ public RetryOptions getRetryOptions() {
return retryOptions;
}

public boolean isDoNotIncludeArgumentsIntoMarker() {
return doNotIncludeArgumentsIntoMarker;
}

public Builder toBuilder() {
return new Builder(this);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!(o instanceof LocalActivityOptions)) return false;
LocalActivityOptions that = (LocalActivityOptions) o;
return Objects.equal(scheduleToCloseTimeout, that.scheduleToCloseTimeout)
return doNotIncludeArgumentsIntoMarker == that.doNotIncludeArgumentsIntoMarker
&& Objects.equal(scheduleToCloseTimeout, that.scheduleToCloseTimeout)
&& Objects.equal(localRetryThreshold, that.localRetryThreshold)
&& Objects.equal(startToCloseTimeout, that.startToCloseTimeout)
&& Objects.equal(retryOptions, that.retryOptions);
}

@Override
public int hashCode() {
return Objects.hashCode(scheduleToCloseTimeout, startToCloseTimeout, retryOptions);
return Objects.hashCode(
scheduleToCloseTimeout,
localRetryThreshold,
startToCloseTimeout,
retryOptions,
doNotIncludeArgumentsIntoMarker);
}

@Override
public String toString() {
return "LocalActivityOptions{"
+ "scheduleToCloseTimeout="
+ scheduleToCloseTimeout
+ ", localRetryThreshold="
+ localRetryThreshold
+ ", startToCloseTimeout="
+ startToCloseTimeout
+ ", retryOptions="
+ retryOptions
+ ", doNotIncludeArgumentsIntoMarker="
+ doNotIncludeArgumentsIntoMarker
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ public class ExecuteLocalActivityParameters {

private final PollActivityTaskQueueResponse.Builder activityTask;
private final Duration localRetryThreshold;
private boolean doNotIncludeArgumentsIntoMarker;

public ExecuteLocalActivityParameters(
PollActivityTaskQueueResponse.Builder activityTask, Duration localRetryThreshold) {
PollActivityTaskQueueResponse.Builder activityTask,
Duration localRetryThreshold,
boolean doNotIncludeArgumentsIntoMarker) {
this.activityTask = activityTask;
this.localRetryThreshold = localRetryThreshold;
this.doNotIncludeArgumentsIntoMarker = doNotIncludeArgumentsIntoMarker;
}

public PollActivityTaskQueueResponse.Builder getActivityTask() {
Expand All @@ -41,6 +45,10 @@ public Duration getLocalRetryThreshold() {
return localRetryThreshold;
}

public boolean isDoNotIncludeArgumentsIntoMarker() {
return doNotIncludeArgumentsIntoMarker;
}

@Override
public String toString() {
return "ExecuteLocalActivityParameters{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ final class LocalActivityStateMachine

static final String LOCAL_ACTIVITY_MARKER_NAME = "LocalActivity";
static final String MARKER_ACTIVITY_ID_KEY = "activityId";
static final String MARKER_ACTIVITY_TYPE_KEY = "type";
static final String MARKER_ACTIVITY_INPUT_KEY = "input";
static final String MARKER_ACTIVITY_RESULT_KEY = "result";
static final String MARKER_TIME_KEY = "time";
// Deprecated in favor of result. Still present for backwards compatibility.
static final String MARKER_DATA_KEY = "data";

private final DataConverter dataConverter = DataConverter.getDefaultInstance();
Expand Down Expand Up @@ -205,8 +209,10 @@ public void cancel() {

public void sendRequest() {
localActivityRequestSink.apply(localActivityParameters);
localActivityParameters =
null; // avoid retaining parameters for the duration of activity execution
if (localActivityParameters.isDoNotIncludeArgumentsIntoMarker()) {
// avoid retaining parameters for the duration of activity execution
localActivityParameters = null;
}
}

public void markAsSent() {
Expand All @@ -231,16 +237,23 @@ private void createMarker() {
markerAttributes.setMarkerName(LOCAL_ACTIVITY_MARKER_NAME);
Payloads id = dataConverter.toPayloads(activityId).get();
details.put(MARKER_ACTIVITY_ID_KEY, id);
Payloads type = dataConverter.toPayloads(activityType.getName()).get();
details.put(MARKER_ACTIVITY_TYPE_KEY, type);
// TODO(maxim): Consider using elapsed since start instead of Sytem.currentTimeMillis
long currentTime = setCurrentTimeCallback.apply(System.currentTimeMillis());
Payloads t = dataConverter.toPayloads(currentTime).get();
details.put(MARKER_TIME_KEY, t);
if (localActivityParameters != null
&& !localActivityParameters.isDoNotIncludeArgumentsIntoMarker()) {
details.put(
MARKER_ACTIVITY_INPUT_KEY, localActivityParameters.getActivityTask().getInput());
}
if (result.getTaskCompleted() != null) {
RespondActivityTaskCompletedRequest completed = result.getTaskCompleted();
if (completed.hasResult()) {
Payloads p = completed.getResult();
laResult = Optional.of(p);
details.put(MARKER_DATA_KEY, p);
details.put(MARKER_ACTIVITY_RESULT_KEY, p);
} else {
laResult = Optional.empty();
}
Expand Down Expand Up @@ -300,7 +313,12 @@ private void notifyResultFromEvent() {
callback.apply(null, attributes.getFailure());
return;
}
Optional<Payloads> fromMaker = Optional.ofNullable(map.get(MARKER_DATA_KEY));
Payloads result = map.get(MARKER_ACTIVITY_RESULT_KEY);
if (result == null) {
// Support old histories that used "data" as a key for "result".
result = map.get(MARKER_DATA_KEY);
}
Optional<Payloads> fromMaker = Optional.ofNullable(result);
callback.apply(fromMaker, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
if (localRetryThreshold == null) {
localRetryThreshold = context.getWorkflowTaskTimeout().multipliedBy(6);
}
return new ExecuteLocalActivityParameters(activityTask, localRetryThreshold);
return new ExecuteLocalActivityParameters(
activityTask, localRetryThreshold, options.isDoNotIncludeArgumentsIntoMarker());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

package io.temporal.internal.statemachines;

import static io.temporal.internal.statemachines.LocalActivityStateMachine.*;
import static org.junit.Assert.*;
import static io.temporal.internal.statemachines.LocalActivityStateMachine.LOCAL_ACTIVITY_MARKER_NAME;
import static io.temporal.internal.statemachines.LocalActivityStateMachine.MARKER_ACTIVITY_ID_KEY;
import static io.temporal.internal.statemachines.LocalActivityStateMachine.MARKER_ACTIVITY_RESULT_KEY;
import static io.temporal.internal.statemachines.LocalActivityStateMachine.MARKER_TIME_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import io.temporal.api.command.v1.Command;
import io.temporal.api.common.v1.ActivityType;
Expand Down Expand Up @@ -86,19 +91,22 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
PollActivityTaskQueueResponse.newBuilder()
.setActivityId("id1")
.setActivityType(ActivityType.newBuilder().setName("activity1")),
null);
null,
true);
ExecuteLocalActivityParameters parameters2 =
new ExecuteLocalActivityParameters(
PollActivityTaskQueueResponse.newBuilder()
.setActivityId("id2")
.setActivityType(ActivityType.newBuilder().setName("activity2")),
null);
null,
false);
ExecuteLocalActivityParameters parameters3 =
new ExecuteLocalActivityParameters(
PollActivityTaskQueueResponse.newBuilder()
.setActivityId("id3")
.setActivityType(ActivityType.newBuilder().setName("activity3")),
null);
null,
true);

builder
.<Optional<Payloads>, Failure>add2(
Expand Down Expand Up @@ -202,15 +210,15 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
.get(0)
.getRecordMarkerCommandAttributes()
.getDetailsMap()
.get(MARKER_DATA_KEY));
.get(MARKER_ACTIVITY_RESULT_KEY));
assertEquals("result2", converter.fromPayloads(0, dataActivity2, String.class, String.class));
Optional<Payloads> dataActivity3 =
Optional.of(
commands
.get(1)
.getRecordMarkerCommandAttributes()
.getDetailsMap()
.get(MARKER_DATA_KEY));
.get(MARKER_ACTIVITY_RESULT_KEY));
assertEquals("result3", converter.fromPayloads(0, dataActivity3, String.class, String.class));
}
{
Expand Down Expand Up @@ -239,7 +247,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
.get(0)
.getRecordMarkerCommandAttributes()
.getDetailsMap()
.get(MARKER_DATA_KEY));
.get(MARKER_ACTIVITY_RESULT_KEY));
assertEquals("result1", converter.fromPayloads(0, data, String.class, String.class));
assertEquals(
CommandType.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, commands.get(1).getCommandType());
Expand Down Expand Up @@ -271,7 +279,8 @@ protected void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
PollActivityTaskQueueResponse.newBuilder()
.setActivityId("id1")
.setActivityType(ActivityType.newBuilder().setName("activity1")),
null);
null,
false);
builder
.<Optional<Payloads>, Failure>add2(
(r, c) -> stateMachines.scheduleLocalActivityTask(parameters1, c))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@

package io.temporal.workflow;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.History;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowStub;
Expand Down Expand Up @@ -57,18 +52,8 @@ public void testBinaryChecksumSetWhenTaskCompleted() {
WorkflowClient.start(client::execute, testWorkflowRule.getTaskQueue());
WorkflowStub stub = WorkflowStub.fromTyped(client);
SDKTestWorkflowRule.waitForOKQuery(stub);
History history = testWorkflowRule.getWorkflowExecutionHistory(execution);

boolean foundCompletedTask = false;
for (HistoryEvent event : history.getEventsList()) {
if (event.getEventType() == EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED) {
assertEquals(
SDKTestWorkflowRule.BINARY_CHECKSUM,
event.getWorkflowTaskCompletedEventAttributes().getBinaryChecksum());
foundCompletedTask = true;
}
}
assertTrue(foundCompletedTask);
testWorkflowRule.assertHistoryEvent(execution, EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED);
}

public static class SimpleTestWorkflow implements TestWorkflows.TestWorkflow1 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.History;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.client.WorkflowClient;
import io.temporal.internal.common.SearchAttributesUtil;
import io.temporal.testing.TracingWorkerInterceptor;
Expand Down Expand Up @@ -64,16 +62,8 @@ public void testUpsertSearchAttributes() {
"upsertSearchAttributes",
"executeActivity Activity",
"activity Activity");
History history = testWorkflowRule.getWorkflowExecutionHistory(execution);

boolean found = false;
for (HistoryEvent event : history.getEventsList()) {
if (EventType.EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES == event.getEventType()) {
found = true;
break;
}
}
Assert.assertTrue("EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES found in the history", found);
testWorkflowRule.assertHistoryEvent(
execution, EventType.EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES);
}

@WorkflowInterface
Expand Down
Loading