Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@
import io.temporal.failure.FailureConverter;
import io.temporal.internal.common.ProtobufTimeUtils;
import io.temporal.internal.common.WorkflowExecutionUtils;
import io.temporal.internal.worker.LocalActivityTask;
import io.temporal.internal.worker.SingleWorkerOptions;
import io.temporal.internal.worker.WorkflowExecutionException;
import io.temporal.internal.worker.WorkflowTaskHandler;
import io.temporal.internal.worker.*;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.NonDeterministicException;
Expand Down Expand Up @@ -175,9 +172,7 @@ private Result handleWorkflowTaskWithQuery(
return failureToWFTResult(workflowTask, e);
}
} finally {
if (useCache) {
cache.markProcessingDone(execution);
} else if (workflowRunTaskHandler != null) {
if (!useCache && workflowRunTaskHandler != null) {
// we close the execution in finally only if we don't use cache, otherwise it stays open
workflowRunTaskHandler.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

package io.temporal.internal.sync;

import io.temporal.internal.replay.WorkflowExecutorCache;
import io.temporal.internal.worker.WorkflowExecutorCache;
import io.temporal.workflow.CancellationScope;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.temporal.common.context.ContextPropagator;
import io.temporal.internal.WorkflowThreadMarker;
import io.temporal.internal.context.ContextThreadLocal;
import io.temporal.internal.replay.WorkflowExecutorCache;
import io.temporal.internal.worker.WorkflowExecutorCache;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.workflow.Promise;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import io.temporal.failure.CanceledFailure;
import io.temporal.internal.replay.ReplayWorkflow;
import io.temporal.internal.replay.ReplayWorkflowFactory;
import io.temporal.internal.replay.WorkflowExecutorCache;
import io.temporal.internal.worker.SingleWorkerOptions;
import io.temporal.internal.worker.WorkflowExecutionException;
import io.temporal.internal.worker.WorkflowExecutorCache;
import io.temporal.worker.TypeAlreadyRegisteredException;
import io.temporal.worker.WorkflowImplementationOptions;
import io.temporal.workflow.DynamicWorkflow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import io.temporal.internal.replay.ReplayWorkflow;
import io.temporal.internal.replay.ReplayWorkflowContext;
import io.temporal.internal.replay.WorkflowContext;
import io.temporal.internal.replay.WorkflowExecutorCache;
import io.temporal.internal.worker.WorkflowExecutorCache;
import io.temporal.worker.WorkflowImplementationOptions;
import java.util.List;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.temporal.internal.context.ContextThreadLocal;
import io.temporal.internal.logging.LoggerTag;
import io.temporal.internal.replay.ReplayWorkflowContext;
import io.temporal.internal.replay.WorkflowExecutorCache;
import io.temporal.internal.worker.WorkflowExecutorCache;
import io.temporal.workflow.Functions;
import io.temporal.workflow.Promise;
import java.io.PrintWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.temporal.internal.activity.LocalActivityExecutionContextFactoryImpl;
import io.temporal.internal.common.WorkflowExecutionHistory;
import io.temporal.internal.replay.ReplayWorkflowTaskHandler;
import io.temporal.internal.replay.WorkflowExecutorCache;
import io.temporal.internal.sync.POJOWorkflowImplementationFactory;
import io.temporal.internal.sync.WorkflowThreadExecutor;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down Expand Up @@ -73,6 +72,7 @@ public SyncWorkflowWorker(
String taskQueue,
SingleWorkerOptions singleWorkerOptions,
SingleWorkerOptions localActivityOptions,
@Nonnull WorkflowRunLockManager runLocks,
@Nonnull WorkflowExecutorCache cache,
String stickyTaskQueueName,
WorkflowThreadExecutor workflowThreadExecutor,
Expand Down Expand Up @@ -119,6 +119,7 @@ public SyncWorkflowWorker(
taskQueue,
stickyTaskQueueName,
singleWorkerOptions,
runLocks,
cache,
taskHandler,
eagerActivityDispatcher);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* limitations under the License.
*/

package io.temporal.internal.replay;
package io.temporal.internal.worker;

import static io.temporal.internal.common.WorkflowExecutionUtils.isFullHistory;

Expand All @@ -29,33 +29,33 @@
import com.uber.m3.tally.Scope;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
import io.temporal.internal.replay.WorkflowRunTaskHandler;
import io.temporal.worker.MetricsType;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class WorkflowExecutorCache {
private final Logger log = LoggerFactory.getLogger(WorkflowExecutorCache.class);

private final Scope metricsScope;
private final WorkflowRunLockManager runLockManager;
private final LoadingCache<String, WorkflowRunTaskHandler> cache;
private final Lock cacheLock = new ReentrantLock();
private final Set<String> inProcessing = new HashSet<>();
private final Scope metricsScope;

public WorkflowExecutorCache(int workflowCacheSize, Scope scope) {
public WorkflowExecutorCache(
int workflowCacheSize, WorkflowRunLockManager runLockManager, Scope scope) {
Preconditions.checkArgument(workflowCacheSize > 0, "Max cache size must be greater than 0");
this.metricsScope = Objects.requireNonNull(scope);
this.runLockManager = runLockManager;
this.cache =
CacheBuilder.newBuilder()
.maximumSize(workflowCacheSize)
// TODO this number is taken out of the blue.
// This number should be calculated based on the number of all workers workflow task
// processors.
.concurrencyLevel(128)
.removalListener(
e -> {
WorkflowRunTaskHandler entry = (WorkflowRunTaskHandler) e.getValue();
Expand All @@ -80,6 +80,7 @@ public WorkflowRunTaskHandler load(String key) {
return null;
}
});
this.metricsScope = Objects.requireNonNull(scope);
this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
}

Expand Down Expand Up @@ -114,13 +115,11 @@ public WorkflowRunTaskHandler getOrCreate(
return workflowExecutorFn.call();
}

private WorkflowRunTaskHandler getForProcessing(
public WorkflowRunTaskHandler getForProcessing(
WorkflowExecution workflowExecution, Scope metricsScope) throws ExecutionException {
String runId = workflowExecution.getRunId();
cacheLock.lock();
try {
WorkflowRunTaskHandler workflowRunTaskHandler = cache.get(runId);
inProcessing.add(runId);
log.trace(
"Workflow Execution {}-{} has been marked as in-progress",
workflowExecution.getWorkflowId(),
Expand All @@ -131,21 +130,6 @@ private WorkflowRunTaskHandler getForProcessing(
// We don't have a default loader and don't want to have one. So it's ok to get null value.
metricsScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1);
return null;
} finally {
cacheLock.unlock();
}
}

void markProcessingDone(WorkflowExecution workflowExecution) {
cacheLock.lock();
try {
inProcessing.remove(workflowExecution.getRunId());
log.trace(
"Workflow Execution {}-{} has been marked as not in-progress",
workflowExecution.getWorkflowId(),
workflowExecution.getRunId());
} finally {
cacheLock.unlock();
}
}

Expand All @@ -160,37 +144,39 @@ public void addToCache(
}

public boolean evictAnyNotInProcessing(WorkflowExecution inFavorOfExecution, Scope metricsScope) {
cacheLock.lock();
try {
String inFavorOfRunId = inFavorOfExecution.getRunId();
for (String key : cache.asMap().keySet()) {
if (!key.equals(inFavorOfRunId) && !inProcessing.contains(key)) {
log.trace(
"Workflow Execution {}-{} caused eviction of Workflow Execution with runId {}",
inFavorOfExecution.getWorkflowId(),
inFavorOfRunId,
key);
cache.invalidate(key);
metricsScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
return true;
if (key.equals(inFavorOfRunId)) continue;
boolean locked = runLockManager.tryLock(key);
// if we were able to take a lock here, it means that the workflow is not in processing
// currently on workers of this WorkerFactory and can be evicted
if (locked) {
try {
log.trace(
"Workflow Execution {}-{} caused eviction of Workflow Execution with runId {}",
inFavorOfExecution.getWorkflowId(),
inFavorOfRunId,
key);
cache.invalidate(key);
metricsScope.counter(MetricsType.STICKY_CACHE_THREAD_FORCED_EVICTION).inc(1);
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
return true;
} finally {
runLockManager.unlock(key);
}
}
}

log.trace(
"Failed to evict from Workflow Execution cache, cache size is {}, inProcessing collection size is {}",
cache.size(),
inProcessing.size());
log.trace("Failed to evict from Workflow Execution cache, cache size is {}", cache.size());
return false;
} finally {
cacheLock.unlock();
this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
}
}

public void invalidate(
WorkflowExecution execution, Scope workflowTypeScope, String reason, Throwable cause) {
cacheLock.lock();
try {
String runId = execution.getRunId();
if (log.isTraceEnabled()) {
Expand All @@ -203,10 +189,8 @@ public void invalidate(
cause);
}
cache.invalidate(runId);
inProcessing.remove(runId);
workflowTypeScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
} finally {
cacheLock.unlock();
this.metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

final class WorkflowRunLockManager {
public final class WorkflowRunLockManager {
private final Map<String, RefCountedLock> runIdLock = new ConcurrentHashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know not changed in this PR, but would change to type ConcurrentMap for documentation/expectation purposes. Confused me at first read.

In fact, after review, maybe the documented type for this should be ConcurrentHashMap or at least document the expectations around compute, because Map.compute isn't thread-safe obviously and ConcurrentMap.compute can run the closure multiple times (which is bad for us because we have a side-effecting closure there). But ConcurrentHashMap.compute seems to guarantee a single execution.


public boolean tryLock(String runId, long timeout, TimeUnit unit) throws InterruptedException {
RefCountedLock runLock =
runIdLock.compute(
runId,
(id, lock) -> {
if (lock == null) {
lock = new RefCountedLock();
}
lock.refCount++;
return lock;
});
RefCountedLock runLock = obtainLock(runId);

boolean obtained = false;
try {
Expand All @@ -53,10 +44,36 @@ public boolean tryLock(String runId, long timeout, TimeUnit unit) throws Interru
}
}

public boolean tryLock(String runId) {
RefCountedLock runLock = obtainLock(runId);

boolean obtained = false;
try {
obtained = runLock.lock.tryLock();
return obtained;
} finally {
if (!obtained) {
derefAndUnlock(runId, false);
}
}
}

public void unlock(String runId) {
derefAndUnlock(runId, true);
}

private RefCountedLock obtainLock(String runId) {
return runIdLock.compute(
runId,
(id, lock) -> {
if (lock == null) {
lock = new RefCountedLock();
}
lock.refCount++;
return lock;
});
}

private void derefAndUnlock(String runId, boolean unlock) {
runIdLock.compute(
runId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflowservice.v1.*;
import io.temporal.internal.logging.LoggerTag;
import io.temporal.internal.replay.WorkflowExecutorCache;
import io.temporal.internal.retryer.GrpcRetryer;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.RpcRetryOptions;
Expand All @@ -51,7 +50,7 @@
final class WorkflowWorker implements SuspendableWorker {
private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class);

private final WorkflowRunLockManager runLocks = new WorkflowRunLockManager();
private final WorkflowRunLockManager runLocks;

private final WorkflowServiceStubs service;
private final String namespace;
Expand All @@ -75,6 +74,7 @@ public WorkflowWorker(
@Nonnull String taskQueue,
@Nullable String stickyTaskQueueName,
@Nonnull SingleWorkerOptions options,
@Nonnull WorkflowRunLockManager runLocks,
@Nonnull WorkflowExecutorCache cache,
@Nonnull WorkflowTaskHandler handler,
@Nonnull EagerActivityDispatcher eagerActivityDispatcher) {
Expand All @@ -86,6 +86,7 @@ public WorkflowWorker(
this.pollerOptions = getPollerOptions(options);
this.workerMetricsScope =
MetricsTag.tagged(options.getMetricsScope(), WorkerMetricsTag.WorkerType.WORKFLOW_WORKER);
this.runLocks = Objects.requireNonNull(runLocks);
this.cache = Objects.requireNonNull(cache);
this.handler = Objects.requireNonNull(handler);
this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
Expand Down Expand Up @@ -217,18 +218,16 @@ public void handle(WorkflowTask task) throws Exception {
//
// Throws interrupted exception which is propagated. It's a correct way to handle it here.
//
// TODO 1: "1 second" timeout looks potentially too strict, especially if we are talking
// about workflow tasks with local activities.
// This could lead to unexpected fail of queries and reset of sticky queue.
// Should it be connected to a workflow task timeout / query timeout?
// TODO 1: 5 seconds is chosen as a half of normal workflow task timeout.
// This value should be dynamically configured.
// TODO 2: Does "consider increasing workflow task timeout" advice in this exception makes
// any sense?
// This MAYBE makes sense only if a previous workflow task timed out, it's still in
// progress on the worker and the next workflow task got picked up by the same exact
// worker from the general non-sticky task queue.
// Even in this case, this advice looks misleading, something else is going on
// (like an extreme network latency).
locked = runLocks.tryLock(runId, 1, TimeUnit.SECONDS);
locked = runLocks.tryLock(runId, 5, TimeUnit.SECONDS);

if (!locked) {
throw new UnableToAcquireLockException(
Expand Down
Loading