Skip to content

Commit

Permalink
Add build id to workflow info (#1964)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Jan 5, 2024
1 parent 9174397 commit 1f7a59c
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ boolean getVersion(
/**
* @return eventId of the last / currently active workflow task of this workflow
*/
long getCurrentWorkflowTaskStartedEventId();
long getLastWorkflowTaskStartedEventId();

/**
* @return size of Workflow history in bytes up until the current moment of execution. This value
Expand Down Expand Up @@ -405,4 +405,13 @@ boolean getVersion(
* @return true if this flag may currently be used.
*/
boolean tryUseSdkFlag(SdkFlag flag);

/**
* @return The Build ID of the worker which executed the current Workflow Task. May be empty the
* task was completed by a worker without a Build ID. If this worker is the one executing this
* task for the first time and has a Build ID set, then its ID will be used. This value may
* change over the lifetime of the workflow run, but is deterministic and safe to use for
* branching.
*/
Optional<String> getCurrentBuildId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,19 @@ public boolean tryUseSdkFlag(SdkFlag flag) {
return workflowStateMachines.tryUseSdkFlag(flag);
}

@Override
public Optional<String> getCurrentBuildId() {
String curTaskBID = workflowStateMachines.getCurrentTaskBuildId();
// The current task started id == 0 check is to avoid setting the build id to this worker's ID
// in the event we're
// servicing a query, in which case we do want to use the ID from history.
if (!workflowStateMachines.isReplaying()
&& workflowStateMachines.getCurrentWFTStartedEventId() != 0) {
curTaskBID = workerOptions.getBuildId();
}
return Optional.ofNullable(curTaskBID);
}

@Override
public Functions.Proc1<RuntimeException> newTimer(
Duration delay, Functions.Proc1<RuntimeException> callback) {
Expand Down Expand Up @@ -356,8 +369,8 @@ public Map<String, Payload> getHeader() {
}

@Override
public long getCurrentWorkflowTaskStartedEventId() {
return workflowStateMachines.getCurrentStartedEventId();
public long getLastWorkflowTaskStartedEventId() {
return workflowStateMachines.getLastWFTStartedEventId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public WorkflowTaskResult handleWorkflowTask(
TimeUnit.NANOSECONDS);

if (workflowTask.getPreviousStartedEventId()
< workflowStateMachines.getCurrentStartedEventId()) {
< workflowStateMachines.getLastWFTStartedEventId()) {
// if previousStartedEventId < currentStartedEventId - the last workflow task handled by
// these state machines is ahead of the last handled workflow task known by the server.
// Something is off, the server lost progress.
Expand Down Expand Up @@ -219,7 +219,7 @@ public QueryResult handleDirectQueryWorkflowTask(

@Override
public void setCurrentStartedEvenId(Long eventId) {
workflowStateMachines.setCurrentStartedEventId(eventId);
workflowStateMachines.setLastWFTStartedEventId(eventId);
}

private void handleWorkflowTaskImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.temporal.workflow.Functions;
import java.nio.charset.StandardCharsets;
import java.util.*;
import javax.annotation.Nullable;

public final class WorkflowStateMachines {

Expand Down Expand Up @@ -79,7 +80,10 @@ enum HandleEventStatus {
private long workflowTaskStartedEventId;

/** EventId of the last WorkflowTaskStarted event handled by these state machines. */
private long currentStartedEventId;
private long lastWFTStartedEventId;

/** The Build ID used in the current WFT if already completed and set (may be null) */
private String currentTaskBuildId;

private long historySize;

Expand Down Expand Up @@ -201,27 +205,36 @@ public void setWorklfowStartedEventId(long workflowTaskStartedEventId) {
this.workflowTaskStartedEventId = workflowTaskStartedEventId;
}

public void setCurrentStartedEventId(long eventId) {
public void setLastWFTStartedEventId(long eventId) {
// We have to drop any state machines (which should only be one workflow task machine)
// created when handling the speculative workflow task
for (long i = this.lastHandledEventId; i > eventId; i--) {
stateMachines.remove(i);
}
this.currentStartedEventId = eventId;
this.lastWFTStartedEventId = eventId;
// When we reset the event ID on a speculative WFT we need to move this counter back
// to the last WFT completed to allow new tasks to be processed. Assume the WFT complete
// always follows the WFT started.
this.lastHandledEventId = eventId + 1;
}

public long getCurrentStartedEventId() {
return currentStartedEventId;
public long getLastWFTStartedEventId() {
return lastWFTStartedEventId;
}

public long getCurrentWFTStartedEventId() {
return workflowTaskStartedEventId;
}

public long getHistorySize() {
return historySize;
}

@Nullable
public String getCurrentTaskBuildId() {
return currentTaskBuildId;
}

public boolean isContinueAsNewSuggested() {
return isContinueAsNewSuggested;
}
Expand Down Expand Up @@ -329,6 +342,10 @@ private void handleSingleEventLookahead(HistoryEvent event) {
case EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
WorkflowTaskCompletedEventAttributes completedEvent =
event.getWorkflowTaskCompletedEventAttributes();
String maybeBuildId = completedEvent.getWorkerVersion().getBuildId();
if (!maybeBuildId.isEmpty()) {
currentTaskBuildId = maybeBuildId;
}
for (Integer flag : completedEvent.getSdkMetadata().getLangUsedFlagsList()) {
SdkFlag sdkFlag = SdkFlag.getValue(flag);
if (sdkFlag.equals(SdkFlag.UNKNOWN)) {
Expand Down Expand Up @@ -703,7 +720,7 @@ private long setCurrentTimeMillis(long currentTimeMillis) {
}

public long getLastStartedEventId() {
return currentStartedEventId;
return lastWFTStartedEventId;
}

/**
Expand Down Expand Up @@ -1164,7 +1181,7 @@ public void workflowTaskStarted(
value.nonReplayWorkflowTaskStarted();
}
}
WorkflowStateMachines.this.currentStartedEventId = startedEventId;
WorkflowStateMachines.this.lastWFTStartedEventId = startedEventId;
WorkflowStateMachines.this.historySize = historySize;
WorkflowStateMachines.this.isContinueAsNewSuggested = isContinueAsNewSuggested;

Expand Down Expand Up @@ -1282,6 +1299,6 @@ private String createEventHandlingMessage(HistoryEvent event) {
private String createShortCurrentStateMessagePostfix() {
return String.format(
"{WorkflowTaskStartedEventId=%s, CurrentStartedEventId=%s}",
this.workflowTaskStartedEventId, this.currentStartedEventId);
this.workflowTaskStartedEventId, this.lastWFTStartedEventId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public String getCronSchedule() {

@Override
public long getHistoryLength() {
return context.getCurrentWorkflowTaskStartedEventId();
return context.getLastWorkflowTaskStartedEventId();
}

@Override
Expand All @@ -150,6 +150,11 @@ public boolean isContinueAsNewSuggested() {
return context.isContinueAsNewSuggested();
}

@Override
public Optional<String> getCurrentBuildId() {
return context.getCurrentBuildId();
}

@Override
public String toString() {
return "WorkflowInfo{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,13 @@ public interface WorkflowInfo {
* value changes during the lifetime of a Workflow Execution.
*/
boolean isContinueAsNewSuggested();

/**
* @return The Build ID of the worker which executed the current Workflow Task. May be empty the
* task was completed by a worker without a Build ID. If this worker is the one executing this
* task for the first time and has a Build ID set, then its ID will be used. This value may
* change over the lifetime of the workflow run, but is deterministic and safe to use for
* branching.
*/
Optional<String> getCurrentBuildId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@

import io.temporal.activity.Activity;
import io.temporal.activity.ActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.client.BuildIdOperation;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.internal.Signal;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowMethod;
import io.temporal.workflow.WorkflowQueue;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import java.util.UUID;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -45,7 +46,6 @@ public class BuildIdVersioningTest {
SDKTestWorkflowRule.newBuilder()
.setWorkerOptions(
WorkerOptions.newBuilder().setBuildId("1.0").setUseBuildIdForVersioning(true).build())
.setWorkflowTypes(BuildIdVersioningTest.TestVersioningWorkflowImpl.class)
.setActivityImplementations(new BuildIdVersioningTest.ActivityImpl())
.setDoNotStart(true)
.build();
Expand All @@ -57,6 +57,10 @@ public void testBuildIdVersioningDataSetProperly() {

String taskQueue = testWorkflowRule.getTaskQueue();
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
testWorkflowRule
.getWorker()
.registerWorkflowImplementationTypes(
BuildIdVersioningTest.TestVersioningWorkflowImpl.class);

// Add 1.0 to the queue
workflowClient.updateWorkerBuildIdCompatability(
Expand Down Expand Up @@ -120,6 +124,75 @@ public void testBuildIdVersioningDataSetProperly() {
w2F.shutdown();
}

private static final Signal ACTIVITY_RAN = new Signal();

@Test
public void testCurrentBuildIDSetProperly() throws InterruptedException {
assumeTrue(
"Test Server doesn't support versioning yet", SDKTestWorkflowRule.useExternalService);

String taskQueue = testWorkflowRule.getTaskQueue();
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
testWorkflowRule
.getWorker()
.registerWorkflowImplementationTypes(
BuildIdVersioningTest.TestCurrentBuildIdWorkflow.class);

// Add 1.0 to the queue
workflowClient.updateWorkerBuildIdCompatability(
taskQueue, BuildIdOperation.newIdInNewDefaultSet("1.0"));

// Now start the worker (to avoid poll timeout while queue is unversioned)
testWorkflowRule.getTestEnvironment().start();

// Start a workflow
String workflowId = "build-id-versioning-1.0-" + UUID.randomUUID();
WorkflowOptions options =
SDKTestOptions.newWorkflowOptionsWithTimeouts(taskQueue).toBuilder()
.setWorkflowId(workflowId)
.build();
TestWorkflows.QueryableWorkflow wf1 =
workflowClient.newWorkflowStub(TestWorkflows.QueryableWorkflow.class, options);
WorkflowClient.start(wf1::execute);

Assert.assertEquals("1.0", wf1.getState());

// Wait for activity to run
ACTIVITY_RAN.waitForSignal();
Assert.assertEquals("1.0", wf1.getState());

testWorkflowRule.getTestEnvironment().shutdown();
workflowClient
.getWorkflowServiceStubs()
.blockingStub()
.resetStickyTaskQueue(
io.temporal.api.workflowservice.v1.ResetStickyTaskQueueRequest.newBuilder()
.setNamespace(testWorkflowRule.getTestEnvironment().getNamespace())
.setExecution(WorkflowExecution.newBuilder().setWorkflowId(workflowId).build())
.build());

// Add 1.1 to the queue
workflowClient.updateWorkerBuildIdCompatability(
taskQueue, BuildIdOperation.newCompatibleVersion("1.1", "1.0"));

WorkerFactory w11F =
WorkerFactory.newInstance(workflowClient, testWorkflowRule.getWorkerFactoryOptions());
Worker w11 =
w11F.newWorker(
taskQueue,
WorkerOptions.newBuilder().setBuildId("1.1").setUseBuildIdForVersioning(true).build());
w11.registerWorkflowImplementationTypes(BuildIdVersioningTest.TestCurrentBuildIdWorkflow.class);
w11.registerActivitiesImplementations(new BuildIdVersioningTest.ActivityImpl());
w11F.start();

Assert.assertEquals("1.0", wf1.getState());
wf1.mySignal("finish");

Assert.assertEquals("1.1", wf1.getState());

w11F.shutdown();
}

public static class TestVersioningWorkflowImpl implements TestWorkflows.QueryableWorkflow {
WorkflowQueue<String> sigQueue = Workflow.newWorkflowQueue(1);
private final TestActivities.TestActivity1 activity =
Expand Down Expand Up @@ -156,4 +229,43 @@ public String execute(String input) {
return Activity.getExecutionContext().getInfo().getActivityType() + "-" + input;
}
}

public static class TestCurrentBuildIdWorkflow implements TestWorkflows.QueryableWorkflow {
private final TestActivities.TestActivity1 activity =
Workflow.newActivityStub(
TestActivities.TestActivity1.class,
ActivityOptions.newBuilder().setScheduleToCloseTimeout(Duration.ofSeconds(10)).build());
private boolean doFinish = false;
private String lastBuildId;

@WorkflowMethod
public String execute() {
updateBuildId();
Workflow.sleep(1);
updateBuildId();
if (Workflow.getInfo().getCurrentBuildId().orElse("").equals("1.0")) {
activity.execute("foo");
updateBuildId();
ACTIVITY_RAN.signal();
}
Workflow.await(() -> doFinish);
updateBuildId();
return "Yay done";
}

private void updateBuildId() {
lastBuildId = Workflow.getInfo().getCurrentBuildId().orElse("");
}

@Override
public void mySignal(String arg) {
doFinish = true;
}

@Override
public String getState() {
// Workflow.getInfo isn't accessible in queries, so we do this
return lastBuildId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ public boolean tryUseSdkFlag(SdkFlag flag) {
return false;
}

@Override
public Optional<String> getCurrentBuildId() {
return Optional.empty();
}

@Override
public int getAttempt() {
return 1;
Expand Down Expand Up @@ -353,7 +358,7 @@ public Map<String, Payload> getHeader() {
}

@Override
public long getCurrentWorkflowTaskStartedEventId() {
public long getLastWorkflowTaskStartedEventId() {
return 0;
}

Expand Down

0 comments on commit 1f7a59c

Please sign in to comment.