Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ ext {
// grpcVersion = '[1.34.0,)!!1.44.1'
// jacksonVersion = '[2.9.0,)!!2.13.1'
// micrometerVersion = '[1.0.0,)!!1.8.3'
grpcVersion = '1.44.1'
grpcVersion = '1.45.0'
jacksonVersion = '2.13.1'
micrometerVersion = '1.8.3'

logbackVersion = '1.2.11'
protoVersion = '[3.10.0,4.0)!!3.19.4'
guavaVersion = '[10.0,)!!31.0.1-jre'
guavaVersion = '[10.0,)!!31.1-jre'
jsonPathVersion = '2.7.0'
mockitoVersion = '4.3.1'
mockitoVersion = '4.4.0'
tallyVersion = '[0.4.0,)!!0.11.1'
gsonVersion = '[2.0,)!!2.9.0'
slf4jVersion = '[1.4.0,)!!1.7.36'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;

import com.uber.m3.tally.NoopScope;
import com.uber.m3.tally.RootScopeBuilder;
Expand All @@ -38,26 +39,27 @@
import io.temporal.api.query.v1.WorkflowQuery;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.common.reporter.TestStatsReporter;
import io.temporal.internal.testservice.TestWorkflowService;
import io.temporal.internal.worker.SingleWorkerOptions;
import io.temporal.internal.worker.WorkflowExecutionException;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.testUtils.HistoryUtils;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.MetricsType;
import io.temporal.worker.WorkflowImplementationOptions;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class ReplayWorkflowRunTaskHandlerCacheTests {

private Scope metricsScope;
private TestStatsReporter reporter;

@Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder().build();

@Before
public void setUp() {
reporter = new TestStatsReporter();
Expand Down Expand Up @@ -88,22 +90,24 @@ public void whenHistoryIsFullNewWorkflowExecutorIsReturnedAndCached_InitiallyEmp

@Test
public void whenHistoryIsFullNewWorkflowExecutorIsReturned_InitiallyCached() throws Exception {
TestWorkflowService testService = new TestWorkflowService();
WorkflowServiceStubs service = testService.newClientStub();
assumeFalse("skipping for docker tests", SDKTestWorkflowRule.useExternalService);

// Arrange
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, new NoopScope());
PollWorkflowTaskQueueResponse workflowTask1 =
HistoryUtils.generateWorkflowTaskWithInitialHistory(
"namespace", "taskQueue", "workflowType", service);
"namespace", "taskQueue", "workflowType", testWorkflowRule.getWorkflowServiceStubs());

WorkflowRunTaskHandler workflowRunTaskHandler =
cache.getOrCreate(workflowTask1, metricsScope, () -> createFakeExecutor(workflowTask1));
cache.addToCache(workflowTask1.getWorkflowExecution(), workflowRunTaskHandler);

PollWorkflowTaskQueueResponse workflowTask2 =
HistoryUtils.generateWorkflowTaskWithPartialHistoryFromExistingTask(
workflowTask1, "namespace", "stickyTaskQueue", service);
workflowTask1,
"namespace",
"stickyTaskQueue",
testWorkflowRule.getWorkflowServiceStubs());

assertEquals(
workflowRunTaskHandler,
Expand All @@ -119,11 +123,12 @@ public void whenHistoryIsFullNewWorkflowExecutorIsReturned_InitiallyCached() thr
workflowRunTaskHandler2,
cache.getOrCreate(workflowTask2, metricsScope, () -> createFakeExecutor(workflowTask2)));
assertSame(workflowRunTaskHandler2, workflowRunTaskHandler);
testService.close();
}

