Skip to content

Commit

Permalink
Some miscellaneous fixes
Browse files Browse the repository at this point in the history
Related to https://build.spring.io/browse/INT-MASTERSPRING40-JOB1-1271

* Fix race condition with subscription in the `FluxMessageChannel`:
rely on the `doOnRequest()` to start producing from upstream publishers
* Remove `SourcePollingChannelAdapterTests` since
`Class.getDeclaredConstructor().newInstance()` doesn't rethrow an exception as is
but wrap it into the `InvocationTargetException`.
This is probably the main reason to deprecate a regular `Class.newInstance()`
* Use `LettuceConnectionFactory.setEagerInitialization(true)` in the `RedisAvailableRule`
to avoid possible dead lock with lazy init
* Adjust Redis tests for new `RedisAvailableRule` behavior
  • Loading branch information
artembilan committed Nov 25, 2020
1 parent 63bd3e4 commit efcc018
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,10 @@ private boolean tryEmitMessage(Message<?> message) {
@Override
public void subscribe(Subscriber<? super Message<?>> subscriber) {
this.sink.asFlux()
.doOnRequest((r) -> this.subscribedSignal.tryEmitNext(true))
.doFinally((s) -> this.subscribedSignal.tryEmitNext(this.sink.currentSubscriberCount() > 0))
.share()
.subscribe(subscriber);

this.subscribedSignal.tryEmitNext(this.sink.currentSubscriberCount() > 0);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,10 +73,12 @@ public void tearDown() {
}

@Test
@RedisAvailable
public void testPubSubChannelConfig() {
RedisConnectionFactory connectionFactory =
TestUtils.getPropertyValue(this.redisChannel, "connectionFactory", RedisConnectionFactory.class);
RedisSerializer<?> redisSerializer = TestUtils.getPropertyValue(redisChannel, "serializer", RedisSerializer.class);
RedisSerializer<?> redisSerializer = TestUtils.getPropertyValue(redisChannel, "serializer",
RedisSerializer.class);
assertThat(this.context.getBean("redisConnectionFactory")).isEqualTo(connectionFactory);
assertThat(this.context.getBean("redisSerializer")).isEqualTo(redisSerializer);
assertThat(TestUtils.getPropertyValue(redisChannel, "topicName")).isEqualTo("si.test.topic.parser");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
id="adapter" topics="foo" topic-patterns="f*, b*" channel="receiveChannel" error-channel="testErrorChannel"
message-converter="testConverter"
serializer="serializer"
task-executor="executor"/>
task-executor="executor"
auto-startup="false"/>

<bean id="executor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,24 +32,24 @@
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.redis.inbound.RedisInboundChannelAdapter;
import org.springframework.integration.redis.rules.RedisAvailable;
import org.springframework.integration.redis.rules.RedisAvailableRule;
import org.springframework.integration.redis.rules.RedisAvailableTests;
import org.springframework.integration.support.converter.SimpleMessageConverter;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;

/**
* @author Oleg Zhurakousky
* @author Mark Fisher
* @author Gary Russell
* @author Gunnar Hillert
* @author Venil Noronha
* @author Artem Bilan
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@RunWith(SpringRunner.class)
@DirtiesContext
public class RedisInboundChannelAdapterParserTests extends RedisAvailableTests {

Expand Down Expand Up @@ -91,10 +91,11 @@ public void validateConfiguration() {
@RedisAvailable
public void testInboundChannelAdapterMessaging() throws Exception {
RedisInboundChannelAdapter adapter = context.getBean("adapter", RedisInboundChannelAdapter.class);
this.awaitContainerSubscribedWithPatterns(TestUtils.getPropertyValue(adapter, "container",
adapter.start();
awaitContainerSubscribedWithPatterns(TestUtils.getPropertyValue(adapter, "container",
RedisMessageListenerContainer.class));

RedisConnectionFactory connectionFactory = this.getConnectionFactoryForTest();
RedisConnectionFactory connectionFactory = RedisAvailableRule.connectionFactory;

connectionFactory.getConnection().publish("foo".getBytes(), "Hello Redis from foo".getBytes());
connectionFactory.getConnection().publish("bar".getBytes(), "Hello Redis from bar".getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,20 @@
</int-redis:request-handler-advice-chain>
</int-redis:outbound-channel-adapter>

<int-redis:inbound-channel-adapter id="fooInbound" channel="receiveChannel" topics="foo"/>
<int-redis:inbound-channel-adapter id="fooInbound" channel="receiveChannel" topics="foo" auto-startup="false"/>

<int:channel id="receiveChannel">
<int:queue/>
</int:channel>

<int-redis:inbound-channel-adapter id="barInbound" channel="barChannel" topics="bar"/>
<int-redis:inbound-channel-adapter id="barInbound" channel="barChannel" topics="bar" auto-startup="false"/>

<int:channel id="barChannel">
<int:queue/>
</int:channel>

<bean id="testConverter" class="org.springframework.integration.redis.config.RedisOutboundChannelAdapterParserTests$TestMessageConverter"/>
<bean id="testConverter"
class="org.springframework.integration.redis.config.RedisOutboundChannelAdapterParserTests$TestMessageConverter"/>

<int:chain input-channel="redisOutboundChain">
<int-redis:outbound-channel-adapter topic="foo"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public final class RedisAvailableRule implements MethodRule {

public static LettuceConnectionFactory connectionFactory;

private static volatile boolean initialized;

protected static void setupConnectionFactory() {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setPort(REDIS_PORT);
Expand All @@ -59,29 +61,34 @@ protected static void setupConnectionFactory() {
.build();

connectionFactory = new LettuceConnectionFactory(redisStandaloneConfiguration, clientConfiguration);
connectionFactory.afterPropertiesSet();
connectionFactory.setEagerInitialization(true);
}


public static void cleanUpConnectionFactoryIfAny() {
if (connectionFactory != null) {
if (initialized) {
connectionFactory.destroy();
initialized = false;
}
}


public Statement apply(final Statement base, final FrameworkMethod method, Object target) {
return new Statement() {

@Override
public void evaluate() throws Throwable {
RedisAvailable redisAvailable = method.getAnnotation(RedisAvailable.class);
if (redisAvailable != null) {
if (connectionFactory != null) {
try {
connectionFactory.getConnection();
connectionFactory.afterPropertiesSet();
initialized = true;
}
catch (Exception e) {
Assume.assumeTrue("Skipping test due to Redis not being available on port: " + REDIS_PORT + ": " + e, false);
Assume.assumeTrue(
"Skipping test due to Redis not being available on port: " + REDIS_PORT + ": " + e,
false);
}
base.evaluate();
}
Expand Down

0 comments on commit efcc018

Please sign in to comment.