Permalink
Browse files

fix overriding

  • Loading branch information...
1 parent 80c2b3e commit 3250e75a15af5cd602843f4e155b57a3e7c48fb6 @smaldini committed Apr 4, 2012
@@ -18,21 +18,19 @@
import org.grails.plugin.platform.events.EventObject
import org.grails.plugin.platform.events.dispatcher.PersistentContextInterceptor
+import org.grails.plugin.platform.events.publisher.EventsPublisher
import org.grails.plugin.platform.events.publisher.EventsPublisherGateway
import org.grails.plugin.platform.events.publisher.SpringIntegrationEventsPublisher
import org.grails.plugin.platform.events.publisher.SpringIntegrationRepliesAggregator
import org.grails.plugin.platform.events.registry.SpringIntegrationEventsRegistry
-import org.grails.plugin.platform.events.dispatcher.EventsDispatcher
-import org.grails.plugin.platform.events.EventObject
-import org.grails.plugin.platform.events.publisher.EventsPublisher
class EventsSiGrailsPlugin {
// the plugin version
def version = "1.0.M1"
// the version or versions of Grails the plugin is designed for
def grailsVersion = "2.0 > *"
// the other plugins this plugin depends on
- def dependsOn = ['platformCore':'0.1 > *']
+ //def dependsOn = ['platform-core']
// resources that are excluded from plugin packaging
def pluginExcludes = [
"grails-app/**"
@@ -79,6 +77,7 @@ This plugin is a Spring Integration implementation and uses its artefacts to map
def gormCancelChannel = gormChannel + 'Cancel'
//def grailsReplyChannel = "grailsReplyPipeline" //todo config
+
/* Declare main grails pipeline and its router to reach listeners */
channelPersistentContextInterceptor(PersistentContextInterceptor) {
persistenceInterceptor = ref("persistenceInterceptor")
@@ -97,7 +96,7 @@ This plugin is a Spring Integration implementation and uses its artefacts to map
si.chain('input-channel': grailsChannel) {
//si.transformer(expression: "payload.getData()")
si.'header-value-router'('header-name': EventsPublisherGateway.TARGET_CHANNEL,
- //'ignore-send-failures': true,
+ 'ignore-send-failures': true,
'resolution-required': false,
'default-output-channel': "nullChannel"
)
@@ -151,6 +150,15 @@ This plugin is a Spring Integration implementation and uses its artefacts to map
/* END GORM CONFIG */
+ /* Listeners config */
+
+ /*Events.eachListener(application.serviceClasses*.clazz) {String listenerId, Method m, Class c ->
+ si.'publish-subscribe-channel'(id: EventsRegistry.GRAILS_TOPIC_PREFIX + listenerId, 'apply-sequence': true) {
+ si.interceptors {
+ ref(bean: 'channelPersistentContextInterceptor')
+ }
+ }
+ }*/
}
@@ -37,7 +37,7 @@
public static final String TARGET_CHANNEL = "targetChannel";
public static final String EVENT_OBJECT_KEY = "EVENT_OBJECT_KEY";
- public Message<?> send(Object data, @Header(EVENT_OBJECT_KEY)EventObject event, @Header(TARGET_CHANNEL) String targetChannel);
+ public Message<?> send(Object data, @Header(EVENT_OBJECT_KEY) EventObject event, @Header(TARGET_CHANNEL) String targetChannel);
- public Future<Message<?>> sendAsync(Object data, @Header(EVENT_OBJECT_KEY)EventObject event, @Header(TARGET_CHANNEL) String targetChannel);
+ public Future<Message<?>> sendAsync(Object data, @Header(EVENT_OBJECT_KEY) EventObject event, @Header(TARGET_CHANNEL) String targetChannel);
}
@@ -32,6 +32,8 @@
import org.springframework.integration.MessageChannel;
import org.springframework.integration.channel.ChannelInterceptor;
import org.springframework.integration.channel.PublishSubscribeChannel;
+import org.springframework.integration.core.SubscribableChannel;
+import org.springframework.integration.handler.BridgeHandler;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.util.ReflectionUtils;
@@ -69,29 +71,15 @@ public void setOutputChannel(MessageChannel outputChannel) {
this.outputChannel = outputChannel;
}
- private PublishSubscribeChannel getOrCreateChannel(String topic) {
- if (topic == null || topic.isEmpty()) {
- throw new RuntimeException("topic name must not be null or empty");
- }
-
- String channelName = GRAILS_TOPIC_PREFIX + topic;
- PublishSubscribeChannel channel = null;
-
- try {
- channel = ctx.getBean(channelName, PublishSubscribeChannel.class);
- } catch (BeansException be) {
- log.debug("creating channel because " + be.getMessage());
- }
+ private SubscribableChannel createChannel(String channelName) {
+ PublishSubscribeChannel _channel = new PublishSubscribeChannel();
+ _channel.setApplySequence(true);
+ _channel.setBeanName(channelName);
+ _channel.setBeanFactory(beanFactory);
+ _channel.addInterceptor(interceptor);
+ beanFactory.registerSingleton(channelName, _channel);
- if (channel == null) {
- channel = new PublishSubscribeChannel();
- channel.setApplySequence(true);
- channel.setBeanName(channelName);
- channel.setBeanFactory(beanFactory);
- channel.addInterceptor(interceptor);
- beanFactory.registerSingleton(channelName, channel);
- }
- return channel;
+ return _channel;
}
private String registerHandler(Object bean, Method callback, String topic) {
@@ -107,38 +95,62 @@ private String registerHandler(Object bean, Method callback, String topic) {
}
ListenerId listener = ListenerId.build(topic, target, callback);
- String callBackId = listener.toString();
ServiceActivatingHandler serviceActivatingHandler =
- new GrailsServiceActivatingHandler(target, callback,listener);
+ new GrailsServiceActivatingHandler(target, callback, listener);
- initServiceActivatingHandler(serviceActivatingHandler, callBackId, topic);
+ initServiceActivatingHandler(serviceActivatingHandler, listener, topic);
- return callBackId;
+ return listener.toString();
}
private String registerHandler(Closure callback, String topic) {
ListenerId listener = ListenerId.build(topic, callback);
- String callBackId = listener.toString();
ServiceActivatingHandler serviceActivatingHandler =
new GrailsServiceActivatingHandler(callback, "call", listener);
- initServiceActivatingHandler(serviceActivatingHandler, callBackId, topic);
+ initServiceActivatingHandler(serviceActivatingHandler, listener, topic);
- return callBackId;
+ return listener.toString();
}
- private void initServiceActivatingHandler(ServiceActivatingHandler serviceActivatingHandler, String callBackId, String topic) {
+ private void initServiceActivatingHandler(ServiceActivatingHandler serviceActivatingHandler, ListenerId listener, String topic) {
+ if (topic == null || topic.isEmpty()) {
+ throw new RuntimeException("topic name must not be null or empty");
+ }
+
+ String callBackId = listener.toString();
serviceActivatingHandler.setBeanName(callBackId);
serviceActivatingHandler.setChannelResolver(resolver);
serviceActivatingHandler.setRequiresReply(true);
serviceActivatingHandler.setOutputChannel(outputChannel);
beanFactory.registerSingleton(callBackId, serviceActivatingHandler);
- PublishSubscribeChannel channel = getOrCreateChannel(topic);
+
+ SubscribableChannel bridgeChannel = null;
+ String channelName = listener.getTopic();
+
+ try {
+ bridgeChannel = ctx.getBean(channelName, SubscribableChannel.class);
+ } catch (BeansException be) {
+ log.debug("no overriding channel found " + be.getMessage());
+ }
+
+ if (bridgeChannel != null) {
+ channelName += "-plugin";
+ }
+
+ SubscribableChannel channel = createChannel(channelName);
channel.subscribe(serviceActivatingHandler);
+
+ if (bridgeChannel != null) {
+ BridgeHandler bridgeHandler = new BridgeHandler();
+ bridgeHandler.setOutputChannel(channel);
+ bridgeChannel.subscribe(bridgeHandler);
+ }
+
}
public String addListener(String topic, Closure callback) {
@@ -157,7 +169,7 @@ public String addListener(String topic, Object bean, Method callback) {
ListenerId listener = ListenerId.parse(callbackId);
List<GrailsServiceActivatingHandler> targetListeners = new ArrayList<GrailsServiceActivatingHandler>();
- if(listener == null)
+ if (listener == null)
return targetListeners;
Map<String, GrailsServiceActivatingHandler> grailsListeners = ctx.getBeansOfType(GrailsServiceActivatingHandler.class);

0 comments on commit 3250e75

Please sign in to comment.