Skip to content
Permalink
Browse files

JBTM-1934 use our own threading to handle dequeing messages as hornet…

…q doesn't like us suspending our own session from a message listener
  • Loading branch information...
tomjenkinson committed Sep 19, 2013
1 parent f8138f2 commit e0e5d7ab58e377ad6eb43e1737baa716d1c4836e
@@ -23,7 +23,6 @@
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.naming.NamingException;

import org.apache.commons.logging.Log;
@@ -37,25 +36,52 @@
*
* @version $Revision: 50 $
*/
public class StompSubscription implements MessageListener {
public class StompSubscription {
public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
private static final transient Log log = LogFactory.getLog(StompSubscription.class);
private final StompSession session;
private final String subscriptionId;
private MessageConsumer consumer;
private Map<String, Object> headers;
private boolean closed;

public StompSubscription(StompSession session, String subscriptionId, StompFrame frame) throws JMSException,
ProtocolException, NamingException {
this.subscriptionId = subscriptionId;
this.session = session;
this.headers = frame.getHeaders();
this.consumer = session.createConsumer(headers);
this.consumer.setMessageListener(this);
(new Thread() {

@Override
public void run() {
while (!closed) {
try {
log.debug("Dequeuing from HQ");
Message receive = consumer.receive();
log.debug("Dequeued from HQ");
if (receive != null) {
onMessage(receive);
} else {
log.debug("Must be closing");
if (!closed) {
log.fatal("Fatal issue, receive returned null before connection closed!");
}
}
} catch (JMSException e) {
if (!closed) {
log.warn("Could not receive a message", e);
}
}
}
}

}).start();
}

public void close() throws JMSException {
closed = true;
consumer.close();
}

0 comments on commit e0e5d7a

Please sign in to comment.
You can’t perform that action at this time.