Skip to content

Commit

Permalink
Create the Queue before the test to avoid erractic test behavior
Browse files Browse the repository at this point in the history
Also updates the CamelSinkAWSSQSITCase to use the blocking connector
runtime initialization.

This solves issue apache#215
  • Loading branch information
orpiske committed May 25, 2020
1 parent 396ff85 commit b15f64f
Showing 1 changed file with 13 additions and 1 deletion.
Expand Up @@ -34,6 +34,7 @@
import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.services.aws.AWSService;
import org.apache.camel.kafkaconnector.services.aws.AWSServiceFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand All @@ -60,9 +61,20 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
@BeforeEach
public void setUp() {
awssqsClient = awsService.getClient();

String queueUrl = awssqsClient.getQueue(TestCommon.DEFAULT_SQS_QUEUE);
LOG.debug("Using queue {} for the test", queueUrl);

received = 0;
}

@AfterEach
public void tearDown() {
if (!awssqsClient.deleteQueue(TestCommon.DEFAULT_SQS_QUEUE)) {
fail("Failed to delete queue");
}
}

private boolean checkMessages(List<Message> messages) {
for (Message message : messages) {
LOG.info("Received: {}", message.getBody());
Expand Down Expand Up @@ -103,7 +115,7 @@ private void produceMessages() {

public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);

LOG.debug("Creating the consumer ...");
ExecutorService service = Executors.newCachedThreadPool();
Expand Down

0 comments on commit b15f64f

Please sign in to comment.