Skip to content

Commit

Permalink
Support terminate-if-running reuse policy in test server (#1683)
Browse files Browse the repository at this point in the history
Issue #1682
  • Loading branch information
jeffschoner-stripe committed Mar 1, 2023
1 parent 5f5ecab commit db81b4d
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import io.grpc.*;
import io.grpc.stub.StreamObserver;
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.Payload;
import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.RetryPolicy;
import io.temporal.api.common.v1.WorkflowExecution;
Expand Down Expand Up @@ -218,15 +220,35 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl(
TestWorkflowMutableState existing;
lock.lock();
try {
String newRunId = UUID.randomUUID().toString();
existing = executionsByWorkflowId.get(workflowId);
if (existing != null) {
WorkflowExecutionStatus status = existing.getWorkflowExecutionStatus();
WorkflowIdReusePolicy policy = startRequest.getWorkflowIdReusePolicy();

if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
&& policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING) {
existing.terminateWorkflowExecution(
TerminateWorkflowExecutionRequest.newBuilder()
.setNamespace(startRequest.getNamespace())
.setWorkflowExecution(existing.getExecutionId().getExecution())
.setReason("TerminateIfRunning WorkflowIdReusePolicy Policy")
.setIdentity("history-service")
.setDetails(
Payloads.newBuilder()
.addPayloads(
Payload.newBuilder()
.setData(
ByteString.copyFromUtf8(
String.format("terminated by new runID: %s", newRunId)))
.build())
.build())
.build());
} else if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING
|| policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE) {
return throwDuplicatedWorkflow(startRequest, existing);
}
if (policy == WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
} else if (policy
== WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
&& (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED
|| status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW)) {
return throwDuplicatedWorkflow(startRequest, existing);
Expand All @@ -244,12 +266,12 @@ StartWorkflowExecutionResponse startWorkflowExecutionImpl(
} else {
retryState = Optional.empty();
}
String runId = UUID.randomUUID().toString();
return startWorkflowExecutionNoRunningCheckLocked(
startRequest,
runId,
// it's the first execution in the continue-as-new chain, so firstExecutionRunId = runId
runId,
newRunId,
// it's the first execution in the continue-as-new chain, so firstExecutionRunId =
// newRunId
newRunId,
Optional.empty(),
retryState,
backoffStartInterval,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.testserver.functional;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.enums.v1.WorkflowIdReusePolicy;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
import io.temporal.client.*;
import io.temporal.failure.ApplicationFailure;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.testserver.functional.common.TestWorkflows;
import io.temporal.workflow.Workflow;
import java.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

public class WorkflowIdReusePolicyTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(ForeverWorkflowImpl.class, FailingWorkflowImpl.class)
.build();

@Test
public void rejectDuplicateStopsAnotherAfterFailed() {
String workflowId = "reject-duplicate-1";
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setWorkflowIdReusePolicy(
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE)
.build();

WorkflowExecution execution1 = runFailingWorkflow(options);
describe(execution1).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED);

Assert.assertThrows(WorkflowExecutionAlreadyStarted.class, () -> startForeverWorkflow(options));
}

@Test
public void allowDuplicateAfterFailed() {
String workflowId = "allow-duplicate-1";
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setWorkflowIdReusePolicy(
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY)
.build();

WorkflowExecution execution1 = runFailingWorkflow(options);
describe(execution1).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED);

WorkflowExecution execution2 = startForeverWorkflow(options);
describe(execution2).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);
}

@Test
public void alreadyRunningWorkflowBlocksSecondEvenWithAllowDuplicate() {
String workflowId = "allow-duplicate-2";
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setWorkflowIdReusePolicy(
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE)
.build();

WorkflowExecution execution1 = startForeverWorkflow(options);
describe(execution1).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);

Assert.assertThrows(WorkflowExecutionAlreadyStarted.class, () -> startForeverWorkflow(options));
describe(execution1).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);
}

@Test
public void secondWorkflowTerminatesFirst() {
String workflowId = "terminate-if-running-1";
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setWorkflowTaskTimeout(Duration.ofSeconds(1))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.setWorkflowIdReusePolicy(
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING)
.build();

WorkflowExecution execution1 = startForeverWorkflow(options);
describe(execution1).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);

WorkflowExecution execution2 = startForeverWorkflow(options);
describe(execution1).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED);
describe(execution2).assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);
}

private WorkflowExecution startForeverWorkflow(WorkflowOptions options) {
TestWorkflows.PrimitiveWorkflow workflowStub =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(TestWorkflows.PrimitiveWorkflow.class, options);
WorkflowClient.start(workflowStub::execute);
return WorkflowStub.fromTyped(workflowStub).getExecution();
}

private WorkflowExecution runFailingWorkflow(WorkflowOptions options) {
TestWorkflows.WorkflowReturnsString workflowStub =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(TestWorkflows.WorkflowReturnsString.class, options);
Assert.assertThrows(WorkflowFailedException.class, workflowStub::execute);
return WorkflowStub.fromTyped(workflowStub).getExecution();
}

private DescribeWorkflowAsserter describe(WorkflowExecution execution) {
DescribeWorkflowAsserter result =
new DescribeWorkflowAsserter(
testWorkflowRule
.getWorkflowClient()
.getWorkflowServiceStubs()
.blockingStub()
.describeWorkflowExecution(
DescribeWorkflowExecutionRequest.newBuilder()
.setNamespace(
testWorkflowRule.getWorkflowClient().getOptions().getNamespace())
.setExecution(execution)
.build()));

// There are some assertions that we can always make...
return result
.assertExecutionId(execution)
.assertSaneTimestamps()
.assertTaskQueue(testWorkflowRule.getTaskQueue());
}

public static class ForeverWorkflowImpl implements TestWorkflows.PrimitiveWorkflow {
@Override
public void execute() {
// wait forever to keep it in running state
Workflow.await(() -> false);
}
}

public static class FailingWorkflowImpl implements TestWorkflows.WorkflowReturnsString {
@Override
public String execute() {
throw ApplicationFailure.newNonRetryableFailure("It's done", "someFailure");
}
}
}

0 comments on commit db81b4d

Please sign in to comment.