From 7521f859398623e453506b365b8542f3743ec74a Mon Sep 17 00:00:00 2001 From: Dmitry Spikhalskiy Date: Tue, 7 Mar 2023 09:27:58 -0500 Subject: [PATCH] Fix rare deadlock when event loop is triggered after a main thread threw an exception (#1690) Issue #871 --- .../sync/DeterministicRunnerImpl.java | 21 ++-- .../CancelActivityDeadlockTest.java | 95 +++++++++++++++++++ 2 files changed, 110 insertions(+), 6 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/activityTests/CancelActivityDeadlockTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java index 38881e23e..6a912ed38 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java @@ -263,7 +263,7 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) { lock.unlock(); // Close was requested while running if (closeRequested) { - close(); + close(true); } } } @@ -285,13 +285,17 @@ public void cancel(String reason) { executeInWorkflowThread("cancel workflow callback", () -> rootWorkflowThread.cancel(reason)); } + @Override + public void close() { + close(false); + } + /** * Destroys all controlled workflow threads by throwing {@link DestroyWorkflowThreadError} from * {@link WorkflowThreadContext#yield(String, Supplier)} when the threads are blocking on the * temporal-sdk code. */ - @Override - public void close() { + private void close(boolean fromWorkflowThread) { lock.lock(); if (closeFuture.isDone()) { lock.unlock(); @@ -308,9 +312,14 @@ public void close() { || closeStarted) { lock.unlock(); - // We will not perform the closure in this call and should just wait on the future when - // another thread responsible for it will close. - closeFuture.join(); + // If called from the workflow thread, we don't want to block this call, otherwise we + // will cause a deadlock. The thread performing the closure waits for all workflow threads + // to complete first. + if (!fromWorkflowThread) { + // We will not perform the closure in this call and should just wait on the future when + // another thread responsible for it will close. + closeFuture.join(); + } return; } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/CancelActivityDeadlockTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/CancelActivityDeadlockTest.java new file mode 100644 index 000000000..2468d18f4 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/CancelActivityDeadlockTest.java @@ -0,0 +1,95 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.activityTests; + +import static org.junit.Assert.*; + +import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.failure.ApplicationFailure; +import io.temporal.internal.Issue; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.WorkerOptions; +import io.temporal.workflow.*; +import io.temporal.workflow.shared.TestActivities; +import io.temporal.workflow.shared.TestWorkflows; +import java.time.Duration; +import org.junit.Rule; +import org.junit.Test; + +/** This test was hanging before the fix for deadlock was implemented */ +@Issue("https://github.com/temporalio/sdk-java/issues/871") +public class CancelActivityDeadlockTest { + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(ScheduleCancelActivityWorkflow.class) + .setActivityImplementations(new TestActivities.TestActivitiesImpl()) + .setWorkerOptions( + WorkerOptions.newBuilder() + .setStickyQueueScheduleToStartTimeout(Duration.ZERO) + .build()) + .build(); + + @Test + public void shouldNotCauseDeadlock() { + WorkflowOptions options = + WorkflowOptions.newBuilder() + .setWorkflowRunTimeout(Duration.ofSeconds(8)) + .setWorkflowTaskTimeout(Duration.ofSeconds(5)) + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .build(); + TestWorkflows.TestWorkflow1 workflowStub = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub(TestWorkflows.TestWorkflow1.class, options); + + WorkflowStub.fromTyped(workflowStub).start("input"); + assertThrows( + WorkflowFailedException.class, + () -> WorkflowStub.fromTyped(workflowStub).getResult(String.class)); + } + + public static class ScheduleCancelActivityWorkflow implements TestWorkflows.TestWorkflow1 { + + @Override + public String execute(String taskQueue) { + TestActivities.VariousTestActivities activities = + Workflow.newActivityStub( + TestActivities.VariousTestActivities.class, + SDKTestOptions.newActivityOptionsForTaskQueue(taskQueue)); + CancellationScope cancellationScope = + Workflow.newCancellationScope(() -> Async.procedure(activities::activity1, 1)); + cancellationScope.run(); + try { + // Forcing an end of WFT + Workflow.sleep(1000); + throw ApplicationFailure.newNonRetryableFailure("messsage", "type"); + } finally { + // this code causes a triggering of event loop when the main workflow method already thew + // and the executor is shutting down the runner + cancellationScope.cancel(); + } + } + } +}