Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,23 @@ Spring Cloud Stream supports publishing error messages received by the Spring In
error channel. Error messages sent to the `errorChannel` can be published to a specific destination
at the broker by configuring a binding for the outbound target named `error`. For example, to
publish error messages to a broker destination named "myErrors", provide the following property:
`spring.cloud.stream.bindings.error.destination=myErrors`
`spring.cloud.stream.bindings.error.destination=myErrors`.

[[binder-error-channels]]
===== Message Channel Binders and Error Channels

Starting with _version 1.3_, some `MessageChannel` - based binders publish errors to a discrete error channel for each destination.
In addition, these error channels are bridged to the global Spring Integration `errorChannel` mentioned above.
You can therefore consume errors for specific destinations and/or for all destinations, using a standard Spring Integration flow (`IntegrationFlow`, `@ServiceActivator`, etc).

On the consumer side, the listener thread catches any exceptions and forwards an `ErrorMessage` to the destination's error channel.
The payload of the message is a `MessagingException` with the normal `failedMessage` and `cause` properties.
Usually, the raw data received from the broker is included in a header.
For binders that support (and are configured with) a dead letter destination; a `MessagePublishingErrorHandler` is subscribed to the channel, and the raw data is forwarded to the dead letter destination.

On the producer side; for binders that support some kind of async result after publishing messages (e.g. RabbitMQ, Kafka), you can enable an error channel by setting the `...producer.errorChannelEnabled` to `true`.
The payload of the `ErrorMessage` depends on the binder implementation but will be a `MessagingException` with the normal `failedMessage` property, as well as additional properties about the failure.
Refer to the binder documentation for complete details.

===== Using @StreamListener for Automatic Content Type Handling

Expand Down Expand Up @@ -1231,6 +1247,11 @@ When native encoding is used, it is the responsibility of the consumer to use ap
Also, when native encoding/decoding is used the `headerMode` property is ignored and headers will not be embedded into the message.
+
Default: `false`.
errorChannelEnabled::
When set to `true`, if the binder supports async send results; send failures will be sent to an error channel for the destination.
See <<binder-error-channels>> for more information.
+
Default: `false`.

