Skip to content

Commit

Permalink
GH-3067: Use default NullChannel instance if poss.
Browse files Browse the repository at this point in the history
Resolves #3067

Also add missing receive counter.

* Late binding of null discard channel; checkstyle

* Fix test; fall back to new NullChannel(); always evaluate tx expressions
  • Loading branch information
garyrussell authored and artembilan committed Sep 27, 2019
1 parent 0c7cae1 commit 9994997
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.springframework.expression.Expression;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.handler.DiscardingMessageHandler;
Expand All @@ -55,6 +56,7 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.core.DestinationResolutionException;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

Expand Down Expand Up @@ -315,7 +317,7 @@ protected void onInit() {
super.onInit();
Assert.state(!(this.discardChannelName != null && this.discardChannel != null),
"'discardChannelName' and 'discardChannel' are mutually exclusive.");
BeanFactory beanFactory = this.getBeanFactory();
BeanFactory beanFactory = getBeanFactory();
if (beanFactory != null) {
if (this.outputProcessor instanceof BeanFactoryAware) {
((BeanFactoryAware) this.outputProcessor).setBeanFactory(beanFactory);
Expand All @@ -328,10 +330,6 @@ protected void onInit() {
}
}

if (this.discardChannel == null) {
this.discardChannel = new NullChannel();
}

if (this.releasePartialSequences) {
Assert.isInstanceOf(SequenceSizeReleaseStrategy.class, this.releaseStrategy, () ->
"Release strategy of type [" + this.releaseStrategy.getClass().getSimpleName() +
Expand Down Expand Up @@ -392,8 +390,21 @@ protected ReleaseStrategy getReleaseStrategy() {
@Override
public MessageChannel getDiscardChannel() {
String channelName = this.discardChannelName;
if (channelName == null && this.discardChannel == null) {
channelName = IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME;
}
if (channelName != null) {
this.discardChannel = getChannelResolver().resolveDestination(channelName);
try {
this.discardChannel = getChannelResolver().resolveDestination(channelName);
}
catch (DestinationResolutionException ex) {
if (channelName.equals(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME)) {
this.discardChannel = new NullChannel();
}
else {
throw ex;
}
}
this.discardChannelName = null;
}
return this.discardChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.springframework.integration.support.management.ConfigurableMetricsAware;
import org.springframework.integration.support.management.DefaultMessageChannelMetrics;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.integration.support.management.metrics.CounterFacade;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.integration.support.management.metrics.TimerFacade;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -68,6 +69,8 @@ public class NullChannel implements PollableChannel,

private TimerFacade successTimer;

private CounterFacade receiveCounter;

@Override
public void setBeanName(String beanName) {
this.beanName = beanName;
Expand Down Expand Up @@ -268,6 +271,7 @@ public Message<?> receive() {
if (this.loggingEnabled) {
this.logger.debug("receive called on null channel");
}
incrementReceiveCounter();
return null;
}

Expand All @@ -276,6 +280,27 @@ public Message<?> receive(long timeout) {
return receive();
}

private void incrementReceiveCounter() {
if (this.metricsCaptor != null) {
if (this.receiveCounter == null) {
this.receiveCounter = buildReceiveCounter();
}
this.receiveCounter.increment();
}
}

private CounterFacade buildReceiveCounter() {
CounterFacade counterFacade = this.metricsCaptor
.counterBuilder(RECEIVE_COUNTER_NAME)
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
.tag("type", "channel")
.tag("result", "success")
.tag("exception", "none")
.description("Messages received")
.build();
return counterFacade;
}

@Override
public String toString() {
return (this.beanName != null) ? this.beanName : super.toString();
Expand All @@ -286,6 +311,9 @@ public void destroy() {
if (this.successTimer != null) {
this.successTimer.remove();
}
if (this.receiveCounter != null) {
this.receiveCounter.remove();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -66,11 +66,11 @@ public class ExpressionEvaluatingTransactionSynchronizationProcessor extends Int

private volatile Expression afterRollbackExpression;

private volatile MessageChannel beforeCommitChannel = new NullChannel();
private volatile MessageChannel beforeCommitChannel;

private volatile MessageChannel afterCommitChannel = new NullChannel();
private volatile MessageChannel afterCommitChannel;

private volatile MessageChannel afterRollbackChannel = new NullChannel();
private volatile MessageChannel afterRollbackChannel;

public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
Expand Down Expand Up @@ -129,8 +129,8 @@ public void processAfterRollback(IntegrationResourceHolder holder) {
doProcess(holder, this.afterRollbackExpression, this.afterRollbackChannel, "afterRollback");
}

private void doProcess(IntegrationResourceHolder holder, Expression expression, MessageChannel messageChannel,
String expressionType) {
private void doProcess(IntegrationResourceHolder holder, Expression expression,
@Nullable MessageChannel messageChannel, String expressionType) {

Message<?> message = holder.getMessage();
if (message != null) {
Expand All @@ -141,7 +141,7 @@ private void doProcess(IntegrationResourceHolder holder, Expression expression,
}
EvaluationContext evaluationContextToUse = prepareEvaluationContextToUse(holder);
Object value = expression.getValue(evaluationContextToUse, message);
if (value != null) {
if (value != null && messageChannel != null) {
if (logger.isDebugEnabled()) {
logger.debug("Sending expression result message to " + messageChannel + " " +
"as part of '" + expressionType + "' transaction synchronization");
Expand Down Expand Up @@ -171,7 +171,7 @@ private void doProcess(IntegrationResourceHolder holder, Expression expression,
}
}
}
else {
else if (messageChannel != null) {
if (logger.isDebugEnabled()) {
logger.debug("Sending received message to " + messageChannel + " as part of '" +
expressionType + "' transaction synchronization");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.springframework.integration.aggregator.MethodInvokingReleaseStrategy;
import org.springframework.integration.aggregator.ReleaseStrategy;
import org.springframework.integration.aggregator.ResequencingMessageHandler;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.SimpleMessageGroup;
Expand Down Expand Up @@ -60,7 +59,6 @@ void testDefaultResequencerProperties() {
ResequencingMessageHandler resequencer = TestUtils.getPropertyValue(endpoint, "handler",
ResequencingMessageHandler.class);
assertThat(getPropertyValue(resequencer, "outputChannel")).isNull();
assertThat(getPropertyValue(resequencer, "discardChannel") instanceof NullChannel).isTrue();
assertThat(getPropertyValue(
resequencer, "messagingTemplate.sendTimeout"))
.as("The ResequencerEndpoint is not set with the appropriate timeout value").isEqualTo(-1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.springframework.integration.aggregator.MethodInvokingCorrelationStrategy;
import org.springframework.integration.aggregator.MethodInvokingReleaseStrategy;
import org.springframework.integration.aggregator.SimpleSequenceSizeReleaseStrategy;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.MessageHandler;
Expand All @@ -39,6 +38,7 @@
* @author Marius Bogoevici
* @author Mark Fisher
* @author Artem Bilan
* @author Gary Russell
*/
public class AggregatorAnnotationTests {

Expand All @@ -51,7 +51,6 @@ public void testAnnotationWithDefaultSettings() {
assertThat(getPropertyValue(aggregator, "releaseStrategy") instanceof SimpleSequenceSizeReleaseStrategy)
.isTrue();
assertThat(getPropertyValue(aggregator, "outputChannel")).isNull();
assertThat(getPropertyValue(aggregator, "discardChannel") instanceof NullChannel).isTrue();
assertThat(getPropertyValue(aggregator, "messagingTemplate.sendTimeout")).isEqualTo(-1L);
assertThat(getPropertyValue(aggregator, "sendPartialResultOnExpiry")).isEqualTo(false);
context.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,8 @@ public Message<String> receive() {
}
});

MessageChannel afterCommitChannel = TestUtils.getPropertyValue(syncProcessor, "afterCommitChannel",
MessageChannel.class);
assertThat(afterCommitChannel).isInstanceOf(NullChannel.class);

MessageChannel afterCommitChannel = new NullChannel();
syncProcessor.setAfterCommitChannel(afterCommitChannel);
Log logger = TestUtils.getPropertyValue(afterCommitChannel, "logger", Log.class);

logger = Mockito.spy(logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,39 +45,43 @@
public class FileTailInboundChannelAdapterFactoryBean extends AbstractFactoryBean<FileTailingMessageProducerSupport>
implements BeanNameAware, SmartLifecycle, ApplicationEventPublisherAware {

private volatile String nativeOptions;
private String nativeOptions;

private volatile boolean enableStatusReader = true;
private boolean enableStatusReader = true;

private volatile Long idleEventInterval;
private Long idleEventInterval;

private volatile File file;
private File file;

private volatile TaskExecutor taskExecutor;
private TaskExecutor taskExecutor;

private volatile TaskScheduler taskScheduler;
private TaskScheduler taskScheduler;

private volatile Long delay;
private Long delay;

private volatile Long fileDelay;
private Long fileDelay;

private volatile Boolean end;
private Boolean end;

private volatile Boolean reopen;
private Boolean reopen;

private volatile FileTailingMessageProducerSupport tailAdapter;
private FileTailingMessageProducerSupport tailAdapter;

private volatile String beanName;
private String beanName;

private volatile MessageChannel outputChannel;
private MessageChannel outputChannel;

private volatile MessageChannel errorChannel;
private MessageChannel errorChannel;

private volatile Boolean autoStartup;
private String outputChannelName;

private volatile Integer phase;
private String errorChannelName;

private volatile ApplicationEventPublisher applicationEventPublisher;
private Boolean autoStartup;

private Integer phase;

private ApplicationEventPublisher applicationEventPublisher;

public void setNativeOptions(String nativeOptions) {
if (StringUtils.hasText(nativeOptions)) {
Expand Down Expand Up @@ -141,10 +145,18 @@ public void setOutputChannel(MessageChannel outputChannel) {
this.outputChannel = outputChannel;
}

public void setOutputChannelName(String outputChannelName) {
this.outputChannelName = outputChannelName;
}

public void setErrorChannel(MessageChannel errorChannel) {
this.errorChannel = errorChannel;
}

public void setErrorChannelName(String errorChannelName) {
this.errorChannelName = errorChannelName;
}

public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}
Expand Down Expand Up @@ -238,6 +250,8 @@ protected FileTailingMessageProducerSupport createInstance() {
.acceptIfNotNull(this.autoStartup, adapter::setAutoStartup)
.acceptIfNotNull(this.phase, adapter::setPhase)
.acceptIfNotNull(this.applicationEventPublisher, adapter::setApplicationEventPublisher)
.acceptIfNotNull(this.outputChannelName, adapter::setOutputChannelName)
.acceptIfNotNull(this.errorChannelName, adapter::setErrorChannelName)
.acceptIfNotNull(beanFactory, adapter::setBeanFactory);
adapter.afterPropertiesSet();
this.tailAdapter = adapter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.MessageProducerSpec;
import org.springframework.integration.file.config.FileTailInboundChannelAdapterFactoryBean;
import org.springframework.integration.file.tail.FileTailingMessageProducerSupport;
Expand All @@ -32,6 +31,7 @@
* A {@link MessageProducerSpec} for file tailing adapters.
*
* @author Artem Bilan
* @author Gary Russell
*
* @since 5.0
*/
Expand Down Expand Up @@ -186,9 +186,6 @@ public TailAdapterSpec errorChannel(MessageChannel errorChannel) {

@Override
protected FileTailingMessageProducerSupport doGet() {
if (this.outputChannel == null) {
this.factoryBean.setOutputChannel(new NullChannel());
}
FileTailingMessageProducerSupport tailingMessageProducerSupport = null;
try {
this.factoryBean.afterPropertiesSet();
Expand Down

0 comments on commit 9994997

Please sign in to comment.