Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INT-4471: PubSubChannel: Add errorHandler warn #2459

Merged
merged 2 commits into from May 31, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,19 +35,18 @@
*/
public class PublishSubscribeChannel extends AbstractExecutorChannel {

private volatile ErrorHandler errorHandler;
private ErrorHandler errorHandler;

private volatile boolean ignoreFailures;
private boolean ignoreFailures;

private volatile boolean applySequence;
private boolean applySequence;

private volatile int minSubscribers;
private int minSubscribers;

/**
* Create a PublishSubscribeChannel that will use an {@link Executor}
* to invoke the handlers. If this is null, each invocation will occur in
* the message sender's thread.
*
* @param executor The executor.
*/
public PublishSubscribeChannel(Executor executor) {
Expand Down Expand Up @@ -79,9 +78,7 @@ public String getComponentType() {
* a {@link MessagePublishingErrorHandler} that sends error messages to
* the failed request Message's error channel header if available or to
* the default 'errorChannel' otherwise.
*
* @param errorHandler The error handler.
*
* @see #PublishSubscribeChannel(Executor)
*/
public void setErrorHandler(ErrorHandler errorHandler) {
Expand Down Expand Up @@ -149,6 +146,11 @@ public final void onInit() throws Exception {
getDispatcher().setApplySequence(this.applySequence);
getDispatcher().setMinSubscribers(this.minSubscribers);
}
else if (this.errorHandler != null) {
logger.warn("When 'executor' is not provided the 'errorHandler' is ignored and " +
"exceptions are thrown directly within the sending Thread");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps include the componentName in the message?


if (this.maxSubscribers == null) {
Integer maxSubscribers =
getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS, Integer.class);
Expand Down
Expand Up @@ -19,7 +19,7 @@

<publish-subscribe-channel id="channelWithApplySequenceEnabledAndTaskExecutor" apply-sequence="true" task-executor="pool"/>

<publish-subscribe-channel id="channelWithErrorHandler" error-handler="testErrorHandler"/>
<publish-subscribe-channel id="channelWithErrorHandler" error-handler="testErrorHandler" task-executor="pool"/>

<task:executor id="pool" pool-size="1"/>

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,76 +26,71 @@
import java.util.concurrent.Executor;

import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.ErrorHandler;

/**
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
*/
@RunWith(SpringRunner.class)
public class PublishSubscribeChannelParserTests {

@Autowired
private ApplicationContext context;

@Test
public void defaultChannel() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("defaultChannel");
PublishSubscribeChannel channel = this.context.getBean("defaultChannel", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
dispatcher.setApplySequence(true);
dispatcher.addHandler(message -> { });
dispatcher.dispatch(new GenericMessage<String>("foo"));
dispatcher.dispatch(new GenericMessage<>("foo"));
DirectFieldAccessor dispatcherAccessor = new DirectFieldAccessor(dispatcher);
assertNull(dispatcherAccessor.getPropertyValue("executor"));
assertFalse((Boolean) dispatcherAccessor.getPropertyValue("ignoreFailures"));
assertTrue((Boolean) dispatcherAccessor.getPropertyValue("applySequence"));
Object mbf = context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
Object mbf = this.context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
assertSame(mbf, dispatcherAccessor.getPropertyValue("messageBuilderFactory"));
context.close();
}

@Test
public void ignoreFailures() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithIgnoreFailures");
PublishSubscribeChannel channel =
this.context.getBean("channelWithIgnoreFailures", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
assertTrue((Boolean) new DirectFieldAccessor(dispatcher).getPropertyValue("ignoreFailures"));
context.close();
}

@Test
public void applySequenceEnabled() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithApplySequenceEnabled");
PublishSubscribeChannel channel =
this.context.getBean("channelWithApplySequenceEnabled", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
assertTrue((Boolean) new DirectFieldAccessor(dispatcher).getPropertyValue("applySequence"));
context.close();
}

@Test
public void channelWithTaskExecutor() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithTaskExecutor");
PublishSubscribeChannel channel =
this.context.getBean("channelWithTaskExecutor", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
Expand All @@ -106,15 +101,12 @@ public void channelWithTaskExecutor() {
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
Executor innerExecutor = (Executor) executorAccessor.getPropertyValue("executor");
assertEquals(context.getBean("pool"), innerExecutor);
context.close();
}

@Test
public void ignoreFailuresWithTaskExecutor() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithIgnoreFailuresAndTaskExecutor");
PublishSubscribeChannel channel =
this.context.getBean("channelWithIgnoreFailuresAndTaskExecutor", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
Expand All @@ -125,16 +117,13 @@ public void ignoreFailuresWithTaskExecutor() {
assertEquals(ErrorHandlingTaskExecutor.class, executor.getClass());
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
Executor innerExecutor = (Executor) executorAccessor.getPropertyValue("executor");
assertEquals(context.getBean("pool"), innerExecutor);
context.close();
assertEquals(this.context.getBean("pool"), innerExecutor);
}

@Test
public void applySequenceEnabledWithTaskExecutor() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithApplySequenceEnabledAndTaskExecutor");
PublishSubscribeChannel channel =
this.context.getBean("channelWithApplySequenceEnabledAndTaskExecutor", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
Expand All @@ -145,21 +134,17 @@ public void applySequenceEnabledWithTaskExecutor() {
assertEquals(ErrorHandlingTaskExecutor.class, executor.getClass());
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
Executor innerExecutor = (Executor) executorAccessor.getPropertyValue("executor");
assertEquals(context.getBean("pool"), innerExecutor);
context.close();
assertEquals(this.context.getBean("pool"), innerExecutor);
}

@Test
public void channelWithErrorHandler() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithErrorHandler");
PublishSubscribeChannel channel =
this.context.getBean("channelWithErrorHandler", PublishSubscribeChannel.class);
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
ErrorHandler errorHandler = (ErrorHandler) accessor.getPropertyValue("errorHandler");
assertNotNull(errorHandler);
assertEquals(context.getBean("testErrorHandler"), errorHandler);
context.close();
assertEquals(this.context.getBean("testErrorHandler"), errorHandler);
}

}
4 changes: 4 additions & 0 deletions src/reference/asciidoc/channel.adoc
Expand Up @@ -572,6 +572,10 @@ When using this element, you can also specify the `task-executor` used for publi
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
----

Alongside with the `Executor`, an `ErrorHandler` can be configured as well.
By default the `PublishSubscribeChannel` uses a `MessagePublishingErrorHandler` implementation to send error to the `MessageChannel` from the `errorChannel` header or a global `errorChannel` instance.
If an `Executor` is not configured, the `ErrorHandler` is ignored and exceptions are thrown directly to the caller's Thread.

If you are providing a _Resequencer_ or _Aggregator_ downstream from a `PublishSubscribeChannel`, then you can set the 'apply-sequence' property on the channel to `true`.
That will indicate that the channel should set the sequence-size and sequence-number Message headers as well as the correlation id prior to passing the Messages along.
For example, if there are 5 subscribers, the sequence-size would be set to 5, and the Messages would have sequence-number header values ranging from 1 to 5.
Expand Down