[[dynamicdestination]]
=== Using dynamically bound destinations
Expand Down
2 changes: 2 additions & 0 deletions spring-cloud-stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>4.3.12.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jmx</artifactId>
<version>4.3.12.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.EventDrivenConsumer;
Expand Down Expand Up @@ -117,7 +118,10 @@ public final Binding<MessageChannel> doBindProducer(final String destination, Me
try {
producerDestination = this.provisioningProvider.provisionProducerDestination(destination,
producerProperties);
producerMessageHandler = createProducerMessageHandler(producerDestination, producerProperties);
SubscribableChannel errorChannel = producerProperties.isErrorChannelEnabled()
? registerErrorInfrastructure(producerDestination) : null;
producerMessageHandler = createProducerMessageHandler(producerDestination, producerProperties,
errorChannel);
if (producerMessageHandler instanceof InitializingBean) {
((InitializingBean) producerMessageHandler).afterPropertiesSet();
}
Expand Down Expand Up @@ -147,6 +151,7 @@ else if (e instanceof ProvisioningException) {
@Override
public void afterUnbind() {
try {
destroyErrorInfrastructure(producerDestination);
if (producerMessageHandler instanceof DisposableBean) {
((DisposableBean) producerMessageHandler).destroy();
}
Expand Down Expand Up @@ -175,11 +180,14 @@ public void afterUnbind() {
*
* @param destination the name of the target destination
* @param producerProperties the producer properties
* @param errorChannel the error channel (if enabled, otherwise null). If not null,
* the binder must wire this channel into the producer endpoint so that errors
* are forwarded to it.
* @return the message handler for sending data to the target middleware
* @throws Exception
*/
protected abstract MessageHandler createProducerMessageHandler(ProducerDestination destination,
P producerProperties)
P producerProperties, MessageChannel errorChannel)
throws Exception;

/**
Expand Down Expand Up @@ -294,7 +302,47 @@ protected void afterUnbindConsumer(ConsumerDestination destination, String group
}

/**
* Build an errorChannelRecoverer that writes to a pub/sub channel for the destination.
* Register an error channel for the destination when an async send error is received.
* Bridge the channel to the global error channel (if present).
* @param destination the destination.
* @return the channel.
*/
private SubscribableChannel registerErrorInfrastructure(ProducerDestination destination) {
ConfigurableListableBeanFactory beanFactory = getApplicationContext().getBeanFactory();
String errorChannelName = errorsBaseName(destination);
SubscribableChannel errorChannel = null;
if (getApplicationContext().containsBean(errorChannelName)) {
Object errorChannelObject = getApplicationContext().getBean(errorChannelName);
if (!(errorChannelObject instanceof SubscribableChannel)) {
throw new IllegalStateException(
"Error channel '" + errorChannelName + "' must be a SubscribableChannel");
}
errorChannel = (SubscribableChannel) errorChannelObject;
}
else {
errorChannel = new PublishSubscribeChannel();
beanFactory.registerSingleton(errorChannelName, errorChannel);
errorChannel = (PublishSubscribeChannel) beanFactory.initializeBean(errorChannel, errorChannelName);
}
MessageChannel defaultErrorChannel = null;
if (getApplicationContext().containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) {
defaultErrorChannel = getApplicationContext().getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
MessageChannel.class);
}
if (defaultErrorChannel != null) {
BridgeHandler errorBridge = new BridgeHandler();
errorBridge.setOutputChannel(defaultErrorChannel);
errorChannel.subscribe(errorBridge);
String errorBridgeHandlerName = getErrorBridgeName(destination);
beanFactory.registerSingleton(errorBridgeHandlerName, errorBridge);
beanFactory.initializeBean(errorBridge, errorBridgeHandlerName);
}
return errorChannel;
}

/**
* Build an errorChannelRecoverer that writes to a pub/sub channel for the destination
* when an exception is thrown to a consumer.
* @param destination the destination.
* @param group the group.
* @param consumerProperties the properties.
Expand Down Expand Up @@ -356,6 +404,25 @@ protected final ErrorInfrastructure registerErrorInfrastructure(ConsumerDestinat
return new ErrorInfrastructure(errorChannel, recoverer, handler);
}

private void destroyErrorInfrastructure(ProducerDestination destination) {
String errorChannelName = errorsBaseName(destination);
String errorBridgeHandlerName = getErrorBridgeName(destination);
MessageHandler bridgeHandler = null;
if (getApplicationContext().containsBean(errorBridgeHandlerName)) {
bridgeHandler = getApplicationContext().getBean(errorBridgeHandlerName, MessageHandler.class);
}
if (getApplicationContext().containsBean(errorChannelName)) {
SubscribableChannel channel = getApplicationContext().getBean(errorChannelName, SubscribableChannel.class);
if (bridgeHandler != null) {
channel.unsubscribe(bridgeHandler);
((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory())
.destroySingleton(errorBridgeHandlerName);
}
((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory())
.destroySingleton(errorChannelName);
}
}

private void destroyErrorInfrastructure(ConsumerDestination destination, String group, C properties) {
try {
String recoverer = getErrorRecovererName(destination, group, properties);
Expand Down Expand Up @@ -450,6 +517,14 @@ protected String errorsBaseName(ConsumerDestination destination, String group,
return destination.getName() + "." + group + ".errors";
}

protected String getErrorBridgeName(ProducerDestination destination) {
return errorsBaseName(destination) + ".bridge";
}

protected String errorsBaseName(ProducerDestination destination) {
return destination.getName() + ".errors";
}

private final class ReceivingHandler extends AbstractReplyProducingMessageHandler {

private final boolean extractEmbeddedHeaders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class ProducerProperties {

private boolean useNativeEncoding = false;

private boolean errorChannelEnabled = false;

public Expression getPartitionKeyExpression() {
return partitionKeyExpression;
}
Expand Down Expand Up @@ -131,4 +133,12 @@ public void setUseNativeEncoding(boolean useNativeEncoding) {
this.useNativeEncoding = useNativeEncoding;
}

public boolean isErrorChannelEnabled() {
return this.errorChannelEnabled;
}

public void setErrorChannelEnabled(boolean errorChannelEnabled) {
this.errorChannelEnabled = errorChannelEnabled;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Set;

import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand All @@ -45,6 +44,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -94,17 +94,22 @@ public void testEndpointLifecycle() throws Exception {
Mockito.verify((DisposableBean) messageProducer).destroy();
Mockito.verifyNoMoreInteractions(messageProducer);

Binding<MessageChannel> producerBinding = binder.bindProducer("bar", new DirectChannel(),
new ProducerProperties());
ProducerProperties producerProps = new ProducerProperties();
producerProps.setErrorChannelEnabled(true);
Binding<MessageChannel> producerBinding = binder.bindProducer("bar", new DirectChannel(), producerProps);
DirectFieldAccessor producerBindingAccessor = new DirectFieldAccessor(producerBinding);
Object messageHandler = producerBindingAccessor.getPropertyValue("lifecycle");
Mockito.verify((Lifecycle) messageHandler).start();
Mockito.verify((InitializingBean) messageHandler).afterPropertiesSet();
Mockito.verifyNoMoreInteractions(messageHandler);
assertThat(context.containsBean("bar.errors")).isTrue();
assertThat(context.containsBean("bar.errors.bridge")).isTrue();
producerBinding.unbind();
Mockito.verify((Lifecycle) messageHandler).stop();
Mockito.verify((DisposableBean) messageHandler).destroy();
Mockito.verifyNoMoreInteractions(messageHandler);
assertThat(context.containsBean("bar.errors")).isFalse();
assertThat(context.containsBean("bar.errors.bridge")).isFalse();
}

@Test
Expand Down Expand Up @@ -165,12 +170,21 @@ public SimpleConsumerDestination answer(final InvocationOnMock invocation) throw
}

}).given(this.provisioningProvider).provisionConsumerDestination(anyString(), anyString(),
Matchers.any(ConsumerProperties.class));
any(ConsumerProperties.class));
willAnswer(new Answer<SimpleProducerDestination>() {

@Override
public SimpleProducerDestination answer(final InvocationOnMock invocation) throws Throwable {
return new SimpleProducerDestination(invocation.getArgumentAt(0, String.class));
}

}).given(this.provisioningProvider).provisionProducerDestination(anyString(),
any(ProducerProperties.class));
}

@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ProducerProperties producerProperties) throws Exception {
ProducerProperties producerProperties, MessageChannel errorChannel) throws Exception {
MessageHandler mock = Mockito.mock(MessageHandler.class, Mockito.withSettings()
.extraInterfaces(Lifecycle.class, InitializingBean.class, DisposableBean.class));
return mock;
Expand Down Expand Up @@ -214,4 +228,25 @@ public String getName() {

}

private static class SimpleProducerDestination implements ProducerDestination {

private final String name;


SimpleProducerDestination(String name) {
this.name = name;
}

@Override
public String getName() {
return this.name;
}

@Override
public String getNameForPartition(int partition) {
return getName() + partition;
}

}

}