diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests-context.xml b/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests-context.xml
index 38b1ec6a637..a090e239e81 100644
--- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests-context.xml
+++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests-context.xml
@@ -23,6 +23,7 @@
channel="fromChannel"
expect-message="true"
auto-startup="false"
+ receive-timeout="10"
serializer="testSerializer"/>
@@ -35,6 +36,7 @@
diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests.java
index 37f0dd6a882..21285795bb4 100644
--- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests.java
+++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2013-2019 the original author or authors.
+ * Copyright 2013-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -30,7 +30,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -43,7 +42,7 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
-import org.springframework.context.Lifecycle;
+import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
@@ -81,7 +80,7 @@
@DirtiesContext
public class RedisQueueMessageDrivenEndpointTests extends RedisAvailableTests {
- public static final String TEST_QUEUE = "testQueue";
+ public static final String TEST_QUEUE = UUID.randomUUID().toString();
@Autowired
private RedisConnectionFactory connectionFactory;
@@ -90,29 +89,29 @@ public class RedisQueueMessageDrivenEndpointTests extends RedisAvailableTests {
private PollableChannel fromChannel;
@Autowired
- private Lifecycle fromChannelEndpoint;
+ private SmartLifecycle fromChannelEndpoint;
@Autowired
private MessageChannel symmetricalInputChannel;
@Autowired
- private Lifecycle symmetricalRedisChannelEndpoint;
+ private SmartLifecycle symmetricalRedisChannelEndpoint;
@Autowired
private PollableChannel symmetricalOutputChannel;
@Before
- @After
public void setUpTearDown() {
RedisTemplate redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(this.connectionFactory);
+ redisTemplate.afterPropertiesSet();
redisTemplate.delete(TEST_QUEUE);
}
@Test
@RedisAvailable
@SuppressWarnings("unchecked")
- public void testInt3014Default() {
+ public void testInt3014Default() throws InterruptedException {
RedisTemplate redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(this.connectionFactory);
redisTemplate.setEnableDefaultSerializer(false);
@@ -147,13 +146,15 @@ public void testInt3014Default() {
assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(payload2);
- endpoint.stop();
+ CountDownLatch stopLatch = new CountDownLatch(1);
+ endpoint.stop(stopLatch::countDown);
+ assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@Test
@RedisAvailable
@SuppressWarnings("unchecked")
- public void testInt3014ExpectMessageTrue() {
+ public void testInt3014ExpectMessageTrue() throws InterruptedException {
RedisTemplate redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(this.connectionFactory);
redisTemplate.setEnableDefaultSerializer(false);
@@ -175,6 +176,7 @@ public void testInt3014ExpectMessageTrue() {
new RedisQueueMessageDrivenEndpoint(TEST_QUEUE, this.connectionFactory);
endpoint.setBeanFactory(Mockito.mock(BeanFactory.class));
endpoint.setExpectMessage(true);
+ endpoint.setSerializer(new JdkSerializationRedisSerializer());
endpoint.setOutputChannel(channel);
endpoint.setErrorChannel(errorChannel);
endpoint.setReceiveTimeout(10);
@@ -195,12 +197,14 @@ public void testInt3014ExpectMessageTrue() {
assertThat(((Exception) receive.getPayload()).getCause().getMessage())
.contains("java.lang.String cannot be cast to org.springframework.messaging.Message");
- endpoint.stop();
+ CountDownLatch stopLatch = new CountDownLatch(1);
+ endpoint.stop(stopLatch::countDown);
+ assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@Test
@RedisAvailable
- public void testInt3017IntegrationInbound() {
+ public void testInt3017IntegrationInbound() throws InterruptedException {
this.fromChannelEndpoint.start();
String payload = new Date().toString();
@@ -214,12 +218,14 @@ public void testInt3017IntegrationInbound() {
Message> receive = this.fromChannel.receive(10000);
assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(payload);
- this.fromChannelEndpoint.stop();
+ CountDownLatch stopLatch = new CountDownLatch(1);
+ this.fromChannelEndpoint.stop(stopLatch::countDown);
+ assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@Test
@RedisAvailable
- public void testInt3017IntegrationSymmetrical() {
+ public void testInt3017IntegrationSymmetrical() throws InterruptedException {
this.symmetricalRedisChannelEndpoint.start();
UUID payload = UUID.randomUUID();
Message message = MessageBuilder.withPayload(payload)
@@ -231,7 +237,10 @@ public void testInt3017IntegrationSymmetrical() {
Message> receive = this.symmetricalOutputChannel.receive(10000);
assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(payload);
- this.symmetricalRedisChannelEndpoint.stop();
+
+ CountDownLatch stopLatch = new CountDownLatch(1);
+ this.symmetricalRedisChannelEndpoint.stop(stopLatch::countDown);
+ assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@Test
@@ -337,13 +346,15 @@ public void testInt3196Recovery() throws Exception {
assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(payload);
- endpoint.stop();
+ CountDownLatch stopLatch = new CountDownLatch(1);
+ endpoint.stop(stopLatch::countDown);
+ assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
@Test
@RedisAvailable
@SuppressWarnings("unchecked")
- public void testInt3932ReadFromLeft() {
+ public void testInt3932ReadFromLeft() throws InterruptedException {
RedisTemplate redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(this.connectionFactory);
redisTemplate.setEnableDefaultSerializer(false);
@@ -379,7 +390,9 @@ public void testInt3932ReadFromLeft() {
assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(payload2);
- endpoint.stop();
+ CountDownLatch stopLatch = new CountDownLatch(1);
+ endpoint.stop(stopLatch::countDown);
+ assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
}
private void waitListening(RedisQueueMessageDrivenEndpoint endpoint) throws InterruptedException {
diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/rules/RedisAvailableRule.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/rules/RedisAvailableRule.java
index 3076d30c644..294847f122c 100644
--- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/rules/RedisAvailableRule.java
+++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/rules/RedisAvailableRule.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -78,11 +78,11 @@ public void evaluate() throws Throwable {
if (connectionFactory != null) {
try {
connectionFactory.getConnection();
- base.evaluate();
}
catch (Exception e) {
Assume.assumeTrue("Skipping test due to Redis not being available on port: " + REDIS_PORT + ": " + e, false);
}
+ base.evaluate();
}
}
}