Skip to content

Commit

Permalink
Fix rare deadlock when event loop is triggered after a main thread th…
Browse files Browse the repository at this point in the history
…rew an exception (#1690)

Issue #871
  • Loading branch information
Spikhalskiy committed Mar 7, 2023
1 parent db81b4d commit 7521f85
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) {
lock.unlock();
// Close was requested while running
if (closeRequested) {
close();
close(true);
}
}
}
Expand All @@ -285,13 +285,17 @@ public void cancel(String reason) {
executeInWorkflowThread("cancel workflow callback", () -> rootWorkflowThread.cancel(reason));
}

@Override
public void close() {
close(false);
}

/**
* Destroys all controlled workflow threads by throwing {@link DestroyWorkflowThreadError} from
* {@link WorkflowThreadContext#yield(String, Supplier)} when the threads are blocking on the
* temporal-sdk code.
*/
@Override
public void close() {
private void close(boolean fromWorkflowThread) {
lock.lock();
if (closeFuture.isDone()) {
lock.unlock();
Expand All @@ -308,9 +312,14 @@ public void close() {
|| closeStarted) {

lock.unlock();
// We will not perform the closure in this call and should just wait on the future when
// another thread responsible for it will close.
closeFuture.join();
// If called from the workflow thread, we don't want to block this call, otherwise we
// will cause a deadlock. The thread performing the closure waits for all workflow threads
// to complete first.
if (!fromWorkflowThread) {
// We will not perform the closure in this call and should just wait on the future when
// another thread responsible for it will close.
closeFuture.join();
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.activityTests;

import static org.junit.Assert.*;

import io.temporal.client.WorkflowFailedException;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.failure.ApplicationFailure;
import io.temporal.internal.Issue;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.WorkerOptions;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestActivities;
import io.temporal.workflow.shared.TestWorkflows;
import java.time.Duration;
import org.junit.Rule;
import org.junit.Test;

/** This test was hanging before the fix for deadlock was implemented */
@Issue("https://github.com/temporalio/sdk-java/issues/871")
public class CancelActivityDeadlockTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(ScheduleCancelActivityWorkflow.class)
.setActivityImplementations(new TestActivities.TestActivitiesImpl())
.setWorkerOptions(
WorkerOptions.newBuilder()
.setStickyQueueScheduleToStartTimeout(Duration.ZERO)
.build())
.build();

@Test
public void shouldNotCauseDeadlock() {
WorkflowOptions options =
WorkflowOptions.newBuilder()
.setWorkflowRunTimeout(Duration.ofSeconds(8))
.setWorkflowTaskTimeout(Duration.ofSeconds(5))
.setTaskQueue(testWorkflowRule.getTaskQueue())
.build();
TestWorkflows.TestWorkflow1 workflowStub =
testWorkflowRule
.getWorkflowClient()
.newWorkflowStub(TestWorkflows.TestWorkflow1.class, options);

WorkflowStub.fromTyped(workflowStub).start("input");
assertThrows(
WorkflowFailedException.class,
() -> WorkflowStub.fromTyped(workflowStub).getResult(String.class));
}

public static class ScheduleCancelActivityWorkflow implements TestWorkflows.TestWorkflow1 {

@Override
public String execute(String taskQueue) {
TestActivities.VariousTestActivities activities =
Workflow.newActivityStub(
TestActivities.VariousTestActivities.class,
SDKTestOptions.newActivityOptionsForTaskQueue(taskQueue));
CancellationScope cancellationScope =
Workflow.newCancellationScope(() -> Async.procedure(activities::activity1, 1));
cancellationScope.run();
try {
// Forcing an end of WFT
Workflow.sleep(1000);
throw ApplicationFailure.newNonRetryableFailure("messsage", "type");
} finally {
// this code causes a triggering of event loop when the main workflow method already thew
// and the executor is shutting down the runner
cancellationScope.cancel();
}
}
}
}

0 comments on commit 7521f85

Please sign in to comment.