From de2f0764eb2e03abf8f6862a6480f697c0232444 Mon Sep 17 00:00:00 2001 From: Nathan Glass Date: Tue, 24 Aug 2021 21:57:31 -0700 Subject: [PATCH 1/4] Add DescribeWorkflowExecution to TestWorkflowService This change adds support for DescribeWorkflowExecution to the test service. It should support everything the real server supports except for AutoResetPoints, which the test service doesn't otherwise support (for good reason). Implementatoin-wise, this is just a port of HistoryEngine.DescribeWorkflowExecution from golang, except that we infer some things from history that the real service just stores. --- .../io/temporal/workflow/DescribeTest.java | 565 ++++++++++++++++++ .../shared/DescribeWorkflowAsserter.java | 188 ++++++ .../internal/testservice/StateMachines.java | 45 +- .../testservice/TestWorkflowMutableState.java | 3 + .../TestWorkflowMutableStateImpl.java | 215 +++++++ .../testservice/TestWorkflowService.java | 24 + .../testservice/TestWorkflowStoreImpl.java | 20 +- 7 files changed, 1044 insertions(+), 16 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/DescribeTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/shared/DescribeWorkflowAsserter.java diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/DescribeTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/DescribeTest.java new file mode 100644 index 0000000000..05976eb224 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/DescribeTest.java @@ -0,0 +1,565 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.workflow; + +import com.google.common.collect.ImmutableMap; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.temporal.activity.Activity; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.api.common.v1.ActivityType; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.PendingActivityState; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflow.v1.PendingActivityInfo; +import io.temporal.api.workflow.v1.PendingChildExecutionInfo; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; +import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.common.RetryOptions; +import io.temporal.serviceclient.CheckedExceptionWrapper; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.DescribeWorkflowAsserter; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DescribeTest { + + private static final Logger log = LoggerFactory.getLogger(DescribeTest.class); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestDescribeWorkflowImpl.class) + .setActivityImplementations(new TestDescribeActivityImpl()) + .setTestTimeoutSeconds(30) + .build(); + + public DescribeWorkflowAsserter describe(WorkflowExecution execution) { + DescribeWorkflowAsserter result = + new DescribeWorkflowAsserter( + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeWorkflowExecution( + DescribeWorkflowExecutionRequest.newBuilder() + .setNamespace( + testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setExecution(execution) + .build())); + + // There are some assertions that we can always make... + return result + .assertType("TestDescribeWorkflow") + .assertExecutionId(execution) + .assertSaneTimestamps() + .assertTaskQueue(testWorkflowRule.getTaskQueue()); + } + + @Test + public void testWorkflowDoesNotExist() { + StatusRuntimeException e = + Assert.assertThrows( + io.grpc.StatusRuntimeException.class, + () -> { + describe( + WorkflowExecution.newBuilder() + .setWorkflowId("627ecd0c-688b-3fc5-927e-0f7ab7eec09b") + .setRunId("8f493dbf-205d-4142-8e0b-dc5cdff404b8") + .build()); + }); + + Assert.assertEquals(Status.NOT_FOUND.getCode(), e.getStatus().getCode()); + } + + private WorkflowOptions options() { + // The task queue isn't known until the test is running, so we can't just declare a constant + // WorkflowOptions + return WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofMinutes(3)) + .setWorkflowRunTimeout(Duration.ofMinutes(2)) + .setWorkflowTaskTimeout(Duration.ofMinutes(1)) + .setMemo(ImmutableMap.of("memo", "randum")) + .build(); + } + + @Test + public void testSuccessfulActivity() throws InterruptedException { + String token = "testSuccessfulActivity"; + WorkflowOptions options = options(); + WorkflowStub stub = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub("TestDescribeWorkflow", options); + WorkflowExecution execution = stub.start(token, null, Boolean.TRUE, Integer.valueOf(0)); + + // Wait for the activity so we know what status to expect + ThreadUtils.waitForWorkflow(token + "-start"); + + DescribeWorkflowAsserter asserter = + describe(execution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) + .assertHistoryLength(5) + .assertNoParent() + .assertPendingActivityCount(1) + .assertPendingChildrenCount(0); + + PendingActivityInfo actual = asserter.getActual().getPendingActivities(0); + + // No fancy asserter type for PendingActivityInfo... we just build the expected proto + PendingActivityInfo expected = + PendingActivityInfo.newBuilder() + .setActivityId(actual.getActivityId()) + .setActivityType(ActivityType.newBuilder().setName("TestDescribeActivity").build()) + .setState(PendingActivityState.PENDING_ACTIVITY_STATE_STARTED) + .setAttempt(1) + .setMaximumAttempts(2) + // times should be present, but we can't know what the expected value is if this test is + // going to run against the real server. + .setLastStartedTime(actual.getLastStartedTime()) + .setExpirationTime(actual.getExpirationTime()) + // Heads up! We're asserting that heartbeat time == started time, which should be true + // before the heartbeat + .setLastHeartbeatTime(actual.getLastStartedTime()) + // this ends up being a dummy value, but if it weren't, we still wouldn't expect to know + // it. + .setLastWorkerIdentity(actual.getLastWorkerIdentity()) + .build(); + + Assert.assertEquals("PendingActivityInfo should match before", expected, actual); + + // Make the activity heartbeat - this should show in the next describe call + ThreadUtils.waitForWorkflow(token + "-heartbeat"); + ThreadUtils.waitForWorkflow(token + "-after-heartbeat"); + + asserter = + describe(execution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) + .assertHistoryLength(5) + .assertNoParent() + .assertPendingActivityCount(1) + .assertPendingChildrenCount(0); + + actual = asserter.getActual().getPendingActivities(0); + + // Now, our PendingActivityInfo has heartbeat data, but is otherwise unchanged + expected = + expected + .toBuilder() + .setHeartbeatDetails(DescribeWorkflowAsserter.stringsToPayloads("heartbeatDetails")) + .setLastHeartbeatTime(actual.getLastHeartbeatTime()) + .build(); + Assert.assertEquals("PendingActivityInfo should match after heartbeat", expected, actual); + + // Let the activity finish, which will let the workflow finish. + ThreadUtils.waitForWorkflow(token + "-finish"); + + // Wait for the workflow to finish so we know what state to expect + stub.getResult(Void.class); + describe(execution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) + .assertHistoryLength(11) + .assertNoParent() + .assertPendingActivityCount(0) + .assertPendingChildrenCount(0); + } + + @Test + public void testFailedActivity() throws InterruptedException { + String token = "testFailedActivity"; + WorkflowOptions options = options(); + WorkflowStub stub = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub("TestDescribeWorkflow", options); + WorkflowExecution execution = stub.start(token, null, Boolean.FALSE, Integer.valueOf(2)); + + // Fast forward until the retry after the failure + ThreadUtils.waitForWorkflow(token + "-start"); + ThreadUtils.waitForWorkflow(token + "-fail"); + ThreadUtils.waitForWorkflow(token + "-start"); + + // Previous test cases have made boilerplate assertions - let's only focus on what's novel + DescribeWorkflowAsserter asserter = + describe(execution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) + .assertHistoryLength(5) + .assertPendingActivityCount(1); + + PendingActivityInfo actual = asserter.getActual().getPendingActivities(0); + + Assert.assertEquals( + "failure message should match", + "Activity was asked to fail on attempt 1", + actual.getLastFailure().getMessage()); + + PendingActivityInfo expected = + PendingActivityInfo.newBuilder() + .setActivityId(actual.getActivityId()) + .setActivityType(ActivityType.newBuilder().setName("TestDescribeActivity").build()) + .setState(PendingActivityState.PENDING_ACTIVITY_STATE_STARTED) + .setAttempt(2) + .setMaximumAttempts(2) + // times should be present, but we can't know what the expected value is if this test is + // going to run against the real server. + .setLastStartedTime(actual.getLastStartedTime()) + .setLastHeartbeatTime(actual.getLastHeartbeatTime()) + .setExpirationTime(actual.getExpirationTime()) + // this ends up being a dummy value, but if it weren't, we still wouldn't expect to know + // it. + .setLastWorkerIdentity(actual.getLastWorkerIdentity()) + // We don't deeply assert the failure structure since we asserted the message above + .setLastFailure(actual.getLastFailure()) + .build(); + + Assert.assertEquals("PendingActivityInfo should match", expected, actual); + + // Now let the workflow succeed + ThreadUtils.waitForWorkflow(token + "-finish"); + stub.getResult(Void.class); + describe(execution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) + .assertHistoryLength(11) + .assertNoParent() + .assertPendingActivityCount(0) + .assertPendingChildrenCount(0); + } + + private void testKilledWorkflow( + String token, + Consumer killer, + WorkflowExecutionStatus expectedWorkflowStatus, + PendingActivityState expectedActivityStatus, + int expectedHistoryLength) + throws InterruptedException { + WorkflowOptions options = options(); + WorkflowStub stub = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub("TestDescribeWorkflow", options); + // Set the execution up so that it will fail due to activity failures if it isn't otherwise + // killed + WorkflowExecution execution = stub.start(token, null, Boolean.FALSE, Integer.valueOf(3)); + + // Let the activity start so we can cancel it + ThreadUtils.waitForWorkflow(token + "-start"); + killer.accept(stub); + + // Wait for the kill (whatever it was) to get noticed (we don't try to assert intermediate + // states - those are hard to catch). + Assert.assertThrows( + WorkflowFailedException.class, + () -> { + stub.getResult(Void.class); + }); + + // Previous test cases have made boilerplate assertions - let's only focus on what's novel + DescribeWorkflowAsserter asserter = + describe(execution) + .assertMatchesOptions(options) + .assertStatus(expectedWorkflowStatus) + .assertHistoryLength(expectedHistoryLength) + .assertPendingActivityCount(expectedActivityStatus == null ? 0 : 1); + + if (expectedActivityStatus == null) { + return; + } + + PendingActivityInfo actual = asserter.getActual().getPendingActivities(0); + + PendingActivityInfo expected = + PendingActivityInfo.newBuilder() + .setActivityId(actual.getActivityId()) + .setActivityType(ActivityType.newBuilder().setName("TestDescribeActivity").build()) + .setState(expectedActivityStatus) + .setAttempt(1) + .setMaximumAttempts(2) + // times should be present, but we can't know what the expected value is if this test is + // going to run against the real server. + .setLastStartedTime(actual.getLastStartedTime()) + .setLastHeartbeatTime(actual.getLastStartedTime()) + .setExpirationTime(actual.getExpirationTime()) + // this ends up being a dummy value, but if it weren't, we still wouldn't expect to know + // it. + .setLastWorkerIdentity(actual.getLastWorkerIdentity()) + .build(); + + Assert.assertEquals("PendingActivityInfo should match", expected, actual); + } + + @Test + public void testCanceledWorkflow() throws InterruptedException { + testKilledWorkflow( + "testCanceledWorkflow", + WorkflowStub::cancel, + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED, + PendingActivityState.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED, + 11); + } + + @Test + public void testTerminatedWorkflow() throws InterruptedException { + testKilledWorkflow( + "testTerminatedWorkflow", + stub -> stub.terminate("testing"), + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED, + PendingActivityState.PENDING_ACTIVITY_STATE_STARTED, + 6); + } + + @Test + public void testFailedWorkflow() throws InterruptedException { + String token = "testFailedWorkflow"; + testKilledWorkflow( + token, + stub -> { + try { + // Don't kill the workflow externally here - instead, unblock the activity twice, which + // will fail the workflow + ThreadUtils.waitForWorkflow(token + "-fail"); + ThreadUtils.waitForWorkflow(token + "-start"); + ThreadUtils.waitForWorkflow(token + "-fail"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }, + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED, + null, + 11); + } + + @Test + public void testChildWorkflow() throws InterruptedException { + String token = "testChildWorkflow"; + WorkflowOptions options = options(); + WorkflowStub stub = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub("TestDescribeWorkflow", options); + WorkflowExecution parentExecution = stub.start(null, token, Boolean.FALSE, Integer.valueOf(0)); + + // This unblocks the child workflow's activity + ThreadUtils.waitForWorkflow(token + "-start"); + + DescribeWorkflowAsserter parent = + describe(parentExecution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) + .assertHistoryLength(9) + .assertNoParent() + .assertPendingActivityCount(0) + .assertPendingChildrenCount(1); + + // There's very little of interest to assert on here, but we need the id so we can describe the + // child + PendingChildExecutionInfo childInfo = parent.getActual().getPendingChildren(0); + Assert.assertEquals( + "child workflow type name should match", + "TestDescribeWorkflow", + childInfo.getWorkflowTypeName()); + + WorkflowExecution childExecution = + WorkflowExecution.newBuilder() + .setWorkflowId(childInfo.getWorkflowId()) + .setRunId(childInfo.getRunId()) + .build(); + + // marshal ChildWorkflowOptions to WorkflowOptions because that's what the asserter expects + WorkflowOptions expectedChildOptions = + WorkflowOptions.newBuilder() + .setWorkflowExecutionTimeout( + TestDescribeWorkflowImpl.CHILD_OPTIONS.getWorkflowExecutionTimeout()) + .setWorkflowRunTimeout(TestDescribeWorkflowImpl.CHILD_OPTIONS.getWorkflowRunTimeout()) + .setWorkflowTaskTimeout(TestDescribeWorkflowImpl.CHILD_OPTIONS.getWorkflowTaskTimeout()) + .setMemo(TestDescribeWorkflowImpl.CHILD_OPTIONS.getMemo()) + .build(); + + describe(childExecution) + .assertMatchesOptions(expectedChildOptions) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) + .assertHistoryLength(5) + .assertParent(parentExecution) + .assertPendingActivityCount(1) + .assertPendingChildrenCount(0); + + // Unblock the child and wait for the parent to finish so we know what expected states are + ThreadUtils.waitForWorkflow(token + "-finish"); + stub.getResult(Void.class); + + describe(parentExecution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) + .assertHistoryLength(14) + .assertNoParent() + .assertPendingActivityCount(0) + .assertPendingChildrenCount(0); + + describe(childExecution) + .assertMatchesOptions(expectedChildOptions) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) + .assertHistoryLength(11) + .assertParent(parentExecution) + .assertPendingActivityCount(0) + .assertPendingChildrenCount(0); + } + + /* + * We don't test things that require the passage of time here to avoid sleepy tests + * against the real temporal service. A future commit could introduce a test- + * environment-only suite that locks timeskipping and exercises time-based scenarios. + */ + + @WorkflowInterface + public interface TestDescribeWorkflow { + @WorkflowMethod(name = "TestDescribeWorkflow") + void run(String myToken, String childToken, boolean heartbeat, int failAttemptsEarlierThan); + } + + public static class TestDescribeWorkflowImpl implements TestDescribeWorkflow { + + private static final ChildWorkflowOptions CHILD_OPTIONS = + ChildWorkflowOptions.newBuilder() + .setWorkflowExecutionTimeout(Duration.ofMinutes(6)) + .setWorkflowRunTimeout(Duration.ofMinutes(5)) + .setWorkflowTaskTimeout(Duration.ofMinutes(1)) + .setMemo(ImmutableMap.of("other", "memo")) + .build(); + + private final TestDescribeWorkflow childStub = + Workflow.newChildWorkflowStub(TestDescribeWorkflow.class, CHILD_OPTIONS); + + private final TestDescribeActivity activityStub = + Workflow.newActivityStub( + TestDescribeActivity.class, + ActivityOptions.newBuilder() + .setRetryOptions( + RetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(1)) + .setMaximumAttempts(2) + .build()) + .setStartToCloseTimeout(Duration.ofMinutes(1)) + .build()); + + @Override + public void run( + String myToken, String childToken, boolean heartbeat, int failAttemptsEarlierThan) { + if (childToken != null) { + childStub.run(childToken, null, heartbeat, failAttemptsEarlierThan); + } else { + activityStub.run(myToken, heartbeat, failAttemptsEarlierThan); + } + } + } + + @ActivityInterface + public interface TestDescribeActivity { + @ActivityMethod(name = "TestDescribeActivity") + void run(String token, boolean heartbeat, int failAttemptsEarlierThan); + } + + public static class TestDescribeActivityImpl implements TestDescribeActivity { + + public void run(String token, boolean heartbeat, int failAttemptsEarlierThan) { + try { + // Wait twice - once to let the test case wait for activity start, and once + // to let the test case hold activities open until it wants them to finish. + ThreadUtils.waitForTestCase(token + "-start"); + + int attempt = Activity.getExecutionContext().getInfo().getAttempt(); + if (heartbeat) { + ThreadUtils.waitForTestCase(token + "-heartbeat"); + Activity.getExecutionContext().heartbeat("heartbeatDetails"); + ThreadUtils.waitForTestCase(token + "-after-heartbeat"); + } else if (attempt < failAttemptsEarlierThan) { + ThreadUtils.waitForTestCase(token + "-fail"); + throw new RuntimeException("Activity was asked to fail on attempt " + attempt); + } + + ThreadUtils.waitForTestCase(token + "-finish"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw CheckedExceptionWrapper.wrap(e); + } + } + } + + /* + * This class lets test code precisely control the execution of activity code via + * barriers. We can't just do this with signals because exercise describe needs + * control over _activity_ code, not just workflow code. + */ + public static class ThreadUtils { + private static final Logger log = LoggerFactory.getLogger(ThreadUtils.class); + + private static Map queues = new ConcurrentHashMap<>(); + + public static void waitForTestCase(String token) throws InterruptedException { + log.info("Workflow is waiting to meet test case: {}", token); + waitFor("test case", token); + log.info("Workflow finished meeting test case: {}", token); + } + + public static void waitForWorkflow(String token) throws InterruptedException { + log.info("Test case is waiting to meet workflow: {}", token); + waitFor("workflow", token); + log.info("Test case finished meeting workflow: {}", token); + } + + private static void waitFor(String otherParty, String token) throws InterruptedException { + CyclicBarrier barrier = queues.computeIfAbsent(token, unused -> new CyclicBarrier(2)); + + try { + barrier.await(30, TimeUnit.SECONDS); + } catch (BrokenBarrierException e) { + // This happens to waiting threads if a peer gets interrupted. + log.warn( + "Barrier broken when waiting for the {} on {}. This is a side-effect of a test thread being interrupted, and is not the cause of a test failure.", + token, + otherParty); + } catch (TimeoutException e) { + // This is a test failure - the other party didn't show up in time. + Assert.fail( + String.format("When waiting on %s, the %s did not arrive in time", token, otherParty)); + } + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/shared/DescribeWorkflowAsserter.java b/temporal-sdk/src/test/java/io/temporal/workflow/shared/DescribeWorkflowAsserter.java new file mode 100644 index 0000000000..78c7ed0973 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/shared/DescribeWorkflowAsserter.java @@ -0,0 +1,188 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.workflow.shared; + +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import com.google.protobuf.util.Timestamps; +import io.temporal.api.common.v1.Payload; +import io.temporal.api.common.v1.Payloads; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflow.v1.WorkflowExecutionConfig; +import io.temporal.api.workflow.v1.WorkflowExecutionInfo; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; +import io.temporal.client.WorkflowOptions; +import io.temporal.internal.common.ProtobufTimeUtils; +import java.util.Map; +import java.util.stream.Collectors; +import org.junit.Assert; + +/* + * Fluent assertions (ala truth or assert-j) for DescribeWorkflowResults. + */ +public class DescribeWorkflowAsserter { + + private final DescribeWorkflowExecutionResponse actual; + + public DescribeWorkflowAsserter(DescribeWorkflowExecutionResponse actual) { + this.actual = actual; + } + + public DescribeWorkflowExecutionResponse getActual() { + return actual; + } + + public DescribeWorkflowAsserter assertMatchesOptions(WorkflowOptions options) { + WorkflowExecutionConfig ec = actual.getExecutionConfig(); + Assert.assertEquals( + "workflow execution timeout should match", + options.getWorkflowExecutionTimeout(), + ProtobufTimeUtils.toJavaDuration(ec.getWorkflowExecutionTimeout())); + Assert.assertEquals( + "workflow run timeout should match", + options.getWorkflowRunTimeout(), + ProtobufTimeUtils.toJavaDuration(ec.getWorkflowRunTimeout())); + Assert.assertEquals( + "workflow task timeout should match", + options.getWorkflowTaskTimeout(), + ProtobufTimeUtils.toJavaDuration(ec.getDefaultWorkflowTaskTimeout())); + + WorkflowExecutionInfo ei = actual.getWorkflowExecutionInfo(); + Assert.assertEquals( + "memo should match", options.getMemo(), toSimpleMap(ei.getMemo().getFieldsMap())); + return this; + } + + private Map toSimpleMap(Map payloadMap) { + return payloadMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> payloadToString(e.getValue()))); + } + + private String payloadToString(Payload payload) { + // For simplicity, we only test with strings in the payload, which get serialized as json + String jsonPayload = payload.getData().toStringUtf8(); + Preconditions.checkState( + jsonPayload.startsWith("\"") && jsonPayload.endsWith("\""), + "Payload not a json string: %s", + jsonPayload); + // Strip off the quotes to get the original string + return jsonPayload.substring(1, jsonPayload.length() - 1); + } + + public static Payloads stringsToPayloads(String... strings) { + Payloads.Builder payloadsBuilder = Payloads.newBuilder(); + + for (String s : strings) { + payloadsBuilder.addPayloads( + Payload.newBuilder() + .putMetadata("encoding", ByteString.copyFromUtf8("json/plain")) + .setData(ByteString.copyFromUtf8("\"" + s + "\"")) + .build()); + } + + return payloadsBuilder.build(); + } + + public DescribeWorkflowAsserter assertTaskQueue(String expected) { + Assert.assertEquals( + "task queue should match", expected, actual.getExecutionConfig().getTaskQueue().getName()); + + // There's a task queue in WorkflowExecutionInfo too, but the golang doesn't set it, so we don't + // either. + + return this; + } + + public DescribeWorkflowAsserter assertSaneTimestamps() { + WorkflowExecutionInfo ei = actual.getWorkflowExecutionInfo(); + + if (ei.hasStartTime()) { + Assert.assertTrue("start time should be positive", ei.getStartTime().getSeconds() > 0); + + Assert.assertTrue( + "start time should be <= execution time", + Timestamps.compare(ei.getStartTime(), ei.getExecutionTime()) <= 0); + + if (ei.hasCloseTime()) { + Assert.assertTrue( + "start time should be <= close time", + Timestamps.compare(ei.getStartTime(), ei.getCloseTime()) <= 0); + } + } + + return this; + } + + public DescribeWorkflowAsserter assertExecutionId(WorkflowExecution expected) { + Assert.assertEquals( + "execution should match", expected, actual.getWorkflowExecutionInfo().getExecution()); + return this; + } + + public DescribeWorkflowAsserter assertType(String expected) { + Assert.assertEquals( + "workflow type should match", + expected, + actual.getWorkflowExecutionInfo().getType().getName()); + return this; + } + + public DescribeWorkflowAsserter assertStatus(WorkflowExecutionStatus expected) { + Assert.assertEquals( + "status should match", expected, actual.getWorkflowExecutionInfo().getStatus()); + return this; + } + + public DescribeWorkflowAsserter assertHistoryLength(int expected) { + Assert.assertEquals( + "history length should match", + expected, + actual.getWorkflowExecutionInfo().getHistoryLength()); + return this; + } + + public DescribeWorkflowAsserter assertNoParent() { + WorkflowExecutionInfo ei = actual.getWorkflowExecutionInfo(); + Assert.assertEquals("parent namespace should be absent", "", ei.getParentNamespaceId()); + Assert.assertFalse("parent execution should be absent", ei.hasParentExecution()); + return this; + } + + public DescribeWorkflowAsserter assertParent(WorkflowExecution parentExecution) { + WorkflowExecutionInfo ei = actual.getWorkflowExecutionInfo(); + // We don't assert parent namespace because we need the _id_, not the name, + // and DescribeNamespace isn't implemented in the test service. + Assert.assertEquals("parent execution should match", parentExecution, ei.getParentExecution()); + return this; + } + + public DescribeWorkflowAsserter assertPendingActivityCount(int expected) { + Assert.assertEquals( + "pending activity count should match", expected, actual.getPendingActivitiesCount()); + return this; + } + + public DescribeWorkflowAsserter assertPendingChildrenCount(int expected) { + Assert.assertEquals( + "child workflow count should match", expected, actual.getPendingChildrenCount()); + return this; + } +} diff --git a/temporal-testing/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-testing/src/main/java/io/temporal/internal/testservice/StateMachines.java index 6dbb37baa6..570e81e473 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-testing/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -164,18 +164,39 @@ class StateMachines { public static final long MAX_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 60L * 1000; enum State { - NONE, - INITIATED, - INITIATED_QUERY_ONLY, - STARTED, - STARTED_QUERY_ONLY, - FAILED, - TIMED_OUT, - CANCELLATION_REQUESTED, - CANCELED, - COMPLETED, - CONTINUED_AS_NEW, - TERMINATED, + NONE(false), + INITIATED(false), + INITIATED_QUERY_ONLY(false), + STARTED(false), + STARTED_QUERY_ONLY(false), + FAILED(true), + TIMED_OUT(true), + CANCELLATION_REQUESTED(false), + CANCELED(true), + COMPLETED(true), + // Tricky! _this_ run is terminal if it continued-as-new, even though + // there's probably another, very similar looking, non-terminal run. + CONTINUED_AS_NEW(true), + TERMINATED(true); + + private final boolean terminal; + + State(boolean terminal) { + this.terminal = terminal; + } + + public boolean isTerminal() { + return terminal; + } + } + + public static boolean historyEventIsTerminal(HistoryEvent e) { + return e.hasWorkflowExecutionContinuedAsNewEventAttributes() + || e.hasWorkflowExecutionCompletedEventAttributes() + || e.hasWorkflowExecutionCanceledEventAttributes() + || e.hasWorkflowExecutionFailedEventAttributes() + || e.hasWorkflowExecutionTerminatedEventAttributes() + || e.hasWorkflowExecutionTimedOutEventAttributes(); } enum Action { diff --git a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java index bab0960d22..3c0a451c79 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java +++ b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java @@ -31,6 +31,7 @@ import io.temporal.api.history.v1.ExternalWorkflowExecutionCancelRequestedEventAttributes; import io.temporal.api.history.v1.StartChildWorkflowExecutionFailedEventAttributes; import io.temporal.api.taskqueue.v1.StickyExecutionAttributes; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueRequest; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest; @@ -123,6 +124,8 @@ void cancelActivityTask( QueryWorkflowResponse query(QueryWorkflowRequest queryRequest, long deadline); + DescribeWorkflowExecutionResponse describeWorkflowExecution(); + void completeQuery(QueryId queryId, RespondQueryTaskCompletedRequest completeRequest); StickyExecutionAttributes getStickyExecutionAttributes(); diff --git a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index a9c5a0e9c4..12167be245 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -32,7 +32,9 @@ import com.cronutils.model.definition.CronDefinitionBuilder; import com.cronutils.model.time.ExecutionTime; import com.cronutils.parser.CronParser; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.protobuf.Timestamp; import com.google.protobuf.util.Durations; import com.google.protobuf.util.Timestamps; import io.grpc.Status; @@ -54,6 +56,7 @@ import io.temporal.api.common.v1.Payloads; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.EventType; +import io.temporal.api.enums.v1.PendingActivityState; import io.temporal.api.enums.v1.QueryRejectCondition; import io.temporal.api.enums.v1.RetryState; import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause; @@ -79,7 +82,14 @@ import io.temporal.api.query.v1.QueryRejected; import io.temporal.api.query.v1.WorkflowQueryResult; import io.temporal.api.taskqueue.v1.StickyExecutionAttributes; +import io.temporal.api.workflow.v1.PendingActivityInfo; +import io.temporal.api.workflow.v1.PendingChildExecutionInfo; +import io.temporal.api.workflow.v1.WorkflowExecutionConfig; +import io.temporal.api.workflow.v1.WorkflowExecutionInfo; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; +import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueRequest; +import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse; @@ -132,6 +142,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.LongSupplier; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -2115,6 +2126,210 @@ public void completeQuery(QueryId queryId, RespondQueryTaskCompletedRequest comp } } + @Override + public DescribeWorkflowExecutionResponse describeWorkflowExecution() { + WorkflowExecutionConfig.Builder executionConfig = + WorkflowExecutionConfig.newBuilder() + .setTaskQueue(startRequest.getTaskQueue()) + .setWorkflowExecutionTimeout(startRequest.getWorkflowExecutionTimeout()) + .setWorkflowRunTimeout(startRequest.getWorkflowRunTimeout()) + .setDefaultWorkflowTaskTimeout(startRequest.getWorkflowTaskTimeout()); + + GetWorkflowExecutionHistoryRequest getRequest = + GetWorkflowExecutionHistoryRequest.newBuilder() + .setNamespace(startRequest.getNamespace()) + .setExecution(executionId.getExecution()) + .build(); + List fullHistory = + store + .getWorkflowExecutionHistory(executionId, getRequest, null) + .getHistory() + .getEventsList(); + + WorkflowExecutionInfo.Builder executionInfo = WorkflowExecutionInfo.newBuilder(); + executionInfo + .setExecution(this.executionId.getExecution()) + .setType(this.getStartRequest().getWorkflowType()) + .setMemo(startRequest.getMemo()) + // No setAutoResetPoints - the test environment doesn't support that feature + .setSearchAttributes(startRequest.getSearchAttributes()) + .setStatus(this.getWorkflowExecutionStatus()); + + // For everything else, we need the history. + executionInfo.setHistoryLength(fullHistory.size()); + setTimestampsBasedOnHistory(executionInfo, fullHistory); + + parent.ifPresent( + p -> { + executionInfo + .setParentNamespaceId(p.getExecutionId().getNamespace()) + .setParentExecution(p.getExecutionId().getExecution()); + }); + + List pendingActivities = + this.activities.values().stream() + .filter(sm -> !sm.getState().isTerminal()) + .map(this::constructPendingActivityInfo) + .collect(Collectors.toList()); + + List pendingChildren = + this.childWorkflows.values().stream() + .filter(sm -> !sm.getState().isTerminal()) + .map(this::constructPendingChildExecutionInfo) + .collect(Collectors.toList()); + + return DescribeWorkflowExecutionResponse.newBuilder() + .setExecutionConfig(executionConfig) + .setWorkflowExecutionInfo(executionInfo) + .addAllPendingActivities(pendingActivities) + .addAllPendingChildren(pendingChildren) + .build(); + } + + private PendingChildExecutionInfo constructPendingChildExecutionInfo( + StateMachine sm) { + ChildWorkflowData data = sm.getData(); + return PendingChildExecutionInfo.newBuilder() + .setWorkflowId(data.execution.getWorkflowId()) + .setRunId(data.execution.getRunId()) + .setWorkflowTypeName(data.initiatedEvent.getWorkflowType().getName()) + .setInitiatedId(data.initiatedEventId) + .setParentClosePolicy(data.initiatedEvent.getParentClosePolicy()) + .build(); + } + + private PendingActivityInfo constructPendingActivityInfo(StateMachine sm) { + /* + * Working on this code? First, go read StateMachines.scheduleActivityTask to learn which + * information it puts in pendingActivity.scheduledEvent and which information it puts + * in pendingActivity.activityTask. The ActivityTaskData itself also has some of what we need. + */ + ActivityTaskData pendingActivity = sm.getData(); + PollActivityTaskQueueResponse.Builder task = pendingActivity.activityTask.getTask(); + ActivityTaskScheduledEventAttributes scheduledEvent = pendingActivity.scheduledEvent; + + State state = sm.getState(); + PendingActivityInfo.Builder builder = + PendingActivityInfo.newBuilder() + .setActivityId(scheduledEvent.getActivityId()) + .setActivityType(scheduledEvent.getActivityType()) + .setState(computeActivityState(state, pendingActivity)); + + // In golang, we set one but never both of these fields, depending on the activity state + if (builder.getState() == PendingActivityState.PENDING_ACTIVITY_STATE_SCHEDULED) { + builder.setScheduledTime(task.getScheduledTime()); + } else { + builder.setLastStartedTime(task.getStartedTime()); + builder.setLastHeartbeatTime(task.getStartedTime()); + } + + if (pendingActivity.lastHeartbeatTime > 0) { + // This may overwrite the heartbeat time we just set - that's fine + builder.setLastHeartbeatTime(timestampFromMillis(pendingActivity.lastHeartbeatTime)); + + if (pendingActivity.heartbeatDetails != null) { + builder.setHeartbeatDetails(pendingActivity.heartbeatDetails); + } + } + + if (scheduledEvent.hasRetryPolicy()) { + builder.setAttempt(pendingActivity.retryState.getAttempt()); + builder.setExpirationTime(pendingActivity.retryState.getExpirationTime()); + builder.setMaximumAttempts(scheduledEvent.getRetryPolicy().getMaximumAttempts()); + pendingActivity.retryState.getPreviousRunFailure().ifPresent(builder::setLastFailure); + // We don't track this in the test environment right now, but we could. + builder.setLastWorkerIdentity("test-environment-worker-identity"); + } else { + builder.setAttempt(1); + } + + return builder.build(); + } + + private Timestamp timestampFromMillis(long millis) { + return Timestamp.newBuilder() + .setSeconds(millis / 1000) + .setNanos(Math.toIntExact(Duration.ofMillis(millis % 1000).toNanos())) + .build(); + } + + // Mimics golang in HistoryEngine.DescribeWorkflowExecution. Note that this only covers pending + // states, so there's quite a bit of state-space that doesn't need to be mapped. + private PendingActivityState computeActivityState(State state, ActivityTaskData pendingActivity) { + if (state == State.CANCELLATION_REQUESTED) { + return PendingActivityState.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED; + } else if (pendingActivity.startedEvent != null) { + return PendingActivityState.PENDING_ACTIVITY_STATE_STARTED; + } else { + return PendingActivityState.PENDING_ACTIVITY_STATE_SCHEDULED; + } + } + + private void setTimestampsBasedOnHistory( + WorkflowExecutionInfo.Builder executionInfo, List fullHistory) { + getStartEvent(fullHistory) + .ifPresent( + startEvent -> { + Timestamp startTime = startEvent.getEventTime(); + executionInfo.setStartTime(startEvent.getEventTime()); + + if (startEvent + .getWorkflowExecutionStartedEventAttributes() + .hasFirstWorkflowTaskBackoff()) { + executionInfo.setExecutionTime( + Timestamps.add( + startTime, + startEvent + .getWorkflowExecutionStartedEventAttributes() + .getFirstWorkflowTaskBackoff())); + } else { + // Some (most) workflows don't have firstWorkflowTaskBackoff. + executionInfo.setExecutionTime(startTime); + } + }); + + getCompletionEvent(fullHistory) + .ifPresent( + completionEvent -> { + executionInfo.setCloseTime(completionEvent.getEventTime()); + }); + } + + // Has an analog in the golang codebase: MutableState.GetStartEvent(). This could become public + // if needed. + private Optional getStartEvent(List history) { + if (history.size() == 0) { + // It's theoretically possible for the TestWorkflowMutableState to exist, but + // for the history to still be empty. This is the case between construction and + // the ctx.commitChanges at the end of startWorkflow. + return Optional.empty(); + } + + HistoryEvent firstEvent = history.get(0); + + // This is true today (see StateMachines.startWorkflow), even in the signalWithStartCase (signal + // is the _second_ event). But if it becomes untrue in the future, we'd rather fail than lie. + Preconditions.checkState( + firstEvent.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + "The first event in a workflow's history should be %s, but was %s", + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED.name(), + firstEvent.getEventType().name()); + + return Optional.of(firstEvent); + } + + // Has an analog in the golang codebase: MutableState.GetCompletionEvent(). This could become + // public if needed. + private Optional getCompletionEvent(List history) { + HistoryEvent lastEvent = history.get(history.size() - 1); + + if (StateMachines.historyEventIsTerminal(lastEvent)) { + return Optional.of(lastEvent); + } else { + return Optional.empty(); + } + } + private void addExecutionSignaledEvent( RequestContext ctx, SignalWorkflowExecutionRequest signalRequest) { WorkflowExecutionSignaledEventAttributes.Builder a = diff --git a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index 76c9f775f2..04e2f24d33 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -44,6 +44,8 @@ import io.temporal.api.failure.v1.Failure; import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes; import io.temporal.api.workflow.v1.WorkflowExecutionInfo; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest; import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse; import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest; @@ -1060,6 +1062,28 @@ public void queryWorkflow( } } + @Override + public void describeWorkflowExecution( + DescribeWorkflowExecutionRequest request, + StreamObserver + responseObserver) { + String namespace = requireNotNull("Namespace", request.getNamespace()); + WorkflowExecution execution = requireNotNull("Execution", request.getExecution()); + ExecutionId executionId = new ExecutionId(namespace, execution); + + try { + TestWorkflowMutableState mutableState = getMutableState(executionId); + DescribeWorkflowExecutionResponse result = mutableState.describeWorkflowExecution(); + responseObserver.onNext(result); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.INTERNAL) { + log.error("unexpected", e); + } + responseObserver.onError(e); + } + } + private R requireNotNull(String fieldName, R value) { if (value == null) { throw Status.INVALID_ARGUMENT diff --git a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java index 3adaaf1b09..eaf3788b79 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java +++ b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java @@ -55,6 +55,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -431,12 +432,23 @@ public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory( lock.lock(); try { history = getHistoryStore(executionId); - if (!getRequest.getWaitNewEvent() - && getRequest.getHistoryEventFilterType() - != HistoryEventFilterType.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT) { + if (!getRequest.getWaitNewEvent()) { List events = history.getEventsLocked(); // Copy the list as it is mutable. Individual events assumed immutable. - ArrayList eventsCopy = new ArrayList<>(events); + List eventsCopy = + events.stream() + .filter( + e -> { + if (getRequest.getHistoryEventFilterType() + != HistoryEventFilterType.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT) { + return true; + } + + // They asked for only the close event. There are a variety of ways a workflow + // can close. + return StateMachines.historyEventIsTerminal(e); + }) + .collect(Collectors.toList()); return GetWorkflowExecutionHistoryResponse.newBuilder() .setHistory(History.newBuilder().addAllEvents(eventsCopy)) .build(); From ed6646cab2d79f78cdcb26737c511a41811f393d Mon Sep 17 00:00:00 2001 From: Nathan Glass Date: Wed, 25 Aug 2021 14:22:43 -0700 Subject: [PATCH 2/4] Fix some lint --- .../internal/testservice/TestWorkflowMutableStateImpl.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 12167be245..630925976b 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -2153,10 +2153,9 @@ public DescribeWorkflowExecutionResponse describeWorkflowExecution() { .setMemo(startRequest.getMemo()) // No setAutoResetPoints - the test environment doesn't support that feature .setSearchAttributes(startRequest.getSearchAttributes()) - .setStatus(this.getWorkflowExecutionStatus()); + .setStatus(this.getWorkflowExecutionStatus()) + .setHistoryLength(fullHistory.size()); - // For everything else, we need the history. - executionInfo.setHistoryLength(fullHistory.size()); setTimestampsBasedOnHistory(executionInfo, fullHistory); parent.ifPresent( From 75ffade91635000cb7529909974a44990ece390b Mon Sep 17 00:00:00 2001 From: Nathan Glass Date: Wed, 25 Aug 2021 15:12:29 -0700 Subject: [PATCH 3/4] Remove assertions on history length - in a constrained execution environment (like at build-time), we can't always know how many history entries to expect. --- .../test/java/io/temporal/workflow/DescribeTest.java | 10 ---------- .../workflow/shared/DescribeWorkflowAsserter.java | 8 -------- 2 files changed, 18 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/DescribeTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/DescribeTest.java index 05976eb224..63e5631eaa 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/DescribeTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/DescribeTest.java @@ -133,7 +133,6 @@ public void testSuccessfulActivity() throws InterruptedException { describe(execution) .assertMatchesOptions(options) .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) - .assertHistoryLength(5) .assertNoParent() .assertPendingActivityCount(1) .assertPendingChildrenCount(0); @@ -170,7 +169,6 @@ public void testSuccessfulActivity() throws InterruptedException { describe(execution) .assertMatchesOptions(options) .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) - .assertHistoryLength(5) .assertNoParent() .assertPendingActivityCount(1) .assertPendingChildrenCount(0); @@ -194,7 +192,6 @@ public void testSuccessfulActivity() throws InterruptedException { describe(execution) .assertMatchesOptions(options) .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) - .assertHistoryLength(11) .assertNoParent() .assertPendingActivityCount(0) .assertPendingChildrenCount(0); @@ -220,7 +217,6 @@ public void testFailedActivity() throws InterruptedException { describe(execution) .assertMatchesOptions(options) .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) - .assertHistoryLength(5) .assertPendingActivityCount(1); PendingActivityInfo actual = asserter.getActual().getPendingActivities(0); @@ -257,7 +253,6 @@ public void testFailedActivity() throws InterruptedException { describe(execution) .assertMatchesOptions(options) .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) - .assertHistoryLength(11) .assertNoParent() .assertPendingActivityCount(0) .assertPendingChildrenCount(0); @@ -296,7 +291,6 @@ private void testKilledWorkflow( describe(execution) .assertMatchesOptions(options) .assertStatus(expectedWorkflowStatus) - .assertHistoryLength(expectedHistoryLength) .assertPendingActivityCount(expectedActivityStatus == null ? 0 : 1); if (expectedActivityStatus == null) { @@ -384,7 +378,6 @@ public void testChildWorkflow() throws InterruptedException { describe(parentExecution) .assertMatchesOptions(options) .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) - .assertHistoryLength(9) .assertNoParent() .assertPendingActivityCount(0) .assertPendingChildrenCount(1); @@ -416,7 +409,6 @@ public void testChildWorkflow() throws InterruptedException { describe(childExecution) .assertMatchesOptions(expectedChildOptions) .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) - .assertHistoryLength(5) .assertParent(parentExecution) .assertPendingActivityCount(1) .assertPendingChildrenCount(0); @@ -428,7 +420,6 @@ public void testChildWorkflow() throws InterruptedException { describe(parentExecution) .assertMatchesOptions(options) .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) - .assertHistoryLength(14) .assertNoParent() .assertPendingActivityCount(0) .assertPendingChildrenCount(0); @@ -436,7 +427,6 @@ public void testChildWorkflow() throws InterruptedException { describe(childExecution) .assertMatchesOptions(expectedChildOptions) .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) - .assertHistoryLength(11) .assertParent(parentExecution) .assertPendingActivityCount(0) .assertPendingChildrenCount(0); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/shared/DescribeWorkflowAsserter.java b/temporal-sdk/src/test/java/io/temporal/workflow/shared/DescribeWorkflowAsserter.java index 78c7ed0973..3adf35c3bd 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/shared/DescribeWorkflowAsserter.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/shared/DescribeWorkflowAsserter.java @@ -151,14 +151,6 @@ public DescribeWorkflowAsserter assertStatus(WorkflowExecutionStatus expected) { return this; } - public DescribeWorkflowAsserter assertHistoryLength(int expected) { - Assert.assertEquals( - "history length should match", - expected, - actual.getWorkflowExecutionInfo().getHistoryLength()); - return this; - } - public DescribeWorkflowAsserter assertNoParent() { WorkflowExecutionInfo ei = actual.getWorkflowExecutionInfo(); Assert.assertEquals("parent namespace should be absent", "", ei.getParentNamespaceId()); From 1296dcc2b85dedac29b480aae1905217fc5344c8 Mon Sep 17 00:00:00 2001 From: Nathan Glass Date: Mon, 30 Aug 2021 10:55:49 -0700 Subject: [PATCH 4/4] Address PR feedback --- .../internal/testservice/StateMachines.java | 45 ++---- .../TestWorkflowMutableStateImpl.java | 151 ++++++++++-------- .../testservice/TestWorkflowStoreImpl.java | 2 +- 3 files changed, 99 insertions(+), 99 deletions(-) diff --git a/temporal-testing/src/main/java/io/temporal/internal/testservice/StateMachines.java b/temporal-testing/src/main/java/io/temporal/internal/testservice/StateMachines.java index 570e81e473..6dbb37baa6 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/testservice/StateMachines.java +++ b/temporal-testing/src/main/java/io/temporal/internal/testservice/StateMachines.java @@ -164,39 +164,18 @@ class StateMachines { public static final long MAX_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 60L * 1000; enum State { - NONE(false), - INITIATED(false), - INITIATED_QUERY_ONLY(false), - STARTED(false), - STARTED_QUERY_ONLY(false), - FAILED(true), - TIMED_OUT(true), - CANCELLATION_REQUESTED(false), - CANCELED(true), - COMPLETED(true), - // Tricky! _this_ run is terminal if it continued-as-new, even though - // there's probably another, very similar looking, non-terminal run. - CONTINUED_AS_NEW(true), - TERMINATED(true); - - private final boolean terminal; - - State(boolean terminal) { - this.terminal = terminal; - } - - public boolean isTerminal() { - return terminal; - } - } - - public static boolean historyEventIsTerminal(HistoryEvent e) { - return e.hasWorkflowExecutionContinuedAsNewEventAttributes() - || e.hasWorkflowExecutionCompletedEventAttributes() - || e.hasWorkflowExecutionCanceledEventAttributes() - || e.hasWorkflowExecutionFailedEventAttributes() - || e.hasWorkflowExecutionTerminatedEventAttributes() - || e.hasWorkflowExecutionTimedOutEventAttributes(); + NONE, + INITIATED, + INITIATED_QUERY_ONLY, + STARTED, + STARTED_QUERY_ONLY, + FAILED, + TIMED_OUT, + CANCELLATION_REQUESTED, + CANCELED, + COMPLETED, + CONTINUED_AS_NEW, + TERMINATED, } enum Action { diff --git a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 630925976b..81062378f5 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -54,6 +54,7 @@ import io.temporal.api.command.v1.StartTimerCommandAttributes; import io.temporal.api.command.v1.UpsertWorkflowSearchAttributesCommandAttributes; import io.temporal.api.common.v1.Payloads; +import io.temporal.api.common.v1.RetryPolicy; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.EventType; import io.temporal.api.enums.v1.PendingActivityState; @@ -89,7 +90,6 @@ import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueRequest; -import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse; @@ -2130,19 +2130,19 @@ public void completeQuery(QueryId queryId, RespondQueryTaskCompletedRequest comp public DescribeWorkflowExecutionResponse describeWorkflowExecution() { WorkflowExecutionConfig.Builder executionConfig = WorkflowExecutionConfig.newBuilder() - .setTaskQueue(startRequest.getTaskQueue()) - .setWorkflowExecutionTimeout(startRequest.getWorkflowExecutionTimeout()) - .setWorkflowRunTimeout(startRequest.getWorkflowRunTimeout()) - .setDefaultWorkflowTaskTimeout(startRequest.getWorkflowTaskTimeout()); + .setTaskQueue(this.startRequest.getTaskQueue()) + .setWorkflowExecutionTimeout(this.startRequest.getWorkflowExecutionTimeout()) + .setWorkflowRunTimeout(this.startRequest.getWorkflowRunTimeout()) + .setDefaultWorkflowTaskTimeout(this.startRequest.getWorkflowTaskTimeout()); GetWorkflowExecutionHistoryRequest getRequest = GetWorkflowExecutionHistoryRequest.newBuilder() - .setNamespace(startRequest.getNamespace()) - .setExecution(executionId.getExecution()) + .setNamespace(this.startRequest.getNamespace()) + .setExecution(this.executionId.getExecution()) .build(); List fullHistory = store - .getWorkflowExecutionHistory(executionId, getRequest, null) + .getWorkflowExecutionHistory(this.executionId, getRequest, null) .getHistory() .getEventsList(); @@ -2150,15 +2150,15 @@ public DescribeWorkflowExecutionResponse describeWorkflowExecution() { executionInfo .setExecution(this.executionId.getExecution()) .setType(this.getStartRequest().getWorkflowType()) - .setMemo(startRequest.getMemo()) + .setMemo(this.startRequest.getMemo()) // No setAutoResetPoints - the test environment doesn't support that feature - .setSearchAttributes(startRequest.getSearchAttributes()) + .setSearchAttributes(this.startRequest.getSearchAttributes()) .setStatus(this.getWorkflowExecutionStatus()) .setHistoryLength(fullHistory.size()); - setTimestampsBasedOnHistory(executionInfo, fullHistory); + populateWorkflowExecutionInfoFromHistory(executionInfo, fullHistory); - parent.ifPresent( + this.parent.ifPresent( p -> { executionInfo .setParentNamespaceId(p.getExecutionId().getNamespace()) @@ -2167,14 +2167,14 @@ public DescribeWorkflowExecutionResponse describeWorkflowExecution() { List pendingActivities = this.activities.values().stream() - .filter(sm -> !sm.getState().isTerminal()) - .map(this::constructPendingActivityInfo) + .filter(sm -> !isTerminalState(sm.getState())) + .map(TestWorkflowMutableStateImpl::constructPendingActivityInfo) .collect(Collectors.toList()); List pendingChildren = this.childWorkflows.values().stream() - .filter(sm -> !sm.getState().isTerminal()) - .map(this::constructPendingChildExecutionInfo) + .filter(sm -> !isTerminalState(sm.getState())) + .map(TestWorkflowMutableStateImpl::constructPendingChildExecutionInfo) .collect(Collectors.toList()); return DescribeWorkflowExecutionResponse.newBuilder() @@ -2185,7 +2185,7 @@ public DescribeWorkflowExecutionResponse describeWorkflowExecution() { .build(); } - private PendingChildExecutionInfo constructPendingChildExecutionInfo( + private static PendingChildExecutionInfo constructPendingChildExecutionInfo( StateMachine sm) { ChildWorkflowData data = sm.getData(); return PendingChildExecutionInfo.newBuilder() @@ -2197,64 +2197,43 @@ private PendingChildExecutionInfo constructPendingChildExecutionInfo( .build(); } - private PendingActivityInfo constructPendingActivityInfo(StateMachine sm) { + private static PendingActivityInfo constructPendingActivityInfo( + StateMachine sm) { /* - * Working on this code? First, go read StateMachines.scheduleActivityTask to learn which - * information it puts in pendingActivity.scheduledEvent and which information it puts - * in pendingActivity.activityTask. The ActivityTaskData itself also has some of what we need. + * Working on this code? Read StateMachines.scheduleActivityTask to get answers to questions + * like 'why does some of the information come from the scheduledEvent?' */ - ActivityTaskData pendingActivity = sm.getData(); - PollActivityTaskQueueResponse.Builder task = pendingActivity.activityTask.getTask(); - ActivityTaskScheduledEventAttributes scheduledEvent = pendingActivity.scheduledEvent; + ActivityTaskData activityTaskData = sm.getData(); State state = sm.getState(); - PendingActivityInfo.Builder builder = - PendingActivityInfo.newBuilder() - .setActivityId(scheduledEvent.getActivityId()) - .setActivityType(scheduledEvent.getActivityType()) - .setState(computeActivityState(state, pendingActivity)); + PendingActivityInfo.Builder builder = PendingActivityInfo.newBuilder(); - // In golang, we set one but never both of these fields, depending on the activity state - if (builder.getState() == PendingActivityState.PENDING_ACTIVITY_STATE_SCHEDULED) { - builder.setScheduledTime(task.getScheduledTime()); - } else { - builder.setLastStartedTime(task.getStartedTime()); - builder.setLastHeartbeatTime(task.getStartedTime()); - } + // The oddballs - these don't obviously come from any one part of the structure + builder + .setState(computeActivityState(state, activityTaskData)) + // We don't track this in the test environment right now, but we could. + .setLastWorkerIdentity("test-environment-worker-identity"); - if (pendingActivity.lastHeartbeatTime > 0) { - // This may overwrite the heartbeat time we just set - that's fine - builder.setLastHeartbeatTime(timestampFromMillis(pendingActivity.lastHeartbeatTime)); + // Some ids are only present in the schedule event... + populatePendingActivityInfoFromScheduledEvent(builder, activityTaskData.scheduledEvent); - if (pendingActivity.heartbeatDetails != null) { - builder.setHeartbeatDetails(pendingActivity.heartbeatDetails); - } - } + // A few bits of timing are only present on the poll response... + PollActivityTaskQueueResponseOrBuilder pollResponse = activityTaskData.activityTask.getTask(); + populatePendingActivityInfoFromPollResponse(builder, pollResponse); - if (scheduledEvent.hasRetryPolicy()) { - builder.setAttempt(pendingActivity.retryState.getAttempt()); - builder.setExpirationTime(pendingActivity.retryState.getExpirationTime()); - builder.setMaximumAttempts(scheduledEvent.getRetryPolicy().getMaximumAttempts()); - pendingActivity.retryState.getPreviousRunFailure().ifPresent(builder::setLastFailure); - // We don't track this in the test environment right now, but we could. - builder.setLastWorkerIdentity("test-environment-worker-identity"); - } else { - builder.setAttempt(1); - } + // Heartbeat details are housed directly in the activityTaskData + populatePendingActivityInfoFromHeartbeatDetails(builder, activityTaskData); - return builder.build(); - } + // Retry data is housed under .retryState + populatePendingActivityInfoFromRetryData(builder, activityTaskData.retryState); - private Timestamp timestampFromMillis(long millis) { - return Timestamp.newBuilder() - .setSeconds(millis / 1000) - .setNanos(Math.toIntExact(Duration.ofMillis(millis % 1000).toNanos())) - .build(); + return builder.build(); } // Mimics golang in HistoryEngine.DescribeWorkflowExecution. Note that this only covers pending // states, so there's quite a bit of state-space that doesn't need to be mapped. - private PendingActivityState computeActivityState(State state, ActivityTaskData pendingActivity) { + private static PendingActivityState computeActivityState( + State state, ActivityTaskData pendingActivity) { if (state == State.CANCELLATION_REQUESTED) { return PendingActivityState.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED; } else if (pendingActivity.startedEvent != null) { @@ -2264,7 +2243,49 @@ private PendingActivityState computeActivityState(State state, ActivityTaskData } } - private void setTimestampsBasedOnHistory( + private static void populatePendingActivityInfoFromScheduledEvent( + PendingActivityInfo.Builder builder, ActivityTaskScheduledEventAttributes scheduledEvent) { + builder + .setActivityId(scheduledEvent.getActivityId()) + .setActivityType(scheduledEvent.getActivityType()); + } + + private static void populatePendingActivityInfoFromPollResponse( + PendingActivityInfo.Builder builder, PollActivityTaskQueueResponseOrBuilder task) { + // In golang, we set one but never both of these fields, depending on the activity state + if (builder.getState() == PendingActivityState.PENDING_ACTIVITY_STATE_SCHEDULED) { + builder.setScheduledTime(task.getScheduledTime()); + } else { + builder.setLastStartedTime(task.getStartedTime()); + builder.setLastHeartbeatTime(task.getStartedTime()); + } + } + + private static void populatePendingActivityInfoFromHeartbeatDetails( + PendingActivityInfo.Builder builder, ActivityTaskData activityTaskData) { + if (activityTaskData.lastHeartbeatTime > 0) { + // This may overwrite the heartbeat time we just set - that's fine + builder.setLastHeartbeatTime(Timestamps.fromMillis(activityTaskData.lastHeartbeatTime)); + + if (activityTaskData.heartbeatDetails != null) { + builder.setHeartbeatDetails(activityTaskData.heartbeatDetails); + } + } + } + + private static void populatePendingActivityInfoFromRetryData( + PendingActivityInfo.Builder builder, TestServiceRetryState retryState) { + builder.setAttempt(retryState.getAttempt()); + builder.setExpirationTime(retryState.getExpirationTime()); + retryState.getPreviousRunFailure().ifPresent(builder::setLastFailure); + + RetryPolicy retryPolicy = + Preconditions.checkNotNull( + retryState.getRetryPolicy(), "retryPolicy should always be present"); + builder.setMaximumAttempts(retryPolicy.getMaximumAttempts()); + } + + private static void populateWorkflowExecutionInfoFromHistory( WorkflowExecutionInfo.Builder executionInfo, List fullHistory) { getStartEvent(fullHistory) .ifPresent( @@ -2296,7 +2317,7 @@ private void setTimestampsBasedOnHistory( // Has an analog in the golang codebase: MutableState.GetStartEvent(). This could become public // if needed. - private Optional getStartEvent(List history) { + private static Optional getStartEvent(List history) { if (history.size() == 0) { // It's theoretically possible for the TestWorkflowMutableState to exist, but // for the history to still be empty. This is the case between construction and @@ -2319,10 +2340,10 @@ private Optional getStartEvent(List history) { // Has an analog in the golang codebase: MutableState.GetCompletionEvent(). This could become // public if needed. - private Optional getCompletionEvent(List history) { + private static Optional getCompletionEvent(List history) { HistoryEvent lastEvent = history.get(history.size() - 1); - if (StateMachines.historyEventIsTerminal(lastEvent)) { + if (WorkflowExecutionUtils.isWorkflowExecutionCompletedEvent(lastEvent)) { return Optional.of(lastEvent); } else { return Optional.empty(); diff --git a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java index eaf3788b79..1ee50065ec 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java +++ b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java @@ -446,7 +446,7 @@ public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory( // They asked for only the close event. There are a variety of ways a workflow // can close. - return StateMachines.historyEventIsTerminal(e); + return WorkflowExecutionUtils.isWorkflowExecutionCompletedEvent(e); }) .collect(Collectors.toList()); return GetWorkflowExecutionHistoryResponse.newBuilder()