From 0a59e9335cf75f7b64eda42d916bee0b565d0bcc Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Fri, 26 Jun 2020 11:59:42 -0700 Subject: [PATCH 1/5] remove excessive options check --- src/main/java/io/temporal/activity/ActivityOptions.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/main/java/io/temporal/activity/ActivityOptions.java b/src/main/java/io/temporal/activity/ActivityOptions.java index 90c40acdd3..b7c7f5c844 100644 --- a/src/main/java/io/temporal/activity/ActivityOptions.java +++ b/src/main/java/io/temporal/activity/ActivityOptions.java @@ -176,12 +176,6 @@ public ActivityOptions build() { } public ActivityOptions validateAndBuildWithDefaults() { - if (scheduleToCloseTimeout == null - && (scheduleToStartTimeout == null || startToCloseTimeout == null)) { - throw new IllegalStateException( - "Either ScheduleToClose or both ScheduleToStart and StartToClose " - + "timeouts are required: "); - } return new ActivityOptions( heartbeatTimeout, scheduleToCloseTimeout, From e56b87e29c7a9ba47ac8e51129f636c51095b10b Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Fri, 26 Jun 2020 12:07:55 -0700 Subject: [PATCH 2/5] Removed excessive check --- src/test/java/io/temporal/worker/StickyWorkerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/temporal/worker/StickyWorkerTest.java b/src/test/java/io/temporal/worker/StickyWorkerTest.java index bc1979fdf9..8cb97b8940 100644 --- a/src/test/java/io/temporal/worker/StickyWorkerTest.java +++ b/src/test/java/io/temporal/worker/StickyWorkerTest.java @@ -430,7 +430,7 @@ public void whenCacheIsEvictedTheWorkerCanRecover() throws Exception { // Act WorkflowClient.start(workflow::getGreeting); - Thread.sleep(200); // Wait for workflow to start + Thread.sleep(1000); // Wait for workflow to start DeciderCache cache = factory.getCache(); assertNotNull(cache); From 8670195a8f5938aadd38d6f349cad3df7ae6d8cd Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Thu, 25 Jun 2020 19:15:42 -0700 Subject: [PATCH 3/5] fixed gradle syntax --- build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 25c5d2a56c..f36783a458 100644 --- a/build.gradle +++ b/build.gradle @@ -300,8 +300,8 @@ publishing { nexusPublishing { repositories { sonatype { - username = hasProperty('ossrhUsername') ? ossrhUsername : '' - password = hasProperty('ossrhPassword') ? ossrhPassword : '' + username = project.hasProperty('ossrhUsername') ? project.property('ossrhUsername') : '' + password = project.hasProperty('ossrhPassword') ? project.property('ossrhPassword') : '' nexusUrl = uri("https://oss.sonatype.org/service/local/staging/deploy/maven2/") snapshotRepositoryUrl = uri("https://oss.sonatype.org/content/repositories/snapshots/") } From d2a5b72663c4395e5592ccc954ab61b91e075b2b Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Fri, 26 Jun 2020 15:06:19 -0700 Subject: [PATCH 4/5] Fixed root thread name and removed duplicated error loggin --- .../internal/sync/DeterministicRunner.java | 8 +++- .../sync/DeterministicRunnerImpl.java | 16 ++++--- .../temporal/internal/sync/SyncWorkflow.java | 7 +++- .../internal/sync/WorkflowThreadImpl.java | 20 +-------- .../sync/DeterministicRunnerTest.java | 4 ++ .../temporal/internal/sync/PromiseTest.java | 1 + .../io/temporal/workflow/WorkflowTest.java | 42 +++++++++---------- 7 files changed, 50 insertions(+), 48 deletions(-) diff --git a/src/main/java/io/temporal/internal/sync/DeterministicRunner.java b/src/main/java/io/temporal/internal/sync/DeterministicRunner.java index afaa6aade6..41c0e81c68 100644 --- a/src/main/java/io/temporal/internal/sync/DeterministicRunner.java +++ b/src/main/java/io/temporal/internal/sync/DeterministicRunner.java @@ -55,9 +55,11 @@ static DeterministicRunner newRunner( ExecutorService threadPool, SyncDecisionContext decisionContext, Supplier clock, + String rootThreadName, Runnable root, DeciderCache cache) { - return new DeterministicRunnerImpl(threadPool, decisionContext, clock, root, cache); + return new DeterministicRunnerImpl( + threadPool, decisionContext, clock, rootThreadName, root, cache); } /** @@ -72,8 +74,10 @@ static DeterministicRunner newRunner( ExecutorService threadPool, SyncDecisionContext decisionContext, Supplier clock, + String rootThreadName, Runnable root) { - return new DeterministicRunnerImpl(threadPool, decisionContext, clock, root, null); + return new DeterministicRunnerImpl( + threadPool, decisionContext, clock, rootThreadName, root, null); } /** diff --git a/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java b/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java index 3e7cd74b14..e33f39885f 100644 --- a/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java +++ b/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java @@ -89,7 +89,7 @@ private NamedRunnable(String name, Runnable runnable) { } private static final Logger log = LoggerFactory.getLogger(DeterministicRunnerImpl.class); - static final String WORKFLOW_ROOT_THREAD_NAME = "workflow-root"; + static final String WORKFLOW_ROOT_THREAD_NAME = "workflow-method"; private static final ThreadLocal currentThreadThreadLocal = new ThreadLocal<>(); private final Lock lock = new ReentrantLock(); @@ -158,7 +158,13 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) { } DeterministicRunnerImpl(Supplier clock, Runnable root) { - this(getDefaultThreadPool(), newDummySyncDecisionContext(), clock, root, null); + this( + getDefaultThreadPool(), + newDummySyncDecisionContext(), + clock, + WORKFLOW_ROOT_THREAD_NAME, + root, + null); } private static ThreadPoolExecutor getDefaultThreadPool() { @@ -173,13 +179,14 @@ private static ThreadPoolExecutor getDefaultThreadPool() { SyncDecisionContext decisionContext, Supplier clock, Runnable root) { - this(threadPool, decisionContext, clock, root, null); + this(threadPool, decisionContext, clock, WORKFLOW_ROOT_THREAD_NAME, root, null); } DeterministicRunnerImpl( ExecutorService threadPool, SyncDecisionContext decisionContext, Supplier clock, + String rootName, Runnable root, DeciderCache cache) { this.threadPool = threadPool; @@ -386,8 +393,7 @@ public void close() { throw new Error("unreachable"); } catch (RuntimeException e) { log.warn( - "Promise that was completedExceptionally was never accessed. " - + "The ignored exception:", + "Promise completed with exception and was never accessed. The ignored exception:", CheckedExceptionWrapper.unwrap(e)); } } diff --git a/src/main/java/io/temporal/internal/sync/SyncWorkflow.java b/src/main/java/io/temporal/internal/sync/SyncWorkflow.java index d7cf7d67ec..785e8d5238 100644 --- a/src/main/java/io/temporal/internal/sync/SyncWorkflow.java +++ b/src/main/java/io/temporal/internal/sync/SyncWorkflow.java @@ -112,9 +112,14 @@ public void start(HistoryEvent event, DecisionContext context) { threadPool, syncContext, context::currentTimeMillis, + "interceptor-init", () -> { workflow.initialize(); - WorkflowInternal.newThread(false, "root", () -> workflowProc.run()).start(); + WorkflowInternal.newThread( + false, + DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME, + () -> workflowProc.run()) + .start(); }, cache); runner.setInterceptorHead(syncContext.getWorkflowInterceptor()); diff --git a/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java b/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java index 159f48881c..61ecba77cb 100644 --- a/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java +++ b/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java @@ -108,15 +108,6 @@ public void run() { threadContext.setUnhandledException(e); } } catch (Error e) { - // Error aborts decision, not fails the workflow. - if (log.isErrorEnabled() && !root) { - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw, true); - e.printStackTrace(pw); - String stackTrace = sw.getBuffer().toString(); - log.error( - String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace)); - } threadContext.setUnhandledException(e); } catch (CanceledFailure e) { if (!isCancelRequested()) { @@ -126,16 +117,6 @@ public void run() { log.debug(String.format("Workflow thread \"%s\" run cancelled", name)); } } catch (Throwable e) { - if (log.isWarnEnabled() && !root) { - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw, true); - e.printStackTrace(pw); - String stackTrace = sw.getBuffer().toString(); - log.warn( - String.format( - "Workflow thread \"%s\" run failed with unhandled exception:\n%s", - name, stackTrace)); - } threadContext.setUnhandledException(e); } finally { DeterministicRunnerImpl.setCurrentThreadInternal(null); @@ -198,6 +179,7 @@ public void setName(String name) { DeciderCache cache, List contextPropagators, Map propagatedContexts) { + log.error("new Thread name=" + name + ", isRoot=" + root); this.root = root; this.threadPool = threadPool; this.runner = runner; diff --git a/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java b/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java index d7314446bf..da6de50dfd 100644 --- a/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java +++ b/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java @@ -733,6 +733,7 @@ public void workflowThreadsWillEvictCacheWhenMaxThreadCountIsHit() throws Throwa new SyncDecisionContext( decisionContext, DataConverter.getDefaultInstance(), null, null), () -> 0L, // clock override + "test-thread", () -> { Promise thread = Async.procedure( @@ -759,6 +760,7 @@ public void workflowThreadsWillEvictCacheWhenMaxThreadCountIsHit() throws Throwa new SyncDecisionContext( decisionContext, DataConverter.getDefaultInstance(), null, null), () -> 0L, // clock override + "test-thread", () -> { Promise thread = Async.procedure( @@ -796,6 +798,7 @@ public void workflowThreadsWillNotEvictCacheWhenMaxThreadCountIsHit() throws Thr threadPool, null, () -> 0L, // clock override + "test-thread", () -> { Promise thread = Async.procedure( @@ -821,6 +824,7 @@ public void workflowThreadsWillNotEvictCacheWhenMaxThreadCountIsHit() throws Thr threadPool, null, () -> 0L, // clock override + "test-thread", () -> { Promise thread = Async.procedure( diff --git a/src/test/java/io/temporal/internal/sync/PromiseTest.java b/src/test/java/io/temporal/internal/sync/PromiseTest.java index e075054ae6..dc01303815 100644 --- a/src/test/java/io/temporal/internal/sync/PromiseTest.java +++ b/src/test/java/io/temporal/internal/sync/PromiseTest.java @@ -154,6 +154,7 @@ public void testGetTimeout() throws Throwable { threadPool, null, () -> currentTime, + "test-thread", () -> { CompletablePromise f = Workflow.newPromise(); trace.add("root begin"); diff --git a/src/test/java/io/temporal/workflow/WorkflowTest.java b/src/test/java/io/temporal/workflow/WorkflowTest.java index 342132a2d0..7c0ec7ce33 100644 --- a/src/test/java/io/temporal/workflow/WorkflowTest.java +++ b/src/test/java/io/temporal/workflow/WorkflowTest.java @@ -459,7 +459,7 @@ public void testSync() { assertEquals("activity10", result); tracer.setExpected( "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "newThread null", "sleep PT2S", "executeActivity ActivityWithDelay", @@ -1499,19 +1499,19 @@ public void testContinueAsNew() { assertEquals(111, result); tracer.setExpected( "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "continueAsNew", "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "continueAsNew", "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "continueAsNew", "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "continueAsNew", "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root"); + "newThread workflow-method"); } @WorkflowInterface @@ -1546,10 +1546,10 @@ public void testContinueAsNewNoArgs() { assertEquals("done", result); tracer.setExpected( "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "continueAsNew", "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root"); + "newThread workflow-method"); } public static class TestAsyncActivityWorkflowImpl implements TestWorkflow1 { @@ -2369,7 +2369,7 @@ public void testTimer() { tracer.setExpected( "interceptExecuteWorkflow " + UUID_REGEXP, "registerQuery getTrace", - "newThread root", + "newThread workflow-method", "newTimer PT0.7S", "newTimer PT1.3S", "newTimer PT10S"); @@ -2377,7 +2377,7 @@ public void testTimer() { tracer.setExpected( "interceptExecuteWorkflow " + UUID_REGEXP, "registerQuery getTrace", - "newThread root", + "newThread workflow-method", "newTimer PT11M40S", "newTimer PT21M40S", "newTimer PT10H"); @@ -3482,10 +3482,10 @@ public void testSignalExternalWorkflow() { tracer.setExpected( "interceptExecuteWorkflow " + stub.getExecution().getWorkflowId(), "registerSignal testSignal", - "newThread root", + "newThread workflow-method", "executeChildWorkflow SignalingChild", "interceptExecuteWorkflow " + UUID_REGEXP, // child - "newThread root", + "newThread workflow-method", "signalExternalWorkflow " + UUID_REGEXP + " testSignal"); } @@ -4573,7 +4573,7 @@ public void testSideEffect() { assertEquals("activity1", result); tracer.setExpected( "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "sideEffect", "sleep PT1S", "executeActivity customActivity1"); @@ -4671,7 +4671,7 @@ public void testGetVersion() { assertEquals("activity22activity1activity1activity1", result); tracer.setExpected( "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "getVersion", "executeActivity Activity2", "getVersion", @@ -4984,7 +4984,7 @@ public void testGetVersionRemovedInReplay() { assertEquals("activity22activity", result); tracer.setExpected( "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "getVersion", "executeActivity Activity2", "executeActivity Activity"); @@ -5022,7 +5022,7 @@ public void testGetVersionRemovedBefore() { assertEquals("activity", result); tracer.setExpected( "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "getVersion", "getVersion", "getVersion", @@ -5225,7 +5225,7 @@ public void testUUIDAndRandom() { assertEquals("foo10", result); tracer.setExpected( "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "sideEffect", "sideEffect", "executeActivity Activity2"); @@ -5934,15 +5934,15 @@ public void testSaga() { sagaWorkflow.execute(taskList, false); tracer.setExpected( "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "executeActivity customActivity1", "executeChildWorkflow TestMultiargsWorkflowsFunc", "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "executeActivity ThrowIO", "executeChildWorkflow TestCompensationWorkflow", "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "executeActivity Activity2"); } @@ -6063,7 +6063,7 @@ public void testUpsertSearchAttributes() { assertEquals("done", result); tracer.setExpected( "interceptExecuteWorkflow " + UUID_REGEXP, - "newThread root", + "newThread workflow-method", "upsertSearchAttributes", "executeActivity Activity"); } From 69775ece70b1801897062daf9cc10513150ccf86 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Fri, 26 Jun 2020 15:30:01 -0700 Subject: [PATCH 5/5] forgotten log statement removed --- src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java b/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java index 61ecba77cb..b41c018ee8 100644 --- a/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java +++ b/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java @@ -179,7 +179,6 @@ public void setName(String name) { DeciderCache cache, List contextPropagators, Map propagatedContexts) { - log.error("new Thread name=" + name + ", isRoot=" + root); this.root = root; this.threadPool = threadPool; this.runner = runner;