diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/DescribeTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/DescribeTest.java new file mode 100644 index 0000000000..63e5631eaa --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/DescribeTest.java @@ -0,0 +1,555 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.workflow; + +import com.google.common.collect.ImmutableMap; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.temporal.activity.Activity; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.api.common.v1.ActivityType; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.PendingActivityState; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflow.v1.PendingActivityInfo; +import io.temporal.api.workflow.v1.PendingChildExecutionInfo; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; +import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.common.RetryOptions; +import io.temporal.serviceclient.CheckedExceptionWrapper; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.shared.DescribeWorkflowAsserter; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DescribeTest { + + private static final Logger log = LoggerFactory.getLogger(DescribeTest.class); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestDescribeWorkflowImpl.class) + .setActivityImplementations(new TestDescribeActivityImpl()) + .setTestTimeoutSeconds(30) + .build(); + + public DescribeWorkflowAsserter describe(WorkflowExecution execution) { + DescribeWorkflowAsserter result = + new DescribeWorkflowAsserter( + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeWorkflowExecution( + DescribeWorkflowExecutionRequest.newBuilder() + .setNamespace( + testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setExecution(execution) + .build())); + + // There are some assertions that we can always make... + return result + .assertType("TestDescribeWorkflow") + .assertExecutionId(execution) + .assertSaneTimestamps() + .assertTaskQueue(testWorkflowRule.getTaskQueue()); + } + + @Test + public void testWorkflowDoesNotExist() { + StatusRuntimeException e = + Assert.assertThrows( + io.grpc.StatusRuntimeException.class, + () -> { + describe( + WorkflowExecution.newBuilder() + .setWorkflowId("627ecd0c-688b-3fc5-927e-0f7ab7eec09b") + .setRunId("8f493dbf-205d-4142-8e0b-dc5cdff404b8") + .build()); + }); + + Assert.assertEquals(Status.NOT_FOUND.getCode(), e.getStatus().getCode()); + } + + private WorkflowOptions options() { + // The task queue isn't known until the test is running, so we can't just declare a constant + // WorkflowOptions + return WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofMinutes(3)) + .setWorkflowRunTimeout(Duration.ofMinutes(2)) + .setWorkflowTaskTimeout(Duration.ofMinutes(1)) + .setMemo(ImmutableMap.of("memo", "randum")) + .build(); + } + + @Test + public void testSuccessfulActivity() throws InterruptedException { + String token = "testSuccessfulActivity"; + WorkflowOptions options = options(); + WorkflowStub stub = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub("TestDescribeWorkflow", options); + WorkflowExecution execution = stub.start(token, null, Boolean.TRUE, Integer.valueOf(0)); + + // Wait for the activity so we know what status to expect + ThreadUtils.waitForWorkflow(token + "-start"); + + DescribeWorkflowAsserter asserter = + describe(execution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) + .assertNoParent() + .assertPendingActivityCount(1) + .assertPendingChildrenCount(0); + + PendingActivityInfo actual = asserter.getActual().getPendingActivities(0); + + // No fancy asserter type for PendingActivityInfo... we just build the expected proto + PendingActivityInfo expected = + PendingActivityInfo.newBuilder() + .setActivityId(actual.getActivityId()) + .setActivityType(ActivityType.newBuilder().setName("TestDescribeActivity").build()) + .setState(PendingActivityState.PENDING_ACTIVITY_STATE_STARTED) + .setAttempt(1) + .setMaximumAttempts(2) + // times should be present, but we can't know what the expected value is if this test is + // going to run against the real server. + .setLastStartedTime(actual.getLastStartedTime()) + .setExpirationTime(actual.getExpirationTime()) + // Heads up! We're asserting that heartbeat time == started time, which should be true + // before the heartbeat + .setLastHeartbeatTime(actual.getLastStartedTime()) + // this ends up being a dummy value, but if it weren't, we still wouldn't expect to know + // it. + .setLastWorkerIdentity(actual.getLastWorkerIdentity()) + .build(); + + Assert.assertEquals("PendingActivityInfo should match before", expected, actual); + + // Make the activity heartbeat - this should show in the next describe call + ThreadUtils.waitForWorkflow(token + "-heartbeat"); + ThreadUtils.waitForWorkflow(token + "-after-heartbeat"); + + asserter = + describe(execution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) + .assertNoParent() + .assertPendingActivityCount(1) + .assertPendingChildrenCount(0); + + actual = asserter.getActual().getPendingActivities(0); + + // Now, our PendingActivityInfo has heartbeat data, but is otherwise unchanged + expected = + expected + .toBuilder() + .setHeartbeatDetails(DescribeWorkflowAsserter.stringsToPayloads("heartbeatDetails")) + .setLastHeartbeatTime(actual.getLastHeartbeatTime()) + .build(); + Assert.assertEquals("PendingActivityInfo should match after heartbeat", expected, actual); + + // Let the activity finish, which will let the workflow finish. + ThreadUtils.waitForWorkflow(token + "-finish"); + + // Wait for the workflow to finish so we know what state to expect + stub.getResult(Void.class); + describe(execution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) + .assertNoParent() + .assertPendingActivityCount(0) + .assertPendingChildrenCount(0); + } + + @Test + public void testFailedActivity() throws InterruptedException { + String token = "testFailedActivity"; + WorkflowOptions options = options(); + WorkflowStub stub = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub("TestDescribeWorkflow", options); + WorkflowExecution execution = stub.start(token, null, Boolean.FALSE, Integer.valueOf(2)); + + // Fast forward until the retry after the failure + ThreadUtils.waitForWorkflow(token + "-start"); + ThreadUtils.waitForWorkflow(token + "-fail"); + ThreadUtils.waitForWorkflow(token + "-start"); + + // Previous test cases have made boilerplate assertions - let's only focus on what's novel + DescribeWorkflowAsserter asserter = + describe(execution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) + .assertPendingActivityCount(1); + + PendingActivityInfo actual = asserter.getActual().getPendingActivities(0); + + Assert.assertEquals( + "failure message should match", + "Activity was asked to fail on attempt 1", + actual.getLastFailure().getMessage()); + + PendingActivityInfo expected = + PendingActivityInfo.newBuilder() + .setActivityId(actual.getActivityId()) + .setActivityType(ActivityType.newBuilder().setName("TestDescribeActivity").build()) + .setState(PendingActivityState.PENDING_ACTIVITY_STATE_STARTED) + .setAttempt(2) + .setMaximumAttempts(2) + // times should be present, but we can't know what the expected value is if this test is + // going to run against the real server. + .setLastStartedTime(actual.getLastStartedTime()) + .setLastHeartbeatTime(actual.getLastHeartbeatTime()) + .setExpirationTime(actual.getExpirationTime()) + // this ends up being a dummy value, but if it weren't, we still wouldn't expect to know + // it. + .setLastWorkerIdentity(actual.getLastWorkerIdentity()) + // We don't deeply assert the failure structure since we asserted the message above + .setLastFailure(actual.getLastFailure()) + .build(); + + Assert.assertEquals("PendingActivityInfo should match", expected, actual); + + // Now let the workflow succeed + ThreadUtils.waitForWorkflow(token + "-finish"); + stub.getResult(Void.class); + describe(execution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) + .assertNoParent() + .assertPendingActivityCount(0) + .assertPendingChildrenCount(0); + } + + private void testKilledWorkflow( + String token, + Consumer killer, + WorkflowExecutionStatus expectedWorkflowStatus, + PendingActivityState expectedActivityStatus, + int expectedHistoryLength) + throws InterruptedException { + WorkflowOptions options = options(); + WorkflowStub stub = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub("TestDescribeWorkflow", options); + // Set the execution up so that it will fail due to activity failures if it isn't otherwise + // killed + WorkflowExecution execution = stub.start(token, null, Boolean.FALSE, Integer.valueOf(3)); + + // Let the activity start so we can cancel it + ThreadUtils.waitForWorkflow(token + "-start"); + killer.accept(stub); + + // Wait for the kill (whatever it was) to get noticed (we don't try to assert intermediate + // states - those are hard to catch). + Assert.assertThrows( + WorkflowFailedException.class, + () -> { + stub.getResult(Void.class); + }); + + // Previous test cases have made boilerplate assertions - let's only focus on what's novel + DescribeWorkflowAsserter asserter = + describe(execution) + .assertMatchesOptions(options) + .assertStatus(expectedWorkflowStatus) + .assertPendingActivityCount(expectedActivityStatus == null ? 0 : 1); + + if (expectedActivityStatus == null) { + return; + } + + PendingActivityInfo actual = asserter.getActual().getPendingActivities(0); + + PendingActivityInfo expected = + PendingActivityInfo.newBuilder() + .setActivityId(actual.getActivityId()) + .setActivityType(ActivityType.newBuilder().setName("TestDescribeActivity").build()) + .setState(expectedActivityStatus) + .setAttempt(1) + .setMaximumAttempts(2) + // times should be present, but we can't know what the expected value is if this test is + // going to run against the real server. + .setLastStartedTime(actual.getLastStartedTime()) + .setLastHeartbeatTime(actual.getLastStartedTime()) + .setExpirationTime(actual.getExpirationTime()) + // this ends up being a dummy value, but if it weren't, we still wouldn't expect to know + // it. + .setLastWorkerIdentity(actual.getLastWorkerIdentity()) + .build(); + + Assert.assertEquals("PendingActivityInfo should match", expected, actual); + } + + @Test + public void testCanceledWorkflow() throws InterruptedException { + testKilledWorkflow( + "testCanceledWorkflow", + WorkflowStub::cancel, + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CANCELED, + PendingActivityState.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED, + 11); + } + + @Test + public void testTerminatedWorkflow() throws InterruptedException { + testKilledWorkflow( + "testTerminatedWorkflow", + stub -> stub.terminate("testing"), + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_TERMINATED, + PendingActivityState.PENDING_ACTIVITY_STATE_STARTED, + 6); + } + + @Test + public void testFailedWorkflow() throws InterruptedException { + String token = "testFailedWorkflow"; + testKilledWorkflow( + token, + stub -> { + try { + // Don't kill the workflow externally here - instead, unblock the activity twice, which + // will fail the workflow + ThreadUtils.waitForWorkflow(token + "-fail"); + ThreadUtils.waitForWorkflow(token + "-start"); + ThreadUtils.waitForWorkflow(token + "-fail"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }, + WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED, + null, + 11); + } + + @Test + public void testChildWorkflow() throws InterruptedException { + String token = "testChildWorkflow"; + WorkflowOptions options = options(); + WorkflowStub stub = + testWorkflowRule + .getWorkflowClient() + .newUntypedWorkflowStub("TestDescribeWorkflow", options); + WorkflowExecution parentExecution = stub.start(null, token, Boolean.FALSE, Integer.valueOf(0)); + + // This unblocks the child workflow's activity + ThreadUtils.waitForWorkflow(token + "-start"); + + DescribeWorkflowAsserter parent = + describe(parentExecution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) + .assertNoParent() + .assertPendingActivityCount(0) + .assertPendingChildrenCount(1); + + // There's very little of interest to assert on here, but we need the id so we can describe the + // child + PendingChildExecutionInfo childInfo = parent.getActual().getPendingChildren(0); + Assert.assertEquals( + "child workflow type name should match", + "TestDescribeWorkflow", + childInfo.getWorkflowTypeName()); + + WorkflowExecution childExecution = + WorkflowExecution.newBuilder() + .setWorkflowId(childInfo.getWorkflowId()) + .setRunId(childInfo.getRunId()) + .build(); + + // marshal ChildWorkflowOptions to WorkflowOptions because that's what the asserter expects + WorkflowOptions expectedChildOptions = + WorkflowOptions.newBuilder() + .setWorkflowExecutionTimeout( + TestDescribeWorkflowImpl.CHILD_OPTIONS.getWorkflowExecutionTimeout()) + .setWorkflowRunTimeout(TestDescribeWorkflowImpl.CHILD_OPTIONS.getWorkflowRunTimeout()) + .setWorkflowTaskTimeout(TestDescribeWorkflowImpl.CHILD_OPTIONS.getWorkflowTaskTimeout()) + .setMemo(TestDescribeWorkflowImpl.CHILD_OPTIONS.getMemo()) + .build(); + + describe(childExecution) + .assertMatchesOptions(expectedChildOptions) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING) + .assertParent(parentExecution) + .assertPendingActivityCount(1) + .assertPendingChildrenCount(0); + + // Unblock the child and wait for the parent to finish so we know what expected states are + ThreadUtils.waitForWorkflow(token + "-finish"); + stub.getResult(Void.class); + + describe(parentExecution) + .assertMatchesOptions(options) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) + .assertNoParent() + .assertPendingActivityCount(0) + .assertPendingChildrenCount(0); + + describe(childExecution) + .assertMatchesOptions(expectedChildOptions) + .assertStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED) + .assertParent(parentExecution) + .assertPendingActivityCount(0) + .assertPendingChildrenCount(0); + } + + /* + * We don't test things that require the passage of time here to avoid sleepy tests + * against the real temporal service. A future commit could introduce a test- + * environment-only suite that locks timeskipping and exercises time-based scenarios. + */ + + @WorkflowInterface + public interface TestDescribeWorkflow { + @WorkflowMethod(name = "TestDescribeWorkflow") + void run(String myToken, String childToken, boolean heartbeat, int failAttemptsEarlierThan); + } + + public static class TestDescribeWorkflowImpl implements TestDescribeWorkflow { + + private static final ChildWorkflowOptions CHILD_OPTIONS = + ChildWorkflowOptions.newBuilder() + .setWorkflowExecutionTimeout(Duration.ofMinutes(6)) + .setWorkflowRunTimeout(Duration.ofMinutes(5)) + .setWorkflowTaskTimeout(Duration.ofMinutes(1)) + .setMemo(ImmutableMap.of("other", "memo")) + .build(); + + private final TestDescribeWorkflow childStub = + Workflow.newChildWorkflowStub(TestDescribeWorkflow.class, CHILD_OPTIONS); + + private final TestDescribeActivity activityStub = + Workflow.newActivityStub( + TestDescribeActivity.class, + ActivityOptions.newBuilder() + .setRetryOptions( + RetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(1)) + .setMaximumAttempts(2) + .build()) + .setStartToCloseTimeout(Duration.ofMinutes(1)) + .build()); + + @Override + public void run( + String myToken, String childToken, boolean heartbeat, int failAttemptsEarlierThan) { + if (childToken != null) { + childStub.run(childToken, null, heartbeat, failAttemptsEarlierThan); + } else { + activityStub.run(myToken, heartbeat, failAttemptsEarlierThan); + } + } + } + + @ActivityInterface + public interface TestDescribeActivity { + @ActivityMethod(name = "TestDescribeActivity") + void run(String token, boolean heartbeat, int failAttemptsEarlierThan); + } + + public static class TestDescribeActivityImpl implements TestDescribeActivity { + + public void run(String token, boolean heartbeat, int failAttemptsEarlierThan) { + try { + // Wait twice - once to let the test case wait for activity start, and once + // to let the test case hold activities open until it wants them to finish. + ThreadUtils.waitForTestCase(token + "-start"); + + int attempt = Activity.getExecutionContext().getInfo().getAttempt(); + if (heartbeat) { + ThreadUtils.waitForTestCase(token + "-heartbeat"); + Activity.getExecutionContext().heartbeat("heartbeatDetails"); + ThreadUtils.waitForTestCase(token + "-after-heartbeat"); + } else if (attempt < failAttemptsEarlierThan) { + ThreadUtils.waitForTestCase(token + "-fail"); + throw new RuntimeException("Activity was asked to fail on attempt " + attempt); + } + + ThreadUtils.waitForTestCase(token + "-finish"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw CheckedExceptionWrapper.wrap(e); + } + } + } + + /* + * This class lets test code precisely control the execution of activity code via + * barriers. We can't just do this with signals because exercise describe needs + * control over _activity_ code, not just workflow code. + */ + public static class ThreadUtils { + private static final Logger log = LoggerFactory.getLogger(ThreadUtils.class); + + private static Map queues = new ConcurrentHashMap<>(); + + public static void waitForTestCase(String token) throws InterruptedException { + log.info("Workflow is waiting to meet test case: {}", token); + waitFor("test case", token); + log.info("Workflow finished meeting test case: {}", token); + } + + public static void waitForWorkflow(String token) throws InterruptedException { + log.info("Test case is waiting to meet workflow: {}", token); + waitFor("workflow", token); + log.info("Test case finished meeting workflow: {}", token); + } + + private static void waitFor(String otherParty, String token) throws InterruptedException { + CyclicBarrier barrier = queues.computeIfAbsent(token, unused -> new CyclicBarrier(2)); + + try { + barrier.await(30, TimeUnit.SECONDS); + } catch (BrokenBarrierException e) { + // This happens to waiting threads if a peer gets interrupted. + log.warn( + "Barrier broken when waiting for the {} on {}. This is a side-effect of a test thread being interrupted, and is not the cause of a test failure.", + token, + otherParty); + } catch (TimeoutException e) { + // This is a test failure - the other party didn't show up in time. + Assert.fail( + String.format("When waiting on %s, the %s did not arrive in time", token, otherParty)); + } + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/shared/DescribeWorkflowAsserter.java b/temporal-sdk/src/test/java/io/temporal/workflow/shared/DescribeWorkflowAsserter.java new file mode 100644 index 0000000000..3adf35c3bd --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/shared/DescribeWorkflowAsserter.java @@ -0,0 +1,180 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.workflow.shared; + +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import com.google.protobuf.util.Timestamps; +import io.temporal.api.common.v1.Payload; +import io.temporal.api.common.v1.Payloads; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflow.v1.WorkflowExecutionConfig; +import io.temporal.api.workflow.v1.WorkflowExecutionInfo; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; +import io.temporal.client.WorkflowOptions; +import io.temporal.internal.common.ProtobufTimeUtils; +import java.util.Map; +import java.util.stream.Collectors; +import org.junit.Assert; + +/* + * Fluent assertions (ala truth or assert-j) for DescribeWorkflowResults. + */ +public class DescribeWorkflowAsserter { + + private final DescribeWorkflowExecutionResponse actual; + + public DescribeWorkflowAsserter(DescribeWorkflowExecutionResponse actual) { + this.actual = actual; + } + + public DescribeWorkflowExecutionResponse getActual() { + return actual; + } + + public DescribeWorkflowAsserter assertMatchesOptions(WorkflowOptions options) { + WorkflowExecutionConfig ec = actual.getExecutionConfig(); + Assert.assertEquals( + "workflow execution timeout should match", + options.getWorkflowExecutionTimeout(), + ProtobufTimeUtils.toJavaDuration(ec.getWorkflowExecutionTimeout())); + Assert.assertEquals( + "workflow run timeout should match", + options.getWorkflowRunTimeout(), + ProtobufTimeUtils.toJavaDuration(ec.getWorkflowRunTimeout())); + Assert.assertEquals( + "workflow task timeout should match", + options.getWorkflowTaskTimeout(), + ProtobufTimeUtils.toJavaDuration(ec.getDefaultWorkflowTaskTimeout())); + + WorkflowExecutionInfo ei = actual.getWorkflowExecutionInfo(); + Assert.assertEquals( + "memo should match", options.getMemo(), toSimpleMap(ei.getMemo().getFieldsMap())); + return this; + } + + private Map toSimpleMap(Map payloadMap) { + return payloadMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> payloadToString(e.getValue()))); + } + + private String payloadToString(Payload payload) { + // For simplicity, we only test with strings in the payload, which get serialized as json + String jsonPayload = payload.getData().toStringUtf8(); + Preconditions.checkState( + jsonPayload.startsWith("\"") && jsonPayload.endsWith("\""), + "Payload not a json string: %s", + jsonPayload); + // Strip off the quotes to get the original string + return jsonPayload.substring(1, jsonPayload.length() - 1); + } + + public static Payloads stringsToPayloads(String... strings) { + Payloads.Builder payloadsBuilder = Payloads.newBuilder(); + + for (String s : strings) { + payloadsBuilder.addPayloads( + Payload.newBuilder() + .putMetadata("encoding", ByteString.copyFromUtf8("json/plain")) + .setData(ByteString.copyFromUtf8("\"" + s + "\"")) + .build()); + } + + return payloadsBuilder.build(); + } + + public DescribeWorkflowAsserter assertTaskQueue(String expected) { + Assert.assertEquals( + "task queue should match", expected, actual.getExecutionConfig().getTaskQueue().getName()); + + // There's a task queue in WorkflowExecutionInfo too, but the golang doesn't set it, so we don't + // either. + + return this; + } + + public DescribeWorkflowAsserter assertSaneTimestamps() { + WorkflowExecutionInfo ei = actual.getWorkflowExecutionInfo(); + + if (ei.hasStartTime()) { + Assert.assertTrue("start time should be positive", ei.getStartTime().getSeconds() > 0); + + Assert.assertTrue( + "start time should be <= execution time", + Timestamps.compare(ei.getStartTime(), ei.getExecutionTime()) <= 0); + + if (ei.hasCloseTime()) { + Assert.assertTrue( + "start time should be <= close time", + Timestamps.compare(ei.getStartTime(), ei.getCloseTime()) <= 0); + } + } + + return this; + } + + public DescribeWorkflowAsserter assertExecutionId(WorkflowExecution expected) { + Assert.assertEquals( + "execution should match", expected, actual.getWorkflowExecutionInfo().getExecution()); + return this; + } + + public DescribeWorkflowAsserter assertType(String expected) { + Assert.assertEquals( + "workflow type should match", + expected, + actual.getWorkflowExecutionInfo().getType().getName()); + return this; + } + + public DescribeWorkflowAsserter assertStatus(WorkflowExecutionStatus expected) { + Assert.assertEquals( + "status should match", expected, actual.getWorkflowExecutionInfo().getStatus()); + return this; + } + + public DescribeWorkflowAsserter assertNoParent() { + WorkflowExecutionInfo ei = actual.getWorkflowExecutionInfo(); + Assert.assertEquals("parent namespace should be absent", "", ei.getParentNamespaceId()); + Assert.assertFalse("parent execution should be absent", ei.hasParentExecution()); + return this; + } + + public DescribeWorkflowAsserter assertParent(WorkflowExecution parentExecution) { + WorkflowExecutionInfo ei = actual.getWorkflowExecutionInfo(); + // We don't assert parent namespace because we need the _id_, not the name, + // and DescribeNamespace isn't implemented in the test service. + Assert.assertEquals("parent execution should match", parentExecution, ei.getParentExecution()); + return this; + } + + public DescribeWorkflowAsserter assertPendingActivityCount(int expected) { + Assert.assertEquals( + "pending activity count should match", expected, actual.getPendingActivitiesCount()); + return this; + } + + public DescribeWorkflowAsserter assertPendingChildrenCount(int expected) { + Assert.assertEquals( + "child workflow count should match", expected, actual.getPendingChildrenCount()); + return this; + } +} diff --git a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java index bab0960d22..3c0a451c79 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java +++ b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableState.java @@ -31,6 +31,7 @@ import io.temporal.api.history.v1.ExternalWorkflowExecutionCancelRequestedEventAttributes; import io.temporal.api.history.v1.StartChildWorkflowExecutionFailedEventAttributes; import io.temporal.api.taskqueue.v1.StickyExecutionAttributes; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueRequest; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest; @@ -123,6 +124,8 @@ void cancelActivityTask( QueryWorkflowResponse query(QueryWorkflowRequest queryRequest, long deadline); + DescribeWorkflowExecutionResponse describeWorkflowExecution(); + void completeQuery(QueryId queryId, RespondQueryTaskCompletedRequest completeRequest); StickyExecutionAttributes getStickyExecutionAttributes(); diff --git a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 9d323c181a..8c66f8f28c 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -32,7 +32,9 @@ import com.cronutils.model.definition.CronDefinitionBuilder; import com.cronutils.model.time.ExecutionTime; import com.cronutils.parser.CronParser; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.protobuf.Timestamp; import com.google.protobuf.util.Durations; import com.google.protobuf.util.Timestamps; import io.grpc.Status; @@ -52,8 +54,10 @@ import io.temporal.api.command.v1.StartTimerCommandAttributes; import io.temporal.api.command.v1.UpsertWorkflowSearchAttributesCommandAttributes; import io.temporal.api.common.v1.Payloads; +import io.temporal.api.common.v1.RetryPolicy; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.EventType; +import io.temporal.api.enums.v1.PendingActivityState; import io.temporal.api.enums.v1.QueryRejectCondition; import io.temporal.api.enums.v1.RetryState; import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause; @@ -79,6 +83,12 @@ import io.temporal.api.query.v1.QueryRejected; import io.temporal.api.query.v1.WorkflowQueryResult; import io.temporal.api.taskqueue.v1.StickyExecutionAttributes; +import io.temporal.api.workflow.v1.PendingActivityInfo; +import io.temporal.api.workflow.v1.PendingChildExecutionInfo; +import io.temporal.api.workflow.v1.WorkflowExecutionConfig; +import io.temporal.api.workflow.v1.WorkflowExecutionInfo; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; +import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueRequest; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder; import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest; @@ -131,6 +141,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.LongSupplier; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -2105,6 +2116,230 @@ public void completeQuery(QueryId queryId, RespondQueryTaskCompletedRequest comp } } + @Override + public DescribeWorkflowExecutionResponse describeWorkflowExecution() { + WorkflowExecutionConfig.Builder executionConfig = + WorkflowExecutionConfig.newBuilder() + .setTaskQueue(this.startRequest.getTaskQueue()) + .setWorkflowExecutionTimeout(this.startRequest.getWorkflowExecutionTimeout()) + .setWorkflowRunTimeout(this.startRequest.getWorkflowRunTimeout()) + .setDefaultWorkflowTaskTimeout(this.startRequest.getWorkflowTaskTimeout()); + + GetWorkflowExecutionHistoryRequest getRequest = + GetWorkflowExecutionHistoryRequest.newBuilder() + .setNamespace(this.startRequest.getNamespace()) + .setExecution(this.executionId.getExecution()) + .build(); + List fullHistory = + store + .getWorkflowExecutionHistory(this.executionId, getRequest, null) + .getHistory() + .getEventsList(); + + WorkflowExecutionInfo.Builder executionInfo = WorkflowExecutionInfo.newBuilder(); + executionInfo + .setExecution(this.executionId.getExecution()) + .setType(this.getStartRequest().getWorkflowType()) + .setMemo(this.startRequest.getMemo()) + // No setAutoResetPoints - the test environment doesn't support that feature + .setSearchAttributes(this.startRequest.getSearchAttributes()) + .setStatus(this.getWorkflowExecutionStatus()) + .setHistoryLength(fullHistory.size()); + + populateWorkflowExecutionInfoFromHistory(executionInfo, fullHistory); + + this.parent.ifPresent( + p -> { + executionInfo + .setParentNamespaceId(p.getExecutionId().getNamespace()) + .setParentExecution(p.getExecutionId().getExecution()); + }); + + List pendingActivities = + this.activities.values().stream() + .filter(sm -> !isTerminalState(sm.getState())) + .map(TestWorkflowMutableStateImpl::constructPendingActivityInfo) + .collect(Collectors.toList()); + + List pendingChildren = + this.childWorkflows.values().stream() + .filter(sm -> !isTerminalState(sm.getState())) + .map(TestWorkflowMutableStateImpl::constructPendingChildExecutionInfo) + .collect(Collectors.toList()); + + return DescribeWorkflowExecutionResponse.newBuilder() + .setExecutionConfig(executionConfig) + .setWorkflowExecutionInfo(executionInfo) + .addAllPendingActivities(pendingActivities) + .addAllPendingChildren(pendingChildren) + .build(); + } + + private static PendingChildExecutionInfo constructPendingChildExecutionInfo( + StateMachine sm) { + ChildWorkflowData data = sm.getData(); + return PendingChildExecutionInfo.newBuilder() + .setWorkflowId(data.execution.getWorkflowId()) + .setRunId(data.execution.getRunId()) + .setWorkflowTypeName(data.initiatedEvent.getWorkflowType().getName()) + .setInitiatedId(data.initiatedEventId) + .setParentClosePolicy(data.initiatedEvent.getParentClosePolicy()) + .build(); + } + + private static PendingActivityInfo constructPendingActivityInfo( + StateMachine sm) { + /* + * Working on this code? Read StateMachines.scheduleActivityTask to get answers to questions + * like 'why does some of the information come from the scheduledEvent?' + */ + ActivityTaskData activityTaskData = sm.getData(); + + State state = sm.getState(); + PendingActivityInfo.Builder builder = PendingActivityInfo.newBuilder(); + + // The oddballs - these don't obviously come from any one part of the structure + builder + .setState(computeActivityState(state, activityTaskData)) + // We don't track this in the test environment right now, but we could. + .setLastWorkerIdentity("test-environment-worker-identity"); + + // Some ids are only present in the schedule event... + populatePendingActivityInfoFromScheduledEvent(builder, activityTaskData.scheduledEvent); + + // A few bits of timing are only present on the poll response... + PollActivityTaskQueueResponseOrBuilder pollResponse = activityTaskData.activityTask.getTask(); + populatePendingActivityInfoFromPollResponse(builder, pollResponse); + + // Heartbeat details are housed directly in the activityTaskData + populatePendingActivityInfoFromHeartbeatDetails(builder, activityTaskData); + + // Retry data is housed under .retryState + populatePendingActivityInfoFromRetryData(builder, activityTaskData.retryState); + + return builder.build(); + } + + // Mimics golang in HistoryEngine.DescribeWorkflowExecution. Note that this only covers pending + // states, so there's quite a bit of state-space that doesn't need to be mapped. + private static PendingActivityState computeActivityState( + State state, ActivityTaskData pendingActivity) { + if (state == State.CANCELLATION_REQUESTED) { + return PendingActivityState.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED; + } else if (pendingActivity.startedEvent != null) { + return PendingActivityState.PENDING_ACTIVITY_STATE_STARTED; + } else { + return PendingActivityState.PENDING_ACTIVITY_STATE_SCHEDULED; + } + } + + private static void populatePendingActivityInfoFromScheduledEvent( + PendingActivityInfo.Builder builder, ActivityTaskScheduledEventAttributes scheduledEvent) { + builder + .setActivityId(scheduledEvent.getActivityId()) + .setActivityType(scheduledEvent.getActivityType()); + } + + private static void populatePendingActivityInfoFromPollResponse( + PendingActivityInfo.Builder builder, PollActivityTaskQueueResponseOrBuilder task) { + // In golang, we set one but never both of these fields, depending on the activity state + if (builder.getState() == PendingActivityState.PENDING_ACTIVITY_STATE_SCHEDULED) { + builder.setScheduledTime(task.getScheduledTime()); + } else { + builder.setLastStartedTime(task.getStartedTime()); + builder.setLastHeartbeatTime(task.getStartedTime()); + } + } + + private static void populatePendingActivityInfoFromHeartbeatDetails( + PendingActivityInfo.Builder builder, ActivityTaskData activityTaskData) { + if (activityTaskData.lastHeartbeatTime > 0) { + // This may overwrite the heartbeat time we just set - that's fine + builder.setLastHeartbeatTime(Timestamps.fromMillis(activityTaskData.lastHeartbeatTime)); + + if (activityTaskData.heartbeatDetails != null) { + builder.setHeartbeatDetails(activityTaskData.heartbeatDetails); + } + } + } + + private static void populatePendingActivityInfoFromRetryData( + PendingActivityInfo.Builder builder, TestServiceRetryState retryState) { + builder.setAttempt(retryState.getAttempt()); + builder.setExpirationTime(retryState.getExpirationTime()); + retryState.getPreviousRunFailure().ifPresent(builder::setLastFailure); + + RetryPolicy retryPolicy = + Preconditions.checkNotNull( + retryState.getRetryPolicy(), "retryPolicy should always be present"); + builder.setMaximumAttempts(retryPolicy.getMaximumAttempts()); + } + + private static void populateWorkflowExecutionInfoFromHistory( + WorkflowExecutionInfo.Builder executionInfo, List fullHistory) { + getStartEvent(fullHistory) + .ifPresent( + startEvent -> { + Timestamp startTime = startEvent.getEventTime(); + executionInfo.setStartTime(startEvent.getEventTime()); + + if (startEvent + .getWorkflowExecutionStartedEventAttributes() + .hasFirstWorkflowTaskBackoff()) { + executionInfo.setExecutionTime( + Timestamps.add( + startTime, + startEvent + .getWorkflowExecutionStartedEventAttributes() + .getFirstWorkflowTaskBackoff())); + } else { + // Some (most) workflows don't have firstWorkflowTaskBackoff. + executionInfo.setExecutionTime(startTime); + } + }); + + getCompletionEvent(fullHistory) + .ifPresent( + completionEvent -> { + executionInfo.setCloseTime(completionEvent.getEventTime()); + }); + } + + // Has an analog in the golang codebase: MutableState.GetStartEvent(). This could become public + // if needed. + private static Optional getStartEvent(List history) { + if (history.size() == 0) { + // It's theoretically possible for the TestWorkflowMutableState to exist, but + // for the history to still be empty. This is the case between construction and + // the ctx.commitChanges at the end of startWorkflow. + return Optional.empty(); + } + + HistoryEvent firstEvent = history.get(0); + + // This is true today (see StateMachines.startWorkflow), even in the signalWithStartCase (signal + // is the _second_ event). But if it becomes untrue in the future, we'd rather fail than lie. + Preconditions.checkState( + firstEvent.getEventType() == EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + "The first event in a workflow's history should be %s, but was %s", + EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED.name(), + firstEvent.getEventType().name()); + + return Optional.of(firstEvent); + } + + // Has an analog in the golang codebase: MutableState.GetCompletionEvent(). This could become + // public if needed. + private static Optional getCompletionEvent(List history) { + HistoryEvent lastEvent = history.get(history.size() - 1); + + if (WorkflowExecutionUtils.isWorkflowExecutionCompletedEvent(lastEvent)) { + return Optional.of(lastEvent); + } else { + return Optional.empty(); + } + } + private void addExecutionSignaledEvent( RequestContext ctx, SignalWorkflowExecutionRequest signalRequest) { WorkflowExecutionSignaledEventAttributes.Builder a = diff --git a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index 42abeb4507..1e9138e22d 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -44,6 +44,8 @@ import io.temporal.api.failure.v1.Failure; import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes; import io.temporal.api.workflow.v1.WorkflowExecutionInfo; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest; import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse; import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest; @@ -1049,6 +1051,28 @@ public void queryWorkflow( } } + @Override + public void describeWorkflowExecution( + DescribeWorkflowExecutionRequest request, + StreamObserver + responseObserver) { + String namespace = requireNotNull("Namespace", request.getNamespace()); + WorkflowExecution execution = requireNotNull("Execution", request.getExecution()); + ExecutionId executionId = new ExecutionId(namespace, execution); + + try { + TestWorkflowMutableState mutableState = getMutableState(executionId); + DescribeWorkflowExecutionResponse result = mutableState.describeWorkflowExecution(); + responseObserver.onNext(result); + responseObserver.onCompleted(); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.INTERNAL) { + log.error("unexpected", e); + } + responseObserver.onError(e); + } + } + private R requireNotNull(String fieldName, R value) { if (value == null) { throw Status.INVALID_ARGUMENT diff --git a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java index 3adaaf1b09..1ee50065ec 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java +++ b/temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowStoreImpl.java @@ -55,6 +55,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -431,12 +432,23 @@ public GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory( lock.lock(); try { history = getHistoryStore(executionId); - if (!getRequest.getWaitNewEvent() - && getRequest.getHistoryEventFilterType() - != HistoryEventFilterType.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT) { + if (!getRequest.getWaitNewEvent()) { List events = history.getEventsLocked(); // Copy the list as it is mutable. Individual events assumed immutable. - ArrayList eventsCopy = new ArrayList<>(events); + List eventsCopy = + events.stream() + .filter( + e -> { + if (getRequest.getHistoryEventFilterType() + != HistoryEventFilterType.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT) { + return true; + } + + // They asked for only the close event. There are a variety of ways a workflow + // can close. + return WorkflowExecutionUtils.isWorkflowExecutionCompletedEvent(e); + }) + .collect(Collectors.toList()); return GetWorkflowExecutionHistoryResponse.newBuilder() .setHistory(History.newBuilder().addAllEvents(eventsCopy)) .build();