From 94956859a27674ce5d00750d8c3011fbad743127 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 23 Jun 2020 16:27:45 -0400 Subject: [PATCH] Fix `RedisAvailableRule` for the proper evaluate * Fix `RedisQueueMessageDrivenEndpointTests` for missed options and for proper stop when the polling from the test queue is over --- ...ueueMessageDrivenEndpointTests-context.xml | 2 + .../RedisQueueMessageDrivenEndpointTests.java | 49 ++++++++++++------- .../redis/rules/RedisAvailableRule.java | 4 +- 3 files changed, 35 insertions(+), 20 deletions(-) diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests-context.xml b/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests-context.xml index 38b1ec6a637..a090e239e81 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests-context.xml +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests-context.xml @@ -23,6 +23,7 @@ channel="fromChannel" expect-message="true" auto-startup="false" + receive-timeout="10" serializer="testSerializer"/> @@ -35,6 +36,7 @@ diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests.java index 37f0dd6a882..21285795bb4 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpointTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2019 the original author or authors. + * Copyright 2013-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.junit.After; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -43,7 +42,7 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEvent; -import org.springframework.context.Lifecycle; +import org.springframework.context.SmartLifecycle; import org.springframework.data.redis.RedisConnectionFailureException; import org.springframework.data.redis.RedisSystemException; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -81,7 +80,7 @@ @DirtiesContext public class RedisQueueMessageDrivenEndpointTests extends RedisAvailableTests { - public static final String TEST_QUEUE = "testQueue"; + public static final String TEST_QUEUE = UUID.randomUUID().toString(); @Autowired private RedisConnectionFactory connectionFactory; @@ -90,29 +89,29 @@ public class RedisQueueMessageDrivenEndpointTests extends RedisAvailableTests { private PollableChannel fromChannel; @Autowired - private Lifecycle fromChannelEndpoint; + private SmartLifecycle fromChannelEndpoint; @Autowired private MessageChannel symmetricalInputChannel; @Autowired - private Lifecycle symmetricalRedisChannelEndpoint; + private SmartLifecycle symmetricalRedisChannelEndpoint; @Autowired private PollableChannel symmetricalOutputChannel; @Before - @After public void setUpTearDown() { RedisTemplate redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(this.connectionFactory); + redisTemplate.afterPropertiesSet(); redisTemplate.delete(TEST_QUEUE); } @Test @RedisAvailable @SuppressWarnings("unchecked") - public void testInt3014Default() { + public void testInt3014Default() throws InterruptedException { RedisTemplate redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(this.connectionFactory); redisTemplate.setEnableDefaultSerializer(false); @@ -147,13 +146,15 @@ public void testInt3014Default() { assertThat(receive).isNotNull(); assertThat(receive.getPayload()).isEqualTo(payload2); - endpoint.stop(); + CountDownLatch stopLatch = new CountDownLatch(1); + endpoint.stop(stopLatch::countDown); + assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @Test @RedisAvailable @SuppressWarnings("unchecked") - public void testInt3014ExpectMessageTrue() { + public void testInt3014ExpectMessageTrue() throws InterruptedException { RedisTemplate redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(this.connectionFactory); redisTemplate.setEnableDefaultSerializer(false); @@ -175,6 +176,7 @@ public void testInt3014ExpectMessageTrue() { new RedisQueueMessageDrivenEndpoint(TEST_QUEUE, this.connectionFactory); endpoint.setBeanFactory(Mockito.mock(BeanFactory.class)); endpoint.setExpectMessage(true); + endpoint.setSerializer(new JdkSerializationRedisSerializer()); endpoint.setOutputChannel(channel); endpoint.setErrorChannel(errorChannel); endpoint.setReceiveTimeout(10); @@ -195,12 +197,14 @@ public void testInt3014ExpectMessageTrue() { assertThat(((Exception) receive.getPayload()).getCause().getMessage()) .contains("java.lang.String cannot be cast to org.springframework.messaging.Message"); - endpoint.stop(); + CountDownLatch stopLatch = new CountDownLatch(1); + endpoint.stop(stopLatch::countDown); + assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @Test @RedisAvailable - public void testInt3017IntegrationInbound() { + public void testInt3017IntegrationInbound() throws InterruptedException { this.fromChannelEndpoint.start(); String payload = new Date().toString(); @@ -214,12 +218,14 @@ public void testInt3017IntegrationInbound() { Message receive = this.fromChannel.receive(10000); assertThat(receive).isNotNull(); assertThat(receive.getPayload()).isEqualTo(payload); - this.fromChannelEndpoint.stop(); + CountDownLatch stopLatch = new CountDownLatch(1); + this.fromChannelEndpoint.stop(stopLatch::countDown); + assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @Test @RedisAvailable - public void testInt3017IntegrationSymmetrical() { + public void testInt3017IntegrationSymmetrical() throws InterruptedException { this.symmetricalRedisChannelEndpoint.start(); UUID payload = UUID.randomUUID(); Message message = MessageBuilder.withPayload(payload) @@ -231,7 +237,10 @@ public void testInt3017IntegrationSymmetrical() { Message receive = this.symmetricalOutputChannel.receive(10000); assertThat(receive).isNotNull(); assertThat(receive.getPayload()).isEqualTo(payload); - this.symmetricalRedisChannelEndpoint.stop(); + + CountDownLatch stopLatch = new CountDownLatch(1); + this.symmetricalRedisChannelEndpoint.stop(stopLatch::countDown); + assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @Test @@ -337,13 +346,15 @@ public void testInt3196Recovery() throws Exception { assertThat(receive).isNotNull(); assertThat(receive.getPayload()).isEqualTo(payload); - endpoint.stop(); + CountDownLatch stopLatch = new CountDownLatch(1); + endpoint.stop(stopLatch::countDown); + assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); } @Test @RedisAvailable @SuppressWarnings("unchecked") - public void testInt3932ReadFromLeft() { + public void testInt3932ReadFromLeft() throws InterruptedException { RedisTemplate redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(this.connectionFactory); redisTemplate.setEnableDefaultSerializer(false); @@ -379,7 +390,9 @@ public void testInt3932ReadFromLeft() { assertThat(receive).isNotNull(); assertThat(receive.getPayload()).isEqualTo(payload2); - endpoint.stop(); + CountDownLatch stopLatch = new CountDownLatch(1); + endpoint.stop(stopLatch::countDown); + assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); } private void waitListening(RedisQueueMessageDrivenEndpoint endpoint) throws InterruptedException { diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/rules/RedisAvailableRule.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/rules/RedisAvailableRule.java index 3076d30c644..294847f122c 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/rules/RedisAvailableRule.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/rules/RedisAvailableRule.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,11 +78,11 @@ public void evaluate() throws Throwable { if (connectionFactory != null) { try { connectionFactory.getConnection(); - base.evaluate(); } catch (Exception e) { Assume.assumeTrue("Skipping test due to Redis not being available on port: " + REDIS_PORT + ": " + e, false); } + base.evaluate(); } } }