Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,21 @@ void mutableSideEffect(
Func1<Optional<Payloads>, Optional<Payloads>> func,
Functions.Proc1<Optional<Payloads>> callback);

default Integer getVersion(
String changeId,
int minSupported,
int maxSupported,
Functions.Proc2<Integer, RuntimeException> callback) {
return getVersion(
changeId,
minSupported,
maxSupported,
(version, exception) -> {
callback.apply(version, exception);
return true;
});
}

/**
* GetVersion is used to safely perform backwards incompatible changes to workflow definitions. It
* is not allowed to update workflow code while there are workflows running as it is going to
Expand All @@ -278,14 +293,14 @@ void mutableSideEffect(
* @param changeId identifier of a particular change
* @param minSupported min version supported for the change
* @param maxSupported max version supported for the change
* @param callback used to return version
* @param callback used to return version. Returning true requests an additional event loop turn.
* @return True if the identifier is not present in history
*/
Integer getVersion(
String changeId,
int minSupported,
int maxSupported,
Functions.Proc2<Integer, RuntimeException> callback);
Functions.Func2<Integer, RuntimeException, Boolean> callback);

/** Replay safe random. */
Random newRandom();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public Integer getVersion(
String changeId,
int minSupported,
int maxSupported,
Functions.Proc2<Integer, RuntimeException> callback) {
Functions.Func2<Integer, RuntimeException, Boolean> callback) {
return workflowStateMachines.getVersion(changeId, minSupported, maxSupported, callback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ public Integer getVersion(
String changeId,
int minSupported,
int maxSupported,
Functions.Proc2<Integer, RuntimeException> callback) {
Functions.Func2<Integer, RuntimeException, Boolean> callback) {
VersionStateMachine stateMachine =
versions.computeIfAbsent(
changeId,
Expand Down Expand Up @@ -1261,11 +1261,14 @@ public Integer getVersion(
return sa;
},
(v, e) -> {
callback.apply(v, e);
// without this getVersion call will trigger the end of WFT,
// instead we want to prepare subsequent commands and unblock the execution one more
// time.
eventLoop();
if (Boolean.TRUE.equals(callback.apply(v, e))) {
// without this getVersion call will trigger the end of WFT,
// instead we want to prepare subsequent commands and unblock the execution one more
// time.
eventLoop();
} else if (e != null) {
throw e;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1143,21 +1143,27 @@ private <R> R mutableSideEffectImpl(
@Override
public int getVersion(String changeId, int minSupported, int maxSupported) {
CompletablePromise<Integer> result = Workflow.newPromise();
AtomicBoolean callbackScheduled = new AtomicBoolean();
Integer versionToUse =
replayContext.getVersion(
changeId,
minSupported,
maxSupported,
(v, e) ->
runner.executeInWorkflowThread(
"version-callback",
() -> {
if (v != null) {
result.complete(v);
} else {
result.completeExceptionally(e);
}
}));
(v, e) -> {
if (!callbackScheduled.compareAndSet(false, true)) {
return false;
}
runner.executeInWorkflowThread(
"version-callback",
() -> {
if (v != null) {
result.complete(v);
} else {
result.completeExceptionally(e);
}
});
return true;
});
/*
* If we are replaying a workflow and encounter a getVersion call it is possible that this call did not exist
* on the original execution. If the call did not exist on the original execution then we cannot block on results
Expand Down
Loading
Loading