Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
import io.temporal.client.WorkflowFailedException;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.EncodedValues;
Expand Down Expand Up @@ -865,4 +866,10 @@ public static WorkflowExecutionHistory readHistory(File historyFile) throws IOEx
return WorkflowExecutionHistory.fromJson(jsonHistory);
}
}

public static boolean isFullHistory(PollWorkflowTaskQueueResponseOrBuilder workflowTask) {
return workflowTask.getHistory() != null
&& workflowTask.getHistory().getEventsCount() > 0
&& workflowTask.getHistory().getEvents(0).getEventId() == 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.temporal.internal.replay;

import static io.temporal.internal.common.InternalUtils.createStickyTaskQueue;
import static io.temporal.internal.common.WorkflowExecutionUtils.isFullHistory;
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;

import com.uber.m3.tally.Scope;
Expand All @@ -38,6 +39,7 @@
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
import io.temporal.api.workflowservice.v1.ResetStickyTaskQueueRequest;
import io.temporal.api.workflowservice.v1.RespondQueryTaskCompletedRequest;
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskCompletedRequest;
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskFailedRequest;
Expand Down Expand Up @@ -216,6 +218,11 @@ private Result handleWorkflowTaskWithEmbeddedQuery(

if (stickyTaskQueueName != null) {
cache.invalidate(execution, metricsScope);
// If history if full and exception occurred then sticky session hasn't been established
// yet and we can avoid doing a reset.
if (!isFullHistory(workflowTask)) {
resetStickyTaskList(execution);
}
}
throw e;
} finally {
Expand All @@ -227,6 +234,16 @@ private Result handleWorkflowTaskWithEmbeddedQuery(
}
}

private void resetStickyTaskList(WorkflowExecution execution) {
service
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment to explain why it is async

.futureStub()
.resetStickyTaskQueue(
ResetStickyTaskQueueRequest.newBuilder()
.setNamespace(namespace)
.setExecution(execution)
.build());
}

private Result handleQueryOnlyWorkflowTask(
PollWorkflowTaskQueueResponse.Builder workflowTask, Scope metricsScope) {
RespondQueryTaskCompletedRequest.Builder queryCompletedRequest =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

package io.temporal.internal.replay;

import static io.temporal.internal.common.WorkflowExecutionUtils.isFullHistory;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.uber.m3.tally.Scope;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
import io.temporal.api.workflowservice.v1.ResetStickyTaskQueueRequest;
import io.temporal.internal.metrics.MetricsType;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
Expand All @@ -38,17 +38,12 @@
import java.util.concurrent.locks.ReentrantLock;

public final class WorkflowExecutorCache {
private final WorkflowServiceStubs service;
private final String namespace;
private final Scope metricsScope;
private final LoadingCache<String, WorkflowRunTaskHandler> cache;
private final Lock cacheLock = new ReentrantLock();
private final Set<String> inProcessing = new HashSet<>();

public WorkflowExecutorCache(
WorkflowServiceStubs service, String namespace, int workflowCacheSize, Scope scope) {
this.service = service;
this.namespace = namespace;
public WorkflowExecutorCache(int workflowCacheSize, Scope scope) {
Preconditions.checkArgument(workflowCacheSize > 0, "Max cache size must be greater than 0");
this.metricsScope = Objects.requireNonNull(scope);
this.cache =
Expand Down Expand Up @@ -145,16 +140,6 @@ void invalidate(WorkflowExecution execution, Scope metricsScope) {
cache.invalidate(runId);
inProcessing.remove(runId);
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
if (service != null) {
// Execute asynchronously
service
.futureStub()
.resetStickyTaskQueue(
ResetStickyTaskQueueRequest.newBuilder()
.setNamespace(namespace)
.setExecution(execution)
.build());
}
} finally {
cacheLock.unlock();
}
Expand All @@ -164,12 +149,6 @@ public long size() {
return cache.size();
}

private boolean isFullHistory(PollWorkflowTaskQueueResponseOrBuilder workflowTask) {
return workflowTask.getHistory() != null
&& workflowTask.getHistory().getEventsCount() > 0
&& workflowTask.getHistory().getEvents(0).getEventId() == 1;
}

public void invalidateAll() {
cache.invalidateAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,7 @@ private WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factor
.tagged(MetricsTag.defaultTags(workflowClient.getOptions().getNamespace()));

this.cache =
new WorkflowExecutorCache(
this.workflowClient.getWorkflowServiceStubs(),
workflowClient.getOptions().getNamespace(),
this.factoryOptions.getWorkflowCacheSize(),
metricsScope);
new WorkflowExecutorCache(this.factoryOptions.getWorkflowCacheSize(), metricsScope);
Scope stickyScope =
metricsScope.tagged(
new ImmutableMap.Builder<String, String>(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void setUp() {
public void whenHistoryIsFullNewWorkflowExecutorIsReturnedAndCached_InitiallyEmpty()
throws Exception {
// Arrange
WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 10, new NoopScope());
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, new NoopScope());
PollWorkflowTaskQueueResponse workflowTask =
HistoryUtils.generateWorkflowTaskWithInitialHistory();

Expand All @@ -92,7 +92,7 @@ public void whenHistoryIsFullNewWorkflowExecutorIsReturned_InitiallyCached() thr
WorkflowServiceStubs service = testService.newClientStub();

// Arrange
WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 10, new NoopScope());
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, new NoopScope());
PollWorkflowTaskQueueResponse workflowTask1 =
HistoryUtils.generateWorkflowTaskWithInitialHistory(
"namespace", "taskQueue", "workflowType", service);
Expand Down Expand Up @@ -137,7 +137,7 @@ public void whenHistoryIsPartialCachedEntryIsReturned() throws Exception {
.build();
Scope scope = metricsScope.tagged(tags);

WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 10, scope);
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, scope);
TestWorkflowService testService = new TestWorkflowService(true);
WorkflowServiceStubs service = testService.newClientStub();
try {
Expand Down Expand Up @@ -178,7 +178,7 @@ public void whenHistoryIsPartialAndCacheIsEmptyThenExceptionIsThrown() throws Ex
.put(MetricsTag.TASK_QUEUE, "stickyTaskQueue")
.build();
Scope scope = metricsScope.tagged(tags);
WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 10, scope);
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, scope);

// Act
PollWorkflowTaskQueueResponse workflowTask =
Expand Down Expand Up @@ -207,7 +207,7 @@ public void evictAnyWillInvalidateAnEntryRandomlyFromTheCache() throws Exception
Scope scope = metricsScope.tagged(tags);

// Arrange
WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 50, scope);
WorkflowExecutorCache cache = new WorkflowExecutorCache(50, scope);
PollWorkflowTaskQueueResponse workflowTask1 =
HistoryUtils.generateWorkflowTaskWithInitialHistory();
PollWorkflowTaskQueueResponse workflowTask2 =
Expand Down Expand Up @@ -242,7 +242,7 @@ public void evictAnyWillInvalidateAnEntryRandomlyFromTheCache() throws Exception
@Test
public void evictAnyWillNotInvalidateItself() throws Exception {
// Arrange
WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 50, new NoopScope());
WorkflowExecutorCache cache = new WorkflowExecutorCache(50, new NoopScope());
PollWorkflowTaskQueueResponse workflowTask1 =
HistoryUtils.generateWorkflowTaskWithInitialHistory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ public void tearDown() {
@Test
public void ifStickyExecutionAttributesAreNotSetThenWorkflowsAreNotCached() throws Throwable {
// Arrange
WorkflowExecutorCache cache =
new WorkflowExecutorCache(service, "default", 10, new NoopScope());
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, new NoopScope());
WorkflowTaskHandler taskHandler =
new ReplayWorkflowTaskHandler(
"namespace",
Expand All @@ -95,8 +94,7 @@ public void ifStickyExecutionAttributesAreNotSetThenWorkflowsAreNotCached() thro
@Test
public void ifStickyExecutionAttributesAreSetThenWorkflowsAreCached() throws Throwable {
// Arrange
WorkflowExecutorCache cache =
new WorkflowExecutorCache(service, "default", 10, new NoopScope());
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, new NoopScope());
WorkflowTaskHandler taskHandler =
new ReplayWorkflowTaskHandler(
"namespace",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ public void workflowThreadsWillEvictCacheWhenMaxThreadCountIsHit() throws Throwa
new ThreadPoolExecutor(1, 3, 1, TimeUnit.SECONDS, new SynchronousQueue<>());
AtomicReference<String> status = new AtomicReference<>();

WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 3, scope);
WorkflowExecutorCache cache = new WorkflowExecutorCache(3, scope);
ReplayWorkflowContext replayWorkflowContext = mock(ReplayWorkflowContext.class);
when(replayWorkflowContext.getMetricsScope()).thenReturn(scope);
when(replayWorkflowContext.getWorkflowExecution())
Expand Down Expand Up @@ -706,7 +706,7 @@ public void workflowThreadsWillNotEvictCacheWhenMaxThreadCountIsHit() throws Thr
new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, new SynchronousQueue<>());
AtomicReference<String> status = new AtomicReference<>();

WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 3, new NoopScope());
WorkflowExecutorCache cache = new WorkflowExecutorCache(3, new NoopScope());

DeterministicRunnerImpl d =
new DeterministicRunnerImpl(
Expand Down