Skip to content

Commit

Permalink
Created separate APIs for durable subscribers.
Browse files Browse the repository at this point in the history
  • Loading branch information
johnament committed Apr 1, 2011
1 parent 87eb743 commit 6c7afc5
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 78 deletions.
51 changes: 51 additions & 0 deletions api/src/main/java/org/jboss/seam/jms/DurableMessageManager.java
@@ -0,0 +1,51 @@
package org.jboss.seam.jms;

import java.io.Serializable;

import javax.jms.Destination;
import javax.jms.MessageListener;
import javax.jms.TopicSubscriber;

import org.jboss.seam.jms.annotations.Durable;

@Durable
public interface DurableMessageManager extends MessageManager {

/**
* Initializes the connection for this DurableMessageManager.
* Sets the ClientID for the underlying {@link javax.jms.Connection}
*
* @param clientId
*/
public void login(String clientId);
/**
* Creates a topic subscriber with the given ID and binds a message listener to it, if valid.
*
* {@see MessageBuilder.createDurableSubscriber}
*
* @param destination JNDI Location of the topic to subscribe to.
* @param id the client id for the subscriber. This ID should be unique, and should be used to shutdown the listener.
* @param listener The Message Listeners to be bound, if any.
* @return the resulting TopicSubscriber or null if an error occurred.
*/
public TopicSubscriber createDurableSubscriber(String destination, String id, MessageListener... listeners);

/**
* Creates a topic subscriber with the given ID and binds a message listener to it, if valid.
*
* {@see MessageBuilder.createDurableSubscriber}
*
* @param destination the existing destination to reference.
* @param id the client id for the subscriber. This ID should be unique, and should be used to shutdown the listener.
* @param listener The Message Listeners to be bound, if any.
* @return the resulting TopicSubscriber or null if an error occurred.
*/
public TopicSubscriber createDurableSubscriber(Destination destination, String id, MessageListener... listeners);
/**
* Unsubscribes a durable subscriber from the topic, with the given id.
*
* @param id the id of the subscriber.
*/
public void unsubscribe(String id);

}
30 changes: 0 additions & 30 deletions api/src/main/java/org/jboss/seam/jms/MessageManager.java
Expand Up @@ -205,34 +205,4 @@ public interface MessageManager extends Serializable {
* @return a new QueueReceiver that is ready to work.
*/
public QueueReceiver createQueueReceiver(String destination, MessageListener... listeners);

/**
* Creates a topic subscriber with the given ID and binds a message listener to it, if valid.
*
* {@see MessageBuilder.createDurableSubscriber}
*
* @param destination JNDI Location of the topic to subscribe to.
* @param id the client id for the subscriber. This ID should be unique, and should be used to shutdown the listener.
* @param listener The Message Listeners to be bound, if any.
* @return the resulting TopicSubscriber or null if an error occurred.
*/
public TopicSubscriber createDurableSubscriber(String destination, String id, MessageListener... listeners);

/**
* Creates a topic subscriber with the given ID and binds a message listener to it, if valid.
*
* {@see MessageBuilder.createDurableSubscriber}
*
* @param destination the existing destination to reference.
* @param id the client id for the subscriber. This ID should be unique, and should be used to shutdown the listener.
* @param listener The Message Listeners to be bound, if any.
* @return the resulting TopicSubscriber or null if an error occurred.
*/
public TopicSubscriber createDurableSubscriber(Destination destination, String id, MessageListener... listeners);
/**
* Unsubscribes a durable subscriber from the topic, with the given id.
*
* @param id the id of the subscriber.
*/
public void unsubscribe(String id);
}
25 changes: 25 additions & 0 deletions api/src/main/java/org/jboss/seam/jms/annotations/Durable.java
@@ -0,0 +1,25 @@
package org.jboss.seam.jms.annotations;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import javax.inject.Qualifier;

/** Durable represents durable subscribers.
*
* @author johnament
*/
@Qualifier
@Documented
@Target( { FIELD, METHOD, TYPE, PARAMETER })
@Retention(RUNTIME)
public @interface Durable {

}
4 changes: 2 additions & 2 deletions api/src/main/java/org/jboss/seam/jms/bridge/RouteBuilder.java
Expand Up @@ -23,6 +23,6 @@ public interface RouteBuilder extends Serializable {
public void handleStartup(@Observes ServletContext servletContext);
@PostConstruct
public void init() throws JMSException;
public void registerDurableIngressRoute(Route route, String clientId);
public void unregisterRoute(String clientId);
//public void registerDurableIngressRoute(Route route, String clientId);
//public void unregisterRoute(String clientId);
}
5 changes: 0 additions & 5 deletions impl/pom.xml
Expand Up @@ -47,11 +47,6 @@
<artifactId>junit</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>org.jboss.arquillian</groupId>
<artifactId>arquillian-junit</artifactId>
Expand Down
@@ -0,0 +1,93 @@
package org.jboss.seam.jms;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TopicSubscriber;

