Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package io.temporal.common.interceptors;

import javax.annotation.Nullable;

/**
* Intercepts calls to the workflow execution. Executes under workflow context. So all the
* restrictions on the workflow code should be obeyed.
Expand Down Expand Up @@ -128,4 +130,24 @@ public Object getResult() {

/** Called when a workflow is queried. */
QueryOutput handleQuery(QueryInput input);

/**
* Intercepts creation of the workflow main method thread
*
* @param runnable thread function to run
* @param name name of the thread, optional
* @return created workflow thread. Should be treated as a pass-through object that shouldn't be
* manipulated in any way by the interceptor code.
*/
Object newWorkflowMethodThread(Runnable runnable, @Nullable String name);

/**
* Intercepts creation of a workflow callback thread
*
* @param runnable thread function to run
* @param name name of the thread, optional
* @return created workflow thread. Should be treated as a pass-through object that shouldn't be
* manipulated in any way by the interceptor code.
*/
Object newCallbackThread(Runnable runnable, @Nullable String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,14 @@ public void handleSignal(SignalInput input) {
public QueryOutput handleQuery(QueryInput input) {
return next.handleQuery(input);
}

@Override
public Object newWorkflowMethodThread(Runnable runnable, String name) {
return next.newWorkflowMethodThread(runnable, name);
}

@Override
public Object newCallbackThread(Runnable runnable, String name) {
return next.newCallbackThread(runnable, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@
import io.temporal.activity.ActivityOptions;
import io.temporal.activity.LocalActivityOptions;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.ContinueAsNewOptions;
import io.temporal.workflow.DynamicQueryHandler;
import io.temporal.workflow.DynamicSignalHandler;
import io.temporal.workflow.Functions;
import io.temporal.workflow.*;
import io.temporal.workflow.Functions.Func;
import io.temporal.workflow.Promise;
import java.lang.reflect.Type;
import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -483,7 +478,18 @@ <R> R mutableSideEffect(

void upsertSearchAttributes(Map<String, Object> searchAttributes);

Object newThread(Runnable runnable, boolean detached, String name);
/**
* Intercepts creation of the workflow child thread.
*
* <p>Please note, that "workflow child thread" and "child workflow" are different and independent
* concepts.
*
* @param runnable thread function to run
* @param detached if this thread is detached from the parent {@link CancellationScope}
* @param name name of the thread
* @return created WorkflowThread
*/
Object newChildThread(Runnable runnable, boolean detached, String name);

long currentTimeMillis();
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ public void upsertSearchAttributes(Map<String, Object> searchAttributes) {
}

@Override
public Object newThread(Runnable runnable, boolean detached, String name) {
return next.newThread(runnable, detached, name);
public Object newChildThread(Runnable runnable, boolean detached, String name) {
return next.newChildThread(runnable, detached, name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,15 @@ private static <R> Promise<R> execute(boolean async, Functions.Func<R> func) {
}
} else {
CompletablePromise<R> result = Workflow.newPromise();
WorkflowInternal.newThread(
false,
WorkflowThread.newThread(
() -> {
try {
result.complete(func.apply());
} catch (Exception e) {
result.completeExceptionally(Workflow.wrap(e));
}
})
},
false)
.start();
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.internal.sync;

import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;

/**
* Provides core functionality for a root WorkflowInboundCallsInterceptor that is reused by specific
* root RootWorkflowInboundCallsInterceptor implementations inside {@link
* DynamicSyncWorkflowDefinition#} and {@link POJOWorkflowImplementationFactory}
*
* <p>Root {@code WorkflowInboundCallsInterceptor} is an interceptor that should be at the end of
* the {@code WorkflowInboundCallsInterceptor} interceptors chain and which encapsulates calls into
* Temporal internals while providing a WorkflowInboundCallsInterceptor interface for chaining on
* top of it.
*/
public abstract class BaseRootWorkflowInboundCallsInterceptor
implements WorkflowInboundCallsInterceptor {
protected final SyncWorkflowContext workflowContext;

public BaseRootWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) {
this.workflowContext = workflowContext;
}

@Override
public void init(WorkflowOutboundCallsInterceptor outboundCalls) {
workflowContext.initHeadOutboundCallsInterceptor(outboundCalls);
}

@Override
public void handleSignal(SignalInput input) {
workflowContext.handleInterceptedSignal(input);
}

@Override
public QueryOutput handleQuery(QueryInput input) {
return workflowContext.handleInterceptedQuery(input);
}

@Override
public Object newWorkflowMethodThread(Runnable runnable, String name) {
return workflowContext.newWorkflowMethodThreadIntercepted(runnable, name);
}

@Override
public Object newCallbackThread(Runnable runnable, String name) {
return workflowContext.newWorkflowCallbackThreadIntercepted(runnable, name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

package io.temporal.internal.sync;

import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
import io.temporal.internal.replay.WorkflowExecutorCache;
import io.temporal.workflow.CancellationScope;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* Executes code passed to {@link #newRunner(Runnable)} as well as threads created from it using
* {@link WorkflowInternal#newThread(boolean, Runnable)} deterministically. Requires use of provided
* {@link WorkflowThread#newThread(Runnable, boolean)} deterministically. Requires use of provided
* wrappers for synchronization and notification instead of native ones.
*/
interface DeterministicRunner {
Expand All @@ -40,12 +39,8 @@ static long getDeadlockDetectionTimeout() {
return debugMode ? Long.MAX_VALUE : DEFAULT_DEADLOCK_DETECTION_TIMEOUT;
}

static DeterministicRunner newRunner(Runnable root) {
return new DeterministicRunnerImpl(root);
}

static DeterministicRunner newRunner(Supplier<Long> clock, Runnable root) {
return new DeterministicRunnerImpl(clock, root);
static DeterministicRunner newRunner(SyncWorkflowContext workflowContext, Runnable root) {
return new DeterministicRunnerImpl(workflowContext, root);
}

/**
Expand Down Expand Up @@ -118,9 +113,11 @@ static DeterministicRunner newRunner(
void executeInWorkflowThread(String name, Runnable r);

/**
* Creates a new instance of a workflow thread. To be called only from another workflow thread.
* Creates a new instance of a workflow child thread. To be called only from another workflow
* thread.
*/
WorkflowThread newThread(Runnable runnable, boolean detached, String name);
WorkflowThread newWorkflowThread(Runnable runnable, boolean detached, @Nullable String name);

void setInterceptorHead(WorkflowOutboundCallsInterceptor interceptorHead);
/** Creates a new instance of a workflow callback thread. */
WorkflowThread newCallbackThread(Runnable runnable, @Nullable String name);
}
Loading