diff --git a/api/pom.xml b/api/pom.xml index 2b51fbc..ab03251 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -1,34 +1,48 @@ - - 4.0.0 + + 4.0.0 - - org.jboss.seam.jms - seam-jms-parent - 3.0.0-SNAPSHOT - ../pom.xml - + + org.jboss.seam.jms + seam-jms-parent + 3.0.0-SNAPSHOT + ../pom.xml + - seam-jms-api - jar + seam-jms-api + jar - Seam JMS API - Client View of the Seam JMS Module - - ${project.parent.url} + Seam JMS API + Client View of the Seam JMS Module + + ${project.parent.url} - - - javax.enterprise - cdi-api - provided - + + + javax.enterprise + cdi-api + provided + + + + org.jboss.spec.javax.jms + jboss-jms-api_1.1_spec + provided + + + org.jboss.logging + jboss-logging + 3.0.0.Beta4 + provided + + + + + org.jboss.spec.javax.servlet + jboss-servlet-api_3.0_spec + provided + + - - org.jboss.spec.javax.jms - jboss-jms-api_1.1_spec - provided - - - diff --git a/api/src/main/java/org/jboss/seam/jms/AbstractMessageListener.java b/api/src/main/java/org/jboss/seam/jms/AbstractMessageListener.java new file mode 100644 index 0000000..86beafe --- /dev/null +++ b/api/src/main/java/org/jboss/seam/jms/AbstractMessageListener.java @@ -0,0 +1,58 @@ +package org.jboss.seam.jms; + +import javax.enterprise.inject.spi.BeanManager; +import javax.jms.JMSException; +import javax.jms.Message; + +import org.jboss.logging.Logger; + +/** + * Supporting base MessageListener for working in CDI enabled environments. + * This is useful for having a MessageListener + * + * @author johnament + * + */ +public abstract class AbstractMessageListener implements javax.jms.MessageListener { + private Logger logger; + + protected BeanManager beanManager; + protected ClassLoader classLoader; + + protected AbstractMessageListener(BeanManager beanManager, ClassLoader classLoader) { + this.logger = Logger.getLogger(AbstractMessageListener.class); + this.beanManager = beanManager; + this.classLoader = classLoader; + logger.debug("Creating new AbstractMessageListener."); + } + + /** + * AbstractMessageListener implements the basic on message functionality to + * handle classloader behavior for working in CDI environments. + * + * @param message The JMS Message that is being received. + */ + public final void onMessage(Message message) { + logger.info("Received a message"); + ClassLoader prevCl = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(classLoader); + try{ + handleMessage(message); + } catch (JMSException e) { + logger.warn("A JMS Exception occurred during processing.",e); + } finally { + Thread.currentThread().setContextClassLoader(prevCl); + } + } + + /** + * Implementations should override this method and + * perform necessary business logic in here. + * + * A BeanManager reference is available, for looking up beans. + * + * @param message The message to be handled. + * @throws JMSException The method can throw this exception if an error occurred. + */ + protected abstract void handleMessage(Message message) throws JMSException; +} diff --git a/api/src/main/java/org/jboss/seam/jms/MessageBuilder.java b/api/src/main/java/org/jboss/seam/jms/MessageBuilder.java index 7f586f2..16fd584 100644 --- a/api/src/main/java/org/jboss/seam/jms/MessageBuilder.java +++ b/api/src/main/java/org/jboss/seam/jms/MessageBuilder.java @@ -7,10 +7,15 @@ import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.TopicPublisher; +import javax.jms.TopicSubscriber; /** * The MessageBuilder interface defines an abstraction layer over the JMS APIs @@ -140,19 +145,82 @@ public interface MessageBuilder { */ public Session getSession(); + /** + * Creates a new MessageProducer that will be managed by the used session + * + * @param destination JNDI Location of Destination in use + * @return a new MessageProducer that is ready to work. + */ + public MessageProducer createMessageProducer(String destination); + + /** + * Creates a TopicPublisher for the given topic. + * + * @param destination JNDI Location of Destination in use + * @return a new TopicPublisher that is ready to work. + */ + public TopicPublisher createTopicPublisher(String destination); + + /** + * Creates a QueueSender for the given queue. + * + * @param destination JNDI Location of Destination in use + * @return a new QueueSender that is ready to work. + */ + public QueueSender createQueueSender(String destination); + /** * Creates a new MessageConsumer that will be managed by the used session * * @param destination JNDI Location of Destination in use + * @param listeners optional list of MessageListeners that will be bound to the consumer. * @return a new MessageConsumer that is ready to work. */ - public MessageConsumer createMessageConsumer(String destination); + public MessageConsumer createMessageConsumer(String destination, MessageListener... listeners); /** - * Creates a new MessageProducer that will be managed by the used session + * Creates a new MessageConsumer that will be managed by the used session * * @param destination JNDI Location of Destination in use - * @return a new MessageProducer that is ready to work. + * @param listeners optional list of MessageListeners that will be bound to the consumer. + * @return a new MessageConsumer that is ready to work. */ - public MessageProducer createMessageProducer(String destination); + public MessageConsumer createMessageConsumer(Destination destination, MessageListener... listeners); + + /** + * Creates a new TopicSubscriber that will be managed by the used session + * + * @param destination JNDI Location of Topic in use + * @param listeners optional list of MessageListeners that will be bound to the subscriber. + * @return a new TopicSubscriber that is ready to work. + */ + public TopicSubscriber createTopicSubscriber(String destination, MessageListener... listeners); + + /** + * Creates a new QueueReceiver that will be managed by the used session + * + * @param destination JNDI Location of Queue in use + * @param listeners optional list of MessageListeners that will be bound to the receiver. + * @return a new QueueReceiver that is ready to work. + */ + public QueueReceiver createQueueReceiver(String destination, MessageListener... listeners); + + /** + * Creates a topic subscriber with the given ID and binds a message listener to it, if valid. + * + * {@see MessageBuilder.createDurableSubscriber} + * + * @param destination JNDI Location of the topic to subscribe to. + * @param id the client id for the subscriber. This ID should be unique, and should be used to shutdown the listener. + * @param listener The Message Listeners to be bound, if any. + * @return the resulting TopicSubscriber or null if an error occurred. + */ + public TopicSubscriber createDurableSubscriber(String destination, String id, MessageListener... listeners); + + /** + * Unsubscribes a durable subscriber from the topic, with the given id. + * + * @param id the id of the subscriber. + */ + public void unsubscribe(String id); } diff --git a/api/src/main/java/org/jboss/seam/jms/bridge/RouteBuilder.java b/api/src/main/java/org/jboss/seam/jms/bridge/RouteBuilder.java new file mode 100644 index 0000000..0f00034 --- /dev/null +++ b/api/src/main/java/org/jboss/seam/jms/bridge/RouteBuilder.java @@ -0,0 +1,25 @@ +package org.jboss.seam.jms.bridge; + +import javax.annotation.PostConstruct; +import javax.enterprise.event.Observes; +import javax.jms.JMSException; +import javax.servlet.ServletContext; + +/** + * RouteBuilder is a start up component responsible for loading the finalized + * BeanManager into the Seam3JmsExtension and then loading all destinations + * that will be used by the ingress routes. + * + * Loading the BeanManager into the Seam3JmsExtension has the result of doing + * the same thing to the egress routes. + * + * Implementations of RouteBuilder should be singleton, and defines start up + * capabilities in handleStartup (servlet containers) and init. + * + * @author johnament + */ +public interface RouteBuilder { + public void handleStartup(@Observes ServletContext servletContext); + @PostConstruct + public void init() throws JMSException; +} diff --git a/docs/reference/src/main/docbook/en-US/mapping-interfaces.xml b/docs/reference/src/main/docbook/en-US/mapping-interfaces.xml index a5a54a8..87667eb 100644 --- a/docs/reference/src/main/docbook/en-US/mapping-interfaces.xml +++ b/docs/reference/src/main/docbook/en-US/mapping-interfaces.xml @@ -23,8 +23,7 @@ This chapter is meant to describe the behavior of mapping interfaces, where event mapping to data flowing through - JMS Queues and Topics are handled via events. Currently, the mapping interfaces only support single direction - mappings, a route defined can only be one of egress or ingress, never both. + JMS Queues and Topics are handled via events. You should never create a bi-directional route via the API. The results will not be what you expect. diff --git a/docs/reference/src/main/docbook/en-US/messaging.xml b/docs/reference/src/main/docbook/en-US/messaging.xml new file mode 100644 index 0000000..6a0603d --- /dev/null +++ b/docs/reference/src/main/docbook/en-US/messaging.xml @@ -0,0 +1,33 @@ + + + + + + Messaging API + + The Seam JMS Messaging API is a higher level abstraction of the JMS API to provide a number of convenience methods for creating consumers, producers, etc. + + +
+ MessageBuilder + + The MessageBuilder interface (org.jboss.seam.jms.MessageBuilder) is the main consolidated API for Seam JMS. It provides almost all of the background functionality for Seam JMS's features (Observer Interfaces, Routing API). In later releases there will be multiple implementations of this interface. The default implementation works against javax.naming.Context assuming running within the same local application server. + +
+
diff --git a/impl/pom.xml b/impl/pom.xml index 26072cf..372fc50 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -125,6 +125,7 @@ org.jboss.arquillian.container arquillian-jbossas-managed-6 + 1.0.0.Alpha4 org.jboss.jbossas diff --git a/impl/src/main/java/org/jboss/seam/jms/MessageBuilderImpl.java b/impl/src/main/java/org/jboss/seam/jms/MessageBuilderImpl.java index 85c58a8..2572c1a 100644 --- a/impl/src/main/java/org/jboss/seam/jms/MessageBuilderImpl.java +++ b/impl/src/main/java/org/jboss/seam/jms/MessageBuilderImpl.java @@ -6,21 +6,25 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import javax.annotation.Resource; import javax.enterprise.context.Dependent; import javax.inject.Inject; import javax.jms.BytesMessage; import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; @@ -30,10 +34,7 @@ @Dependent public class MessageBuilderImpl implements MessageBuilder { - @Resource(mappedName = "ConnectionFactory") - private ConnectionFactory cf; - - private Connection connection; + @Inject Connection connection; private Session session; private Logger logger = Logger.getLogger(MessageBuilderImpl.class); @@ -41,8 +42,6 @@ public class MessageBuilderImpl implements MessageBuilder { @PostConstruct public void init() { try{ - connection = cf.createConnection(); - connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { } } @@ -146,7 +145,7 @@ private void sendMessage(String jndiName, Message message) { private void sendMessage(Destination destination, Message message) { try { logger.info("Routing destionation "+destination+" with message " +message); - session.createProducer(destination).send(message); + this.createMessageProducer(destination).send(message); } catch (JMSException e) { logger.warn("Problem attempting to send message "+message+" to destination "+destination,e); } @@ -193,21 +192,122 @@ public Session getSession() { } @Override - public MessageConsumer createMessageConsumer(String destination) { + public MessageConsumer createMessageConsumer(String destination, MessageListener... listeners) { + try { + MessageConsumer mc = this.session.createConsumer(lookupDestination(destination)); + if(mc != null && listeners != null) { + for(MessageListener listener : listeners) + try { + mc.setMessageListener(listener); + } catch (JMSException e) { + logger.warn("Unable to map listener "+listener+" to consumer "+mc,e); + } + } + return mc; + } catch (JMSException e) { + logger.warn("Unable to create message consumer",e); + return null; + } + } + + protected MessageConsumer createMessageConsumer(Destination destination) { try { - return this.session.createConsumer(lookupDestination(destination)); + return this.session.createConsumer(destination); } catch (JMSException e) { + logger.warn("Problem creating message consumer for "+destination,e); return null; } } + + protected MessageProducer createMessageProducer(Destination destination) { + try { + return this.session.createProducer(destination); + } catch (JMSException e) { + logger.warn("Problem creating message producer for "+destination,e); + return null; + } + } + + private TopicSubscriber createDurableSubscriber(String destination, String id) { + try { + return + this.session.createDurableSubscriber((Topic)this.lookupDestination(destination), id); + } catch (JMSException e) { + logger.warn("Unable to create durable subscriber",e); + return null; + } + } + + @Override + public TopicSubscriber createDurableSubscriber(String destination, String id, MessageListener... listeners) { + TopicSubscriber ts = createDurableSubscriber(destination,id); + if(ts != null && listeners != null && listeners.length > 0) { + for(MessageListener listener : listeners) + try { + ts.setMessageListener(listener); + } catch (JMSException e) { + logger.warn("Unable to map listener "+listener+" to subscriber "+ts,e); + } + } + return ts; + } + + @Override + public void unsubscribe(String id) { + try { + session.unsubscribe(id); + } catch (JMSException e) { + logger.warn("Unable to unsubscribe for id: "+id,e); + } + } @Override public MessageProducer createMessageProducer(String destination) { try { return this.session.createProducer(lookupDestination(destination)); } catch (JMSException e) { + logger.warn("Unable to create message producer",e); return null; } } + + @Override + public TopicPublisher createTopicPublisher(String destination) { + return (TopicPublisher)this.createMessageProducer(destination); + } + + @Override + public QueueSender createQueueSender(String destination) { + return (QueueSender)this.createMessageProducer(destination); + } + + @Override + public TopicSubscriber createTopicSubscriber(String destination, + MessageListener... listeners) { + MessageConsumer mc = this.createMessageConsumer(destination, listeners); + return (TopicSubscriber)mc; + } + + @Override + public QueueReceiver createQueueReceiver(String destination, + MessageListener... listeners) { + MessageConsumer mc = this.createMessageConsumer(destination, listeners); + return (QueueReceiver)mc; + } + + @Override + public MessageConsumer createMessageConsumer(Destination destination, + MessageListener... listeners) { + MessageConsumer mc = this.createMessageConsumer(destination); + if(mc != null && listeners != null) { + for(MessageListener listener : listeners) + try { + mc.setMessageListener(listener); + } catch (JMSException e) { + logger.warn("Unable to set listener "+listener+" on to destination "+destination); + } + } + return mc; + } } diff --git a/impl/src/main/java/org/jboss/seam/jms/Seam3JmsExtension.java b/impl/src/main/java/org/jboss/seam/jms/Seam3JmsExtension.java index 0dd6740..b7162e6 100644 --- a/impl/src/main/java/org/jboss/seam/jms/Seam3JmsExtension.java +++ b/impl/src/main/java/org/jboss/seam/jms/Seam3JmsExtension.java @@ -23,36 +23,29 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import javax.annotation.Resource; +import javax.annotation.Resource; import javax.enterprise.context.spi.CreationalContext; import javax.enterprise.event.Observes; import javax.enterprise.inject.spi.AfterBeanDiscovery; -import javax.enterprise.inject.spi.AfterDeploymentValidation; import javax.enterprise.inject.spi.AnnotatedMember; import javax.enterprise.inject.spi.AnnotatedMethod; import javax.enterprise.inject.spi.AnnotatedParameter; import javax.enterprise.inject.spi.Bean; import javax.enterprise.inject.spi.BeanManager; import javax.enterprise.inject.spi.Extension; -import javax.enterprise.inject.spi.ObserverMethod; import javax.enterprise.inject.spi.ProcessAnnotatedType; -import javax.enterprise.inject.spi.ProcessObserverMethod; import javax.enterprise.inject.spi.ProcessProducer; import javax.inject.Qualifier; import javax.jms.Destination; -import javax.naming.InitialContext; -import javax.naming.NamingException; import org.jboss.logging.Logger; -import org.jboss.seam.jms.annotations.JmsDestination; import org.jboss.seam.jms.annotations.Routing; import org.jboss.seam.jms.bridge.EgressRoutingObserver; import org.jboss.seam.jms.bridge.Route; import org.jboss.seam.jms.bridge.RouteImpl; import org.jboss.seam.jms.bridge.RouteType; import org.jboss.seam.jms.impl.wrapper.JmsAnnotatedTypeWrapper; -import org.jboss.seam.solder.bean.ImmutableInjectionPoint; /** * Seam 3 JMS Portable Extension diff --git a/impl/src/main/java/org/jboss/seam/jms/annotations/RoutingLiteral.java b/impl/src/main/java/org/jboss/seam/jms/annotations/RoutingLiteral.java index e55cde1..49ff9f8 100644 --- a/impl/src/main/java/org/jboss/seam/jms/annotations/RoutingLiteral.java +++ b/impl/src/main/java/org/jboss/seam/jms/annotations/RoutingLiteral.java @@ -39,7 +39,6 @@ public RoutingLiteral(RouteType routeType) { @Override public RouteType value() { - // TODO Auto-generated method stub return routeType; } diff --git a/impl/src/main/java/org/jboss/seam/jms/bridge/EgressRoutingObserver.java b/impl/src/main/java/org/jboss/seam/jms/bridge/EgressRoutingObserver.java index d18c174..e9bf8e0 100644 --- a/impl/src/main/java/org/jboss/seam/jms/bridge/EgressRoutingObserver.java +++ b/impl/src/main/java/org/jboss/seam/jms/bridge/EgressRoutingObserver.java @@ -16,15 +16,14 @@ */ package org.jboss.seam.jms.bridge; -import java.io.Serializable; +import static org.jboss.seam.jms.annotations.RoutingLiteral.EGRESS; + import java.lang.annotation.Annotation; import java.lang.reflect.Type; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.logging.Level; import javax.enterprise.context.ApplicationScoped; import javax.enterprise.event.Reception; @@ -34,20 +33,15 @@ import javax.enterprise.inject.spi.BeanManager; import javax.enterprise.inject.spi.ObserverMethod; import javax.inject.Named; -import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; + import org.jboss.logging.Logger; import org.jboss.seam.jms.MessageBuilder; import org.jboss.seam.jms.Seam3JmsExtension; import org.jboss.seam.solder.bean.ImmutableInjectionPoint; -import org.jboss.seam.solder.literal.DefaultLiteral; -import static org.jboss.seam.jms.annotations.RoutingLiteral.EGRESS; /** * Forwards CDI events that match the provided {@link Route} configuration to diff --git a/impl/src/main/java/org/jboss/seam/jms/bridge/IngressMessageListener.java b/impl/src/main/java/org/jboss/seam/jms/bridge/IngressMessageListener.java index 0b07077..94c8c30 100644 --- a/impl/src/main/java/org/jboss/seam/jms/bridge/IngressMessageListener.java +++ b/impl/src/main/java/org/jboss/seam/jms/bridge/IngressMessageListener.java @@ -16,98 +16,88 @@ */ package org.jboss.seam.jms.bridge; +import static org.jboss.seam.jms.annotations.RoutingLiteral.INGRESS; + import java.io.Serializable; import java.lang.annotation.Annotation; import java.util.HashSet; import java.util.Set; + import javax.enterprise.inject.spi.BeanManager; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.TextMessage; import org.jboss.logging.Logger; -import static org.jboss.seam.jms.annotations.RoutingLiteral.INGRESS; +import org.jboss.seam.jms.AbstractMessageListener; /** - * + * * @author johnament */ -public class IngressMessageListener implements MessageListener { +public class IngressMessageListener extends AbstractMessageListener { - private BeanManager beanManager; - private Annotation[] qualifiers = null; - private Logger logger; - private ClassLoader classLoader; - private Class payload; + private Annotation[] qualifiers = null; + private Logger logger; + private Class payload; - public IngressMessageListener(BeanManager beanManager, ClassLoader classLoader) { - this.logger = Logger.getLogger(IngressMessageListener.class); - this.beanManager = beanManager; - this.classLoader = classLoader; - logger.info("Creating new IngressMessageListener."); - } + public IngressMessageListener(BeanManager beanManager, + ClassLoader classLoader, Route route) { + super(beanManager, classLoader); + this.logger = Logger.getLogger(IngressMessageListener.class); + logger.debug("Creating new IngressMessageListener."); + setRoute(route); + } - public void setRoute(Route route) { - logger.info("Setting route. "+route); - Set annotations = new HashSet(); - if (!route.getQualifiers().isEmpty()) { - annotations.addAll(route.getQualifiers()); - } - this.payload = (Class)route.getPayloadType(); - annotations.add(INGRESS); - logger.info("Qualifiers: "+annotations); - this.qualifiers = annotations.toArray(new Annotation[]{}); - } + public void setRoute(Route route) { + logger.info("Setting route. " + route); + Set annotations = new HashSet(); + if (!route.getQualifiers().isEmpty()) { + annotations.addAll(route.getQualifiers()); + } + this.payload = (Class) route.getPayloadType(); + annotations.add(INGRESS); + logger.info("Qualifiers: " + annotations); + this.qualifiers = annotations.toArray(new Annotation[] {}); + } - public BeanManager getBeanManager() { - return beanManager; - } + private boolean isMessagePayload() { + return this.payload.isAssignableFrom(Message.class); + } - public Annotation[] getAnnotations() { - return qualifiers; - } - - private boolean isMessagePayload() { - return this.payload.isAssignableFrom(Message.class); - } + @Override + protected void handleMessage(Message msg) throws JMSException { + if (isMessagePayload()) { + beanManager.fireEvent(msg, qualifiers); + } else { + // then the result is an object message, and we're going to + // send the object. + if (msg instanceof ObjectMessage) { + ObjectMessage om = (ObjectMessage) msg; + try { + Serializable data = (Serializable) om.getObject(); + logger.debug("data was: " + om.getObject() + " of type " + + data.getClass().getCanonicalName()); + beanManager.fireEvent(data, qualifiers); + } catch (JMSException ex) { + logger.warn("Unable to read data in message " + msg); + } + } else if (msg instanceof TextMessage + && this.payload.isAssignableFrom(String.class)) { + TextMessage tm = (TextMessage) msg; + try { + String data = tm.getText(); + logger.debug("data was: " + data + " of type " + + data.getClass().getCanonicalName()); + beanManager.fireEvent(data, qualifiers); + } catch (JMSException e) { + logger.warn("Unable to read data in message " + msg); + } + } else { + logger.warn("Received the wrong type of message " + msg); + } + } - public void onMessage(Message msg) { - logger.info("Received a message"); - ClassLoader prevCl = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(classLoader); - try{ - if(isMessagePayload()) { - beanManager.fireEvent(msg,getAnnotations()); - } else { - // then the result is an object message, and we're going to - // send the object. - if (msg instanceof ObjectMessage) { - ObjectMessage om = (ObjectMessage) msg; - try { - Serializable data = (Serializable)om.getObject(); - logger.debug("data was: " + om.getObject()+" of type "+data.getClass().getCanonicalName()); - beanManager.fireEvent(data,getAnnotations()); - } catch (JMSException ex) { - logger.warn("Unable to read data in message " + msg); - } - } else if(msg instanceof TextMessage && - this.payload.isAssignableFrom(String.class)){ - TextMessage tm = (TextMessage)msg; - try { - String data = tm.getText(); - logger.debug("data was: " + data+" of type "+data.getClass().getCanonicalName()); - beanManager.fireEvent(data,getAnnotations()); - } catch (JMSException e) { - logger.warn("Unable to read data in message " + msg); - } - } else { - logger.warn("Received the wrong type of message " + msg); - } - } - } finally { - Thread.currentThread().setContextClassLoader(prevCl); - } - } + } } diff --git a/impl/src/main/java/org/jboss/seam/jms/bridge/RouteBuilderImpl.java b/impl/src/main/java/org/jboss/seam/jms/bridge/RouteBuilderImpl.java new file mode 100644 index 0000000..d8b808e --- /dev/null +++ b/impl/src/main/java/org/jboss/seam/jms/bridge/RouteBuilderImpl.java @@ -0,0 +1,82 @@ +/* + * JBoss, Home of Professional Open Source + * Copyright 2011, Red Hat, Inc. and/or its affiliates, and individual + * contributors by the @authors tag. See the copyright.txt in the + * distribution for a full listing of individual contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jboss.seam.jms.bridge; + +import java.util.List; + +import javax.annotation.PostConstruct; +import javax.enterprise.event.Observes; +import javax.enterprise.inject.spi.BeanManager; +import javax.inject.Inject; +import javax.inject.Singleton; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.servlet.ServletContext; + +import org.jboss.logging.Logger; +import org.jboss.seam.jms.MessageBuilder; +import org.jboss.seam.jms.Seam3JmsExtension; + +@Singleton +public class RouteBuilderImpl implements RouteBuilder, java.io.Serializable { + + private static final long serialVersionUID = -6782656668733696386L; + private Logger log = Logger.getLogger(RouteBuilderImpl.class); + private List ingressRoutes; + @Inject + Seam3JmsExtension extension; + @Inject + MessageBuilder messageBuilder; + @Inject + BeanManager beanManager; + + public RouteBuilderImpl() { + Logger log = Logger.getLogger(RouteBuilderImpl.class); + log.debug("Creating a new RouteBuilder()"); + } + + @Override + public void handleStartup(@Observes ServletContext servletContext) { + log.debug("Starting up Seam JMS via ServletContext callback."); + } + + @PostConstruct + @Override + public void init() throws JMSException { + log.debug("Calling RouteBuilder.init"); + extension.setBeanManager(beanManager); + ingressRoutes = extension.getIngressRoutes(); + log.debug("Ingress routes size: (" + ingressRoutes.size() + ") " + + ingressRoutes); + for (Route ingressRoute : ingressRoutes) { + ingressRoute.build(beanManager); + createListener(ingressRoute); + } + } + + private void createListener(Route ingressRoute) { + ClassLoader prevCl = Thread.currentThread().getContextClassLoader(); + log.debug("About to create listener for route " + ingressRoute); + log.debug("Routes: " + ingressRoute.getDestinations()); + for (Destination d : ingressRoute.getDestinations()) { + IngressMessageListener listener = new IngressMessageListener( + beanManager, prevCl, ingressRoute); + this.messageBuilder.createMessageConsumer(d, listener); + } + } + +} diff --git a/impl/src/main/java/org/jboss/seam/jms/impl/inject/ConnectionProducer.java b/impl/src/main/java/org/jboss/seam/jms/impl/inject/ConnectionProducer.java index f65a87a..f43cc63 100644 --- a/impl/src/main/java/org/jboss/seam/jms/impl/inject/ConnectionProducer.java +++ b/impl/src/main/java/org/jboss/seam/jms/impl/inject/ConnectionProducer.java @@ -54,10 +54,10 @@ public ConnectionFactory getConnectionFactory() @Produces @ApplicationScoped - public Connection getConnection() throws Exception + public Connection getConnection(@Module ConnectionFactory connectionFactory) throws Exception { log.debug("Creating a new connection."); - Connection conn = cf.createConnection(); + Connection conn = connectionFactory.createConnection(); conn.start(); return conn; } diff --git a/impl/src/test/java/org/jboss/seam/jms/test/bridge/intf/BidirectionalTest.java b/impl/src/test/java/org/jboss/seam/jms/test/bridge/intf/BidirectionalTest.java index ac75659..68a6a1b 100644 --- a/impl/src/test/java/org/jboss/seam/jms/test/bridge/intf/BidirectionalTest.java +++ b/impl/src/test/java/org/jboss/seam/jms/test/bridge/intf/BidirectionalTest.java @@ -10,6 +10,7 @@ import org.jboss.arquillian.junit.Arquillian; import org.jboss.seam.jms.annotations.Routing; import org.jboss.seam.jms.bridge.RouteBuilder; +import org.jboss.seam.jms.bridge.RouteBuilderImpl; import org.jboss.seam.jms.bridge.RouteType; import org.jboss.seam.jms.impl.inject.ConnectionProducer; import org.jboss.seam.jms.impl.inject.DestinationProducer; @@ -27,7 +28,7 @@ public class BidirectionalTest { public static Archive createDeployment() { return Util.createDeployment(ObserverInterface.class, ImmutableInjectionPoint.class, DestinationProducer.class, MessagePubSubProducer.class, - RouteBuilder.class, ConnectionProducer.class); + RouteBuilderImpl.class, ConnectionProducer.class); } @Inject @Routing(RouteType.EGRESS) diff --git a/impl/src/test/java/org/jboss/seam/jms/test/bridge/intf/IngressTest.java b/impl/src/test/java/org/jboss/seam/jms/test/bridge/intf/IngressTest.java index f05cef0..46322c5 100644 --- a/impl/src/test/java/org/jboss/seam/jms/test/bridge/intf/IngressTest.java +++ b/impl/src/test/java/org/jboss/seam/jms/test/bridge/intf/IngressTest.java @@ -30,9 +30,11 @@ import javax.jms.Topic; import org.jboss.arquillian.api.Deployment; import org.jboss.arquillian.junit.Arquillian; +import org.jboss.seam.jms.MessageBuilder; import org.jboss.seam.jms.annotations.JmsDestination; import org.jboss.seam.jms.annotations.Routing; import org.jboss.seam.jms.bridge.RouteBuilder; +import org.jboss.seam.jms.bridge.RouteBuilderImpl; import org.jboss.seam.jms.bridge.RouteType; import org.jboss.seam.jms.impl.inject.ConnectionProducer; import org.jboss.seam.jms.impl.inject.DestinationProducer; @@ -54,26 +56,19 @@ public class IngressTest { @Deployment public static Archive createDeployment() { return Util.createDeployment(ObserverInterface.class, ImmutableInjectionPoint.class, - DestinationProducer.class, MessagePubSubProducer.class, RouteBuilder.class, ConnectionProducer.class); + DestinationProducer.class, MessagePubSubProducer.class, RouteBuilderImpl.class, ConnectionProducer.class); } @Inject RouteBuilder builder; - @Inject Connection conn; - @Inject Session session; + @Inject MessageBuilder messageBuilder; @Inject @JmsDestination(jndiName="jms/T2") Topic t; private static boolean received = false; @Test public void testObserveMessage() throws JMSException, InterruptedException { - //conn.start(); - MessageProducer mp = session.createProducer(t); - ObjectMessage om = session.createObjectMessage(); - om.setObject(7l); - mp.send(om); + messageBuilder.sendObjectToDestinations(7L, t); Thread.sleep(5 * 1000); - mp.close(); - //conn.stop(); Assert.assertTrue(received); } diff --git a/impl/src/test/java/org/jboss/seam/jms/test/bridge/intf/ObserverTest.java b/impl/src/test/java/org/jboss/seam/jms/test/bridge/intf/ObserverTest.java index ccaf1c7..0f4f6b3 100644 --- a/impl/src/test/java/org/jboss/seam/jms/test/bridge/intf/ObserverTest.java +++ b/impl/src/test/java/org/jboss/seam/jms/test/bridge/intf/ObserverTest.java @@ -16,31 +16,30 @@ */ package org.jboss.seam.jms.test.bridge.intf; -import org.jboss.seam.jms.bridge.RouteBuilder; -import org.jboss.seam.jms.bridge.RouteType; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; -import javax.jms.ConnectionFactory; +import javax.enterprise.event.Event; +import javax.inject.Inject; +import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.Topic; -import org.jboss.logging.Logger; -import javax.jms.Connection; -import javax.jms.JMSException; -import org.jboss.seam.jms.impl.inject.MessagePubSubProducer; -import org.jboss.seam.jms.annotations.JmsDestination; -import org.jboss.seam.jms.annotations.Routing; -import org.jboss.seam.solder.bean.ImmutableInjectionPoint; -import javax.annotation.Resource; -import javax.enterprise.event.Event; -import javax.inject.Inject; import org.jboss.arquillian.api.Deployment; -import org.junit.runner.RunWith; import org.jboss.arquillian.junit.Arquillian; +import org.jboss.logging.Logger; +import org.jboss.seam.jms.annotations.JmsDestination; +import org.jboss.seam.jms.annotations.Routing; +import org.jboss.seam.jms.bridge.RouteBuilder; +import org.jboss.seam.jms.bridge.RouteType; +import org.jboss.seam.jms.impl.inject.MessagePubSubProducer; import org.jboss.seam.jms.test.Util; +import org.jboss.seam.solder.bean.ImmutableInjectionPoint; import org.jboss.shrinkwrap.api.Archive; import org.junit.Test; -import static org.junit.Assert.*; +import org.junit.runner.RunWith; /** *