Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ publishing {
nexusPublishing {
repositories {
sonatype {
username = hasProperty('ossrhUsername') ? ossrhUsername : ''
password = hasProperty('ossrhPassword') ? ossrhPassword : ''
username = project.hasProperty('ossrhUsername') ? project.property('ossrhUsername') : ''
password = project.hasProperty('ossrhPassword') ? project.property('ossrhPassword') : ''
nexusUrl = uri("https://oss.sonatype.org/service/local/staging/deploy/maven2/")
snapshotRepositoryUrl = uri("https://oss.sonatype.org/content/repositories/snapshots/")
}
Expand Down
6 changes: 0 additions & 6 deletions src/main/java/io/temporal/activity/ActivityOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,6 @@ public ActivityOptions build() {
}

public ActivityOptions validateAndBuildWithDefaults() {
if (scheduleToCloseTimeout == null
&& (scheduleToStartTimeout == null || startToCloseTimeout == null)) {
throw new IllegalStateException(
"Either ScheduleToClose or both ScheduleToStart and StartToClose "
+ "timeouts are required: ");
}
return new ActivityOptions(
heartbeatTimeout,
scheduleToCloseTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ static DeterministicRunner newRunner(
ExecutorService threadPool,
SyncDecisionContext decisionContext,
Supplier<Long> clock,
String rootThreadName,
Runnable root,
DeciderCache cache) {
return new DeterministicRunnerImpl(threadPool, decisionContext, clock, root, cache);
return new DeterministicRunnerImpl(
threadPool, decisionContext, clock, rootThreadName, root, cache);
}

/**
Expand All @@ -72,8 +74,10 @@ static DeterministicRunner newRunner(
ExecutorService threadPool,
SyncDecisionContext decisionContext,
Supplier<Long> clock,
String rootThreadName,
Runnable root) {
return new DeterministicRunnerImpl(threadPool, decisionContext, clock, root, null);
return new DeterministicRunnerImpl(
threadPool, decisionContext, clock, rootThreadName, root, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private NamedRunnable(String name, Runnable runnable) {
}

private static final Logger log = LoggerFactory.getLogger(DeterministicRunnerImpl.class);
static final String WORKFLOW_ROOT_THREAD_NAME = "workflow-root";
static final String WORKFLOW_ROOT_THREAD_NAME = "workflow-method";
private static final ThreadLocal<WorkflowThread> currentThreadThreadLocal = new ThreadLocal<>();

private final Lock lock = new ReentrantLock();
Expand Down Expand Up @@ -158,7 +158,13 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) {
}

DeterministicRunnerImpl(Supplier<Long> clock, Runnable root) {
this(getDefaultThreadPool(), newDummySyncDecisionContext(), clock, root, null);
this(
getDefaultThreadPool(),
newDummySyncDecisionContext(),
clock,
WORKFLOW_ROOT_THREAD_NAME,
root,
null);
}

private static ThreadPoolExecutor getDefaultThreadPool() {
Expand All @@ -173,13 +179,14 @@ private static ThreadPoolExecutor getDefaultThreadPool() {
SyncDecisionContext decisionContext,
Supplier<Long> clock,
Runnable root) {
this(threadPool, decisionContext, clock, root, null);
this(threadPool, decisionContext, clock, WORKFLOW_ROOT_THREAD_NAME, root, null);
}

DeterministicRunnerImpl(
ExecutorService threadPool,
SyncDecisionContext decisionContext,
Supplier<Long> clock,
String rootName,
Runnable root,
DeciderCache cache) {
this.threadPool = threadPool;
Expand Down Expand Up @@ -386,8 +393,7 @@ public void close() {
throw new Error("unreachable");
} catch (RuntimeException e) {
log.warn(
"Promise that was completedExceptionally was never accessed. "
+ "The ignored exception:",
"Promise completed with exception and was never accessed. The ignored exception:",
CheckedExceptionWrapper.unwrap(e));
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/io/temporal/internal/sync/SyncWorkflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,14 @@ public void start(HistoryEvent event, DecisionContext context) {
threadPool,
syncContext,
context::currentTimeMillis,
"interceptor-init",
() -> {
workflow.initialize();
WorkflowInternal.newThread(false, "root", () -> workflowProc.run()).start();
WorkflowInternal.newThread(
false,
DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME,
() -> workflowProc.run())
.start();
},
cache);
runner.setInterceptorHead(syncContext.getWorkflowInterceptor());
Expand Down
19 changes: 0 additions & 19 deletions src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,6 @@ public void run() {
threadContext.setUnhandledException(e);
}
} catch (Error e) {
// Error aborts decision, not fails the workflow.
if (log.isErrorEnabled() && !root) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw, true);
e.printStackTrace(pw);
String stackTrace = sw.getBuffer().toString();
log.error(
String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace));
}
threadContext.setUnhandledException(e);
} catch (CanceledFailure e) {
if (!isCancelRequested()) {
Expand All @@ -126,16 +117,6 @@ public void run() {
log.debug(String.format("Workflow thread \"%s\" run cancelled", name));
}
} catch (Throwable e) {
if (log.isWarnEnabled() && !root) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw, true);
e.printStackTrace(pw);
String stackTrace = sw.getBuffer().toString();
log.warn(
String.format(
"Workflow thread \"%s\" run failed with unhandled exception:\n%s",
name, stackTrace));
}
threadContext.setUnhandledException(e);
} finally {
DeterministicRunnerImpl.setCurrentThreadInternal(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ public void workflowThreadsWillEvictCacheWhenMaxThreadCountIsHit() throws Throwa
new SyncDecisionContext(
decisionContext, DataConverter.getDefaultInstance(), null, null),
() -> 0L, // clock override
"test-thread",
() -> {
Promise<Void> thread =
Async.procedure(
Expand All @@ -759,6 +760,7 @@ public void workflowThreadsWillEvictCacheWhenMaxThreadCountIsHit() throws Throwa
new SyncDecisionContext(
decisionContext, DataConverter.getDefaultInstance(), null, null),
() -> 0L, // clock override
"test-thread",
() -> {
Promise<Void> thread =
Async.procedure(
Expand Down Expand Up @@ -796,6 +798,7 @@ public void workflowThreadsWillNotEvictCacheWhenMaxThreadCountIsHit() throws Thr
threadPool,
null,
() -> 0L, // clock override
"test-thread",
() -> {
Promise<Void> thread =
Async.procedure(
Expand All @@ -821,6 +824,7 @@ public void workflowThreadsWillNotEvictCacheWhenMaxThreadCountIsHit() throws Thr
threadPool,
null,
() -> 0L, // clock override
"test-thread",
() -> {
Promise<Void> thread =
Async.procedure(
Expand Down
1 change: 1 addition & 0 deletions src/test/java/io/temporal/internal/sync/PromiseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void testGetTimeout() throws Throwable {
threadPool,
null,
() -> currentTime,
"test-thread",
() -> {
CompletablePromise<String> f = Workflow.newPromise();
trace.add("root begin");
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/temporal/worker/StickyWorkerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ public void whenCacheIsEvictedTheWorkerCanRecover() throws Exception {
// Act
WorkflowClient.start(workflow::getGreeting);

Thread.sleep(200); // Wait for workflow to start
Thread.sleep(1000); // Wait for workflow to start

DeciderCache cache = factory.getCache();
assertNotNull(cache);
Expand Down
42 changes: 21 additions & 21 deletions src/test/java/io/temporal/workflow/WorkflowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ public void testSync() {
assertEquals("activity10", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"newThread null",
"sleep PT2S",
"executeActivity ActivityWithDelay",
Expand Down Expand Up @@ -1499,19 +1499,19 @@ public void testContinueAsNew() {
assertEquals(111, result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"continueAsNew",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"continueAsNew",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"continueAsNew",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"continueAsNew",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root");
"newThread workflow-method");
}

@WorkflowInterface
Expand Down Expand Up @@ -1546,10 +1546,10 @@ public void testContinueAsNewNoArgs() {
assertEquals("done", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"continueAsNew",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root");
"newThread workflow-method");
}

public static class TestAsyncActivityWorkflowImpl implements TestWorkflow1 {
Expand Down Expand Up @@ -2369,15 +2369,15 @@ public void testTimer() {
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"registerQuery getTrace",
"newThread root",
"newThread workflow-method",
"newTimer PT0.7S",
"newTimer PT1.3S",
"newTimer PT10S");
} else {
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"registerQuery getTrace",
"newThread root",
"newThread workflow-method",
"newTimer PT11M40S",
"newTimer PT21M40S",
"newTimer PT10H");
Expand Down Expand Up @@ -3482,10 +3482,10 @@ public void testSignalExternalWorkflow() {
tracer.setExpected(
"interceptExecuteWorkflow " + stub.getExecution().getWorkflowId(),
"registerSignal testSignal",
"newThread root",
"newThread workflow-method",
"executeChildWorkflow SignalingChild",
"interceptExecuteWorkflow " + UUID_REGEXP, // child
"newThread root",
"newThread workflow-method",
"signalExternalWorkflow " + UUID_REGEXP + " testSignal");
}

Expand Down Expand Up @@ -4573,7 +4573,7 @@ public void testSideEffect() {
assertEquals("activity1", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"sideEffect",
"sleep PT1S",
"executeActivity customActivity1");
Expand Down Expand Up @@ -4671,7 +4671,7 @@ public void testGetVersion() {
assertEquals("activity22activity1activity1activity1", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"getVersion",
"executeActivity Activity2",
"getVersion",
Expand Down Expand Up @@ -4984,7 +4984,7 @@ public void testGetVersionRemovedInReplay() {
assertEquals("activity22activity", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"getVersion",
"executeActivity Activity2",
"executeActivity Activity");
Expand Down Expand Up @@ -5022,7 +5022,7 @@ public void testGetVersionRemovedBefore() {
assertEquals("activity", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"getVersion",
"getVersion",
"getVersion",
Expand Down Expand Up @@ -5225,7 +5225,7 @@ public void testUUIDAndRandom() {
assertEquals("foo10", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"sideEffect",
"sideEffect",
"executeActivity Activity2");
Expand Down Expand Up @@ -5934,15 +5934,15 @@ public void testSaga() {
sagaWorkflow.execute(taskList, false);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"executeActivity customActivity1",
"executeChildWorkflow TestMultiargsWorkflowsFunc",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"executeActivity ThrowIO",
"executeChildWorkflow TestCompensationWorkflow",
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"executeActivity Activity2");
}

Expand Down Expand Up @@ -6063,7 +6063,7 @@ public void testUpsertSearchAttributes() {
assertEquals("done", result);
tracer.setExpected(
"interceptExecuteWorkflow " + UUID_REGEXP,
"newThread root",
"newThread workflow-method",
"upsertSearchAttributes",
"executeActivity Activity");
}
Expand Down