Skip to content

Commit

Permalink
INT-4471: PubSubChannel: Add errorHandler warn (#2459)
Browse files Browse the repository at this point in the history
* INT-4471: PubSubChannel: Add errorHandler warn

JIRA: https://jira.spring.io/browse/INT-4471

* When an `Executor` is not provided, log warn that the provided
`ErrorHandler` is ignored.

**Cherry-pick to 5.0.x and 4.3.x**

* * Polish warn message
  • Loading branch information
artembilan authored and garyrussell committed May 31, 2018
1 parent 9a88140 commit 17e794d
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 51 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,19 +35,18 @@
*/
public class PublishSubscribeChannel extends AbstractExecutorChannel {

private volatile ErrorHandler errorHandler;
private ErrorHandler errorHandler;

private volatile boolean ignoreFailures;
private boolean ignoreFailures;

private volatile boolean applySequence;
private boolean applySequence;

private volatile int minSubscribers;
private int minSubscribers;

/**
* Create a PublishSubscribeChannel that will use an {@link Executor}
* to invoke the handlers. If this is null, each invocation will occur in
* the message sender's thread.
*
* @param executor The executor.
*/
public PublishSubscribeChannel(Executor executor) {
Expand Down Expand Up @@ -79,9 +78,7 @@ public String getComponentType() {
* a {@link MessagePublishingErrorHandler} that sends error messages to
* the failed request Message's error channel header if available or to
* the default 'errorChannel' otherwise.
*
* @param errorHandler The error handler.
*
* @see #PublishSubscribeChannel(Executor)
*/
public void setErrorHandler(ErrorHandler errorHandler) {
Expand Down Expand Up @@ -149,6 +146,14 @@ public final void onInit() throws Exception {
getDispatcher().setApplySequence(this.applySequence);
getDispatcher().setMinSubscribers(this.minSubscribers);
}
else if (this.errorHandler != null) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("The 'errorHandler' is ignored for the '" + getComponentName() +
"' (an 'executor' is not provided) and exceptions will be thrown " +
"directly within the sending Thread");
}
}

if (this.maxSubscribers == null) {
Integer maxSubscribers =
getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS, Integer.class);
Expand Down
Expand Up @@ -19,7 +19,7 @@

<publish-subscribe-channel id="channelWithApplySequenceEnabledAndTaskExecutor" apply-sequence="true" task-executor="pool"/>

<publish-subscribe-channel id="channelWithErrorHandler" error-handler="testErrorHandler"/>
<publish-subscribe-channel id="channelWithErrorHandler" error-handler="testErrorHandler" task-executor="pool"/>

<task:executor id="pool" pool-size="1"/>

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,76 +26,71 @@
import java.util.concurrent.Executor;

import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.ErrorHandler;

/**
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
*/
@RunWith(SpringRunner.class)
public class PublishSubscribeChannelParserTests {

@Autowired
private ApplicationContext context;

@Test
public void defaultChannel() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("defaultChannel");
PublishSubscribeChannel channel = this.context.getBean("defaultChannel", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
dispatcher.setApplySequence(true);
dispatcher.addHandler(message -> { });
dispatcher.dispatch(new GenericMessage<String>("foo"));
dispatcher.dispatch(new GenericMessage<>("foo"));
DirectFieldAccessor dispatcherAccessor = new DirectFieldAccessor(dispatcher);
assertNull(dispatcherAccessor.getPropertyValue("executor"));
assertFalse((Boolean) dispatcherAccessor.getPropertyValue("ignoreFailures"));
assertTrue((Boolean) dispatcherAccessor.getPropertyValue("applySequence"));
Object mbf = context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
Object mbf = this.context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
assertSame(mbf, dispatcherAccessor.getPropertyValue("messageBuilderFactory"));
context.close();
}

@Test
public void ignoreFailures() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithIgnoreFailures");
PublishSubscribeChannel channel =
this.context.getBean("channelWithIgnoreFailures", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
assertTrue((Boolean) new DirectFieldAccessor(dispatcher).getPropertyValue("ignoreFailures"));
context.close();
}

@Test
public void applySequenceEnabled() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithApplySequenceEnabled");
PublishSubscribeChannel channel =
this.context.getBean("channelWithApplySequenceEnabled", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
assertTrue((Boolean) new DirectFieldAccessor(dispatcher).getPropertyValue("applySequence"));
context.close();
}

@Test
public void channelWithTaskExecutor() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithTaskExecutor");
PublishSubscribeChannel channel =
this.context.getBean("channelWithTaskExecutor", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
Expand All @@ -106,15 +101,12 @@ public void channelWithTaskExecutor() {
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
Executor innerExecutor = (Executor) executorAccessor.getPropertyValue("executor");
assertEquals(context.getBean("pool"), innerExecutor);
context.close();
}

@Test
public void ignoreFailuresWithTaskExecutor() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithIgnoreFailuresAndTaskExecutor");
PublishSubscribeChannel channel =
this.context.getBean("channelWithIgnoreFailuresAndTaskExecutor", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
Expand All @@ -125,16 +117,13 @@ public void ignoreFailuresWithTaskExecutor() {
assertEquals(ErrorHandlingTaskExecutor.class, executor.getClass());
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
Executor innerExecutor = (Executor) executorAccessor.getPropertyValue("executor");
assertEquals(context.getBean("pool"), innerExecutor);
context.close();
assertEquals(this.context.getBean("pool"), innerExecutor);
}

@Test
public void applySequenceEnabledWithTaskExecutor() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithApplySequenceEnabledAndTaskExecutor");
PublishSubscribeChannel channel =
this.context.getBean("channelWithApplySequenceEnabledAndTaskExecutor", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
Expand All @@ -145,21 +134,17 @@ public void applySequenceEnabledWithTaskExecutor() {
assertEquals(ErrorHandlingTaskExecutor.class, executor.getClass());
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
Executor innerExecutor = (Executor) executorAccessor.getPropertyValue("executor");
assertEquals(context.getBean("pool"), innerExecutor);
context.close();
assertEquals(this.context.getBean("pool"), innerExecutor);
}

@Test
public void channelWithErrorHandler() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithErrorHandler");
PublishSubscribeChannel channel =
this.context.getBean("channelWithErrorHandler", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
ErrorHandler errorHandler = (ErrorHandler) accessor.getPropertyValue("errorHandler");
assertNotNull(errorHandler);
assertEquals(context.getBean("testErrorHandler"), errorHandler);
context.close();
assertEquals(this.context.getBean("testErrorHandler"), errorHandler);
}

}
4 changes: 4 additions & 0 deletions src/reference/asciidoc/channel.adoc
Expand Up @@ -572,6 +572,10 @@ When using this element, you can also specify the `task-executor` used for publi
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
----

Alongside with the `Executor`, an `ErrorHandler` can be configured as well.
By default the `PublishSubscribeChannel` uses a `MessagePublishingErrorHandler` implementation to send error to the `MessageChannel` from the `errorChannel` header or a global `errorChannel` instance.
If an `Executor` is not configured, the `ErrorHandler` is ignored and exceptions are thrown directly to the caller's Thread.

If you are providing a _Resequencer_ or _Aggregator_ downstream from a `PublishSubscribeChannel`, then you can set the 'apply-sequence' property on the channel to `true`.
That will indicate that the channel should set the sequence-size and sequence-number Message headers as well as the correlation id prior to passing the Messages along.
For example, if there are 5 subscribers, the sequence-size would be set to 5, and the Messages would have sequence-number header values ranging from 1 to 5.
Expand Down

0 comments on commit 17e794d

Please sign in to comment.