Skip to content
Browse files

Merge branch 'master' of https://github.com/ptgoetz/storm-jms

Conflicts:
	src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java
  • Loading branch information...
2 parents cbafa76 + 4fcddea commit 97b5c70d02587fbe603052ba5e598679718385c8 @boneill42 boneill42 committed May 2, 2012
View
5 pom.xml
@@ -1,5 +1,4 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.sonatype.oss</groupId>
@@ -11,7 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.ptgoetz</groupId>
<artifactId>storm-jms</artifactId>
- <version>0.1.1-SNAPSHOT</version>
+ <version>0.2.1-SNAPSHOT</version>
<name>Storm JMS</name>
<description>Storm JMS Components</description>
View
43 src/main/java/backtype/storm/contrib/jms/spout/JmsSpout.java
@@ -3,6 +3,7 @@
import java.io.Serializable;
import java.util.Map;
import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -67,9 +68,9 @@
private transient Session session;
private boolean hasFailures = false;
- public Serializable recoveryMutex = "RECOVERY_MUTEX";
+ public final Serializable recoveryMutex = "RECOVERY_MUTEX";
private Timer recoveryTimer = null;
- private long recoveryPeriod = 30*1000; // Default to 30 seconds
+ private long recoveryPeriod = -1; // default to disabled
/**
* Sets the JMS Session acknowledgement mode for the JMS seesion associated with this spout.
@@ -156,7 +157,16 @@ public void open(Map conf, TopologyContext context,
if(this.tupleProducer == null){
throw new IllegalStateException("JMS Tuple Producer has not been set.");
}
- queue = new LinkedBlockingQueue<Message>();
+ Integer topologyTimeout = (Integer)conf.get("topology.message.timeout.secs");
+ // TODO fine a way to get the default timeout from storm, so we're not hard-coding to 30 seconds (it could change)
+ topologyTimeout = topologyTimeout == null ? 30 : topologyTimeout;
+ if( (topologyTimeout.intValue() * 1000 )> this.recoveryPeriod){
+ LOG.warn("*** WARNING *** : " +
+ "Recovery period ("+ this.recoveryPeriod + " ms.) is less then the configured " +
+ "'topology.message.timeout.secs' of " + topologyTimeout +
+ " secs. This could lead to a message replay flood!");
+ }
+ this.queue = new LinkedBlockingQueue<Message>();
this.pendingMessages = new ConcurrentHashMap<String, Message>();
this.collector = collector;
try {
@@ -167,10 +177,10 @@ public void open(Map conf, TopologyContext context,
this.jmsAcknowledgeMode);
MessageConsumer consumer = session.createConsumer(dest);
consumer.setMessageListener(this);
- connection.start();
- if (this.isDurableSubscription()){
+ this.connection.start();
+ if (this.isDurableSubscription() && this.recoveryPeriod > 0){
this.recoveryTimer = new Timer();
- this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(this), 10, this.recoveryPeriod);
+ this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), 10, this.recoveryPeriod);
}
} catch (Exception e) {
@@ -200,7 +210,6 @@ public void nextTuple() {
// get the tuple from the handler
try {
Values vals = this.tupleProducer.toTuple(msg);
- // if we're transactional, always ack, otherwise
// ack if we're not in AUTO_ACKNOWLEDGE mode, or the message requests ACKNOWLEDGE
LOG.debug("Requested deliveryMode: " + toDeliveryModeString(msg.getJMSDeliveryMode()));
LOG.debug("Our deliveryMode: " + toDeliveryModeString(this.jmsAcknowledgeMode));
@@ -325,4 +334,24 @@ protected Session getSession(){
private boolean isDurableSubscription(){
return (this.jmsAcknowledgeMode != Session.AUTO_ACKNOWLEDGE);
}
+
+
+ private class RecoveryTask extends TimerTask {
+ private final Logger LOG = LoggerFactory.getLogger(RecoveryTask.class);
+
+ public void run() {
+ synchronized (JmsSpout.this.recoveryMutex) {
+ if (JmsSpout.this.hasFailures()) {
+ try {
+ LOG.info("Recovering from a message failure.");
+ JmsSpout.this.getSession().recover();
+ JmsSpout.this.recovered();
+ } catch (JMSException e) {
+ LOG.warn("Could not recover jms session.", e);
+ }
+ }
+ }
+ }
+
+ }
}
View
32 src/main/java/backtype/storm/contrib/jms/spout/RecoveryTask.java
@@ -1,32 +0,0 @@
-package backtype.storm.contrib.jms.spout;
-
-import java.util.TimerTask;
-
-import javax.jms.JMSException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RecoveryTask extends TimerTask {
- private static final Logger LOG = LoggerFactory.getLogger(RecoveryTask.class);
- private JmsSpout spout;
-
- public RecoveryTask(JmsSpout spout) {
- this.spout = spout;
- }
-
- public void run() {
- synchronized (spout.recoveryMutex) {
- if (spout.hasFailures()) {
- try {
- LOG.info("Recovering from a message failure.");
- spout.getSession().recover();
- spout.recovered();
- } catch (JMSException e) {
- LOG.warn("Could not recover jms session.", e);
- }
- }
- }
- }
-
-}

0 comments on commit 97b5c70

Please sign in to comment.
Something went wrong with that request. Please try again.