Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -187,7 +186,7 @@ private void timerCancelled(long startEventId, Exception reason) {
}

byte[] sideEffect(Func<byte[]> func) {
decisions.addAllMissingVersionMarker(false, Optional.empty());
decisions.addAllMissingVersionMarker();
long sideEffectEventId = decisions.getNextDecisionEventId();
byte[] result;
if (replaying) {
Expand Down Expand Up @@ -216,7 +215,7 @@ byte[] sideEffect(Func<byte[]> func) {
*/
Optional<byte[]> mutableSideEffect(
String id, DataConverter converter, Func1<Optional<byte[]>, Optional<byte[]>> func) {
decisions.addAllMissingVersionMarker(false, Optional.empty());
decisions.addAllMissingVersionMarker();
return mutableSideEffectHandler.handle(id, converter, func);
}

Expand Down Expand Up @@ -281,14 +280,24 @@ private void handleLocalActivityMarker(MarkerRecordedEventAttributes attributes)
}
}

/**
* During replay getVersion should account for the following situations at the current eventId.
*
* <ul>
* <li>There is correspondent Marker with the same changeId: return version from the marker.
* <li>There is no Marker with the same changeId: return DEFAULT_VERSION,
* <li>There is marker with a different changeId (possibly more than one) and the marker with
* matching changeId follows them: add fake decisions for all the version markers that
* precede the matching one as the correspondent getVersion calls were removed
* <li>There is marker with a different changeId (possibly more than one) and no marker with
* matching changeId follows them: return DEFAULT_VERSION as it looks like the getVersion
* was added after that part of code has executed
* <li>Another case is when there is no call to getVersion and there is a version marker: insert
* fake decisions for all version markers up to the event that caused the lookup.
* </ul>
*/
int getVersion(String changeId, DataConverter converter, int minSupported, int maxSupported) {
Predicate<MarkerRecordedEventAttributes> changeIdEquals =
(attributes) -> {
MarkerHandler.MarkerInterface markerData =
MarkerHandler.MarkerInterface.fromEventAttributes(attributes, converter);
return markerData.getId().equals(changeId);
};
decisions.addAllMissingVersionMarker(true, Optional.of(changeIdEquals));
decisions.addAllMissingVersionMarker(Optional.of(changeId), Optional.of(converter));

Optional<byte[]> result =
versionHandler.handle(
Expand Down
125 changes: 77 additions & 48 deletions src/main/java/io/temporal/internal/replay/DecisionsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.temporal.internal.replay;

import io.temporal.common.converter.DataConverter;
import io.temporal.internal.common.OptionsUtils;
import io.temporal.internal.common.WorkflowExecutionUtils;
import io.temporal.internal.replay.HistoryHelper.DecisionEvents;
Expand Down Expand Up @@ -52,7 +53,6 @@
import io.temporal.proto.event.EventType;
import io.temporal.proto.event.ExternalWorkflowExecutionCancelRequestedEventAttributes;
import io.temporal.proto.event.HistoryEvent;
import io.temporal.proto.event.MarkerRecordedEventAttributes;
import io.temporal.proto.event.RequestCancelActivityTaskFailedEventAttributes;
import io.temporal.proto.event.RequestCancelExternalWorkflowExecutionFailedEventAttributes;
import io.temporal.proto.event.StartChildWorkflowExecutionFailedEventAttributes;
Expand All @@ -70,7 +70,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;

class DecisionsHelper {

Expand Down Expand Up @@ -117,7 +116,7 @@ long getNextDecisionEventId() {
}

long scheduleActivityTask(ScheduleActivityTaskDecisionAttributes schedule) {
addAllMissingVersionMarker(false, Optional.empty());
addAllMissingVersionMarker();

long nextDecisionEventId = getNextDecisionEventId();
DecisionId decisionId = new DecisionId(DecisionTarget.ACTIVITY, nextDecisionEventId);
Expand Down Expand Up @@ -198,7 +197,7 @@ boolean handleRequestCancelActivityTaskFailed(HistoryEvent event) {
}

long startChildWorkflowExecution(StartChildWorkflowExecutionDecisionAttributes childWorkflow) {
addAllMissingVersionMarker(false, Optional.empty());
addAllMissingVersionMarker();

long nextDecisionEventId = getNextDecisionEventId();
DecisionId decisionId = new DecisionId(DecisionTarget.CHILD_WORKFLOW, nextDecisionEventId);
Expand Down Expand Up @@ -269,7 +268,7 @@ boolean handleStartChildWorkflowExecutionFailed(HistoryEvent event) {
*/
long requestCancelExternalWorkflowExecution(
RequestCancelExternalWorkflowExecutionDecisionAttributes schedule) {
addAllMissingVersionMarker(false, Optional.empty());
addAllMissingVersionMarker();

long nextDecisionEventId = getNextDecisionEventId();
DecisionId decisionId =
Expand Down Expand Up @@ -306,7 +305,7 @@ void handleRequestCancelExternalWorkflowExecutionFailed(HistoryEvent event) {
}

long signalExternalWorkflowExecution(SignalExternalWorkflowExecutionDecisionAttributes signal) {
addAllMissingVersionMarker(false, Optional.empty());
addAllMissingVersionMarker();

long nextDecisionEventId = getNextDecisionEventId();
DecisionId decisionId =
Expand Down Expand Up @@ -339,7 +338,7 @@ boolean handleExternalWorkflowExecutionSignaled(long initiatedEventId) {
}

long startTimer(StartTimerDecisionAttributes request) {
addAllMissingVersionMarker(false, Optional.empty());
addAllMissingVersionMarker();

long startEventId = getNextDecisionEventId();
DecisionId decisionId = new DecisionId(DecisionTarget.TIMER, startEventId);
Expand Down Expand Up @@ -461,7 +460,7 @@ public void handleWorkflowExecutionCompleted(HistoryEvent event) {
}

void completeWorkflowExecution(byte[] output) {
addAllMissingVersionMarker(false, Optional.empty());
addAllMissingVersionMarker();

Decision decision =
Decision.newBuilder()
Expand All @@ -475,7 +474,7 @@ void completeWorkflowExecution(byte[] output) {
}

void continueAsNewWorkflowExecution(ContinueAsNewWorkflowExecutionParameters continueParameters) {
addAllMissingVersionMarker(false, Optional.empty());
addAllMissingVersionMarker();

HistoryEvent firstEvent = task.getHistory().getEvents(0);
if (!firstEvent.hasWorkflowExecutionStartedEventAttributes()) {
Expand Down Expand Up @@ -522,7 +521,7 @@ void continueAsNewWorkflowExecution(ContinueAsNewWorkflowExecutionParameters con
}

void failWorkflowExecution(WorkflowExecutionException failure) {
addAllMissingVersionMarker(false, Optional.empty());
addAllMissingVersionMarker();

Decision decision =
Decision.newBuilder()
Expand All @@ -541,7 +540,7 @@ void failWorkflowExecution(WorkflowExecutionException failure) {
* CancelWorkflowExecution was created.
*/
void cancelWorkflowExecution() {
addAllMissingVersionMarker(false, Optional.empty());
addAllMissingVersionMarker();

Decision decision =
Decision.newBuilder()
Expand Down Expand Up @@ -685,60 +684,90 @@ private void addDecision(DecisionId decisionId, DecisionStateMachine decision) {
nextDecisionEventId++;
}

// This is to support the case where a getVersion call presents during workflow execution but
// is removed in replay.
void addAllMissingVersionMarker(
boolean isNextDecisionVersionMarker,
Optional<Predicate<MarkerRecordedEventAttributes>> isDifferentChange) {
boolean added;
do {
added = addMissingVersionMarker(isNextDecisionVersionMarker, isDifferentChange);
} while (added);
void addAllMissingVersionMarker() {
addAllMissingVersionMarker(Optional.empty(), Optional.empty());
}

private boolean addMissingVersionMarker(
boolean isNextDecisionVersionMarker,
Optional<Predicate<MarkerRecordedEventAttributes>> changeIdEquals) {
Optional<HistoryEvent> optionalEvent = getOptionalDecisionEvent(nextDecisionEventId);
Optional<HistoryEvent> getVersionMakerEvent(long eventId) {
Optional<HistoryEvent> optionalEvent = getOptionalDecisionEvent(eventId);
if (!optionalEvent.isPresent()) {
return false;
return Optional.empty();
}

HistoryEvent event = optionalEvent.get();
if (event.getEventType() != EventType.MarkerRecorded) {
return false;
return Optional.empty();
}

if (!event
.getMarkerRecordedEventAttributes()
.getMarkerName()
.equals(ClockDecisionContext.VERSION_MARKER_NAME)) {
return false;
return Optional.empty();
}
return Optional.of(event);
}

// Next decision is for version marker and the event is for the same.
if (isNextDecisionVersionMarker
&& (!changeIdEquals.isPresent()
|| changeIdEquals.get().test(event.getMarkerRecordedEventAttributes()))) {
return false;
/**
* As getVersion calls can be added and removed any time this method inserts missing decision
* events that correspond to removed getVersion calls.
*
* @param changeId optional getVersion change id to compare
* @param converter must be present if changeId is present
*/
void addAllMissingVersionMarker(Optional<String> changeId, Optional<DataConverter> converter) {
Optional<HistoryEvent> markerEvent = getVersionMakerEvent(nextDecisionEventId);

if (!markerEvent.isPresent()) {
return;
}

// If we have a version marker in history event but not in decisions, let's add one.
RecordMarkerDecisionAttributes.Builder marker =
RecordMarkerDecisionAttributes.newBuilder()
.setMarkerName(ClockDecisionContext.VERSION_MARKER_NAME)
.setHeader(event.getMarkerRecordedEventAttributes().getHeader())
.setDetails(event.getMarkerRecordedEventAttributes().getDetails());
Decision markerDecision =
Decision.newBuilder()
.setDecisionType(DecisionType.RecordMarker)
.setRecordMarkerDecisionAttributes(marker)
.build();
DecisionId markerDecisionId = new DecisionId(DecisionTarget.MARKER, nextDecisionEventId);
decisions.put(
markerDecisionId, new MarkerDecisionStateMachine(markerDecisionId, markerDecision));
nextDecisionEventId++;
return true;
// Look ahead to see if there is a marker with changeId following current version marker
// If it is the case then all the markers that precede it should be added as decisions
// as their correspondent getVersion calls were removed.
long changeIdMarkerEventId = -1;
if (changeId.isPresent()) {
String id = changeId.get();
long eventId = nextDecisionEventId;
while (true) {
MarkerHandler.MarkerInterface markerData =
MarkerHandler.MarkerInterface.fromEventAttributes(
markerEvent.get().getMarkerRecordedEventAttributes(), converter.get());

if (id.equals(markerData.getId())) {
changeIdMarkerEventId = eventId;
break;
}
eventId++;
markerEvent = getVersionMakerEvent(eventId);
if (!markerEvent.isPresent()) {
break;
}
}
// There are no version markers preceding a marker with the changeId
if (changeIdMarkerEventId < 0 || changeIdMarkerEventId == nextDecisionEventId) {
return;
}
}
do {
// If we have a version marker in history event but not in decisions, let's add one.
RecordMarkerDecisionAttributes.Builder attributes =
RecordMarkerDecisionAttributes.newBuilder()
.setMarkerName(ClockDecisionContext.VERSION_MARKER_NAME)
.setHeader(markerEvent.get().getMarkerRecordedEventAttributes().getHeader())
.setDetails(markerEvent.get().getMarkerRecordedEventAttributes().getDetails());
Decision markerDecision =
Decision.newBuilder()
.setDecisionType(DecisionType.RecordMarker)
.setRecordMarkerDecisionAttributes(attributes)
.build();
DecisionId markerDecisionId = new DecisionId(DecisionTarget.MARKER, nextDecisionEventId);
decisions.put(
markerDecisionId, new MarkerDecisionStateMachine(markerDecisionId, markerDecision));
nextDecisionEventId++;
markerEvent = getVersionMakerEvent(nextDecisionEventId);
} while (markerEvent.isPresent()
&& (changeIdMarkerEventId < 0 || nextDecisionEventId < changeIdMarkerEventId));
}

private DecisionStateMachine getDecision(DecisionId decisionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ private Result createCompletedRequest(
.putAllQueryResults(result.getQueryResults())
.setForceCreateNewDecisionTask(result.getForceCreateNewDecisionTask());

if (stickyTaskListName != null) {
if (stickyTaskListName != null && !stickyTaskListScheduleToStartTimeout.isZero()) {
StickyExecutionAttributes.Builder attributes =
StickyExecutionAttributes.newBuilder()
.setWorkerTaskList(createStickyTaskList(stickyTaskListName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class POJOActivityTaskHandler implements ActivityTaskHandler {

private static final Logger log = LoggerFactory.getLogger(POJOActivityTaskHandler.class);

private final DataConverter dataConverter;
private final ScheduledExecutorService heartbeatExecutor;
private final Map<String, ActivityTaskExecutor> activities =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static WorkerFactoryOptions getDefaultInstance() {
}

public static class Builder {
private int stickyDecisionScheduleToStartTimeoutInSeconds;
private int stickyDecisionScheduleToStartTimeoutInSeconds = 10;
private int cacheMaximumSize;
private int maxWorkflowThreadCount;
private WorkflowInterceptor workflowInterceptor;
Expand Down Expand Up @@ -144,9 +144,6 @@ private WorkerFactoryOptions(
if (maxWorkflowThreadCount <= 0) {
maxWorkflowThreadCount = 600;
}
if (stickyDecisionScheduleToStartTimeoutInSeconds <= 0) {
stickyDecisionScheduleToStartTimeoutInSeconds = 5;
}
if (workflowInterceptor == null) {
workflowInterceptor = new NoopWorkflowInterceptor();
}
Expand Down
Loading