diff --git a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc index 7ec249f00a..a13a854d42 100644 --- a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc +++ b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc @@ -432,7 +432,23 @@ Spring Cloud Stream supports publishing error messages received by the Spring In error channel. Error messages sent to the `errorChannel` can be published to a specific destination at the broker by configuring a binding for the outbound target named `error`. For example, to publish error messages to a broker destination named "myErrors", provide the following property: -`spring.cloud.stream.bindings.error.destination=myErrors` +`spring.cloud.stream.bindings.error.destination=myErrors`. + +[[binder-error-channels]] +===== Message Channel Binders and Error Channels + +Starting with _version 1.3_, some `MessageChannel` - based binders publish errors to a discrete error channel for each destination. +In addition, these error channels are bridged to the global Spring Integration `errorChannel` mentioned above. +You can therefore consume errors for specific destinations and/or for all destinations, using a standard Spring Integration flow (`IntegrationFlow`, `@ServiceActivator`, etc). + +On the consumer side, the listener thread catches any exceptions and forwards an `ErrorMessage` to the destination's error channel. +The payload of the message is a `MessagingException` with the normal `failedMessage` and `cause` properties. +Usually, the raw data received from the broker is included in a header. +For binders that support (and are configured with) a dead letter destination; a `MessagePublishingErrorHandler` is subscribed to the channel, and the raw data is forwarded to the dead letter destination. + +On the producer side; for binders that support some kind of async result after publishing messages (e.g. RabbitMQ, Kafka), you can enable an error channel by setting the `...producer.errorChannelEnabled` to `true`. +The payload of the `ErrorMessage` depends on the binder implementation but will be a `MessagingException` with the normal `failedMessage` property, as well as additional properties about the failure. +Refer to the binder documentation for complete details. ===== Using @StreamListener for Automatic Content Type Handling @@ -1231,6 +1247,11 @@ When native encoding is used, it is the responsibility of the consumer to use ap Also, when native encoding/decoding is used the `headerMode` property is ignored and headers will not be embedded into the message. + Default: `false`. +errorChannelEnabled:: + When set to `true`, if the binder supports async send results; send failures will be sent to an error channel for the destination. + See <> for more information. ++ +Default: `false`. [[dynamicdestination]] === Using dynamically bound destinations diff --git a/spring-cloud-stream/pom.xml b/spring-cloud-stream/pom.xml index efb74c64bf..135797e363 100644 --- a/spring-cloud-stream/pom.xml +++ b/spring-cloud-stream/pom.xml @@ -33,10 +33,12 @@ org.springframework.integration spring-integration-core + 4.3.12.BUILD-SNAPSHOT org.springframework.integration spring-integration-jmx + 4.3.12.BUILD-SNAPSHOT org.springframework diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java index 22600d2030..6100a21361 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java @@ -29,6 +29,7 @@ import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.channel.FixedSubscriberChannel; +import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.endpoint.EventDrivenConsumer; @@ -117,7 +118,10 @@ public final Binding doBindProducer(final String destination, Me try { producerDestination = this.provisioningProvider.provisionProducerDestination(destination, producerProperties); - producerMessageHandler = createProducerMessageHandler(producerDestination, producerProperties); + SubscribableChannel errorChannel = producerProperties.isErrorChannelEnabled() + ? registerErrorInfrastructure(producerDestination) : null; + producerMessageHandler = createProducerMessageHandler(producerDestination, producerProperties, + errorChannel); if (producerMessageHandler instanceof InitializingBean) { ((InitializingBean) producerMessageHandler).afterPropertiesSet(); } @@ -147,6 +151,7 @@ else if (e instanceof ProvisioningException) { @Override public void afterUnbind() { try { + destroyErrorInfrastructure(producerDestination); if (producerMessageHandler instanceof DisposableBean) { ((DisposableBean) producerMessageHandler).destroy(); } @@ -175,11 +180,14 @@ public void afterUnbind() { * * @param destination the name of the target destination * @param producerProperties the producer properties + * @param errorChannel the error channel (if enabled, otherwise null). If not null, + * the binder must wire this channel into the producer endpoint so that errors + * are forwarded to it. * @return the message handler for sending data to the target middleware * @throws Exception */ protected abstract MessageHandler createProducerMessageHandler(ProducerDestination destination, - P producerProperties) + P producerProperties, MessageChannel errorChannel) throws Exception; /** @@ -294,7 +302,47 @@ protected void afterUnbindConsumer(ConsumerDestination destination, String group } /** - * Build an errorChannelRecoverer that writes to a pub/sub channel for the destination. + * Register an error channel for the destination when an async send error is received. + * Bridge the channel to the global error channel (if present). + * @param destination the destination. + * @return the channel. + */ + private SubscribableChannel registerErrorInfrastructure(ProducerDestination destination) { + ConfigurableListableBeanFactory beanFactory = getApplicationContext().getBeanFactory(); + String errorChannelName = errorsBaseName(destination); + SubscribableChannel errorChannel = null; + if (getApplicationContext().containsBean(errorChannelName)) { + Object errorChannelObject = getApplicationContext().getBean(errorChannelName); + if (!(errorChannelObject instanceof SubscribableChannel)) { + throw new IllegalStateException( + "Error channel '" + errorChannelName + "' must be a SubscribableChannel"); + } + errorChannel = (SubscribableChannel) errorChannelObject; + } + else { + errorChannel = new PublishSubscribeChannel(); + beanFactory.registerSingleton(errorChannelName, errorChannel); + errorChannel = (PublishSubscribeChannel) beanFactory.initializeBean(errorChannel, errorChannelName); + } + MessageChannel defaultErrorChannel = null; + if (getApplicationContext().containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) { + defaultErrorChannel = getApplicationContext().getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, + MessageChannel.class); + } + if (defaultErrorChannel != null) { + BridgeHandler errorBridge = new BridgeHandler(); + errorBridge.setOutputChannel(defaultErrorChannel); + errorChannel.subscribe(errorBridge); + String errorBridgeHandlerName = getErrorBridgeName(destination); + beanFactory.registerSingleton(errorBridgeHandlerName, errorBridge); + beanFactory.initializeBean(errorBridge, errorBridgeHandlerName); + } + return errorChannel; + } + + /** + * Build an errorChannelRecoverer that writes to a pub/sub channel for the destination + * when an exception is thrown to a consumer. * @param destination the destination. * @param group the group. * @param consumerProperties the properties. @@ -356,6 +404,25 @@ protected final ErrorInfrastructure registerErrorInfrastructure(ConsumerDestinat return new ErrorInfrastructure(errorChannel, recoverer, handler); } + private void destroyErrorInfrastructure(ProducerDestination destination) { + String errorChannelName = errorsBaseName(destination); + String errorBridgeHandlerName = getErrorBridgeName(destination); + MessageHandler bridgeHandler = null; + if (getApplicationContext().containsBean(errorBridgeHandlerName)) { + bridgeHandler = getApplicationContext().getBean(errorBridgeHandlerName, MessageHandler.class); + } + if (getApplicationContext().containsBean(errorChannelName)) { + SubscribableChannel channel = getApplicationContext().getBean(errorChannelName, SubscribableChannel.class); + if (bridgeHandler != null) { + channel.unsubscribe(bridgeHandler); + ((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory()) + .destroySingleton(errorBridgeHandlerName); + } + ((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory()) + .destroySingleton(errorChannelName); + } + } + private void destroyErrorInfrastructure(ConsumerDestination destination, String group, C properties) { try { String recoverer = getErrorRecovererName(destination, group, properties); @@ -450,6 +517,14 @@ protected String errorsBaseName(ConsumerDestination destination, String group, return destination.getName() + "." + group + ".errors"; } + protected String getErrorBridgeName(ProducerDestination destination) { + return errorsBaseName(destination) + ".bridge"; + } + + protected String errorsBaseName(ProducerDestination destination) { + return destination.getName() + ".errors"; + } + private final class ReceivingHandler extends AbstractReplyProducingMessageHandler { private final boolean extractEmbeddedHeaders; diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ProducerProperties.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ProducerProperties.java index 5b4f5fd38e..1ce7eb6167 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ProducerProperties.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ProducerProperties.java @@ -52,6 +52,8 @@ public class ProducerProperties { private boolean useNativeEncoding = false; + private boolean errorChannelEnabled = false; + public Expression getPartitionKeyExpression() { return partitionKeyExpression; } @@ -131,4 +133,12 @@ public void setUseNativeEncoding(boolean useNativeEncoding) { this.useNativeEncoding = useNativeEncoding; } + public boolean isErrorChannelEnabled() { + return this.errorChannelEnabled; + } + + public void setErrorChannelEnabled(boolean errorChannelEnabled) { + this.errorChannelEnabled = errorChannelEnabled; + } + } diff --git a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinderTests.java b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinderTests.java index a71f28da23..16f956fcaf 100644 --- a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinderTests.java +++ b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinderTests.java @@ -20,7 +20,6 @@ import java.util.Set; import org.junit.Test; -import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -45,6 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; @@ -94,17 +94,22 @@ public void testEndpointLifecycle() throws Exception { Mockito.verify((DisposableBean) messageProducer).destroy(); Mockito.verifyNoMoreInteractions(messageProducer); - Binding producerBinding = binder.bindProducer("bar", new DirectChannel(), - new ProducerProperties()); + ProducerProperties producerProps = new ProducerProperties(); + producerProps.setErrorChannelEnabled(true); + Binding producerBinding = binder.bindProducer("bar", new DirectChannel(), producerProps); DirectFieldAccessor producerBindingAccessor = new DirectFieldAccessor(producerBinding); Object messageHandler = producerBindingAccessor.getPropertyValue("lifecycle"); Mockito.verify((Lifecycle) messageHandler).start(); Mockito.verify((InitializingBean) messageHandler).afterPropertiesSet(); Mockito.verifyNoMoreInteractions(messageHandler); + assertThat(context.containsBean("bar.errors")).isTrue(); + assertThat(context.containsBean("bar.errors.bridge")).isTrue(); producerBinding.unbind(); Mockito.verify((Lifecycle) messageHandler).stop(); Mockito.verify((DisposableBean) messageHandler).destroy(); Mockito.verifyNoMoreInteractions(messageHandler); + assertThat(context.containsBean("bar.errors")).isFalse(); + assertThat(context.containsBean("bar.errors.bridge")).isFalse(); } @Test @@ -165,12 +170,21 @@ public SimpleConsumerDestination answer(final InvocationOnMock invocation) throw } }).given(this.provisioningProvider).provisionConsumerDestination(anyString(), anyString(), - Matchers.any(ConsumerProperties.class)); + any(ConsumerProperties.class)); + willAnswer(new Answer() { + + @Override + public SimpleProducerDestination answer(final InvocationOnMock invocation) throws Throwable { + return new SimpleProducerDestination(invocation.getArgumentAt(0, String.class)); + } + + }).given(this.provisioningProvider).provisionProducerDestination(anyString(), + any(ProducerProperties.class)); } @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, - ProducerProperties producerProperties) throws Exception { + ProducerProperties producerProperties, MessageChannel errorChannel) throws Exception { MessageHandler mock = Mockito.mock(MessageHandler.class, Mockito.withSettings() .extraInterfaces(Lifecycle.class, InitializingBean.class, DisposableBean.class)); return mock; @@ -214,4 +228,25 @@ public String getName() { } + private static class SimpleProducerDestination implements ProducerDestination { + + private final String name; + + + SimpleProducerDestination(String name) { + this.name = name; + } + + @Override + public String getName() { + return this.name; + } + + @Override + public String getNameForPartition(int partition) { + return getName() + partition; + } + + } + }