Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
*/
interface DeterministicRunner {

long DEFAULT_DEADLOCK_DETECTION_TIMEOUT = 1000;

static DeterministicRunner newRunner(Runnable root) {
return new DeterministicRunnerImpl(root);
}
Expand Down Expand Up @@ -74,8 +76,9 @@ static DeterministicRunner newRunner(
* completed or blocked.
*
* @throws Throwable if one of the threads didn't handle an exception.
* @param deadlockDetectionTimeout the maximum time a thread can run without calling yield.
*/
void runUntilAllBlocked();
void runUntilAllBlocked(long deadlockDetectionTimeout);

/** IsDone returns true when all of threads are completed */
boolean isDone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ SyncWorkflowContext getWorkflowContext() {
}

@Override
public void runUntilAllBlocked() {
public void runUntilAllBlocked(long deadlockDetectionTimeout) {
if (rootWorkflowThread == null) {
// TODO: workflow instance specific thread name
rootWorkflowThread =
Expand Down Expand Up @@ -276,7 +276,7 @@ public void runUntilAllBlocked() {
Iterator<WorkflowThread> ci = threads.iterator();
while (ci.hasNext()) {
WorkflowThread c = ci.next();
progress = c.runUntilBlocked() || progress;
progress = c.runUntilBlocked(deadlockDetectionTimeout) || progress;
if (exitRequested) {
close();
break outerLoop;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
result.completeExceptionally(throwable);
}
});
runner.runUntilAllBlocked();
// Used to execute activities under TestActivityEnvironment
// So it is expected that a workflow thread is blocked for a long time.
runner.runUntilAllBlocked(Long.MAX_VALUE);
try {
return result.get();
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.internal.sync;

class PotentialDeadlockException extends RuntimeException {

PotentialDeadlockException(StackTraceElement[] stackTrace) {
super(
"Potential deadlock detected: workflow thread blocked for over a second", null, true, true);
setStackTrace(stackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package io.temporal.internal.sync;

import static io.temporal.internal.sync.DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT;

import io.temporal.api.common.v1.Payloads;
import io.temporal.api.common.v1.WorkflowType;
import io.temporal.api.enums.v1.EventType;
Expand Down Expand Up @@ -134,7 +136,7 @@ public boolean eventLoop() {
if (runner == null) {
return false;
}
runner.runUntilAllBlocked();
runner.runUntilAllBlocked(DEFAULT_DEADLOCK_DETECTION_TIMEOUT);
return runner.isDone() || workflowProc.isDone(); // Do not wait for all other threads.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static WorkflowThread newThread(Runnable runnable, boolean detached, String name

SyncWorkflowContext getWorkflowContext();

boolean runUntilBlocked();
boolean runUntilBlocked(long deadlockDetectionTimeout);

Throwable getUnhandledException();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package io.temporal.internal.sync;

import static io.temporal.internal.sync.DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT;

import com.google.common.base.Throwables;
import io.temporal.workflow.Functions;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;
Expand All @@ -43,6 +46,7 @@ class WorkflowThreadContext {
private boolean remainedBlocked;
private String yieldReason;
private boolean destroyRequested;
private Thread currentThread;

WorkflowThreadContext(Lock lock) {
this.lock = lock;
Expand Down Expand Up @@ -194,15 +198,20 @@ public void setUnhandledException(Throwable unhandledException) {
}
}

public void setCurrentThread(Thread currentThread) {
this.currentThread = currentThread;
}

public String getYieldReason() {
return yieldReason;
}

/**
* @return true if thread made some progress. Which is await was unblocked and some code after it
* was executed.
* @param deadlockDetectionTimeout
*/
public boolean runUntilBlocked() {
public boolean runUntilBlocked(long deadlockDetectionTimeout) {
lock.lock();
try {
if (status == Status.DONE) {
Expand All @@ -218,7 +227,10 @@ public boolean runUntilBlocked() {
remainedBlocked = true;
yieldCondition.signal();
while (status == Status.RUNNING || status == Status.CREATED) {
runCondition.await();
if (!runCondition.await(deadlockDetectionTimeout, TimeUnit.MILLISECONDS)) {
throw new PotentialDeadlockException(currentThread.getStackTrace());
}
;
if (evaluationFunction != null) {
throw new IllegalStateException("Cannot runUntilBlocked while evaluating");
}
Expand Down Expand Up @@ -259,7 +271,7 @@ public void destroy() {
(r) -> {
throw new DestroyWorkflowThreadError();
});
runUntilBlocked();
runUntilBlocked(DEFAULT_DEADLOCK_DETECTION_TIMEOUT);
}

/** To be called only from a workflow thread. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class RunnableWrapper implements Runnable {
@Override
public void run() {
thread = Thread.currentThread();
threadContext.setCurrentThread(thread);
originalName = thread.getName();
thread.setName(name);
DeterministicRunnerImpl.setCurrentThreadInternal(WorkflowThreadImpl.this);
Expand All @@ -99,7 +100,6 @@ public void run() {
// Repopulate the context(s)
ContextThreadLocal.setContextPropagators(this.contextPropagators);
ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts);

try {
// initialYield blocks thread until the first runUntilBlocked is called.
// Otherwise r starts executing without control of the sync.
Expand All @@ -125,6 +125,7 @@ public void run() {
threadContext.setStatus(Status.DONE);
thread.setName(originalName);
thread = null;
threadContext.setCurrentThread(null);
MDC.clear();
}
}
Expand Down Expand Up @@ -308,13 +309,16 @@ public int getPriority() {
return priority;
}

/** @return true if coroutine made some progress. */
/**
* @return true if coroutine made some progress.
* @param deadlockDetectionTimeout maximum time the thread can run before calling yield.
*/
@Override
public boolean runUntilBlocked() {
public boolean runUntilBlocked(long deadlockDetectionTimeout) {
if (taskFuture == null) {
start();
}
return context.runUntilBlocked();
return context.runUntilBlocked(deadlockDetectionTimeout);
}

@Override
Expand Down
Loading