Skip to content

Commit

Permalink
Fix RedisAvailableRule for the proper evaluate
Browse files Browse the repository at this point in the history
* Fix `RedisQueueMessageDrivenEndpointTests` for missed options
and for proper stop when the polling from the test queue is over
  • Loading branch information
artembilan committed Jun 23, 2020
1 parent 9367d7f commit 9495685
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 20 deletions.
Expand Up @@ -23,6 +23,7 @@
channel="fromChannel"
expect-message="true"
auto-startup="false"
receive-timeout="10"
serializer="testSerializer"/>

<bean id="testSerializer" class="org.springframework.integration.redis.util.CustomJsonSerializer"/>
Expand All @@ -35,6 +36,7 @@
<int-redis:queue-inbound-channel-adapter id="symmetricalRedisChannelEndpoint" queue="#{TEST_QUEUE}"
channel="symmetricalRedisChannel"
auto-startup="false"
receive-timeout="10"
serializer=""/>


Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2019 the original author or authors.
* Copyright 2013-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
Expand All @@ -43,7 +42,7 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.Lifecycle;
import org.springframework.context.SmartLifecycle;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisConnectionFactory;
Expand Down Expand Up @@ -81,7 +80,7 @@
@DirtiesContext
public class RedisQueueMessageDrivenEndpointTests extends RedisAvailableTests {

public static final String TEST_QUEUE = "testQueue";
public static final String TEST_QUEUE = UUID.randomUUID().toString();

@Autowired
private RedisConnectionFactory connectionFactory;
Expand All @@ -90,29 +89,29 @@ public class RedisQueueMessageDrivenEndpointTests extends RedisAvailableTests {
private PollableChannel fromChannel;

@Autowired
private Lifecycle fromChannelEndpoint;
private SmartLifecycle fromChannelEndpoint;

@Autowired
private MessageChannel symmetricalInputChannel;

@Autowired
private Lifecycle symmetricalRedisChannelEndpoint;
private SmartLifecycle symmetricalRedisChannelEndpoint;

@Autowired
private PollableChannel symmetricalOutputChannel;

@Before
@After
public void setUpTearDown() {
RedisTemplate<String, ?> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(this.connectionFactory);
redisTemplate.afterPropertiesSet();
redisTemplate.delete(TEST_QUEUE);
}

@Test
@RedisAvailable
@SuppressWarnings("unchecked")
public void testInt3014Default() {
public void testInt3014Default() throws InterruptedException {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(this.connectionFactory);
redisTemplate.setEnableDefaultSerializer(false);
Expand Down Expand Up @@ -147,13 +146,15 @@ public void testInt3014Default() {
assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(payload2);

endpoint.stop();
CountDownLatch stopLatch = new CountDownLatch(1);
endpoint.stop(stopLatch::countDown);
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
@RedisAvailable
@SuppressWarnings("unchecked")
public void testInt3014ExpectMessageTrue() {
public void testInt3014ExpectMessageTrue() throws InterruptedException {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(this.connectionFactory);
redisTemplate.setEnableDefaultSerializer(false);
Expand All @@ -175,6 +176,7 @@ public void testInt3014ExpectMessageTrue() {
new RedisQueueMessageDrivenEndpoint(TEST_QUEUE, this.connectionFactory);
endpoint.setBeanFactory(Mockito.mock(BeanFactory.class));
endpoint.setExpectMessage(true);
endpoint.setSerializer(new JdkSerializationRedisSerializer());
endpoint.setOutputChannel(channel);
endpoint.setErrorChannel(errorChannel);
endpoint.setReceiveTimeout(10);
Expand All @@ -195,12 +197,14 @@ public void testInt3014ExpectMessageTrue() {
assertThat(((Exception) receive.getPayload()).getCause().getMessage())
.contains("java.lang.String cannot be cast to org.springframework.messaging.Message");

endpoint.stop();
CountDownLatch stopLatch = new CountDownLatch(1);
endpoint.stop(stopLatch::countDown);
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
@RedisAvailable
public void testInt3017IntegrationInbound() {
public void testInt3017IntegrationInbound() throws InterruptedException {
this.fromChannelEndpoint.start();
String payload = new Date().toString();

Expand All @@ -214,12 +218,14 @@ public void testInt3017IntegrationInbound() {
Message<?> receive = this.fromChannel.receive(10000);
assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(payload);
this.fromChannelEndpoint.stop();
CountDownLatch stopLatch = new CountDownLatch(1);
this.fromChannelEndpoint.stop(stopLatch::countDown);
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
@RedisAvailable
public void testInt3017IntegrationSymmetrical() {
public void testInt3017IntegrationSymmetrical() throws InterruptedException {
this.symmetricalRedisChannelEndpoint.start();
UUID payload = UUID.randomUUID();
Message<UUID> message = MessageBuilder.withPayload(payload)
Expand All @@ -231,7 +237,10 @@ public void testInt3017IntegrationSymmetrical() {
Message<?> receive = this.symmetricalOutputChannel.receive(10000);
assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(payload);
this.symmetricalRedisChannelEndpoint.stop();

CountDownLatch stopLatch = new CountDownLatch(1);
this.symmetricalRedisChannelEndpoint.stop(stopLatch::countDown);
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
Expand Down Expand Up @@ -337,13 +346,15 @@ public void testInt3196Recovery() throws Exception {
assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(payload);

endpoint.stop();
CountDownLatch stopLatch = new CountDownLatch(1);
endpoint.stop(stopLatch::countDown);
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
@RedisAvailable
@SuppressWarnings("unchecked")
public void testInt3932ReadFromLeft() {
public void testInt3932ReadFromLeft() throws InterruptedException {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(this.connectionFactory);
redisTemplate.setEnableDefaultSerializer(false);
Expand Down Expand Up @@ -379,7 +390,9 @@ public void testInt3932ReadFromLeft() {
assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isEqualTo(payload2);

endpoint.stop();
CountDownLatch stopLatch = new CountDownLatch(1);
endpoint.stop(stopLatch::countDown);
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

private void waitListening(RedisQueueMessageDrivenEndpoint endpoint) throws InterruptedException {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -78,11 +78,11 @@ public void evaluate() throws Throwable {
if (connectionFactory != null) {
try {
connectionFactory.getConnection();
base.evaluate();
}
catch (Exception e) {
Assume.assumeTrue("Skipping test due to Redis not being available on port: " + REDIS_PORT + ": " + e, false);
}
base.evaluate();
}
}
}
Expand Down

0 comments on commit 9495685

Please sign in to comment.