From 9e7b6f1645b9aa421a7900d81b7943fdb3d3b265 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 15 Aug 2017 17:11:40 -0400 Subject: [PATCH 1/5] GH-916: MC Binder Producer Error Infrastructure Initial Commit for GH-916. - register a pubsub error channel. - register and subscribe a bridge handler to bridge it to the global error channel. - pass the error channel to the implementation so it can wire it into the outbound endpoint. - destroy the infrastructure when unbinding. --- .../binder/AbstractMessageChannelBinder.java | 80 ++++++++++++++++++- .../stream/binder/ProducerProperties.java | 10 +++ .../AbstractMessageChannelBinderTests.java | 2 +- 3 files changed, 88 insertions(+), 4 deletions(-) diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java index 22600d2030..f264c52491 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java @@ -29,6 +29,7 @@ import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.channel.FixedSubscriberChannel; +import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.endpoint.EventDrivenConsumer; @@ -117,7 +118,9 @@ public final Binding doBindProducer(final String destination, Me try { producerDestination = this.provisioningProvider.provisionProducerDestination(destination, producerProperties); - producerMessageHandler = createProducerMessageHandler(producerDestination, producerProperties); + SubscribableChannel errorChannel = registerErrorInfrastructure(producerDestination); + producerMessageHandler = createProducerMessageHandler(producerDestination, producerProperties, + errorChannel); if (producerMessageHandler instanceof InitializingBean) { ((InitializingBean) producerMessageHandler).afterPropertiesSet(); } @@ -147,6 +150,7 @@ else if (e instanceof ProvisioningException) { @Override public void afterUnbind() { try { + destroyErrorInfrastructure(producerDestination); if (producerMessageHandler instanceof DisposableBean) { ((DisposableBean) producerMessageHandler).destroy(); } @@ -175,11 +179,14 @@ public void afterUnbind() { * * @param destination the name of the target destination * @param producerProperties the producer properties + * @param errorChannel the error channel (if enabled, otherwise null). If not null, + * the binder must wire this channel into the producer endpoint so that errors + * are forwarded to it. * @return the message handler for sending data to the target middleware * @throws Exception */ protected abstract MessageHandler createProducerMessageHandler(ProducerDestination destination, - P producerProperties) + P producerProperties, MessageChannel errorChannel) throws Exception; /** @@ -294,7 +301,47 @@ protected void afterUnbindConsumer(ConsumerDestination destination, String group } /** - * Build an errorChannelRecoverer that writes to a pub/sub channel for the destination. + * Build an errorChannelRecoverer that writes to a pub/sub channel for the destination + * when an async send error is received. + * @param destination the destination. + * @return the channel. + */ + private SubscribableChannel registerErrorInfrastructure(ProducerDestination destination) { + ConfigurableListableBeanFactory beanFactory = getApplicationContext().getBeanFactory(); + String errorChannelName = errorsBaseName(destination); + SubscribableChannel errorChannel = null; + if (getApplicationContext().containsBean(errorChannelName)) { + Object errorChannelObject = getApplicationContext().getBean(errorChannelName); + if (!(errorChannelObject instanceof SubscribableChannel)) { + throw new IllegalStateException( + "Error channel '" + errorChannelName + "' must be a SubscribableChannel"); + } + errorChannel = (SubscribableChannel) errorChannelObject; + } + else { + errorChannel = new PublishSubscribeChannel(); + beanFactory.registerSingleton(errorChannelName, errorChannel); + errorChannel = (PublishSubscribeChannel) beanFactory.initializeBean(errorChannel, errorChannelName); + } + MessageChannel defaultErrorChannel = null; + if (getApplicationContext().containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) { + defaultErrorChannel = getApplicationContext().getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, + MessageChannel.class); + } + if (defaultErrorChannel != null) { + BridgeHandler errorBridge = new BridgeHandler(); + errorBridge.setOutputChannel(defaultErrorChannel); + errorChannel.subscribe(errorBridge); + String errorBridgeHandlerName = getErrorBridgeName(destination); + beanFactory.registerSingleton(errorBridgeHandlerName, errorBridge); + beanFactory.initializeBean(errorBridge, errorBridgeHandlerName); + } + return errorChannel; + } + + /** + * Build an errorChannelRecoverer that writes to a pub/sub channel for the destination + * when an exception is thrown to a consumer. * @param destination the destination. * @param group the group. * @param consumerProperties the properties. @@ -356,6 +403,25 @@ protected final ErrorInfrastructure registerErrorInfrastructure(ConsumerDestinat return new ErrorInfrastructure(errorChannel, recoverer, handler); } + private void destroyErrorInfrastructure(ProducerDestination destination) { + String errorChannelName = errorsBaseName(destination); + String errorBridgeHandlerName = getErrorBridgeName(destination); + MessageHandler bridgeHandler = null; + if (getApplicationContext().containsBean(errorBridgeHandlerName)) { + bridgeHandler = getApplicationContext().getBean(errorBridgeHandlerName, MessageHandler.class); + } + if (getApplicationContext().containsBean(errorChannelName)) { + SubscribableChannel channel = getApplicationContext().getBean(errorChannelName, SubscribableChannel.class); + if (bridgeHandler != null) { + channel.unsubscribe(bridgeHandler); + ((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory()) + .destroySingleton(errorBridgeHandlerName); + } + ((DefaultSingletonBeanRegistry) getApplicationContext().getBeanFactory()) + .destroySingleton(errorChannelName); + } + } + private void destroyErrorInfrastructure(ConsumerDestination destination, String group, C properties) { try { String recoverer = getErrorRecovererName(destination, group, properties); @@ -450,6 +516,14 @@ protected String errorsBaseName(ConsumerDestination destination, String group, return destination.getName() + "." + group + ".errors"; } + protected String getErrorBridgeName(ProducerDestination destination) { + return errorsBaseName(destination) + ".bridge"; + } + + protected String errorsBaseName(ProducerDestination destination) { + return destination.getName() + ".errors"; + } + private final class ReceivingHandler extends AbstractReplyProducingMessageHandler { private final boolean extractEmbeddedHeaders; diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ProducerProperties.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ProducerProperties.java index 5b4f5fd38e..1ce7eb6167 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ProducerProperties.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ProducerProperties.java @@ -52,6 +52,8 @@ public class ProducerProperties { private boolean useNativeEncoding = false; + private boolean errorChannelEnabled = false; + public Expression getPartitionKeyExpression() { return partitionKeyExpression; } @@ -131,4 +133,12 @@ public void setUseNativeEncoding(boolean useNativeEncoding) { this.useNativeEncoding = useNativeEncoding; } + public boolean isErrorChannelEnabled() { + return this.errorChannelEnabled; + } + + public void setErrorChannelEnabled(boolean errorChannelEnabled) { + this.errorChannelEnabled = errorChannelEnabled; + } + } diff --git a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinderTests.java b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinderTests.java index a71f28da23..7723250812 100644 --- a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinderTests.java +++ b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinderTests.java @@ -170,7 +170,7 @@ public SimpleConsumerDestination answer(final InvocationOnMock invocation) throw @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, - ProducerProperties producerProperties) throws Exception { + ProducerProperties producerProperties, MessageChannel errorChannel) throws Exception { MessageHandler mock = Mockito.mock(MessageHandler.class, Mockito.withSettings() .extraInterfaces(Lifecycle.class, InitializingBean.class, DisposableBean.class)); return mock; From 64c1945abe85e396d7fb1a4d138277219f72e630 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 15 Aug 2017 17:25:55 -0400 Subject: [PATCH 2/5] Javadoc Polishing --- .../cloud/stream/binder/AbstractMessageChannelBinder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java index f264c52491..67f1660c02 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java @@ -301,8 +301,8 @@ protected void afterUnbindConsumer(ConsumerDestination destination, String group } /** - * Build an errorChannelRecoverer that writes to a pub/sub channel for the destination - * when an async send error is received. + * Register an error channel for the destination when an async send error is received. + * Bridge the channel to the global error channel (if present). * @param destination the destination. * @return the channel. */ From 6050359c4b27c6eed680a175f5ff2e365d87c3ce Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 16 Aug 2017 09:27:52 -0400 Subject: [PATCH 3/5] Add test case. --- .../binder/AbstractMessageChannelBinder.java | 3 +- .../AbstractMessageChannelBinderTests.java | 43 +++++++++++++++++-- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java index 67f1660c02..6100a21361 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java @@ -118,7 +118,8 @@ public final Binding doBindProducer(final String destination, Me try { producerDestination = this.provisioningProvider.provisionProducerDestination(destination, producerProperties); - SubscribableChannel errorChannel = registerErrorInfrastructure(producerDestination); + SubscribableChannel errorChannel = producerProperties.isErrorChannelEnabled() + ? registerErrorInfrastructure(producerDestination) : null; producerMessageHandler = createProducerMessageHandler(producerDestination, producerProperties, errorChannel); if (producerMessageHandler instanceof InitializingBean) { diff --git a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinderTests.java b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinderTests.java index 7723250812..16f956fcaf 100644 --- a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinderTests.java +++ b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinderTests.java @@ -20,7 +20,6 @@ import java.util.Set; import org.junit.Test; -import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -45,6 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; @@ -94,17 +94,22 @@ public void testEndpointLifecycle() throws Exception { Mockito.verify((DisposableBean) messageProducer).destroy(); Mockito.verifyNoMoreInteractions(messageProducer); - Binding producerBinding = binder.bindProducer("bar", new DirectChannel(), - new ProducerProperties()); + ProducerProperties producerProps = new ProducerProperties(); + producerProps.setErrorChannelEnabled(true); + Binding producerBinding = binder.bindProducer("bar", new DirectChannel(), producerProps); DirectFieldAccessor producerBindingAccessor = new DirectFieldAccessor(producerBinding); Object messageHandler = producerBindingAccessor.getPropertyValue("lifecycle"); Mockito.verify((Lifecycle) messageHandler).start(); Mockito.verify((InitializingBean) messageHandler).afterPropertiesSet(); Mockito.verifyNoMoreInteractions(messageHandler); + assertThat(context.containsBean("bar.errors")).isTrue(); + assertThat(context.containsBean("bar.errors.bridge")).isTrue(); producerBinding.unbind(); Mockito.verify((Lifecycle) messageHandler).stop(); Mockito.verify((DisposableBean) messageHandler).destroy(); Mockito.verifyNoMoreInteractions(messageHandler); + assertThat(context.containsBean("bar.errors")).isFalse(); + assertThat(context.containsBean("bar.errors.bridge")).isFalse(); } @Test @@ -165,7 +170,16 @@ public SimpleConsumerDestination answer(final InvocationOnMock invocation) throw } }).given(this.provisioningProvider).provisionConsumerDestination(anyString(), anyString(), - Matchers.any(ConsumerProperties.class)); + any(ConsumerProperties.class)); + willAnswer(new Answer() { + + @Override + public SimpleProducerDestination answer(final InvocationOnMock invocation) throws Throwable { + return new SimpleProducerDestination(invocation.getArgumentAt(0, String.class)); + } + + }).given(this.provisioningProvider).provisionProducerDestination(anyString(), + any(ProducerProperties.class)); } @Override @@ -214,4 +228,25 @@ public String getName() { } + private static class SimpleProducerDestination implements ProducerDestination { + + private final String name; + + + SimpleProducerDestination(String name) { + this.name = name; + } + + @Override + public String getName() { + return this.name; + } + + @Override + public String getNameForPartition(int partition) { + return getName() + partition; + } + + } + } From 7adbe64dcbebeb7e125d936aee8d1bb35c826c14 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 17 Aug 2017 09:53:18 -0400 Subject: [PATCH 4/5] temp update to SI 4.3.12 --- spring-cloud-stream/pom.xml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spring-cloud-stream/pom.xml b/spring-cloud-stream/pom.xml index efb74c64bf..135797e363 100644 --- a/spring-cloud-stream/pom.xml +++ b/spring-cloud-stream/pom.xml @@ -33,10 +33,12 @@ org.springframework.integration spring-integration-core + 4.3.12.BUILD-SNAPSHOT org.springframework.integration spring-integration-jmx + 4.3.12.BUILD-SNAPSHOT org.springframework From 00a67fbaade63719bec8da4beb5a5bbeb2f6cbd5 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 17 Aug 2017 10:35:48 -0400 Subject: [PATCH 5/5] GH-802 - Error Handling Documentation Resolves #802 --- .../spring-cloud-stream-overview.adoc | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc index 7ec249f00a..a13a854d42 100644 --- a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc +++ b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc @@ -432,7 +432,23 @@ Spring Cloud Stream supports publishing error messages received by the Spring In error channel. Error messages sent to the `errorChannel` can be published to a specific destination at the broker by configuring a binding for the outbound target named `error`. For example, to publish error messages to a broker destination named "myErrors", provide the following property: -`spring.cloud.stream.bindings.error.destination=myErrors` +`spring.cloud.stream.bindings.error.destination=myErrors`. + +[[binder-error-channels]] +===== Message Channel Binders and Error Channels + +Starting with _version 1.3_, some `MessageChannel` - based binders publish errors to a discrete error channel for each destination. +In addition, these error channels are bridged to the global Spring Integration `errorChannel` mentioned above. +You can therefore consume errors for specific destinations and/or for all destinations, using a standard Spring Integration flow (`IntegrationFlow`, `@ServiceActivator`, etc). + +On the consumer side, the listener thread catches any exceptions and forwards an `ErrorMessage` to the destination's error channel. +The payload of the message is a `MessagingException` with the normal `failedMessage` and `cause` properties. +Usually, the raw data received from the broker is included in a header. +For binders that support (and are configured with) a dead letter destination; a `MessagePublishingErrorHandler` is subscribed to the channel, and the raw data is forwarded to the dead letter destination. + +On the producer side; for binders that support some kind of async result after publishing messages (e.g. RabbitMQ, Kafka), you can enable an error channel by setting the `...producer.errorChannelEnabled` to `true`. +The payload of the `ErrorMessage` depends on the binder implementation but will be a `MessagingException` with the normal `failedMessage` property, as well as additional properties about the failure. +Refer to the binder documentation for complete details. ===== Using @StreamListener for Automatic Content Type Handling @@ -1231,6 +1247,11 @@ When native encoding is used, it is the responsibility of the consumer to use ap Also, when native encoding/decoding is used the `headerMode` property is ignored and headers will not be embedded into the message. + Default: `false`. +errorChannelEnabled:: + When set to `true`, if the binder supports async send results; send failures will be sent to an error channel for the destination. + See <> for more information. ++ +Default: `false`. [[dynamicdestination]] === Using dynamically bound destinations