import org.jboss.logging.Logger;
import org.jboss.seam.jms.annotations.Durable;
import org.jboss.seam.jms.annotations.Module;

@Dependent
@Durable
public class DurableMessageManagerImpl
extends MessageManagerImpl
implements DurableMessageManager
{

private Logger logger = Logger.getLogger(DurableMessageManagerImpl.class);
@Inject @Module ConnectionFactory connectionFactory;
private Connection connection;

@Override
@PostConstruct
public void init() {
try{
connection = connectionFactory.createConnection();
} catch (JMSException e) { logger.warn("Unable to create connection."); }
}

@PreDestroy
public void shutdown() {
if(connection != null) {
try{
connection.close();
} catch (JMSException e) { logger.warn("Unable to create connection."); }
}
}

public void login(String clientId) {
try{
connection.setClientID(clientId);
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
logger.warn("Unable to create connection.");
}
}

@Override
public TopicSubscriber createDurableSubscriber(String destination, String id, MessageListener... listeners) {
TopicSubscriber ts = createDurableSubscriber(destination,id);
if(ts != null && listeners != null && listeners.length > 0) {
for(MessageListener listener : listeners)
try {
ts.setMessageListener(listener);
} catch (JMSException e) {
logger.warn("Unable to map listener "+listener+" to subscriber "+ts,e);
}
}
return ts;
}

@Override
public TopicSubscriber createDurableSubscriber(Destination destination, String id, MessageListener... listeners) {
TopicSubscriber ts = createDurableSubscriber(destination,id);
if(ts != null && listeners != null && listeners.length > 0) {
for(MessageListener listener : listeners)
try {
ts.setMessageListener(listener);
} catch (JMSException e) {
logger.warn("Unable to map listener "+listener+" to subscriber "+ts,e);
}
}
return ts;
}

@Override
public void unsubscribe(String id) {
try {
session.unsubscribe(id);
} catch (JMSException e) {
logger.warn("Unable to unsubscribe for id: "+id,e);
}
}
}
39 changes: 1 addition & 38 deletions impl/src/main/java/org/jboss/seam/jms/MessageManagerImpl.java
Expand Up @@ -35,7 +35,7 @@
public class MessageManagerImpl implements MessageManager {

@Inject Connection connection;
private Session session;
protected Session session;

private Logger logger = Logger.getLogger(MessageManagerImpl.class);

Expand Down Expand Up @@ -237,43 +237,6 @@ private TopicSubscriber createDurableSubscriber(String destination, String id) {
return null;
}
}

@Override
public TopicSubscriber createDurableSubscriber(String destination, String id, MessageListener... listeners) {
TopicSubscriber ts = createDurableSubscriber(destination,id);
if(ts != null && listeners != null && listeners.length > 0) {
for(MessageListener listener : listeners)
try {
ts.setMessageListener(listener);
} catch (JMSException e) {
logger.warn("Unable to map listener "+listener+" to subscriber "+ts,e);
}
}
return ts;
}

@Override
public TopicSubscriber createDurableSubscriber(Destination destination, String id, MessageListener... listeners) {
TopicSubscriber ts = createDurableSubscriber(destination,id);
if(ts != null && listeners != null && listeners.length > 0) {
for(MessageListener listener : listeners)
try {
ts.setMessageListener(listener);
} catch (JMSException e) {
logger.warn("Unable to map listener "+listener+" to subscriber "+ts,e);
}
}
return ts;
}

@Override
public void unsubscribe(String id) {
try {
session.unsubscribe(id);
} catch (JMSException e) {
logger.warn("Unable to unsubscribe for id: "+id,e);
}
}

@Override
public MessageProducer createMessageProducer(String destination) {
Expand Down
Expand Up @@ -78,7 +78,7 @@ private void createListener(Route ingressRoute) {
this.messageBuilder.createMessageConsumer(d, listener);
}
}

/*
@Override
public void registerDurableIngressRoute(Route ingressRoute, String clientId) {
if(ingressRoute.getType() == RouteType.INGRESS) {
Expand All @@ -100,5 +100,5 @@ public void registerDurableIngressRoute(Route ingressRoute, String clientId) {
public void unregisterRoute(String clientId) {
this.messageBuilder.unsubscribe(clientId);
}

*/
}
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -18,7 +18,7 @@
<url>http://sfwk.org/Seam3/JMS</url>

<properties>
<seam.version>3.0.0.CR6</seam.version>
<seam.version>3.0.0.Final</seam.version>
<arquillian.version>1.0.0.Alpha4</arquillian.version>
<emma.version>2.0.5312</emma.version>
</properties>
Expand Down

0 comments on commit 6c7afc5

Please sign in to comment.