diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java index 0508bb13ffa..cde44307ce8 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java @@ -43,6 +43,7 @@ import org.springframework.integration.channel.NullChannel; import org.springframework.integration.expression.ExpressionUtils; import org.springframework.integration.handler.AbstractMessageProducingHandler; +import org.springframework.integration.handler.DiscardingMessageHandler; import org.springframework.integration.store.MessageGroup; import org.springframework.integration.store.MessageGroupStore; import org.springframework.integration.store.MessageGroupStore.MessageGroupCallback; @@ -85,7 +86,7 @@ * @since 2.0 */ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageProducingHandler - implements DisposableBean, ApplicationEventPublisherAware { + implements DiscardingMessageHandler, DisposableBean, ApplicationEventPublisherAware { private static final Log logger = LogFactory.getLog(AbstractCorrelatingMessageHandler.class); @@ -329,7 +330,16 @@ protected ReleaseStrategy getReleaseStrategy() { return this.releaseStrategy; } - protected MessageChannel getDiscardChannel() { + @Override + public MessageChannel getDiscardChannel() { + if (this.discardChannelName != null) { + synchronized (this) { + if (this.discardChannelName != null) { + this.discardChannel = getChannelResolver().resolveDestination(this.discardChannelName); + this.discardChannelName = null; + } + } + } return this.discardChannel; } @@ -476,15 +486,8 @@ private void processForceRelease(Object groupId) { } private void discardMessage(Message message) { - if (this.discardChannelName != null) { - synchronized (this) { - if (this.discardChannelName != null) { - this.discardChannel = getChannelResolver().resolveDestination(this.discardChannelName); - this.discardChannelName = null; - } - } - } - this.messagingTemplate.send(this.discardChannel, message); + MessageChannel discardChannel = getDiscardChannel(); + this.messagingTemplate.send(discardChannel, message); } /** diff --git a/spring-integration-core/src/main/java/org/springframework/integration/filter/MessageFilter.java b/spring-integration-core/src/main/java/org/springframework/integration/filter/MessageFilter.java index d2d2940ebe1..dfadaca8b80 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/filter/MessageFilter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/filter/MessageFilter.java @@ -21,6 +21,7 @@ import org.springframework.integration.MessageRejectedException; import org.springframework.integration.core.MessageSelector; import org.springframework.integration.handler.AbstractReplyProducingPostProcessingMessageHandler; +import org.springframework.integration.handler.DiscardingMessageHandler; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; @@ -40,7 +41,8 @@ * @author Artem Bilan * @author David Liu */ -public class MessageFilter extends AbstractReplyProducingPostProcessingMessageHandler implements Lifecycle { +public class MessageFilter extends AbstractReplyProducingPostProcessingMessageHandler + implements DiscardingMessageHandler, Lifecycle { private final MessageSelector selector; @@ -104,6 +106,20 @@ public void setDiscardWithinAdvice(boolean discardWithinAdvice) { this.setPostProcessWithinAdvice(discardWithinAdvice); } + @Override + public MessageChannel getDiscardChannel() { + if (this.discardChannelName != null) { + synchronized (this) { + if (this.discardChannelName != null) { + this.discardChannel = getChannelResolver().resolveDestination(this.discardChannelName); + this.discardChannelName = null; + } + } + } + return this.discardChannel; + } + + @Override public String getComponentType() { return "filter"; @@ -153,16 +169,9 @@ protected Object doHandleRequestMessage(Message message) { @Override public Object postProcess(Message message, Object result) { if (result == null) { - if (this.discardChannelName != null) { - synchronized (this) { - if (this.discardChannelName != null) { - this.discardChannel = getChannelResolver().resolveDestination(this.discardChannelName); - this.discardChannelName = null; - } - } - } - if (this.discardChannel != null) { - this.messagingTemplate.send(this.discardChannel, message); + MessageChannel discardChannel = getDiscardChannel(); + if (discardChannel != null) { + this.messagingTemplate.send(discardChannel, message); } if (this.throwExceptionOnRejection) { throw new MessageRejectedException(message, "MessageFilter '" + this.getComponentName() diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/CompositeMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/CompositeMessageHandler.java new file mode 100644 index 00000000000..1a971a47d45 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/CompositeMessageHandler.java @@ -0,0 +1,38 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.handler; + +import java.util.List; + +import org.springframework.messaging.MessageHandler; + +/** + * Classes implementing this interface delegate to a list of handlers. + * + * @author Gary Russell + * @since 4.3 + * + */ +public interface CompositeMessageHandler extends MessageHandler { + + /** + * Return an unmodifiable list of handlers. + * @return the handlers. + */ + List getHandlers(); + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/DiscardingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/DiscardingMessageHandler.java new file mode 100644 index 00000000000..f0bf0ccba05 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/DiscardingMessageHandler.java @@ -0,0 +1,37 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.handler; + +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; + +/** + * Classes implementing this interface are capable of discarding messages. + * + * @author Gary Russell + * @since 4.3 + * + */ +public interface DiscardingMessageHandler extends MessageHandler { + + /** + * Return the discard channel. + * @return the channel. + */ + MessageChannel getDiscardChannel(); + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/MessageHandlerChain.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/MessageHandlerChain.java index f08ebc2a8bf..e8b1dfa701e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/MessageHandlerChain.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/MessageHandlerChain.java @@ -16,6 +16,7 @@ package org.springframework.integration.handler; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.concurrent.locks.ReentrantLock; @@ -62,7 +63,8 @@ * @author Gary Russell * @author Artem Bilan */ -public class MessageHandlerChain extends AbstractMessageProducingHandler implements MessageProducer, Lifecycle { +public class MessageHandlerChain extends AbstractMessageProducingHandler implements MessageProducer, + CompositeMessageHandler, Lifecycle { private volatile List handlers; @@ -78,6 +80,11 @@ public void setHandlers(List handlers) { this.handlers = handlers; } + @Override + public List getHandlers() { + return Collections.unmodifiableList(this.handlers); + } + @Override public String getComponentType() { return "chain"; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/CompositeMessageHandlerNode.java b/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/CompositeMessageHandlerNode.java new file mode 100644 index 00000000000..1ff334aacfd --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/CompositeMessageHandlerNode.java @@ -0,0 +1,66 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.management.graph; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.messaging.MessageHandler; + +/** + * Represents a composite message handler. + * + * @author Gary Russell + * @since 4.3 + * + */ +public class CompositeMessageHandlerNode extends MessageHandlerNode { + + private final List handlers = new ArrayList(); + + public CompositeMessageHandlerNode(int nodeId, String name, MessageHandler handler, String input, String output, + List handlers) { + super(nodeId, name, handler, input, output); + this.handlers.addAll(handlers); + } + + public List getHandlers() { + return this.handlers; + } + + public static class InnerHandler { + + private final String name; + + private final String type; + + public InnerHandler(String name, String type) { + this.name = name; + this.type = type; + } + + public String getName() { + return this.name; + } + + public String getType() { + return this.type; + } + + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/DiscardingMessageHandlerNode.java b/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/DiscardingMessageHandlerNode.java new file mode 100644 index 00000000000..da57ee30922 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/DiscardingMessageHandlerNode.java @@ -0,0 +1,42 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.management.graph; + +import org.springframework.messaging.MessageHandler; + +/** + * Represents an endpoint that has a discard channel. + * + * @author Gary Russell + * @since 4.3 + * + */ +public class DiscardingMessageHandlerNode extends MessageHandlerNode { + + private final String discards; + + public DiscardingMessageHandlerNode(int nodeId, String name, MessageHandler handler, String input, String output, + String discards) { + super(nodeId, name, handler, input, output); + this.discards = discards; + } + + public String getDiscards() { + return this.discards; + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/ErrorCapableCompositeMessageHandlerNode.java b/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/ErrorCapableCompositeMessageHandlerNode.java new file mode 100644 index 00000000000..db9fd307f22 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/ErrorCapableCompositeMessageHandlerNode.java @@ -0,0 +1,46 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.management.graph; + +import java.util.List; + +import org.springframework.integration.handler.CompositeMessageHandler; + +/** + * Represents a composite message handler that can emit error messages + * (pollable endpoint). + * + * @author Gary Russell + * @since 4.3 + * + */ +public class ErrorCapableCompositeMessageHandlerNode extends CompositeMessageHandlerNode implements ErrorCapableNode { + + private final String errors; + + public ErrorCapableCompositeMessageHandlerNode(int nodeId, String name, CompositeMessageHandler handler, String input, + String output, String errors, List handlers) { + super(nodeId, name, handler, input, output, handlers); + this.errors = errors; + } + + @Override + public String getErrors() { + return this.errors; + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/ErrorCapableDiscardingMessageHandlerNode.java b/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/ErrorCapableDiscardingMessageHandlerNode.java new file mode 100644 index 00000000000..705c924311c --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/ErrorCapableDiscardingMessageHandlerNode.java @@ -0,0 +1,44 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.management.graph; + +import org.springframework.messaging.MessageHandler; + +/** + * Represents an endpoint that has a discard channel and can emit errors + * (pollable endpoint). + * + * @author Gary Russell + * @since 4.3 + * + */ +public class ErrorCapableDiscardingMessageHandlerNode extends DiscardingMessageHandlerNode implements ErrorCapableNode { + + private final String errors; + + public ErrorCapableDiscardingMessageHandlerNode(int nodeId, String name, MessageHandler handler, String input, + String output, String discards, String errors) { + super(nodeId, name, handler, input, output, discards); + this.errors = errors; + } + + @Override + public String getErrors() { + return this.errors; + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/IntegrationGraphServer.java b/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/IntegrationGraphServer.java index 45d007c2f58..69d1bab6693 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/IntegrationGraphServer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/management/graph/IntegrationGraphServer.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; @@ -33,7 +34,11 @@ import org.springframework.integration.endpoint.PollingConsumer; import org.springframework.integration.endpoint.SourcePollingChannelAdapter; import org.springframework.integration.gateway.MessagingGatewaySupport; +import org.springframework.integration.handler.CompositeMessageHandler; +import org.springframework.integration.handler.DiscardingMessageHandler; +import org.springframework.integration.support.context.NamedComponent; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; /** * Builds the runtime object model graph. @@ -203,6 +208,12 @@ private void producerLink(Collection links, Map handlers = handler.getHandlers(); + List innerHandlers = + new ArrayList(); + for (MessageHandler innerHandler : handlers) { + if (innerHandler instanceof NamedComponent) { + NamedComponent named = (NamedComponent) innerHandler; + innerHandlers.add(new CompositeMessageHandlerNode.InnerHandler(named.getComponentName(), + named.getComponentType())); + } + } + return polled + ? new ErrorCapableCompositeMessageHandlerNode(this.nodeId.incrementAndGet(), name, handler, + consumer.getInputChannel().toString(), output, errors, innerHandlers) + : new CompositeMessageHandlerNode(this.nodeId.incrementAndGet(), name, handler, + consumer.getInputChannel().toString(), output, innerHandlers); + } + + private MessageHandlerNode discardingHandler(String name, IntegrationConsumer consumer, + DiscardingMessageHandler handler, String output, String errors, boolean polled) { + return polled + ? new ErrorCapableDiscardingMessageHandlerNode(this.nodeId.incrementAndGet(), name, handler, + consumer.getInputChannel().toString(), output, handler.getDiscardChannel().toString(), errors) + : new DiscardingMessageHandlerNode(this.nodeId.incrementAndGet(), name, handler, + consumer.getInputChannel().toString(), output, handler.getDiscardChannel().toString()); } private void reset() { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/support/management/graph/IntegrationGraphServerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/support/management/graph/IntegrationGraphServerTests.java index fd7071bd880..48a94f38986 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/support/management/graph/IntegrationGraphServerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/support/management/graph/IntegrationGraphServerTests.java @@ -31,6 +31,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.ImportResource; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.MessagePublishingErrorHandler; @@ -48,6 +49,7 @@ import org.springframework.messaging.PollableChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.scheduling.support.PeriodicTrigger; +import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.fasterxml.jackson.databind.ObjectMapper; @@ -59,6 +61,7 @@ * */ @RunWith(SpringJUnit4ClassRunner.class) +@DirtiesContext public class IntegrationGraphServerTests { @Autowired @@ -77,16 +80,17 @@ public void test() throws Exception { @SuppressWarnings("unchecked") List> nodes = (List>) map.get("nodes"); assertThat(nodes, is(notNullValue())); - assertThat(nodes.size(), is(equalTo(13))); + assertThat(nodes.size(), is(equalTo(19))); @SuppressWarnings("unchecked") List> links = (List>) map.get("links"); assertThat(links, is(notNullValue())); - assertThat(links.size(), is(equalTo(9))); + assertThat(links.size(), is(equalTo(17))); } @Configuration @EnableIntegration @EnableIntegrationManagement + @ImportResource("org/springframework/integration/support/management/graph/integration-graph-context.xml") public static class Config { @Bean @@ -129,7 +133,7 @@ public PollingConsumer polling() { } @Bean - public PollableChannel two() { + public PollableChannel polledChannel() { return new QueueChannel(); } @@ -162,12 +166,12 @@ public PollerMetadata defaultPoller() { public static class Services { - @ServiceActivator(inputChannel = "one", outputChannel = "two") + @ServiceActivator(inputChannel = "one", outputChannel = "polledChannel") public String foo(String foo) { return foo.toUpperCase(); } - @ServiceActivator(inputChannel = "two") + @ServiceActivator(inputChannel = "polledChannel") public void bar(String foo) { } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/support/management/graph/integration-graph-context.xml b/spring-integration-core/src/test/java/org/springframework/integration/support/management/graph/integration-graph-context.xml new file mode 100644 index 00000000000..24014aef706 --- /dev/null +++ b/spring-integration-core/src/test/java/org/springframework/integration/support/management/graph/integration-graph-context.xml @@ -0,0 +1,22 @@ + + + + + + + + + + + + + + + + + +