Skip to content

Commit

Permalink
[JBPM-9567] Stabilize Kafka Consumer/Producer tests (#2374)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmunozfe committed Jan 23, 2021
1 parent 4ae7b1d commit 0c40c76
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 13 deletions.
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;

import java.util.Optional;

Expand Down Expand Up @@ -60,7 +61,7 @@ public class KafkaEmitterBrokerDownTest extends KafkaFixture {

@BeforeClass
public static void beforeClass() {
checkRightOSForTestContainers();
assumeTrue(isDockerAvailable());
generalSetup(false);
System.setProperty("org.kie.jbpm.event.emitters.kafka.max.block.ms", "500");
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import static org.kie.api.runtime.process.ProcessInstance.STATE_ABORTED;
import static org.kie.api.runtime.process.ProcessInstance.STATE_ACTIVE;
import static org.kie.api.task.model.Status.Completed;
Expand Down Expand Up @@ -87,7 +88,7 @@ public class KafkaEmitterHappyPathTest extends KafkaFixture {

@BeforeClass
public static void beforeClass() {
checkRightOSForTestContainers();
assumeTrue(isDockerAvailable());
kafka.start();
bootstrapServers = kafka.getBootstrapServers();
generalSetup(true);
Expand Down
Expand Up @@ -33,7 +33,6 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;

import java.io.IOException;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -65,6 +64,7 @@
import org.kie.server.api.model.cases.CaseFile;
import org.kie.server.springboot.utils.KieJarBuildHelper;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
Expand Down Expand Up @@ -115,10 +115,13 @@ public class KafkaFixture {

protected KModuleDeploymentUnit unit = null;

public static void checkRightOSForTestContainers() {
// Currently testcontainers are not supported out-of-the-box on Windows and RHEL8
assumeTrue(!System.getProperty("os.name").toLowerCase().contains("win")
&& !System.getProperty("os.version").toLowerCase().contains("el8"));
public static boolean isDockerAvailable() {
try {
DockerClientFactory.instance().client();
return true;
} catch (Throwable ex) {
return false;
}
}

public static void generalSetup(boolean configure) {
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import static org.kie.api.runtime.process.ProcessInstance.STATE_ABORTED;
import static org.kie.api.runtime.process.ProcessInstance.STATE_ACTIVE;

Expand Down Expand Up @@ -71,7 +72,7 @@ public class ProxyAwareKafkaEmitterTest extends KafkaFixture{

@BeforeClass
public static void beforeClass() {
checkRightOSForTestContainers();
assumeTrue(isDockerAvailable());
System.setProperty("org.kie.jbpm.event.emitters.kafka.topic.processes", CUSTOM_PROCESSES_TOPIC);
toxiproxy.start();
kafkaProxy = toxiproxy.getProxy(kafka, TOXY_PROXY_PORT);
Expand Down
Expand Up @@ -144,6 +144,7 @@ public void setup() {

@After
public void cleanup() {
abortAllProcesses(runtimeDataService, processService);
cleanup(deploymentService, unit);
}

Expand Down
Expand Up @@ -32,12 +32,14 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
import static org.kie.api.runtime.process.ProcessInstance.STATE_ACTIVE;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -64,9 +66,13 @@
import org.jbpm.kie.services.impl.KModuleDeploymentUnit;
import org.jbpm.runtime.manager.impl.jpa.EntityManagerFactoryManager;
import org.jbpm.services.api.DeploymentService;
import org.jbpm.services.api.ProcessService;
import org.jbpm.services.api.RuntimeDataService;
import org.jbpm.services.api.model.ProcessInstanceDesc;
import org.kie.server.services.jbpm.kafka.KafkaServerExtension;
import org.kie.server.springboot.samples.utils.KieJarBuildHelper;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.KafkaContainer;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -146,9 +152,8 @@ public abstract class KafkaFixture {
protected static Properties props = new Properties();

public static void generalSetup() {
// Currently testcontainers are not supported out-of-the-box on Windows and RHEL8
assumeTrue(!System.getProperty("os.name").toLowerCase().contains("win")
&& !System.getProperty("os.version").toLowerCase().contains("el8"));
// Currently, Docker is needed for testcontainers
assumeTrue(isDockerAvailable());

//for the transactional tests
kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
Expand Down Expand Up @@ -195,13 +200,25 @@ public KModuleDeploymentUnit setup(DeploymentService ds, String artifactId) {
}

protected void cleanup(DeploymentService ds, KModuleDeploymentUnit unit) {
if (ds!=null) {
if (ds!=null && unit!=null) {
ds.undeploy(unit);
}
System.clearProperty(MESSAGE_MAPPING_PROPERTY);
System.clearProperty(SIGNAL_MAPPING_PROPERTY);
}

protected void abortAllProcesses(RuntimeDataService runtimeDataService, ProcessService processService) {
if (runtimeDataService == null || processService == null) {
return;
}
Collection<ProcessInstanceDesc> activeInstances = runtimeDataService.getProcessInstances(singletonList(STATE_ACTIVE), null, null);
if (activeInstances != null) {
for (ProcessInstanceDesc instance : activeInstances) {
processService.abortProcessInstance(instance.getDeploymentId(), instance.getId());
}
}
}

protected void waitForConsumerGroupToBeReady() {
await().atMost(3, SECONDS).pollDelay(1, SECONDS).until(() -> !listConsumerGroups().isEmpty());
}
Expand Down Expand Up @@ -330,4 +347,13 @@ protected Properties consumerProperties() {
props.setProperty(AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}

private static boolean isDockerAvailable() {
try {
DockerClientFactory.instance().client();
return true;
} catch (Throwable ex) {
return false;
}
}
}
Expand Up @@ -143,6 +143,7 @@ public void setupRestClient() {

@After
public void cleanup() {
abortAllProcesses(runtimeDataService, processService);
cleanup(deploymentService, unit);
if (kieServicesClient != null) {
kieServicesClient.disposeContainer(SEND_PROJECT);
Expand Down
Expand Up @@ -5,7 +5,8 @@
<encoder>
<!-- %l lowers performance -->
<!--<pattern>%d [%t] %-5p %l%n %m%n</pattern>-->
<pattern>%d [%t] %-5p %m%n</pattern>
<!-- limiting maximum size to 3000 per KafkaProducerHappyPathTest#testEndMessageRecordTooLargeException -->
<pattern>%d [%t] %-5p %.-3000m%n</pattern>
</encoder>
</appender>

Expand Down

0 comments on commit 0c40c76

Please sign in to comment.