Skip to content

Commit

Permalink
INT-4009: Support Composite Handler in Graph
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/INT-4009

Adds:

    ...
    "handlers" : [ {
      "name" : "polledChain$child#0",
      "type" : "transformer"
    }, {
      "name" : "polledChain$child#1",
      "type" : "service-activator"
    } ],
    ...

to a chain node.

INT-4010: Add Discard Flows to Object Model

JIRA: https://jira.spring.io/browse/INT-4010
  • Loading branch information
garyrussell authored and artembilan committed Apr 26, 2016
1 parent 590398f commit 6a9a425
Show file tree
Hide file tree
Showing 12 changed files with 402 additions and 32 deletions.
Expand Up @@ -43,6 +43,7 @@
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.handler.DiscardingMessageHandler;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.MessageGroupStore.MessageGroupCallback;
Expand Down Expand Up @@ -85,7 +86,7 @@
* @since 2.0
*/
public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageProducingHandler
implements DisposableBean, ApplicationEventPublisherAware {
implements DiscardingMessageHandler, DisposableBean, ApplicationEventPublisherAware {

private static final Log logger = LogFactory.getLog(AbstractCorrelatingMessageHandler.class);

Expand Down Expand Up @@ -329,7 +330,16 @@ protected ReleaseStrategy getReleaseStrategy() {
return this.releaseStrategy;
}

protected MessageChannel getDiscardChannel() {
@Override
public MessageChannel getDiscardChannel() {
if (this.discardChannelName != null) {
synchronized (this) {
if (this.discardChannelName != null) {
this.discardChannel = getChannelResolver().resolveDestination(this.discardChannelName);
this.discardChannelName = null;
}
}
}
return this.discardChannel;
}

Expand Down Expand Up @@ -476,15 +486,8 @@ private void processForceRelease(Object groupId) {
}

private void discardMessage(Message<?> message) {
if (this.discardChannelName != null) {
synchronized (this) {
if (this.discardChannelName != null) {
this.discardChannel = getChannelResolver().resolveDestination(this.discardChannelName);
this.discardChannelName = null;
}
}
}
this.messagingTemplate.send(this.discardChannel, message);
MessageChannel discardChannel = getDiscardChannel();
this.messagingTemplate.send(discardChannel, message);
}

/**
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.springframework.integration.MessageRejectedException;
import org.springframework.integration.core.MessageSelector;
import org.springframework.integration.handler.AbstractReplyProducingPostProcessingMessageHandler;
import org.springframework.integration.handler.DiscardingMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
Expand All @@ -40,7 +41,8 @@
* @author Artem Bilan
* @author David Liu
*/
public class MessageFilter extends AbstractReplyProducingPostProcessingMessageHandler implements Lifecycle {
public class MessageFilter extends AbstractReplyProducingPostProcessingMessageHandler
implements DiscardingMessageHandler, Lifecycle {

private final MessageSelector selector;

Expand Down Expand Up @@ -104,6 +106,20 @@ public void setDiscardWithinAdvice(boolean discardWithinAdvice) {
this.setPostProcessWithinAdvice(discardWithinAdvice);
}

@Override
public MessageChannel getDiscardChannel() {
if (this.discardChannelName != null) {
synchronized (this) {
if (this.discardChannelName != null) {
this.discardChannel = getChannelResolver().resolveDestination(this.discardChannelName);
this.discardChannelName = null;
}
}
}
return this.discardChannel;
}


@Override
public String getComponentType() {
return "filter";
Expand Down Expand Up @@ -153,16 +169,9 @@ protected Object doHandleRequestMessage(Message<?> message) {
@Override
public Object postProcess(Message<?> message, Object result) {
if (result == null) {
if (this.discardChannelName != null) {
synchronized (this) {
if (this.discardChannelName != null) {
this.discardChannel = getChannelResolver().resolveDestination(this.discardChannelName);
this.discardChannelName = null;
}
}
}
if (this.discardChannel != null) {
this.messagingTemplate.send(this.discardChannel, message);
MessageChannel discardChannel = getDiscardChannel();
if (discardChannel != null) {
this.messagingTemplate.send(discardChannel, message);
}
if (this.throwExceptionOnRejection) {
throw new MessageRejectedException(message, "MessageFilter '" + this.getComponentName()
Expand Down
@@ -0,0 +1,38 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.handler;

import java.util.List;

import org.springframework.messaging.MessageHandler;

/**
* Classes implementing this interface delegate to a list of handlers.
*
* @author Gary Russell
* @since 4.3
*
*/
public interface CompositeMessageHandler extends MessageHandler {

/**
* Return an unmodifiable list of handlers.
* @return the handlers.
*/
List<MessageHandler> getHandlers();

}
@@ -0,0 +1,37 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.handler;

import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
* Classes implementing this interface are capable of discarding messages.
*
* @author Gary Russell
* @since 4.3
*
*/
public interface DiscardingMessageHandler extends MessageHandler {

/**
* Return the discard channel.
* @return the channel.
*/
MessageChannel getDiscardChannel();

}
Expand Up @@ -16,6 +16,7 @@

package org.springframework.integration.handler;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -62,7 +63,8 @@
* @author Gary Russell
* @author Artem Bilan
*/
public class MessageHandlerChain extends AbstractMessageProducingHandler implements MessageProducer, Lifecycle {
public class MessageHandlerChain extends AbstractMessageProducingHandler implements MessageProducer,
CompositeMessageHandler, Lifecycle {

private volatile List<MessageHandler> handlers;

Expand All @@ -78,6 +80,11 @@ public void setHandlers(List<MessageHandler> handlers) {
this.handlers = handlers;
}

@Override
public List<MessageHandler> getHandlers() {
return Collections.unmodifiableList(this.handlers);
}

@Override
public String getComponentType() {
return "chain";
Expand Down
@@ -0,0 +1,66 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.management.graph;

import java.util.ArrayList;
import java.util.List;

import org.springframework.messaging.MessageHandler;

/**
* Represents a composite message handler.
*
* @author Gary Russell
* @since 4.3
*
*/
public class CompositeMessageHandlerNode extends MessageHandlerNode {

private final List<InnerHandler> handlers = new ArrayList<InnerHandler>();

public CompositeMessageHandlerNode(int nodeId, String name, MessageHandler handler, String input, String output,
List<InnerHandler> handlers) {
super(nodeId, name, handler, input, output);
this.handlers.addAll(handlers);
}

public List<InnerHandler> getHandlers() {
return this.handlers;
}

public static class InnerHandler {

private final String name;

private final String type;

public InnerHandler(String name, String type) {
this.name = name;
this.type = type;
}

public String getName() {
return this.name;
}

public String getType() {
return this.type;
}

}

}
@@ -0,0 +1,42 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.management.graph;

import org.springframework.messaging.MessageHandler;

/**
* Represents an endpoint that has a discard channel.
*
* @author Gary Russell
* @since 4.3
*
*/
public class DiscardingMessageHandlerNode extends MessageHandlerNode {

private final String discards;

public DiscardingMessageHandlerNode(int nodeId, String name, MessageHandler handler, String input, String output,
String discards) {
super(nodeId, name, handler, input, output);
this.discards = discards;
}

public String getDiscards() {
return this.discards;
}

}
@@ -0,0 +1,46 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.support.management.graph;

import java.util.List;

import org.springframework.integration.handler.CompositeMessageHandler;

/**
* Represents a composite message handler that can emit error messages
* (pollable endpoint).
*
* @author Gary Russell
* @since 4.3
*
*/
public class ErrorCapableCompositeMessageHandlerNode extends CompositeMessageHandlerNode implements ErrorCapableNode {

private final String errors;

public ErrorCapableCompositeMessageHandlerNode(int nodeId, String name, CompositeMessageHandler handler, String input,
String output, String errors, List<InnerHandler> handlers) {
super(nodeId, name, handler, input, output, handlers);
this.errors = errors;
}

@Override
public String getErrors() {
return this.errors;
}

}

0 comments on commit 6a9a425

Please sign in to comment.