@Test(timeout = 2000)
public void whenHistoryIsPartialCachedEntryIsReturned() throws Exception {
assumeFalse("skipping for docker tests", SDKTestWorkflowRule.useExternalService);

// Arrange
Map<String, String> tags =
new ImmutableMap.Builder<String, String>(2)
Expand All @@ -133,35 +138,30 @@ public void whenHistoryIsPartialCachedEntryIsReturned() throws Exception {
Scope scope = metricsScope.tagged(tags);

WorkflowExecutorCache cache = new WorkflowExecutorCache(10, scope);
TestWorkflowService testService = new TestWorkflowService(true);
WorkflowServiceStubs service = testService.newClientStub();
try {
PollWorkflowTaskQueueResponse workflowTask =
HistoryUtils.generateWorkflowTaskWithInitialHistory(
"namespace", "taskQueue", "workflowType", service);
PollWorkflowTaskQueueResponse workflowTask =
HistoryUtils.generateWorkflowTaskWithInitialHistory(
"namespace", "taskQueue", "workflowType", testWorkflowRule.getWorkflowServiceStubs());

WorkflowRunTaskHandler workflowRunTaskHandler =
cache.getOrCreate(workflowTask, scope, () -> createFakeExecutor(workflowTask));
cache.addToCache(workflowTask.getWorkflowExecution(), workflowRunTaskHandler);
WorkflowRunTaskHandler workflowRunTaskHandler =
cache.getOrCreate(workflowTask, scope, () -> createFakeExecutor(workflowTask));
cache.addToCache(workflowTask.getWorkflowExecution(), workflowRunTaskHandler);

// Act
PollWorkflowTaskQueueResponse workflowTask2 =
HistoryUtils.generateWorkflowTaskWithPartialHistoryFromExistingTask(
workflowTask, "namespace", "stickyTaskQueue", service);
WorkflowRunTaskHandler workflowRunTaskHandler2 =
cache.getOrCreate(workflowTask2, scope, () -> doNotCreateFakeExecutor(workflowTask2));
// Act
PollWorkflowTaskQueueResponse workflowTask2 =
HistoryUtils.generateWorkflowTaskWithPartialHistoryFromExistingTask(
workflowTask,
"namespace",
"stickyTaskQueue",
testWorkflowRule.getWorkflowServiceStubs());
WorkflowRunTaskHandler workflowRunTaskHandler2 =
cache.getOrCreate(workflowTask2, scope, () -> doNotCreateFakeExecutor(workflowTask2));

// Assert
// Wait for reporter
Thread.sleep(100);
reporter.assertCounter(MetricsType.STICKY_CACHE_HIT, tags, 1);
// Assert
// Wait for reporter
Thread.sleep(100);
reporter.assertCounter(MetricsType.STICKY_CACHE_HIT, tags, 1);

assertEquals(workflowRunTaskHandler, workflowRunTaskHandler2);
} finally {
service.shutdownNow();
service.awaitTermination(1, TimeUnit.SECONDS);
testService.close();
}
assertEquals(workflowRunTaskHandler, workflowRunTaskHandler2);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static junit.framework.TestCase.assertNotNull;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -31,38 +32,23 @@
import com.uber.m3.tally.NoopScope;
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.internal.testservice.TestWorkflowService;
import io.temporal.internal.worker.SingleWorkerOptions;
import io.temporal.internal.worker.WorkflowTaskHandler;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.testUtils.HistoryUtils;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class ReplayWorkflowRunTaskHandlerTaskHandlerTests {

private TestWorkflowService testService;
private WorkflowServiceStubs service;

@Before
public void setUp() {
testService = new TestWorkflowService(true);
service = testService.newClientStub();
}

@After
public void tearDown() {
service.shutdownNow();
service.awaitTermination(1, TimeUnit.SECONDS);
testService.close();
}
@Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder().build();

@Test
public void ifStickyExecutionAttributesAreNotSetThenWorkflowsAreNotCached() throws Throwable {
assumeFalse("skipping for docker tests", SDKTestWorkflowRule.useExternalService);

// Arrange
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, new NoopScope());
WorkflowTaskHandler taskHandler =
Expand All @@ -73,7 +59,7 @@ public void ifStickyExecutionAttributesAreNotSetThenWorkflowsAreNotCached() thro
SingleWorkerOptions.newBuilder().build(),
null,
Duration.ofSeconds(5),
service,
testWorkflowRule.getWorkflowServiceStubs(),
null);

// Act
Expand All @@ -88,6 +74,8 @@ public void ifStickyExecutionAttributesAreNotSetThenWorkflowsAreNotCached() thro

@Test
public void ifStickyExecutionAttributesAreSetThenWorkflowsAreCached() throws Throwable {
assumeFalse("skipping for docker tests", SDKTestWorkflowRule.useExternalService);

// Arrange
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, new NoopScope());
WorkflowTaskHandler taskHandler =
Expand All @@ -98,7 +86,7 @@ public void ifStickyExecutionAttributesAreSetThenWorkflowsAreCached() throws Thr
SingleWorkerOptions.newBuilder().build(),
"sticky",
Duration.ofSeconds(5),
service,
testWorkflowRule.getWorkflowServiceStubs(),
null);

PollWorkflowTaskQueueResponse workflowTask =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public void testChildSimulatedTimeout() throws Throwable {
.build();
ListOpenWorkflowExecutionsResponse listResponse =
testEnvironment
.getWorkflowService()
.getWorkflowServiceStubs()
.blockingStub()
.listOpenWorkflowExecutions(listRequest);
List<WorkflowExecutionInfo> executions = listResponse.getExecutionsList();
Expand All @@ -422,7 +422,7 @@ public void testChildSimulatedTimeout() throws Throwable {
.build();
ListClosedWorkflowExecutionsResponse listResponse =
testEnvironment
.getWorkflowService()
.getWorkflowServiceStubs()
.blockingStub()
.listClosedWorkflowExecutions(listRequest);
List<WorkflowExecutionInfo> executions = listResponse.getExecutionsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,11 @@
import io.temporal.api.taskqueue.v1.TaskQueue;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import io.temporal.internal.testservice.TestWorkflowService;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Functions;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,30 +52,18 @@ public class PollWorkflowTaskDispatcherTests {
LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
ch.qos.logback.classic.Logger logger = context.getLogger(Logger.ROOT_LOGGER_NAME);

private TestWorkflowService testService;
private WorkflowServiceStubs service;
private final Scope metricsScope = new NoopScope();

@Before
public void setUp() {
testService = new TestWorkflowService();
service = testService.newClientStub();
}

@After
public void tearDown() {
service.shutdownNow();
service.awaitTermination(1, TimeUnit.SECONDS);
testService.close();
}
@Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder().build();

@Test
public void pollWorkflowTasksAreDispatchedBasedOnTaskQueueName() {
AtomicBoolean handled = new AtomicBoolean(false);
Functions.Proc1<PollWorkflowTaskQueueResponse> handler = r -> handled.set(true);

PollWorkflowTaskDispatcher dispatcher =
new PollWorkflowTaskDispatcher(service, "default", metricsScope);
new PollWorkflowTaskDispatcher(
testWorkflowRule.getWorkflowServiceStubs(), "default", metricsScope);
dispatcher.subscribe("taskqueue1", handler);

PollWorkflowTaskQueueResponse response = CreatePollWorkflowTaskQueueResponse("taskqueue1");
Expand All @@ -97,7 +81,8 @@ public void pollWorkflowTasksAreDispatchedToTheCorrectHandler() {
Functions.Proc1<PollWorkflowTaskQueueResponse> handler2 = r -> handled2.set(true);

PollWorkflowTaskDispatcher dispatcher =
new PollWorkflowTaskDispatcher(service, "default", metricsScope);
new PollWorkflowTaskDispatcher(
testWorkflowRule.getWorkflowServiceStubs(), "default", metricsScope);
dispatcher.subscribe("taskqueue1", handler);
dispatcher.subscribe("taskqueue2", handler2);

Expand All @@ -117,7 +102,8 @@ public void handlersGetOverwrittenWhenRegisteredForTheSameTaskQueue() {
Functions.Proc1<PollWorkflowTaskQueueResponse> handler2 = r -> handled2.set(true);

PollWorkflowTaskDispatcher dispatcher =
new PollWorkflowTaskDispatcher(service, "default", metricsScope);
new PollWorkflowTaskDispatcher(
testWorkflowRule.getWorkflowServiceStubs(), "default", metricsScope);
dispatcher.subscribe("taskqueue1", handler);
dispatcher.subscribe("taskqueue1", handler2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@

import static io.temporal.internal.common.InternalUtils.createNormalTaskQueue;
import static io.temporal.internal.common.InternalUtils.createStickyTaskQueue;
import static io.temporal.testUtils.TestServiceUtils.*;
import static io.temporal.testing.internal.TestServiceUtils.*;

import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
import io.temporal.internal.testservice.TestWorkflowService;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.serviceclient.WorkflowServiceStubsOptions;
import io.temporal.testserver.TestServer;
import java.util.concurrent.TimeUnit;

public class HistoryUtils {
Expand All @@ -38,14 +39,17 @@ private HistoryUtils() {}

public static PollWorkflowTaskQueueResponse generateWorkflowTaskWithInitialHistory()
throws Exception {
TestWorkflowService testService = new TestWorkflowService(true);
WorkflowServiceStubs service = testService.newClientStub();
try {
return generateWorkflowTaskWithInitialHistory(NAMESPACE, TASK_QUEUE, WORKFLOW_TYPE, service);
} finally {
service.shutdownNow();
service.awaitTermination(1, TimeUnit.SECONDS);
testService.close();
try (TestServer.InProcessTestServer server = TestServer.createServer(true)) {
WorkflowServiceStubs workflowServiceStubs =
WorkflowServiceStubs.newInstance(
WorkflowServiceStubsOptions.newBuilder().setChannel(server.getChannel()).build());
try {
return generateWorkflowTaskWithInitialHistory(
NAMESPACE, TASK_QUEUE, WORKFLOW_TYPE, workflowServiceStubs);
} finally {
workflowServiceStubs.shutdownNow();
workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
}
}
}

Expand All @@ -63,17 +67,20 @@ public static PollWorkflowTaskQueueResponse generateWorkflowTaskWithPartialHisto

public static PollWorkflowTaskQueueResponse generateWorkflowTaskWithPartialHistory(
String namespace, String taskqueueName, String workflowType) throws Exception {
TestWorkflowService testService = new TestWorkflowService(true);
WorkflowServiceStubs service = testService.newClientStub();
try {
PollWorkflowTaskQueueResponse response =
generateWorkflowTaskWithInitialHistory(namespace, taskqueueName, workflowType, service);
return generateWorkflowTaskWithPartialHistoryFromExistingTask(
response, namespace, HOST_TASK_QUEUE, service);
} finally {
service.shutdownNow();
service.awaitTermination(1, TimeUnit.SECONDS);
testService.close();
try (TestServer.InProcessTestServer server = TestServer.createServer(true)) {
WorkflowServiceStubs workflowServiceStubs =
WorkflowServiceStubs.newInstance(
WorkflowServiceStubsOptions.newBuilder().setChannel(server.getChannel()).build());
try {
PollWorkflowTaskQueueResponse response =
generateWorkflowTaskWithInitialHistory(
namespace, taskqueueName, workflowType, workflowServiceStubs);
return generateWorkflowTaskWithPartialHistoryFromExistingTask(
response, namespace, HOST_TASK_QUEUE, workflowServiceStubs);
} finally {
workflowServiceStubs.shutdownNow();
workflowServiceStubs.awaitTermination(1, TimeUnit.SECONDS);
}
}
}

Expand Down
Loading