From 319298064ddfd7efecf04b07331024f74b9db2a9 Mon Sep 17 00:00:00 2001 From: Dmitry Spikhalskiy Date: Sun, 27 Jun 2021 22:46:23 -0700 Subject: [PATCH] Refactor how DeterministicRunnerImpl and SyncWorkflowContext handle creation of workflow root and method threads --- .../WorkflowInboundCallsInterceptor.java | 22 + .../WorkflowInboundCallsInterceptorBase.java | 10 + .../WorkflowOutboundCallsInterceptor.java | 20 +- .../WorkflowOutboundCallsInterceptorBase.java | 4 +- .../temporal/internal/sync/AsyncInternal.java | 6 +- ...seRootWorkflowInboundCallsInterceptor.java | 67 +++ .../internal/sync/DeterministicRunner.java | 21 +- .../sync/DeterministicRunnerImpl.java | 442 ++++-------------- .../sync/DynamicSyncWorkflowDefinition.java | 22 +- .../POJOWorkflowImplementationFactory.java | 74 ++- .../temporal/internal/sync/SyncWorkflow.java | 7 +- .../internal/sync/SyncWorkflowContext.java | 73 ++- .../internal/sync/WorkflowInternal.java | 24 +- .../internal/sync/WorkflowThread.java | 6 +- .../internal/sync/WorkflowThreadContext.java | 2 +- .../internal/sync/WorkflowThreadImpl.java | 10 + .../sync/DeterministicRunnerTest.java | 48 +- .../temporal/internal/sync/PromiseTest.java | 137 +++--- .../WorkflowInternalDeprecatedQueueTest.java | 62 +-- .../sync/WorkflowInternalQueueTest.java | 61 +-- ...ignalWorkflowOutboundCallsInterceptor.java | 4 +- .../testing/TracingWorkerInterceptor.java | 12 +- .../sync/DeterministicRunnerWrapper.java | 1 + .../sync/DummySyncWorkflowContext.java | 276 +++++++++++ .../TestActivityEnvironmentInternal.java | 2 +- 25 files changed, 805 insertions(+), 608 deletions(-) create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/sync/BaseRootWorkflowInboundCallsInterceptor.java create mode 100644 temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java index dbbd7de405..b2ab6989c1 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptor.java @@ -19,6 +19,8 @@ package io.temporal.common.interceptors; +import javax.annotation.Nullable; + /** * Intercepts calls to the workflow execution. Executes under workflow context. So all the * restrictions on the workflow code should be obeyed. @@ -128,4 +130,24 @@ public Object getResult() { /** Called when a workflow is queried. */ QueryOutput handleQuery(QueryInput input); + + /** + * Intercepts creation of the workflow main method thread + * + * @param runnable thread function to run + * @param name name of the thread, optional + * @return created workflow thread. Should be treated as a pass-through object that shouldn't be + * manipulated in any way by the interceptor code. + */ + Object newWorkflowMethodThread(Runnable runnable, @Nullable String name); + + /** + * Intercepts creation of a workflow callback thread + * + * @param runnable thread function to run + * @param name name of the thread, optional + * @return created workflow thread. Should be treated as a pass-through object that shouldn't be + * manipulated in any way by the interceptor code. + */ + Object newCallbackThread(Runnable runnable, @Nullable String name); } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptorBase.java index 1169730747..40a3df85e5 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowInboundCallsInterceptorBase.java @@ -46,4 +46,14 @@ public void handleSignal(SignalInput input) { public QueryOutput handleQuery(QueryInput input) { return next.handleQuery(input); } + + @Override + public Object newWorkflowMethodThread(Runnable runnable, String name) { + return next.newWorkflowMethodThread(runnable, name); + } + + @Override + public Object newCallbackThread(Runnable runnable, String name) { + return next.newCallbackThread(runnable, name); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java index 3fd4a011ec..84a02c61da 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java @@ -22,13 +22,8 @@ import io.temporal.activity.ActivityOptions; import io.temporal.activity.LocalActivityOptions; import io.temporal.api.common.v1.WorkflowExecution; -import io.temporal.workflow.ChildWorkflowOptions; -import io.temporal.workflow.ContinueAsNewOptions; -import io.temporal.workflow.DynamicQueryHandler; -import io.temporal.workflow.DynamicSignalHandler; -import io.temporal.workflow.Functions; +import io.temporal.workflow.*; import io.temporal.workflow.Functions.Func; -import io.temporal.workflow.Promise; import java.lang.reflect.Type; import java.time.Duration; import java.util.List; @@ -483,7 +478,18 @@ R mutableSideEffect( void upsertSearchAttributes(Map searchAttributes); - Object newThread(Runnable runnable, boolean detached, String name); + /** + * Intercepts creation of the workflow child thread. + * + *

Please note, that "workflow child thread" and "child workflow" are different and independent + * concepts. + * + * @param runnable thread function to run + * @param detached if this thread is detached from the parent {@link CancellationScope} + * @param name name of the thread + * @return created WorkflowThread + */ + Object newChildThread(Runnable runnable, boolean detached, String name); long currentTimeMillis(); } diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java index 511c8644a5..14e42a29b1 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java @@ -140,8 +140,8 @@ public void upsertSearchAttributes(Map searchAttributes) { } @Override - public Object newThread(Runnable runnable, boolean detached, String name) { - return next.newThread(runnable, detached, name); + public Object newChildThread(Runnable runnable, boolean detached, String name) { + return next.newChildThread(runnable, detached, name); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/AsyncInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/AsyncInternal.java index 728aa1aaae..0f16da19b4 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/AsyncInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/AsyncInternal.java @@ -293,15 +293,15 @@ private static Promise execute(boolean async, Functions.Func func) { } } else { CompletablePromise result = Workflow.newPromise(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { try { result.complete(func.apply()); } catch (Exception e) { result.completeExceptionally(Workflow.wrap(e)); } - }) + }, + false) .start(); return result; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/BaseRootWorkflowInboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/BaseRootWorkflowInboundCallsInterceptor.java new file mode 100644 index 0000000000..13e7b23eb9 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/BaseRootWorkflowInboundCallsInterceptor.java @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.internal.sync; + +import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; +import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; + +/** + * Provides core functionality for a root WorkflowInboundCallsInterceptor that is reused by specific + * root RootWorkflowInboundCallsInterceptor implementations inside {@link + * DynamicSyncWorkflowDefinition#} and {@link POJOWorkflowImplementationFactory} + * + *

Root {@code WorkflowInboundCallsInterceptor} is an interceptor that should be at the end of + * the {@code WorkflowInboundCallsInterceptor} interceptors chain and which encapsulates calls into + * Temporal internals while providing a WorkflowInboundCallsInterceptor interface for chaining on + * top of it. + */ +public abstract class BaseRootWorkflowInboundCallsInterceptor + implements WorkflowInboundCallsInterceptor { + protected final SyncWorkflowContext workflowContext; + + public BaseRootWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) { + this.workflowContext = workflowContext; + } + + @Override + public void init(WorkflowOutboundCallsInterceptor outboundCalls) { + workflowContext.initHeadOutboundCallsInterceptor(outboundCalls); + } + + @Override + public void handleSignal(SignalInput input) { + workflowContext.handleInterceptedSignal(input); + } + + @Override + public QueryOutput handleQuery(QueryInput input) { + return workflowContext.handleInterceptedQuery(input); + } + + @Override + public Object newWorkflowMethodThread(Runnable runnable, String name) { + return workflowContext.newWorkflowMethodThreadIntercepted(runnable, name); + } + + @Override + public Object newCallbackThread(Runnable runnable, String name) { + return workflowContext.newWorkflowCallbackThreadIntercepted(runnable, name); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java index 6be9999200..9f34f1a1aa 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java @@ -19,15 +19,14 @@ package io.temporal.internal.sync; -import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import io.temporal.internal.replay.WorkflowExecutorCache; import io.temporal.workflow.CancellationScope; import java.util.concurrent.ExecutorService; -import java.util.function.Supplier; +import javax.annotation.Nullable; /** * Executes code passed to {@link #newRunner(Runnable)} as well as threads created from it using - * {@link WorkflowInternal#newThread(boolean, Runnable)} deterministically. Requires use of provided + * {@link WorkflowThread#newThread(Runnable, boolean)} deterministically. Requires use of provided * wrappers for synchronization and notification instead of native ones. */ interface DeterministicRunner { @@ -40,12 +39,8 @@ static long getDeadlockDetectionTimeout() { return debugMode ? Long.MAX_VALUE : DEFAULT_DEADLOCK_DETECTION_TIMEOUT; } - static DeterministicRunner newRunner(Runnable root) { - return new DeterministicRunnerImpl(root); - } - - static DeterministicRunner newRunner(Supplier clock, Runnable root) { - return new DeterministicRunnerImpl(clock, root); + static DeterministicRunner newRunner(SyncWorkflowContext workflowContext, Runnable root) { + return new DeterministicRunnerImpl(workflowContext, root); } /** @@ -118,9 +113,11 @@ static DeterministicRunner newRunner( void executeInWorkflowThread(String name, Runnable r); /** - * Creates a new instance of a workflow thread. To be called only from another workflow thread. + * Creates a new instance of a workflow child thread. To be called only from another workflow + * thread. */ - WorkflowThread newThread(Runnable runnable, boolean detached, String name); + WorkflowThread newWorkflowThread(Runnable runnable, boolean detached, @Nullable String name); - void setInterceptorHead(WorkflowOutboundCallsInterceptor interceptorHead); + /** Creates a new instance of a workflow callback thread. */ + WorkflowThread newCallbackThread(Runnable runnable, @Nullable String name); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java index e440f561ce..9d18a850ca 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java @@ -20,33 +20,12 @@ package io.temporal.internal.sync; import com.google.common.primitives.Ints; -import com.uber.m3.tally.NoopScope; -import com.uber.m3.tally.Scope; -import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes; -import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes; -import io.temporal.api.common.v1.Payloads; -import io.temporal.api.common.v1.SearchAttributes; -import io.temporal.api.common.v1.WorkflowExecution; -import io.temporal.api.common.v1.WorkflowType; -import io.temporal.api.failure.v1.Failure; import io.temporal.common.context.ContextPropagator; -import io.temporal.common.converter.DataConverter; -import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; -import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptorBase; -import io.temporal.failure.CanceledFailure; import io.temporal.internal.WorkflowThreadMarker; import io.temporal.internal.context.ContextThreadLocal; -import io.temporal.internal.replay.ExecuteActivityParameters; -import io.temporal.internal.replay.ExecuteLocalActivityParameters; -import io.temporal.internal.replay.ReplayWorkflowContext; -import io.temporal.internal.replay.StartChildWorkflowExecutionParameters; import io.temporal.internal.replay.WorkflowExecutorCache; import io.temporal.serviceclient.CheckedExceptionWrapper; -import io.temporal.workflow.Functions; -import io.temporal.workflow.Functions.Func; -import io.temporal.workflow.Functions.Func1; import io.temporal.workflow.Promise; -import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -54,14 +33,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.Random; import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; import java.util.TreeSet; -import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -70,7 +44,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Supplier; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +57,9 @@ class DeterministicRunnerImpl implements DeterministicRunner { private static final int ROOT_THREAD_PRIORITY = 0; private static final int CALLBACK_THREAD_PRIORITY = 10; private static final int WORKFLOW_THREAD_PRIORITY = 20000000; - static final String WORKFLOW_ROOT_THREAD_NAME = "workflow-method"; + + static final String WORKFLOW_ROOT_THREAD_NAME = "workflow-root"; + static final String WORKFLOW_MAIN_THREAD_NAME = "workflow-method"; private static final Logger log = LoggerFactory.getLogger(DeterministicRunnerImpl.class); private static final ThreadLocal currentThreadThreadLocal = new ThreadLocal<>(); @@ -92,7 +69,10 @@ class DeterministicRunnerImpl implements DeterministicRunner { new TreeSet<>((t1, t2) -> Ints.compare(t1.getPriority(), t2.getPriority())); // Values from RunnerLocalInternal private final Map, Object> runnerLocalMap = new HashMap<>(); - private final List threadsToAdd = Collections.synchronizedList(new ArrayList<>()); + private final List workflowThreadsToAdd = + Collections.synchronizedList(new ArrayList<>()); + private final List callbackThreadsToAdd = + Collections.synchronizedList(new ArrayList<>()); private final List toExecuteInWorkflowThread = new ArrayList<>(); private final Lock lock = new ReentrantLock(); private final Runnable rootRunnable; @@ -114,18 +94,6 @@ private NamedRunnable(String name, Runnable runnable) { } } - /** - * Used to create a root workflow thread through the interceptor chain. The default value is used - * only in the unit tests. - */ - private WorkflowOutboundCallsInterceptor interceptorHead = - new WorkflowOutboundCallsInterceptorBase(null) { - @Override - public Object newThread(Runnable runnable, boolean detached, String name) { - return DeterministicRunnerImpl.this.newThread(runnable, detached, name); - } - }; - static WorkflowThread currentThreadInternal() { WorkflowThread result = currentThreadThreadLocal.get(); if (result == null) { @@ -163,60 +131,31 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) { private WorkflowThread rootWorkflowThread; private final CancellationScopeImpl runnerCancellationScope; - DeterministicRunnerImpl(Runnable root) { - this(System::currentTimeMillis, root); - } - - DeterministicRunnerImpl(Supplier clock, Runnable root) { - this(getDefaultThreadPool(), newDummySyncWorkflowContext(), root, null); - } - - private static ThreadPoolExecutor getDefaultThreadPool() { - ThreadPoolExecutor result = - new ThreadPoolExecutor(0, 1000, 1, TimeUnit.SECONDS, new SynchronousQueue<>()); - result.setThreadFactory(r -> new Thread(r, "deterministic runner thread")); - return result; + DeterministicRunnerImpl(@Nonnull SyncWorkflowContext workflowContext, Runnable root) { + this(getDefaultThreadPool(), workflowContext, root, null); } DeterministicRunnerImpl( - ExecutorService threadPool, SyncWorkflowContext workflowContext, Runnable root) { + ExecutorService threadPool, @Nonnull SyncWorkflowContext workflowContext, Runnable root) { this(threadPool, workflowContext, root, null); } DeterministicRunnerImpl( ExecutorService threadPool, - SyncWorkflowContext workflowContext, + @Nonnull SyncWorkflowContext workflowContext, Runnable root, WorkflowExecutorCache cache) { this.threadPool = threadPool; - this.workflowContext = - workflowContext != null ? workflowContext : newDummySyncWorkflowContext(); + if (workflowContext == null) { + throw new NullPointerException("workflowContext can't be null"); + } + this.workflowContext = workflowContext; this.workflowContext.setRunner(this); this.cache = cache; - runnerCancellationScope = new CancellationScopeImpl(true, null, null); + this.runnerCancellationScope = new CancellationScopeImpl(true, null, null); this.rootRunnable = root; } - private WorkflowThreadImpl newRootWorkflowThread( - Runnable runnable, boolean detached, String name) { - return new WorkflowThreadImpl( - threadPool, - this, - name, - ROOT_THREAD_PRIORITY, - detached, - runnerCancellationScope, - runnable, - cache, - getContextPropagators(), - getPropagatedContexts()); - } - - private static SyncWorkflowContext newDummySyncWorkflowContext() { - return new SyncWorkflowContext( - new DummyReplayWorkflowContext(), DataConverter.getDefaultInstance(), null, null, null); - } - SyncWorkflowContext getWorkflowContext() { return workflowContext; } @@ -224,10 +163,7 @@ SyncWorkflowContext getWorkflowContext() { @Override public void runUntilAllBlocked(long deadlockDetectionTimeout) { if (rootWorkflowThread == null) { - // TODO: workflow instance specific thread name - rootWorkflowThread = - (WorkflowThread) - interceptorHead.newThread(rootRunnable, false, WORKFLOW_ROOT_THREAD_NAME); + rootWorkflowThread = newRootThread(rootRunnable); threads.add(rootWorkflowThread); rootWorkflowThread.start(); } @@ -244,24 +180,9 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) { boolean progress; outerLoop: do { - threadsToAdd.clear(); if (!toExecuteInWorkflowThread.isEmpty()) { - List callbackThreads = new ArrayList<>(toExecuteInWorkflowThread.size()); for (NamedRunnable nr : toExecuteInWorkflowThread) { - WorkflowThread thread = - new WorkflowThreadImpl( - threadPool, - this, - nr.name, - CALLBACK_THREAD_PRIORITY - + (addedThreads++), // maintain the order in toExecuteInWorkflowThread - false, - runnerCancellationScope, - nr.runnable, - cache, - getContextPropagators(), - getPropagatedContexts()); - callbackThreads.add(thread); + workflowContext.getWorkflowInboundInterceptor().newCallbackThread(nr.runnable, nr.name); } // It is important to prepend threads as there are callbacks @@ -269,10 +190,10 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) { // Otherwise signal might be never processed if it was received // after workflow decided to close. // Adding the callbacks in the same order as they appear in history. - - for (int i = callbackThreads.size() - 1; i >= 0; i--) { - threads.add(callbackThreads.get(i)); + for (int i = callbackThreadsToAdd.size() - 1; i >= 0; i--) { + threads.add(callbackThreadsToAdd.get(i)); } + callbackThreadsToAdd.clear(); } toExecuteInWorkflowThread.clear(); progress = false; @@ -296,9 +217,8 @@ public void runUntilAllBlocked(long deadlockDetectionTimeout) { close(); throw WorkflowInternal.wrap(unhandledException); } - for (WorkflowThread c : threadsToAdd) { - threads.add(c); - } + threads.addAll(workflowThreadsToAdd); + workflowThreadsToAdd.clear(); } while (progress && !threads.isEmpty()); } catch (PotentialDeadlockException e) { StringBuilder dump = new StringBuilder(); @@ -333,7 +253,6 @@ public boolean isDone() { } @Override - @SuppressWarnings("unchecked") public Object getExitValue() { lock.lock(); try { @@ -367,10 +286,10 @@ public void close() { return; } try { - for (WorkflowThread c : threadsToAdd) { + for (WorkflowThread c : workflowThreadsToAdd) { threads.add(c); } - threadsToAdd.clear(); + workflowThreadsToAdd.clear(); for (WorkflowThread c : threads) { threadFutures.add(c.stopNow()); @@ -440,38 +359,78 @@ private void checkClosed() { } } - /** To be called only from another workflow thread. */ - public WorkflowThread newThread(Runnable runnable, boolean detached, String name) { + /** Creates a new instance of a root workflow thread. */ + private WorkflowThread newRootThread(Runnable runnable) { + String name = WORKFLOW_ROOT_THREAD_NAME; + // TODO: workflow instance specific thread name + // String name = "workflow[" + workflowContext.getContext().getWorkflowId() + "]-root"; + if (rootWorkflowThread != null) { + throw new IllegalStateException( + "newRootThread can be called only if there is no existing root workflow thread"); + } + rootWorkflowThread = + new WorkflowThreadImpl( + threadPool, + this, + name, + ROOT_THREAD_PRIORITY, + false, + runnerCancellationScope, + runnable, + cache, + getContextPropagators(), + getPropagatedContexts()); + return rootWorkflowThread; + } + + @Override + public WorkflowThread newWorkflowThread( + Runnable runnable, boolean detached, @Nullable String name) { if (name == null) { name = "workflow[" + workflowContext.getContext().getWorkflowId() + "]-" + addedThreads; } - WorkflowThread result; if (rootWorkflowThread == null) { - rootWorkflowThread = newRootWorkflowThread(runnable, detached, name); - result = rootWorkflowThread; - } else { - checkWorkflowThreadOnly(); - checkClosed(); - result = - new WorkflowThreadImpl( - threadPool, - this, - name, - WORKFLOW_THREAD_PRIORITY + (addedThreads++), - detached, - CancellationScopeImpl.current(), - runnable, - cache, - getContextPropagators(), - getPropagatedContexts()); + throw new IllegalStateException( + "newChildThread can be called only with existing root workflow thread"); } - threadsToAdd.add(result); // This is synchronized collection. + checkWorkflowThreadOnly(); + checkClosed(); + WorkflowThread result = + new WorkflowThreadImpl( + threadPool, + this, + name, + WORKFLOW_THREAD_PRIORITY + (addedThreads++), + detached, + CancellationScopeImpl.current(), + runnable, + cache, + getContextPropagators(), + getPropagatedContexts()); + workflowThreadsToAdd.add(result); // This is synchronized collection. return result; } @Override - public void setInterceptorHead(WorkflowOutboundCallsInterceptor interceptorHead) { - this.interceptorHead = Objects.requireNonNull(interceptorHead); + public WorkflowThread newCallbackThread(Runnable runnable, @Nullable String name) { + if (name == null) { + name = "workflow[" + workflowContext.getContext().getWorkflowId() + "]-" + addedThreads; + } + WorkflowThread result = + new WorkflowThreadImpl( + threadPool, + this, + name, + CALLBACK_THREAD_PRIORITY + + (addedThreads++), // maintain the order in toExecuteInWorkflowThread + false, + runnerCancellationScope, + runnable, + cache, + getContextPropagators(), + getPropagatedContexts()); + callbackThreadsToAdd.add(result); + return result; } /** @@ -548,222 +507,11 @@ private List getContextPropagators() { } } - private static final class DummyReplayWorkflowContext implements ReplayWorkflowContext { - - private final Timer timer = new Timer(); - - @Override - public WorkflowExecution getWorkflowExecution() { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public WorkflowExecution getParentWorkflowExecution() { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public WorkflowType getWorkflowType() { - return WorkflowType.newBuilder().setName("dummy-workflow").build(); - } - - @Override - public boolean isCancelRequested() { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public ContinueAsNewWorkflowExecutionCommandAttributes getContinueAsNewOnCompletion() { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public void setContinueAsNewOnCompletion( - ContinueAsNewWorkflowExecutionCommandAttributes attributes) { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public Optional getContinuedExecutionRunId() { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public String getTaskQueue() { - return "dummy-task-queue"; - } - - @Override - public String getNamespace() { - return "dummy-namespace"; - } - - @Override - public String getWorkflowId() { - return "dummy-workflow-id"; - } - - @Override - public String getRunId() { - return "dummy-run-id"; - } - - @Override - public Duration getWorkflowRunTimeout() { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public Duration getWorkflowExecutionTimeout() { - return Duration.ZERO; - } - - @Override - public long getRunStartedTimestampMillis() { - return 0; - } - - @Override - public long getWorkflowExecutionExpirationTimestampMillis() { - return 0; - } - - @Override - public Duration getWorkflowTaskTimeout() { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public SearchAttributes getSearchAttributes() { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public Map getPropagatedContexts() { - return null; - } - - @Override - public List getContextPropagators() { - return null; - } - - @Override - public Functions.Proc1 scheduleActivityTask( - ExecuteActivityParameters parameters, - Functions.Proc2, Failure> callback) { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public Functions.Proc scheduleLocalActivityTask( - ExecuteLocalActivityParameters parameters, - Functions.Proc2, Failure> callback) { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public Functions.Proc1 startChildWorkflow( - StartChildWorkflowExecutionParameters parameters, - Functions.Proc1 executionCallback, - Functions.Proc2, Exception> callback) { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public Functions.Proc1 signalExternalWorkflowExecution( - SignalExternalWorkflowExecutionCommandAttributes.Builder attributes, - Functions.Proc2 callback) { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public void requestCancelExternalWorkflowExecution( - WorkflowExecution execution, Functions.Proc2 callback) { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public void continueAsNewOnCompletion( - ContinueAsNewWorkflowExecutionCommandAttributes attributes) { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public long currentTimeMillis() { - return System.currentTimeMillis(); - } - - @Override - public Functions.Proc1 newTimer( - Duration delay, Functions.Proc1 callback) { - timer.schedule( - new TimerTask() { - @Override - public void run() { - callback.apply(null); - } - }, - delay.toMillis()); - return (e) -> { - callback.apply(new CanceledFailure(null)); - }; - } - - @Override - public void sideEffect( - Func> func, Functions.Proc1> callback) { - callback.apply(func.apply()); - } - - @Override - public void mutableSideEffect( - String id, - Func1, Optional> func, - Functions.Proc1> callback) { - callback.apply(func.apply(Optional.empty())); - } - - @Override - public boolean isReplaying() { - return false; - } - - @Override - public void getVersion( - String changeId, int minSupported, int maxSupported, Functions.Proc1 callback) { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public Random newRandom() { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public Scope getMetricsScope() { - return new NoopScope(); - } - - @Override - public boolean getEnableLoggingInReplay() { - return false; - } - - @Override - public UUID randomUUID() { - return UUID.randomUUID(); - } - - @Override - public void upsertSearchAttributes(SearchAttributes searchAttributes) { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public int getAttempt() { - return 1; - } + private static ThreadPoolExecutor getDefaultThreadPool() { + ThreadPoolExecutor result = + new ThreadPoolExecutor(0, 1000, 1, TimeUnit.SECONDS, new SynchronousQueue<>()); + result.setThreadFactory(r -> new Thread(r, "deterministic runner thread")); + return result; } private static class WorkflowThreadMarkerAccessor extends WorkflowThreadMarker { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/DynamicSyncWorkflowDefinition.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/DynamicSyncWorkflowDefinition.java index 111a09b624..b5915de184 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/DynamicSyncWorkflowDefinition.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/DynamicSyncWorkflowDefinition.java @@ -37,7 +37,6 @@ final class DynamicSyncWorkflowDefinition implements SyncWorkflowDefinition { private final WorkerInterceptor[] workerInterceptors; private final DataConverter dataConverter; private WorkflowInboundCallsInterceptor workflowInvoker; - private DynamicWorkflow workflow; public DynamicSyncWorkflowDefinition( Functions.Func factory, @@ -55,7 +54,7 @@ public void initialize() { for (WorkerInterceptor workerInterceptor : workerInterceptors) { workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker); } - workflowContext.setHeadInboundCallsInterceptor(workflowInvoker); + workflowContext.initHeadInboundCallsInterceptor(workflowInvoker); workflowInvoker.init(workflowContext); } @@ -68,16 +67,17 @@ public Optional execute(Header header, Optional input) { return dataConverter.toPayloads(result.getResult()); } - private class RootWorkflowInboundCallsInterceptor implements WorkflowInboundCallsInterceptor { - private final SyncWorkflowContext workflowContext; + private class RootWorkflowInboundCallsInterceptor + extends BaseRootWorkflowInboundCallsInterceptor { + private DynamicWorkflow workflow; public RootWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) { - this.workflowContext = workflowContext; + super(workflowContext); } @Override public void init(WorkflowOutboundCallsInterceptor outboundCalls) { - WorkflowInternal.getRootWorkflowContext().setHeadInterceptor(outboundCalls); + super.init(outboundCalls); newInstance(); WorkflowInternal.registerListener(workflow); } @@ -88,16 +88,6 @@ public WorkflowOutput execute(WorkflowInput input) { return new WorkflowOutput(result); } - @Override - public void handleSignal(SignalInput input) { - workflowContext.handleInterceptedSignal(input); - } - - @Override - public QueryOutput handleQuery(QueryInput input) { - return workflowContext.handleInterceptedQuery(input); - } - private void newInstance() { if (workflow != null) { throw new IllegalStateException("Already called"); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java index 40ca483812..b5f3c12697 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/POJOWorkflowImplementationFactory.java @@ -261,7 +261,6 @@ private class POJOWorkflowImplementation implements SyncWorkflowDefinition { private final String workflowName; private final Method workflowMethod; private final Class workflowImplementationClass; - private Object workflow; private WorkflowInboundCallsInterceptor workflowInvoker; public POJOWorkflowImplementation( @@ -278,7 +277,7 @@ public void initialize() { for (WorkerInterceptor workerInterceptor : workerInterceptors) { workflowInvoker = workerInterceptor.interceptWorkflow(workflowInvoker); } - workflowContext.setHeadInboundCallsInterceptor(workflowInvoker); + workflowContext.initHeadInboundCallsInterceptor(workflowInvoker); workflowInvoker.init(workflowContext); } @@ -300,35 +299,19 @@ public Optional execute(Header header, Optional input) return dataConverter.toPayloads(result.getResult()); } - private void newInstance() { - if (workflow != null) { - throw new IllegalStateException("Already called"); - } - Func factory = workflowImplementationFactories.get(workflowImplementationClass); - if (factory != null) { - workflow = factory.apply(); - } else { - try { - workflow = workflowImplementationClass.getDeclaredConstructor().newInstance(); - } catch (NoSuchMethodException - | InstantiationException - | IllegalAccessException - | InvocationTargetException e) { - // Error to fail workflow task as this can be fixed by a new deployment. - throw new Error( - "Failure instantiating workflow implementation class " - + workflowImplementationClass.getName(), - e); - } - } - } - - private class RootWorkflowInboundCallsInterceptor implements WorkflowInboundCallsInterceptor { - - private final SyncWorkflowContext workflowContext; + private class RootWorkflowInboundCallsInterceptor + extends BaseRootWorkflowInboundCallsInterceptor { + private Object workflow; public RootWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) { - this.workflowContext = workflowContext; + super(workflowContext); + } + + @Override + public void init(WorkflowOutboundCallsInterceptor outboundCalls) { + super.init(outboundCalls); + newInstance(); + WorkflowInternal.registerListener(workflow); } @Override @@ -381,21 +364,24 @@ private void logWorkflowExecutionException(WorkflowInfo info, Throwable exceptio exception); } - @Override - public void init(WorkflowOutboundCallsInterceptor outboundCalls) { - WorkflowInternal.getRootWorkflowContext().setHeadInterceptor(outboundCalls); - newInstance(); - WorkflowInternal.registerListener(workflow); - } - - @Override - public void handleSignal(SignalInput input) { - workflowContext.handleInterceptedSignal(input); - } - - @Override - public QueryOutput handleQuery(QueryInput input) { - return workflowContext.handleInterceptedQuery(input); + protected void newInstance() { + Func factory = workflowImplementationFactories.get(workflowImplementationClass); + if (factory != null) { + workflow = factory.apply(); + } else { + try { + workflow = workflowImplementationClass.getDeclaredConstructor().newInstance(); + } catch (NoSuchMethodException + | InstantiationException + | IllegalAccessException + | InvocationTargetException e) { + // Error to fail workflow task as this can be fixed by a new deployment. + throw new Error( + "Failure instantiating workflow implementation class " + + workflowImplementationClass.getName(), + e); + } + } } } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java index b90bc3da81..60538964ae 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java @@ -127,14 +127,11 @@ public void start(HistoryEvent event, ReplayWorkflowContext context) { syncContext, () -> { workflow.initialize(); - WorkflowInternal.newThread( - false, - DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME, - () -> workflowProc.run()) + WorkflowInternal.newWorkflowMethodThread( + () -> workflowProc.run(), DeterministicRunnerImpl.WORKFLOW_MAIN_THREAD_NAME) .start(); }, cache); - runner.setInterceptorHead(syncContext.getWorkflowInterceptor()); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 6011297584..65a19f4582 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -39,6 +39,7 @@ import io.temporal.api.common.v1.WorkflowType; import io.temporal.api.enums.v1.ParentClosePolicy; import io.temporal.api.failure.v1.Failure; +import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.taskqueue.v1.TaskQueue; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse; import io.temporal.client.WorkflowException; @@ -82,6 +83,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; import java.util.function.Supplier; +import javax.annotation.Nullable; final class SyncWorkflowContext implements WorkflowOutboundCallsInterceptor { @@ -92,7 +94,8 @@ final class SyncWorkflowContext implements WorkflowOutboundCallsInterceptor { private final QueryDispatcher queryDispatcher; private final Optional lastCompletionResult; private final Optional lastFailure; - private WorkflowOutboundCallsInterceptor headInterceptor; + private WorkflowInboundCallsInterceptor headInboundInterceptor; + private WorkflowOutboundCallsInterceptor headOutboundInterceptor; private DeterministicRunner runner; private ActivityOptions defaultActivityOptions = null; @@ -125,6 +128,13 @@ public SyncWorkflowContext( this.defaultActivityOptions = workflowImplementationOptions.getDefaultActivityOptions(); this.activityOptionsMap = workflowImplementationOptions.getActivityOptions(); } + // initial values for headInboundInterceptor and headOutboundInterceptor until they initialized + // with actual interceptors through #initHeadInboundCallsInterceptor and + // #initHeadOutboundCallsInterceptor during initialization phase. + // See workflow.initialize() performed inside the workflow root thread inside + // SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext) + this.headInboundInterceptor = new InitialWorkflowInboundCallsInterceptor(this); + this.headOutboundInterceptor = this; } /** @@ -139,16 +149,22 @@ public DeterministicRunner getRunner() { return runner; } - public WorkflowOutboundCallsInterceptor getWorkflowInterceptor() { - // This is needed for unit tests that create DeterministicRunner directly. - return headInterceptor == null ? this : headInterceptor; + public WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() { + return headOutboundInterceptor; } - public void setHeadInterceptor(WorkflowOutboundCallsInterceptor head) { - if (headInterceptor == null) { - runner.setInterceptorHead(head); - this.headInterceptor = head; - } + public WorkflowInboundCallsInterceptor getWorkflowInboundInterceptor() { + return headInboundInterceptor; + } + + public void initHeadOutboundCallsInterceptor(WorkflowOutboundCallsInterceptor head) { + headOutboundInterceptor = head; + } + + public void initHeadInboundCallsInterceptor(WorkflowInboundCallsInterceptor head) { + headInboundInterceptor = head; + signalDispatcher.setInboundCallsInterceptor(head); + queryDispatcher.setInboundCallsInterceptor(head); } public ActivityOptions getDefaultActivityOptions() { @@ -228,11 +244,6 @@ public Optional handleQuery(String queryName, Optional input return queryDispatcher.handleQuery(queryName, input); } - public void setHeadInboundCallsInterceptor(WorkflowInboundCallsInterceptor inbound) { - signalDispatcher.setInboundCallsInterceptor(inbound); - queryDispatcher.setInboundCallsInterceptor(inbound); - } - private class ActivityCallback { private final CompletablePromise> result = Workflow.newPromise(); @@ -824,13 +835,43 @@ public void upsertSearchAttributes(Map searchAttributes) { context.upsertSearchAttributes(attr); } + public Object newWorkflowMethodThreadIntercepted(Runnable runnable, @Nullable String name) { + return runner.newWorkflowThread(runnable, false, name); + } + + public Object newWorkflowCallbackThreadIntercepted(Runnable runnable, @Nullable String name) { + return runner.newCallbackThread(runnable, name); + } + @Override - public Object newThread(Runnable runnable, boolean detached, String name) { - return runner.newThread(runnable, detached, name); + public Object newChildThread(Runnable runnable, boolean detached, String name) { + return runner.newWorkflowThread(runnable, detached, name); } @Override public long currentTimeMillis() { return context.currentTimeMillis(); } + + /** + * This WorkflowInboundCallsInterceptor is used during creation of the initial root workflow + * thread and should be replaced with another specific implementation during initialization stage + * {@code workflow.initialize()} performed inside the workflow root thread. + * + * @see SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext) + */ + private static final class InitialWorkflowInboundCallsInterceptor + extends BaseRootWorkflowInboundCallsInterceptor { + + public InitialWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) { + super(workflowContext); + } + + @Override + public WorkflowOutput execute(WorkflowInput input) { + throw new UnsupportedOperationException( + "SyncWorkflowContext should be initialized with a non-initial WorkflowInboundCallsInterceptor " + + "before #execute can be called"); + } + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 8a5acd9289..6dd5bf87b5 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -20,6 +20,7 @@ package io.temporal.internal.sync; import static io.temporal.internal.sync.AsyncInternal.AsyncMarker; +import static io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal; import com.uber.m3.tally.Scope; import io.temporal.activity.ActivityOptions; @@ -75,16 +76,12 @@ public final class WorkflowInternal { public static final int DEFAULT_VERSION = -1; - public static WorkflowThread newThread(boolean ignoreParentCancellation, Runnable runnable) { - return WorkflowThread.newThread(runnable, ignoreParentCancellation); - } - - public static WorkflowThread newThread( - boolean ignoreParentCancellation, String name, Runnable runnable) { - if (name == null) { - throw new NullPointerException("name cannot be null"); - } - return WorkflowThread.newThread(runnable, ignoreParentCancellation, name); + public static WorkflowThread newWorkflowMethodThread(Runnable runnable, String name) { + return (WorkflowThread) + currentThreadInternal() + .getWorkflowContext() + .getWorkflowInboundInterceptor() + .newWorkflowMethodThread(runnable, name); } public static Promise newTimer(Duration duration) { @@ -231,7 +228,10 @@ public static T newActivityStub( } InvocationHandler invocationHandler = ActivityInvocationHandler.newInstance( - activityInterface, options, mergedActivityOptionsMap, context.getWorkflowInterceptor()); + activityInterface, + options, + mergedActivityOptionsMap, + context.getWorkflowOutboundInterceptor()); return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler); } @@ -347,7 +347,7 @@ public static R executeActivity( private static WorkflowOutboundCallsInterceptor getWorkflowInterceptor() { return DeterministicRunnerImpl.currentThreadInternal() .getWorkflowContext() - .getWorkflowInterceptor(); + .getWorkflowOutboundInterceptor(); } static SyncWorkflowContext getRootWorkflowContext() { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThread.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThread.java index 88d069187c..dc63fd9ed7 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThread.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThread.java @@ -28,7 +28,7 @@ import java.util.function.Supplier; /** Thread that is scheduled deterministically by {@link DeterministicRunner}. */ -interface WorkflowThread extends CancellationScope { +public interface WorkflowThread extends CancellationScope { /** * Block current thread until unblockCondition is evaluated to true. This method is intended for @@ -60,8 +60,8 @@ static WorkflowThread newThread(Runnable runnable, boolean detached, String name return (WorkflowThread) currentThreadInternal() .getWorkflowContext() - .getWorkflowInterceptor() - .newThread(runnable, detached, name); + .getWorkflowOutboundInterceptor() + .newChildThread(runnable, detached, name); } void start(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadContext.java index 57c4192883..362107af7b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadContext.java @@ -28,7 +28,7 @@ import java.util.concurrent.locks.Lock; import java.util.function.Supplier; -class WorkflowThreadContext { +public class WorkflowThreadContext { // Shared runner lock private final Lock lock; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java index d31026b8fa..bec239403c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java @@ -394,7 +394,17 @@ public void addStackTrace(StringBuilder result) { // These numbers might change if implementation changes. int omitTop = 5; int omitBottom = 7; + // TODO it's not a good idea to rely on the name to understand the thread type. Instead of that + // we would better + // assign an explicit thread type enum to the threads. This will be especially important when we + // refactor + // root and workflow-method + // thread names into names that will include workflowId if (DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME.equals(getName())) { + // TODO revisit this number + omitBottom = 11; + } else if (DeterministicRunnerImpl.WORKFLOW_MAIN_THREAD_NAME.equals(getName())) { + // TODO revisit this number omitBottom = 11; } StackTraceElement[] stackTrace = thread.getStackTrace(); diff --git a/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java index f9fecff5bc..166b25fd30 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java @@ -76,7 +76,6 @@ public class DeterministicRunnerTest { private boolean unblock1; private boolean unblock2; private Throwable failure; - private long currentTime; private ExecutorService threadPool; @Before @@ -85,7 +84,6 @@ public void setUp() { unblock2 = false; failure = null; status = "initial"; - currentTime = 0; threadPool = new ThreadPoolExecutor(1, 1000, 1, TimeUnit.SECONDS, new SynchronousQueue<>()); } @@ -96,9 +94,10 @@ public void tearDown() throws InterruptedException { } @Test - public void testYield() throws Throwable { + public void testYield() { DeterministicRunner d = new DeterministicRunnerImpl( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { status = "started"; WorkflowThread.await("reason1", () -> unblock1); @@ -141,7 +140,7 @@ public void testRetry() throws Throwable { DeterministicRunnerImpl d = new DeterministicRunnerImpl( threadPool, - null, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("started"); Workflow.retry( @@ -179,9 +178,10 @@ public void testRetry() throws Throwable { } @Test - public void testRootFailure() throws Throwable { + public void testRootFailure() { DeterministicRunner d = new DeterministicRunnerImpl( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { status = "started"; WorkflowThread.await("reason1", () -> unblock1); @@ -201,9 +201,10 @@ public void testRootFailure() throws Throwable { } @Test - public void testDispatcherStop() throws Throwable { + public void testDispatcherStop() { DeterministicRunner d = new DeterministicRunnerImpl( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { status = "started"; WorkflowThread.await("reason1", () -> unblock1); @@ -229,9 +230,10 @@ public void testDispatcherStop() throws Throwable { } @Test - public void testDispatcherExit() throws Throwable { + public void testDispatcherExit() { DeterministicRunner d = new DeterministicRunnerImpl( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root started"); Promise thread1 = @@ -268,10 +270,11 @@ public void testDispatcherExit() throws Throwable { } @Test - public void testRootCancellation() throws Throwable { + public void testRootCancellation() { trace.add("init"); DeterministicRunner d = new DeterministicRunnerImpl( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root started"); WorkflowThread.await( @@ -294,10 +297,11 @@ public void testRootCancellation() throws Throwable { } @Test - public void testExplicitScopeCancellation() throws Throwable { + public void testExplicitScopeCancellation() { trace.add("init"); DeterministicRunner d = new DeterministicRunnerImpl( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root started"); CompletablePromise var = Workflow.newPromise(); @@ -340,6 +344,7 @@ public void testExplicitDetachedScopeCancellation() throws Throwable { trace.add("init"); DeterministicRunner d = new DeterministicRunnerImpl( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root started"); CompletablePromise var = Workflow.newPromise(); @@ -391,10 +396,11 @@ private Promise newTimer(int milliseconds) { } @Test - public void testExplicitThreadCancellation() throws Throwable { + public void testExplicitThreadCancellation() { trace.add("init"); DeterministicRunner d = new DeterministicRunnerImpl( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root started"); CompletablePromise threadDone = Workflow.newPromise(); @@ -434,10 +440,11 @@ public void testExplicitThreadCancellation() throws Throwable { } @Test - public void testExplicitCancellationOnFailure() throws Throwable { + public void testExplicitCancellationOnFailure() { trace.add("init"); DeterministicRunner d = new DeterministicRunnerImpl( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root started"); Workflow.newCancellationScope( @@ -492,10 +499,11 @@ public void testExplicitCancellationOnFailure() throws Throwable { } @Test - public void testDetachedCancellation() throws Throwable { + public void testDetachedCancellation() { trace.add("init"); DeterministicRunner d = new DeterministicRunnerImpl( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root started"); CompletablePromise done = Workflow.newPromise(); @@ -546,9 +554,10 @@ public void testDetachedCancellation() throws Throwable { } @Test - public void testChild() throws Throwable { + public void testChild() { DeterministicRunner d = new DeterministicRunnerImpl( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { Promise async = Async.procedure( @@ -602,8 +611,11 @@ public void apply() { } @Test - public void testChildTree() throws Throwable { - DeterministicRunner d = new DeterministicRunnerImpl(new TestChildTreeRunnable(0)::apply); + public void testChildTree() { + DeterministicRunner d = + new DeterministicRunnerImpl( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), + new TestChildTreeRunnable(0)::apply); d.runUntilAllBlocked(getDeadlockDetectionTimeout()); unblock1 = true; d.runUntilAllBlocked(getDeadlockDetectionTimeout()); @@ -711,7 +723,7 @@ public void workflowThreadsWillNotEvictCacheWhenMaxThreadCountIsHit() throws Thr DeterministicRunnerImpl d = new DeterministicRunnerImpl( threadPool, - null, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { Promise thread = Async.procedure( @@ -736,7 +748,7 @@ public void workflowThreadsWillNotEvictCacheWhenMaxThreadCountIsHit() throws Thr DeterministicRunnerImpl d2 = new DeterministicRunnerImpl( threadPool, - null, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { Promise thread = Async.procedure( @@ -791,7 +803,7 @@ public void testRejectedExecutionError() { DeterministicRunner d = new DeterministicRunnerImpl( threadPool, - null, + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { Promise async = Async.procedure(() -> status = "started"); async.get(); diff --git a/temporal-sdk/src/test/java/io/temporal/internal/sync/PromiseTest.java b/temporal-sdk/src/test/java/io/temporal/internal/sync/PromiseTest.java index 7053eab2f1..27c09b9820 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/sync/PromiseTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/sync/PromiseTest.java @@ -49,17 +49,17 @@ public class PromiseTest { @Rule public final Tracer trace = new Tracer(); @Test - public void testFailure() throws Throwable { + public void testFailure() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { CompletablePromise f = Workflow.newPromise(); trace.add("root begin"); - WorkflowInternal.newThread( - false, () -> f.completeExceptionally(new IllegalArgumentException("foo"))) + WorkflowThread.newThread( + () -> f.completeExceptionally(new IllegalArgumentException("foo")), false) .start(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { try { f.get(); @@ -69,7 +69,8 @@ public void testFailure() throws Throwable { assertEquals(IllegalArgumentException.class, e.getClass()); trace.add("thread1 get failure"); } - }) + }, + false) .start(); trace.add("root done"); }); @@ -82,13 +83,14 @@ public void testFailure() throws Throwable { } @Test - public void testGet() throws Throwable { + public void testGet() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { CompletablePromise f = Workflow.newPromise(); trace.add("root begin"); - WorkflowInternal.newThread(false, () -> f.complete("thread1")).start(); + WorkflowThread.newThread(() -> f.complete("thread1"), false).start(); trace.add(f.get()); trace.add("root done"); }); @@ -101,13 +103,14 @@ public void testGet() throws Throwable { } @Test - public void testCancellableGet() throws Throwable { + public void testCancellableGet() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { CompletablePromise f = Workflow.newPromise(); trace.add("root begin"); - WorkflowInternal.newThread(false, () -> f.complete("thread1")).start(); + WorkflowThread.newThread(() -> f.complete("thread1"), false).start(); trace.add(f.cancellableGet()); trace.add("root done"); }); @@ -120,9 +123,10 @@ public void testCancellableGet() throws Throwable { } @Test - public void testCancellableGetCancellation() throws Throwable { + public void testCancellableGetCancellation() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { CompletablePromise f = Workflow.newPromise(); trace.add("root begin"); @@ -193,34 +197,35 @@ public void testGetTimeout() throws Throwable { } @Test - public void testMultiple() throws Throwable { + public void testMultiple() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root begin"); CompletablePromise f1 = Workflow.newPromise(); CompletablePromise f2 = Workflow.newPromise(); CompletablePromise f3 = Workflow.newPromise(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); assertTrue(f1.get()); trace.add("thread1 f1"); f2.complete(true); trace.add("thread1 done"); - }) + }, + false) .start(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread2 begin"); assertTrue(f2.get()); trace.add("thread2 f2"); f3.complete(true); trace.add("thread2 done"); - }) + }, + false) .start(); f1.complete(true); assertFalse(f1.complete(false)); @@ -246,9 +251,10 @@ public void testMultiple() throws Throwable { } @Test - public void tstAsync() throws Throwable { + public void tstAsync() { DeterministicRunner runner = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root begin"); CompletablePromise f1 = Workflow.newPromise(); @@ -279,9 +285,10 @@ public void tstAsync() throws Throwable { } @Test - public void testAsyncFailure() throws Throwable { + public void testAsyncFailure() { DeterministicRunner runner = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root begin"); CompletablePromise f1 = Workflow.newPromise(); @@ -321,38 +328,39 @@ public void testAsyncFailure() throws Throwable { } @Test - public void testAllOf() throws Throwable { + public void testAllOf() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root begin"); CompletablePromise f1 = Workflow.newPromise(); CompletablePromise f2 = Workflow.newPromise(); CompletablePromise f3 = Workflow.newPromise(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); f1.complete("value1"); trace.add("thread1 done"); - }) + }, + false) .start(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread3 begin"); f3.complete("value3"); trace.add("thread3 done"); - }) + }, + false) .start(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread2 begin"); f2.complete("value2"); trace.add("thread2 done"); - }) + }, + false) .start(); List> promises = new ArrayList<>(); promises.add(f1); @@ -380,9 +388,10 @@ public void testAllOf() throws Throwable { } @Test - public void testAllOfImmediatelyReady() throws Throwable { + public void testAllOfImmediatelyReady() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root begin"); { @@ -421,38 +430,39 @@ public void testAllOfImmediatelyReady() throws Throwable { } @Test - public void testAnyOf() throws Throwable { + public void testAnyOf() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root begin"); CompletablePromise f1 = Workflow.newPromise(); CompletablePromise f2 = Workflow.newPromise(); CompletablePromise f3 = Workflow.newPromise(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); f1.complete("value1"); trace.add("thread1 done"); - }) + }, + false) .start(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread3 begin"); f3.complete("value3"); trace.add("thread3 done"); - }) + }, + false) .start(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread2 begin"); f2.complete("value2"); trace.add("thread2 done"); - }) + }, + false) .start(); List> promises = new ArrayList<>(); promises.add(f1); @@ -481,38 +491,39 @@ public void testAnyOf() throws Throwable { } @Test - public void testAllOfArray() throws Throwable { + public void testAllOfArray() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root begin"); CompletablePromise f1 = Workflow.newPromise(); CompletablePromise f2 = Workflow.newPromise(); CompletablePromise f3 = Workflow.newPromise(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); f1.complete("value1"); trace.add("thread1 done"); - }) + }, + false) .start(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread3 begin"); f3.complete(true); trace.add("thread3 done"); - }) + }, + false) .start(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread2 begin"); f2.complete(111); trace.add("thread2 done"); - }) + }, + false) .start(); trace.add("root before allOf"); assertFalse(f1.isCompleted()); @@ -541,38 +552,39 @@ public void testAllOfArray() throws Throwable { } @Test - public void testAnyOfArray() throws Throwable { + public void testAnyOfArray() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root begin"); CompletablePromise f1 = Workflow.newPromise(); CompletablePromise f2 = Workflow.newPromise(); CompletablePromise f3 = Workflow.newPromise(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); f1.complete("value1"); trace.add("thread1 done"); - }) + }, + false) .start(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread3 begin"); f3.complete(true); trace.add("thread3 done"); - }) + }, + false) .start(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread2 begin"); f2.complete(111); trace.add("thread2 done"); - }) + }, + false) .start(); trace.add("root before allOf"); assertFalse(f1.isCompleted()); @@ -602,9 +614,10 @@ public void testAnyOfArray() throws Throwable { } @Test - public void testAnyOfImmediatelyReady() throws Throwable { + public void testAnyOfImmediatelyReady() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { trace.add("root begin"); { diff --git a/temporal-sdk/src/test/java/io/temporal/internal/sync/WorkflowInternalDeprecatedQueueTest.java b/temporal-sdk/src/test/java/io/temporal/internal/sync/WorkflowInternalDeprecatedQueueTest.java index 5a7706746f..d6e17182b8 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/sync/WorkflowInternalDeprecatedQueueTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/sync/WorkflowInternalDeprecatedQueueTest.java @@ -43,24 +43,25 @@ public class WorkflowInternalDeprecatedQueueTest { public void testTakeBlocking() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { WorkflowQueue f = WorkflowInternal.newQueue(1); trace.add("root begin"); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); assertTrue(f.take()); trace.add("thread1 take success"); - }) + }, + false) .start(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread2 begin"); f.put(true); trace.add("thread2 put success"); - }) + }, + false) .start(); trace.add("root done"); }); @@ -82,11 +83,11 @@ public void testTakeBlocking() { public void testTakeCanceled() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { WorkflowQueue f = WorkflowInternal.newQueue(1); trace.add("root begin"); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); try { @@ -95,7 +96,8 @@ public void testTakeCanceled() { trace.add("thread1 CanceledException"); } trace.add("thread1 done"); - }) + }, + false) .start(); trace.add("root done"); }); @@ -115,11 +117,11 @@ public void testTakeCanceled() { public void testCancellableTakeCanceled() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { WorkflowQueue f = WorkflowInternal.newQueue(1); trace.add("root begin"); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); try { @@ -128,7 +130,8 @@ public void testCancellableTakeCanceled() { trace.add("thread1 CanceledFailure"); } trace.add("thread1 done"); - }) + }, + false) .start(); trace.add("root done"); }); @@ -158,8 +161,7 @@ public List test() { WorkflowQueue f = WorkflowInternal.newQueue(1); trace.add("root begin"); WorkflowThread thread1 = - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); Workflow.sleep(2000); @@ -167,19 +169,20 @@ public List test() { trace.add("thread1 take1 success"); assertFalse(f.take()); trace.add("thread1 take2 success"); - }); + }, + false); thread1.start(); WorkflowThread thread2 = - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread2 begin"); f.put(true); trace.add("thread2 put1 success"); f.put(false); trace.add("thread2 put2 success"); - }); + }, + false); thread2.start(); trace.add("root done"); Workflow.await(() -> thread1.isDone() && thread2.isDone()); @@ -188,7 +191,7 @@ public List test() { } @Test - public void testPutBlocking() throws Throwable { + public void testPutBlocking() { TestWorkflowEnvironment testEnv = TestWorkflowEnvironment.newInstance(); String testTaskQueue = "testTaskQueue"; Worker worker = testEnv.newWorker(testTaskQueue); @@ -270,11 +273,11 @@ public void testOfferPollPeek() { public void testPutCanceled() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { WorkflowQueue f = WorkflowInternal.newQueue(1); trace.add("root begin"); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); try { @@ -284,7 +287,8 @@ public void testPutCanceled() { trace.add("thread1 CanceledFailure"); } trace.add("thread1 done"); - }) + }, + false) .start(); trace.add("root done"); }); @@ -304,11 +308,11 @@ public void testPutCanceled() { public void testCancellablePutCanceled() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { WorkflowQueue f = WorkflowInternal.newQueue(1); trace.add("root begin"); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); try { @@ -318,7 +322,8 @@ public void testCancellablePutCanceled() { trace.add("thread1 CanceledFailure"); } trace.add("thread1 done"); - }) + }, + false) .start(); trace.add("root done"); }); @@ -338,11 +343,11 @@ public void testCancellablePutCanceled() { public void testMap() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { WorkflowQueue queue = WorkflowInternal.newQueue(1); trace.add("root begin"); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { QueueConsumer mapped = queue.map((s) -> s + "-mapped"); trace.add("thread1 begin"); @@ -350,7 +355,8 @@ public void testMap() { trace.add("thread1 " + mapped.take()); } trace.add("thread1 done"); - }) + }, + false) .start(); trace.add("root thread1 started"); for (int i = 0; i < 10; i++) { diff --git a/temporal-sdk/src/test/java/io/temporal/internal/sync/WorkflowInternalQueueTest.java b/temporal-sdk/src/test/java/io/temporal/internal/sync/WorkflowInternalQueueTest.java index 13063fc899..fab5ebe826 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/sync/WorkflowInternalQueueTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/sync/WorkflowInternalQueueTest.java @@ -46,24 +46,25 @@ public class WorkflowInternalQueueTest { public void testTakeBlocking() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { WorkflowQueue f = WorkflowInternal.newWorkflowQueue(1); trace.add("root begin"); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); assertTrue(f.take()); trace.add("thread1 take success"); - }) + }, + false) .start(); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread2 begin"); f.put(true); trace.add("thread2 put success"); - }) + }, + false) .start(); trace.add("root done"); }); @@ -85,11 +86,11 @@ public void testTakeBlocking() { public void testTakeCanceled() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { WorkflowQueue f = WorkflowInternal.newWorkflowQueue(1); trace.add("root begin"); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); try { @@ -98,7 +99,8 @@ public void testTakeCanceled() { trace.add("thread1 CanceledException"); } trace.add("thread1 done"); - }) + }, + false) .start(); trace.add("root done"); }); @@ -118,11 +120,11 @@ public void testTakeCanceled() { public void testCancellableTakeCanceled() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { WorkflowQueue f = WorkflowInternal.newWorkflowQueue(1); trace.add("root begin"); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); try { @@ -131,7 +133,8 @@ public void testCancellableTakeCanceled() { trace.add("thread1 CanceledFailure"); } trace.add("thread1 done"); - }) + }, + false) .start(); trace.add("root done"); }); @@ -161,8 +164,7 @@ public List test() { WorkflowQueue f = WorkflowInternal.newWorkflowQueue(1); trace.add("root begin"); WorkflowThread thread1 = - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); Workflow.sleep(2000); @@ -170,19 +172,20 @@ public List test() { trace.add("thread1 take1 success"); assertFalse(f.take()); trace.add("thread1 take2 success"); - }); + }, + false); thread1.start(); WorkflowThread thread2 = - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread2 begin"); f.put(true); trace.add("thread2 put1 success"); f.put(false); trace.add("thread2 put2 success"); - }); + }, + false); thread2.start(); trace.add("root done"); Workflow.await(() -> thread1.isDone() && thread2.isDone()); @@ -273,11 +276,11 @@ public void testOfferPollPeek() { public void testPutCanceled() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { WorkflowQueue f = WorkflowInternal.newWorkflowQueue(1); trace.add("root begin"); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); try { @@ -287,7 +290,8 @@ public void testPutCanceled() { trace.add("thread1 CanceledFailure"); } trace.add("thread1 done"); - }) + }, + false) .start(); trace.add("root done"); }); @@ -307,11 +311,11 @@ public void testPutCanceled() { public void testCancellablePutCanceled() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { WorkflowQueue f = WorkflowInternal.newWorkflowQueue(1); trace.add("root begin"); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { trace.add("thread1 begin"); try { @@ -321,7 +325,8 @@ public void testCancellablePutCanceled() { trace.add("thread1 CanceledFailure"); } trace.add("thread1 done"); - }) + }, + false) .start(); trace.add("root done"); }); @@ -341,11 +346,11 @@ public void testCancellablePutCanceled() { public void testMap() { DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { WorkflowQueue queue = WorkflowInternal.newWorkflowQueue(1); trace.add("root begin"); - WorkflowInternal.newThread( - false, + WorkflowThread.newThread( () -> { QueueConsumer mapped = queue.map((s) -> s + "-mapped"); trace.add("thread1 begin"); @@ -353,7 +358,8 @@ public void testMap() { trace.add("thread1 " + mapped.take()); } trace.add("thread1 done"); - }) + }, + false) .start(); trace.add("root thread1 started"); for (int i = 0; i < 10; i++) { @@ -393,6 +399,7 @@ public void testQueueOrder() { int[] result = new int[3]; DeterministicRunner r = DeterministicRunner.newRunner( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { queue.put(1); queue.put(2); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/interceptors/SignalWorkflowOutboundCallsInterceptor.java b/temporal-sdk/src/test/java/io/temporal/workflow/interceptors/SignalWorkflowOutboundCallsInterceptor.java index ef9f6f535a..a6992e3089 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/interceptors/SignalWorkflowOutboundCallsInterceptor.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/interceptors/SignalWorkflowOutboundCallsInterceptor.java @@ -161,8 +161,8 @@ public void upsertSearchAttributes(Map searchAttributes) { } @Override - public Object newThread(Runnable runnable, boolean detached, String name) { - return next.newThread(runnable, detached, name); + public Object newChildThread(Runnable runnable, boolean detached, String name) { + return next.newChildThread(runnable, detached, name); } @Override diff --git a/temporal-testing-junit4/src/main/java/io/temporal/testing/TracingWorkerInterceptor.java b/temporal-testing-junit4/src/main/java/io/temporal/testing/TracingWorkerInterceptor.java index f906e33ee5..4a9504f1e8 100644 --- a/temporal-testing-junit4/src/main/java/io/temporal/testing/TracingWorkerInterceptor.java +++ b/temporal-testing-junit4/src/main/java/io/temporal/testing/TracingWorkerInterceptor.java @@ -104,6 +104,14 @@ public QueryOutput handleQuery(QueryInput input) { trace.add("handleQuery " + input.getQueryName()); return super.handleQuery(input); } + + @Override + public Object newWorkflowMethodThread(Runnable runnable, String name) { + if (!Workflow.isReplaying()) { + trace.add("newThread " + name); + } + return next.newWorkflowMethodThread(runnable, name); + } }; } @@ -336,11 +344,11 @@ public void upsertSearchAttributes(Map searchAttributes) { } @Override - public Object newThread(Runnable runnable, boolean detached, String name) { + public Object newChildThread(Runnable runnable, boolean detached, String name) { if (!Workflow.isReplaying()) { trace.add("newThread " + name); } - return next.newThread(runnable, detached, name); + return next.newChildThread(runnable, detached, name); } @Override diff --git a/temporal-testing/src/main/java/io/temporal/internal/sync/DeterministicRunnerWrapper.java b/temporal-testing/src/main/java/io/temporal/internal/sync/DeterministicRunnerWrapper.java index 917e2107c0..00c61a8a80 100644 --- a/temporal-testing/src/main/java/io/temporal/internal/sync/DeterministicRunnerWrapper.java +++ b/temporal-testing/src/main/java/io/temporal/internal/sync/DeterministicRunnerWrapper.java @@ -38,6 +38,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl CompletableFuture result = new CompletableFuture<>(); DeterministicRunner runner = new DeterministicRunnerImpl( + DummySyncWorkflowContext.newDummySyncWorkflowContext(), () -> { try { result.complete(invocationHandler.invoke(proxy, method, args)); diff --git a/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java new file mode 100644 index 0000000000..5e0c7d1daa --- /dev/null +++ b/temporal-testing/src/main/java/io/temporal/internal/sync/DummySyncWorkflowContext.java @@ -0,0 +1,276 @@ +/* + * Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.internal.sync; + +import com.uber.m3.tally.NoopScope; +import com.uber.m3.tally.Scope; +import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes; +import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes; +import io.temporal.api.common.v1.Payloads; +import io.temporal.api.common.v1.SearchAttributes; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.common.v1.WorkflowType; +import io.temporal.api.failure.v1.Failure; +import io.temporal.common.context.ContextPropagator; +import io.temporal.common.converter.DataConverter; +import io.temporal.failure.CanceledFailure; +import io.temporal.internal.replay.ExecuteActivityParameters; +import io.temporal.internal.replay.ExecuteLocalActivityParameters; +import io.temporal.internal.replay.ReplayWorkflowContext; +import io.temporal.internal.replay.StartChildWorkflowExecutionParameters; +import io.temporal.workflow.Functions; +import java.time.Duration; +import java.util.*; + +public class DummySyncWorkflowContext { + public static SyncWorkflowContext newDummySyncWorkflowContext() { + SyncWorkflowContext context = + new SyncWorkflowContext( + new DummyReplayWorkflowContext(), DataConverter.getDefaultInstance(), null, null, null); + context.initHeadOutboundCallsInterceptor(context); + context.initHeadInboundCallsInterceptor( + new BaseRootWorkflowInboundCallsInterceptor(context) { + @Override + public WorkflowOutput execute(WorkflowInput input) { + throw new UnsupportedOperationException( + "#execute is not implemented or needed for low level DeterministicRunner tests"); + } + }); + return context; + } + + private static final class DummyReplayWorkflowContext implements ReplayWorkflowContext { + + private final Timer timer = new Timer(); + + @Override + public WorkflowExecution getWorkflowExecution() { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public WorkflowExecution getParentWorkflowExecution() { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public WorkflowType getWorkflowType() { + return WorkflowType.newBuilder().setName("dummy-workflow").build(); + } + + @Override + public boolean isCancelRequested() { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public ContinueAsNewWorkflowExecutionCommandAttributes getContinueAsNewOnCompletion() { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public void setContinueAsNewOnCompletion( + ContinueAsNewWorkflowExecutionCommandAttributes attributes) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Optional getContinuedExecutionRunId() { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public String getTaskQueue() { + return "dummy-task-queue"; + } + + @Override + public String getNamespace() { + return "dummy-namespace"; + } + + @Override + public String getWorkflowId() { + return "dummy-workflow-id"; + } + + @Override + public String getRunId() { + return "dummy-run-id"; + } + + @Override + public Duration getWorkflowRunTimeout() { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Duration getWorkflowExecutionTimeout() { + return Duration.ZERO; + } + + @Override + public long getRunStartedTimestampMillis() { + return 0; + } + + @Override + public long getWorkflowExecutionExpirationTimestampMillis() { + return 0; + } + + @Override + public Duration getWorkflowTaskTimeout() { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public SearchAttributes getSearchAttributes() { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Map getPropagatedContexts() { + return null; + } + + @Override + public List getContextPropagators() { + return null; + } + + @Override + public Functions.Proc1 scheduleActivityTask( + ExecuteActivityParameters parameters, + Functions.Proc2, Failure> callback) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Functions.Proc scheduleLocalActivityTask( + ExecuteLocalActivityParameters parameters, + Functions.Proc2, Failure> callback) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Functions.Proc1 startChildWorkflow( + StartChildWorkflowExecutionParameters parameters, + Functions.Proc1 executionCallback, + Functions.Proc2, Exception> callback) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Functions.Proc1 signalExternalWorkflowExecution( + SignalExternalWorkflowExecutionCommandAttributes.Builder attributes, + Functions.Proc2 callback) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public void requestCancelExternalWorkflowExecution( + WorkflowExecution execution, Functions.Proc2 callback) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public void continueAsNewOnCompletion( + ContinueAsNewWorkflowExecutionCommandAttributes attributes) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long currentTimeMillis() { + return System.currentTimeMillis(); + } + + @Override + public Functions.Proc1 newTimer( + Duration delay, Functions.Proc1 callback) { + timer.schedule( + new TimerTask() { + @Override + public void run() { + callback.apply(null); + } + }, + delay.toMillis()); + return (e) -> { + callback.apply(new CanceledFailure(null)); + }; + } + + @Override + public void sideEffect( + Functions.Func> func, Functions.Proc1> callback) { + callback.apply(func.apply()); + } + + @Override + public void mutableSideEffect( + String id, + Functions.Func1, Optional> func, + Functions.Proc1> callback) { + callback.apply(func.apply(Optional.empty())); + } + + @Override + public boolean isReplaying() { + return false; + } + + @Override + public void getVersion( + String changeId, int minSupported, int maxSupported, Functions.Proc1 callback) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Random newRandom() { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Scope getMetricsScope() { + return new NoopScope(); + } + + @Override + public boolean getEnableLoggingInReplay() { + return false; + } + + @Override + public UUID randomUUID() { + return UUID.randomUUID(); + } + + @Override + public void upsertSearchAttributes(SearchAttributes searchAttributes) { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public int getAttempt() { + return 1; + } + } +} diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index 9ea347b743..56aa395c3f 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -456,7 +456,7 @@ public void upsertSearchAttributes(Map searchAttributes) { } @Override - public Object newThread(Runnable runnable, boolean detached, String name) { + public Object newChildThread(Runnable runnable, boolean detached, String name) { throw new UnsupportedOperationException("not implemented"); }