/
WorkflowThreadImpl.java
470 lines (420 loc) · 14.5 KB
/
WorkflowThreadImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.temporal.internal.sync;
import com.google.common.base.Preconditions;
import io.temporal.common.context.ContextPropagator;
import io.temporal.failure.CanceledFailure;
import io.temporal.internal.common.NonIdempotentHandle;
import io.temporal.internal.context.ContextThreadLocal;
import io.temporal.internal.logging.LoggerTag;
import io.temporal.internal.replay.ReplayWorkflowContext;
import io.temporal.internal.worker.WorkflowExecutorCache;
import io.temporal.workflow.Functions;
import io.temporal.workflow.Promise;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
class WorkflowThreadImpl implements WorkflowThread {
/**
* Runnable passed to the thread that wraps a runnable passed to the WorkflowThreadImpl
* constructor.
*/
class RunnableWrapper implements Runnable {
private final WorkflowThreadContext threadContext;
// TODO: Move MDC injection logic into an interceptor as this context shouldn't be leaked
// to the WorkflowThreadImpl
private final ReplayWorkflowContext replayWorkflowContext;
private String originalName;
private String name;
private final CancellationScopeImpl cancellationScope;
private final List<ContextPropagator> contextPropagators;
private final Map<String, Object> propagatedContexts;
RunnableWrapper(
WorkflowThreadContext threadContext,
ReplayWorkflowContext replayWorkflowContext,
String name,
boolean detached,
CancellationScopeImpl parent,
Runnable runnable,
List<ContextPropagator> contextPropagators,
Map<String, Object> propagatedContexts) {
this.threadContext = threadContext;
this.replayWorkflowContext = replayWorkflowContext;
this.name = name;
this.cancellationScope = new CancellationScopeImpl(detached, runnable, parent);
Preconditions.checkState(
context.getStatus() == Status.CREATED, "threadContext not in CREATED state");
this.contextPropagators = contextPropagators;
this.propagatedContexts = propagatedContexts;
}
@Override
public void run() {
Thread thread = Thread.currentThread();
originalName = thread.getName();
thread.setName(name);
threadContext.initializeCurrentThread(thread);
DeterministicRunnerImpl.setCurrentThreadInternal(WorkflowThreadImpl.this);
MDC.put(LoggerTag.WORKFLOW_ID, replayWorkflowContext.getWorkflowId());
MDC.put(LoggerTag.WORKFLOW_TYPE, replayWorkflowContext.getWorkflowType().getName());
MDC.put(LoggerTag.RUN_ID, replayWorkflowContext.getRunId());
MDC.put(LoggerTag.TASK_QUEUE, replayWorkflowContext.getTaskQueue());
MDC.put(LoggerTag.NAMESPACE, replayWorkflowContext.getNamespace());
// Repopulate the context(s)
ContextThreadLocal.setContextPropagators(this.contextPropagators);
ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts);
try {
// initialYield blocks thread until the first runUntilBlocked is called.
// Otherwise, r starts executing without control of the sync.
threadContext.initialYield();
cancellationScope.run();
} catch (DestroyWorkflowThreadError e) {
if (!threadContext.isDestroyRequested()) {
threadContext.setUnhandledException(e);
}
} catch (Error e) {
threadContext.setUnhandledException(e);
} catch (CanceledFailure e) {
if (!isCancelRequested()) {
threadContext.setUnhandledException(e);
}
if (log.isDebugEnabled()) {
log.debug(String.format("Workflow thread \"%s\" run canceled", name));
}
} catch (Throwable e) {
threadContext.setUnhandledException(e);
} finally {
DeterministicRunnerImpl.setCurrentThreadInternal(null);
threadContext.makeDone();
thread.setName(originalName);
MDC.clear();
}
}
public String getName() {
return name;
}
StackTraceElement[] getStackTrace() {
@Nullable Thread thread = threadContext.getCurrentThread();
if (thread != null) {
return thread.getStackTrace();
}
return new StackTraceElement[0];
}
public void setName(String name) {
this.name = name;
@Nullable Thread thread = threadContext.getCurrentThread();
if (thread != null) {
thread.setName(name);
}
}
}
private static final Logger log = LoggerFactory.getLogger(WorkflowThreadImpl.class);
private final WorkflowThreadExecutor workflowThreadExecutor;
private final WorkflowThreadContext context;
private final WorkflowExecutorCache cache;
private final SyncWorkflowContext syncWorkflowContext;
private final DeterministicRunnerImpl runner;
private final RunnableWrapper task;
private final int priority;
private Future<?> taskFuture;
private final Map<WorkflowThreadLocalInternal<?>, Object> threadLocalMap = new HashMap<>();
WorkflowThreadImpl(
WorkflowThreadExecutor workflowThreadExecutor,
SyncWorkflowContext syncWorkflowContext,
DeterministicRunnerImpl runner,
@Nonnull String name,
int priority,
boolean detached,
CancellationScopeImpl parentCancellationScope,
Runnable runnable,
WorkflowExecutorCache cache,
List<ContextPropagator> contextPropagators,
Map<String, Object> propagatedContexts) {
this.workflowThreadExecutor = workflowThreadExecutor;
this.syncWorkflowContext = Preconditions.checkNotNull(syncWorkflowContext);
this.runner = runner;
this.context = new WorkflowThreadContext(runner.getLock());
this.cache = cache;
this.priority = priority;
this.task =
new RunnableWrapper(
context,
syncWorkflowContext.getReplayContext(),
Preconditions.checkNotNull(name, "Thread name shouldn't be null"),
detached,
parentCancellationScope,
runnable,
contextPropagators,
propagatedContexts);
}
@Override
public void run() {
throw new UnsupportedOperationException("not used");
}
@Override
public boolean isDetached() {
return task.cancellationScope.isDetached();
}
@Override
public void cancel() {
task.cancellationScope.cancel();
}
@Override
public void cancel(String reason) {
task.cancellationScope.cancel(reason);
}
@Override
public String getCancellationReason() {
return task.cancellationScope.getCancellationReason();
}
@Override
public boolean isCancelRequested() {
return task.cancellationScope.isCancelRequested();
}
@Override
public Promise<String> getCancellationRequest() {
return task.cancellationScope.getCancellationRequest();
}
@Override
public void start() {
context.verifyAndStart();
while (true) {
try {
taskFuture = workflowThreadExecutor.submit(task);
return;
} catch (RejectedExecutionException e) {
if (cache != null) {
SyncWorkflowContext workflowContext = getWorkflowContext();
ReplayWorkflowContext context = workflowContext.getReplayContext();
boolean evicted =
cache.evictAnyNotInProcessing(
context.getWorkflowExecution(), workflowContext.getMetricsScope());
if (!evicted) {
// Note here we need to throw error, not exception. Otherwise it will be
// translated to workflow execution exception and instead of failing the
// workflow task we will be failing the workflow.
throw new WorkflowRejectedExecutionError(e);
}
} else {
throw new WorkflowRejectedExecutionError(e);
}
}
}
}
@Override
public boolean isStarted() {
return context.getStatus() != Status.CREATED;
}
@Override
public WorkflowThreadContext getWorkflowThreadContext() {
return context;
}
@Override
public DeterministicRunnerImpl getRunner() {
return runner;
}
@Override
public SyncWorkflowContext getWorkflowContext() {
return syncWorkflowContext;
}
@Override
public void setName(String name) {
task.setName(name);
}
@Override
public String getName() {
return task.getName();
}
@Override
public long getId() {
return hashCode();
}
@Override
public int getPriority() {
return priority;
}
@Override
public boolean runUntilBlocked(long deadlockDetectionTimeoutMs) {
if (taskFuture == null) {
start();
}
return context.runUntilBlocked(deadlockDetectionTimeoutMs);
}
@Override
public NonIdempotentHandle lockDeadlockDetector() {
return context.lockDeadlockDetector();
}
@Override
public boolean isDone() {
return context.isDone();
}
@Override
public Throwable getUnhandledException() {
return context.getUnhandledException();
}
/**
* Evaluates function in the threadContext of the coroutine without unblocking it. Used to get
* current coroutine status, like stack trace.
*
* @param function Parameter is reason for current goroutine blockage.
*/
public void evaluateInCoroutineContext(Functions.Proc1<String> function) {
context.evaluateInCoroutineContext(function);
}
/**
* Interrupt coroutine by throwing DestroyWorkflowThreadError from an await method it is blocked
* on and return underlying Future to be waited on.
*/
@Override
public Future<?> stopNow() {
// Cannot call destroy() on itself
@Nullable Thread thread = context.getCurrentThread();
if (Thread.currentThread().equals(thread)) {
throw new Error("Cannot call destroy on itself: " + thread.getName());
}
context.initiateDestroy();
if (taskFuture == null) {
return getCompletedFuture();
}
return taskFuture;
}
private Future<?> getCompletedFuture() {
CompletableFuture<String> f = new CompletableFuture<>();
f.complete("done");
return f;
}
@Override
public void addStackTrace(StringBuilder result) {
result.append(getName());
@Nullable Thread thread = context.getCurrentThread();
if (thread == null) {
result.append("(NEW)");
return;
}
result
.append(": (BLOCKED on ")
.append(getWorkflowThreadContext().getYieldReason())
.append(")\n");
// These numbers might change if implementation changes.
int omitTop = 5;
int omitBottom = 7;
// TODO it's not a good idea to rely on the name to understand the thread type. Instead of that
// we would better
// assign an explicit thread type enum to the threads. This will be especially important when we
// refactor
// root and workflow-method
// thread names into names that will include workflowId
if (DeterministicRunnerImpl.WORKFLOW_ROOT_THREAD_NAME.equals(getName())) {
// TODO revisit this number
omitBottom = 11;
} else if (getName().startsWith(WorkflowMethodThreadNameStrategy.WORKFLOW_MAIN_THREAD_PREFIX)) {
// TODO revisit this number
omitBottom = 11;
}
StackTraceElement[] stackTrace = thread.getStackTrace();
for (int i = omitTop; i < stackTrace.length - omitBottom; i++) {
StackTraceElement e = stackTrace[i];
if (i == omitTop && "await".equals(e.getMethodName())) continue;
result.append(e);
result.append("\n");
}
}
@Override
public void yield(String reason, Supplier<Boolean> unblockCondition) {
context.yield(reason, unblockCondition);
}
@Override
public void exitThread() {
runner.exit();
throw new DestroyWorkflowThreadError("exit");
}
@Override
public <T> void setThreadLocal(WorkflowThreadLocalInternal<T> key, T value) {
threadLocalMap.put(key, value);
}
/**
* Retrieve data from thread locals. Returns 1. not found (an empty Optional) 2. found but null
* (an Optional of an empty Optional) 3. found and non-null (an Optional of an Optional of a
* value). The type nesting is because Java Optionals cannot understand "Some null" vs "None",
* which is exactly what we need here.
*
* @param key
* @return one of three cases
* @param <T>
*/
@SuppressWarnings("unchecked")
public <T> Optional<Optional<T>> getThreadLocal(WorkflowThreadLocalInternal<T> key) {
if (!threadLocalMap.containsKey(key)) {
return Optional.empty();
}
return Optional.of(Optional.ofNullable((T) threadLocalMap.get(key)));
}
/**
* @return stack trace of the coroutine thread
*/
@Override
public String getStackTrace() {
StackTraceElement[] st = task.getStackTrace();
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
pw.append(task.getName());
pw.append("\n");
for (StackTraceElement se : st) {
pw.println("\tat " + se);
}
return sw.toString();
}
static class YieldWithTimeoutCondition implements Supplier<Boolean> {
private final Supplier<Boolean> unblockCondition;
private final long blockedUntil;
private boolean timedOut;
YieldWithTimeoutCondition(Supplier<Boolean> unblockCondition, long blockedUntil) {
this.unblockCondition = unblockCondition;
this.blockedUntil = blockedUntil;
}
boolean isTimedOut() {
return timedOut;
}
/**
* @return true if condition matched or timed out
*/
@Override
public Boolean get() {
boolean result = unblockCondition.get();
if (result) {
return true;
}
long currentTimeMillis = WorkflowInternal.currentTimeMillis();
timedOut = currentTimeMillis >= blockedUntil;
return timedOut;
}
}
}