Skip to content

Commit

Permalink
Make GetVersion more deterministic (#1807)
Browse files Browse the repository at this point in the history
Make GetVersion more deterministic
  • Loading branch information
Quinn-With-Two-Ns committed Jul 17, 2023
1 parent 3be060b commit 6054c31
Show file tree
Hide file tree
Showing 22 changed files with 1,609 additions and 71 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.common;

/**
* SdkFlag represents a flag used to help version the sdk internally to make breaking changes in
* workflow logic.
*/
public enum SdkFlag {
UNSET(0),
/*
* Changes behavior of GetVersion to not yield if no previous call existed in history.
*/
SKIP_YIELD_ON_DEFAULT_VERSION(1),
UNKNOWN(Integer.MAX_VALUE);

private final int value;

SdkFlag(int value) {
this.value = value;
}

public boolean compare(int i) {
return value == i;
}

public static SdkFlag getValue(int id) {
SdkFlag[] as = SdkFlag.values();
for (int i = 0; i < as.length; i++) {
if (as[i].compare(id)) return as[i];
}
return SdkFlag.UNKNOWN;
}

public int getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.common;

import io.temporal.workflow.Functions;
import java.util.EnumSet;

/** Represents all the flags that are currently set in a workflow execution. */
public final class SdkFlags {
private final boolean supportSdkMetadata;
private final Functions.Func<Boolean> replaying;
// Flags that have been received from the server or have not been sent yet.
private final EnumSet<SdkFlag> sdkFlags = EnumSet.noneOf(SdkFlag.class);
// Flags that have been set this WFT that have not been sent to the server.
// Keep track of them separately, so we know what to send to the server.
private final EnumSet<SdkFlag> unsentSdkFlags = EnumSet.noneOf(SdkFlag.class);

public SdkFlags(boolean supportSdkMetadata, Functions.Func<Boolean> replaying) {
this.supportSdkMetadata = supportSdkMetadata;
this.replaying = replaying;
}

/**
* Marks a flag as usable regardless of replay status.
*
* @return True, as long as the server supports SDK flags
*/
public boolean setSdkFlag(SdkFlag flag) {
if (!supportSdkMetadata) {
return false;
}
sdkFlags.add(flag);
return true;
}

/**
* @return True if this flag may currently be used.
*/
public boolean tryUseSdkFlag(SdkFlag flag) {
if (!supportSdkMetadata) {
return false;
}

if (!replaying.apply()) {
sdkFlags.add(flag);
unsentSdkFlags.add(flag);
return true;
} else {
return sdkFlags.contains(flag);
}
}

/**
* @return All flags set since the last call to takeNewSdkFlags.
*/
public EnumSet<SdkFlag> takeNewSdkFlags() {
EnumSet<SdkFlag> result = unsentSdkFlags.clone();
unsentSdkFlags.clear();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.temporal.api.common.v1.*;
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.internal.common.SdkFlag;
import io.temporal.internal.statemachines.ExecuteActivityParameters;
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
import io.temporal.internal.statemachines.LocalActivityCallback;
Expand Down Expand Up @@ -269,8 +270,9 @@ void mutableSideEffect(
* @param minSupported min version supported for the change
* @param maxSupported max version supported for the change
* @param callback used to return version
* @return True if the identifier is not present in history
*/
void getVersion(
boolean getVersion(
String changeId,
int minSupported,
int maxSupported,
Expand Down Expand Up @@ -382,4 +384,9 @@ void getVersion(

/** Updates or inserts search attributes used to index workflows. */
void upsertSearchAttributes(@Nonnull SearchAttributes searchAttributes);

/**
* @return true if this flag may currently be used.
*/
boolean tryUseSdkFlag(SdkFlag flag);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
import io.temporal.failure.CanceledFailure;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.common.SdkFlag;
import io.temporal.internal.statemachines.*;
import io.temporal.internal.worker.SingleWorkerOptions;
import io.temporal.workflow.Functions;
Expand Down Expand Up @@ -240,6 +241,11 @@ public boolean isReplaying() {
return workflowStateMachines.isReplaying();
}

@Override
public boolean tryUseSdkFlag(SdkFlag flag) {
return workflowStateMachines.tryUseSdkFlag(flag);
}

@Override
public Functions.Proc1<RuntimeException> newTimer(
Duration delay, Functions.Proc1<RuntimeException> callback) {
Expand Down Expand Up @@ -290,12 +296,12 @@ public void mutableSideEffect(
}

@Override
public void getVersion(
public boolean getVersion(
String changeId,
int minSupported,
int maxSupported,
Functions.Proc2<Integer, RuntimeException> callback) {
workflowStateMachines.getVersion(changeId, minSupported, maxSupported, callback);
return workflowStateMachines.getVersion(changeId, minSupported, maxSupported, callback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import io.temporal.api.protocol.v1.Message;
import io.temporal.api.query.v1.WorkflowQuery;
import io.temporal.api.query.v1.WorkflowQueryResult;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
import io.temporal.internal.Config;
import io.temporal.internal.common.SdkFlag;
import io.temporal.internal.common.UpdateMessage;
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
import io.temporal.internal.statemachines.StatesMachinesCallback;
Expand Down Expand Up @@ -90,13 +92,16 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {

private final ReplayWorkflowExecutor replayWorkflowExecutor;

private final GetSystemInfoResponse.Capabilities capabilities;

ReplayWorkflowRunTaskHandler(
String namespace,
ReplayWorkflow workflow,
PollWorkflowTaskQueueResponseOrBuilder workflowTask,
SingleWorkerOptions workerOptions,
Scope metricsScope,
LocalActivityDispatcher localActivityDispatcher) {
LocalActivityDispatcher localActivityDispatcher,
GetSystemInfoResponse.Capabilities capabilities) {
HistoryEvent startedEvent = workflowTask.getHistory().getEvents(0);
if (!startedEvent.hasWorkflowExecutionStartedEventAttributes()) {
throw new IllegalArgumentException(
Expand All @@ -107,7 +112,8 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
this.localActivityDispatcher = localActivityDispatcher;
this.workflow = workflow;

this.workflowStateMachines = new WorkflowStateMachines(new StatesMachinesCallbackImpl());
this.workflowStateMachines =
new WorkflowStateMachines(new StatesMachinesCallbackImpl(), capabilities);
String fullReplayDirectQueryType =
workflowTask.hasQuery() ? workflowTask.getQuery().getQueryType() : null;
this.context =
Expand All @@ -125,6 +131,7 @@ class ReplayWorkflowRunTaskHandler implements WorkflowRunTaskHandler {
new ReplayWorkflowExecutor(workflow, workflowStateMachines, context);
this.localActivityCompletionSink = localActivityCompletionQueue::add;
this.localActivityMeteringHelper = new LocalActivityMeteringHelper();
this.capabilities = capabilities;
}

@Override
Expand Down Expand Up @@ -159,6 +166,11 @@ public WorkflowTaskResult handleWorkflowTask(
processLocalActivityRequests(wftHearbeatDeadline);
List<Command> commands = workflowStateMachines.takeCommands();
List<Message> messages = workflowStateMachines.takeMessages();
EnumSet<SdkFlag> newFlags = workflowStateMachines.takeNewSdkFlags();
List<Integer> newSdkFlags = new ArrayList<>(newFlags.size());
for (SdkFlag flag : newFlags) {
newSdkFlags.add(flag.getValue());
}
if (context.isWorkflowMethodCompleted()) {
// it's important for query, otherwise the WorkflowTaskHandler is responsible for closing
// and invalidation
Expand All @@ -175,6 +187,7 @@ public WorkflowTaskResult handleWorkflowTask(
.setFinalCommand(context.isWorkflowMethodCompleted())
.setForceWorkflowTask(localActivityTaskCount > 0 && !context.isWorkflowMethodCompleted())
.setNonfirstLocalActivityAttempts(localActivityMeteringHelper.getNonfirstAttempts())
.setSdkFlags(newSdkFlags)
.build();
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.temporal.api.failure.v1.Failure;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.api.query.v1.WorkflowQuery;
import io.temporal.api.sdk.v1.WorkflowTaskCompletedMetadata;
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.api.workflowservice.v1.*;
Expand Down Expand Up @@ -235,6 +236,13 @@ private Result createCompletedWFTRequest(
}
completedRequest.setStickyAttributes(attributes);
}
if (!result.getSdkFlags().isEmpty()) {
completedRequest =
completedRequest.setSdkMetadata(
WorkflowTaskCompletedMetadata.newBuilder()
.addAllLangUsedFlags(result.getSdkFlags())
.build());
}
return new Result(
workflowType,
completedRequest.build(),
Expand Down Expand Up @@ -383,7 +391,13 @@ private WorkflowRunTaskHandler createStatefulHandler(
}
ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType, workflowExecution);
return new ReplayWorkflowRunTaskHandler(
namespace, workflow, workflowTask, options, metricsScope, localActivityDispatcher);
namespace,
workflow,
workflowTask,
options,
metricsScope,
localActivityDispatcher,
service.getServerCapabilities().get());
}

private void resetStickyTaskQueue(WorkflowExecution execution) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public static final class Builder {
private Map<String, WorkflowQueryResult> queryResults;
private boolean forceWorkflowTask;
private int nonfirstLocalActivityAttempts;
private List<Integer> sdkFlags;

public Builder setCommands(List<Command> commands) {
this.commands = commands;
Expand Down Expand Up @@ -71,14 +72,20 @@ public Builder setNonfirstLocalActivityAttempts(int nonfirstLocalActivityAttempt
return this;
}

public Builder setSdkFlags(List<Integer> sdkFlags) {
this.sdkFlags = sdkFlags;
return this;
}

public WorkflowTaskResult build() {
return new WorkflowTaskResult(
commands == null ? Collections.emptyList() : commands,
messages == null ? Collections.emptyList() : messages,
queryResults == null ? Collections.emptyMap() : queryResults,
finalCommand,
forceWorkflowTask,
nonfirstLocalActivityAttempts);
nonfirstLocalActivityAttempts,
sdkFlags == null ? Collections.emptyList() : sdkFlags);
}
}

Expand All @@ -88,14 +95,16 @@ public WorkflowTaskResult build() {
private final Map<String, WorkflowQueryResult> queryResults;
private final boolean forceWorkflowTask;
private final int nonfirstLocalActivityAttempts;
private final List<Integer> sdkFlags;

private WorkflowTaskResult(
List<Command> commands,
List<Message> messages,
Map<String, WorkflowQueryResult> queryResults,
boolean finalCommand,
boolean forceWorkflowTask,
int nonfirstLocalActivityAttempts) {
int nonfirstLocalActivityAttempts,
List<Integer> sdkFlags) {
this.commands = commands;
this.messages = messages;
this.nonfirstLocalActivityAttempts = nonfirstLocalActivityAttempts;
Expand All @@ -105,6 +114,7 @@ private WorkflowTaskResult(
this.queryResults = queryResults;
this.finalCommand = finalCommand;
this.forceWorkflowTask = forceWorkflowTask;
this.sdkFlags = sdkFlags;
}

public List<Command> getCommands() {
Expand All @@ -131,4 +141,8 @@ public boolean isForceWorkflowTask() {
public int getNonfirstLocalActivityAttempts() {
return nonfirstLocalActivityAttempts;
}

public List<Integer> getSdkFlags() {
return sdkFlags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,12 @@ private VersionStateMachine(
this.stateMachineSink = stateMachineSink;
}

public void getVersion(
public State getVersion(
int minSupported, int maxSupported, Functions.Proc2<Integer, RuntimeException> callback) {
InvocationStateMachine ism = new InvocationStateMachine(minSupported, maxSupported, callback);
ism.explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
ism.explicitEvent(ExplicitEvent.SCHEDULE);
return ism.getState();
}

public void handleNonMatchingEvent(HistoryEvent event) {
Expand Down

0 comments on commit 6054c31

Please sign in to comment.