Permalink
Browse files

Auto reconnect on connection drop (thanks Olga Gorun)

  • Loading branch information...
1 parent 506cdc8 commit 6e1a88f3188267fa55e400ce3d10aa2113cede84 @samstokes samstokes committed Dec 22, 2011
Showing with 30 additions and 2 deletions.
  1. +30 −2 src/main/java/com/rapportive/storm/spout/AMQPSpout.java
View
32 src/main/java/com/rapportive/storm/spout/AMQPSpout.java
@@ -11,6 +11,7 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.ShutdownSignalException;
import com.rapportive.storm.amqp.QueueDeclaration;
import backtype.storm.spout.Scheme;
@@ -19,6 +20,8 @@
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.utils.Utils;
+
/**
* Spout to feed messages into Storm from an AMQP queue. Each message routed
* to the queue will be emitted as a Storm tuple. The message will be acked or
@@ -82,6 +85,12 @@
*/
public static final long WAIT_FOR_NEXT_MESSAGE = 1L;
+ /**
+ * Time in milliseconds to wait after losing connection to the AMQP broker
+ * before attempting to reconnect.
+ */
+ public static final long WAIT_AFTER_SHUTDOWN_SIGNAL = 10000L;
+
private final String amqpHost;
private final int amqpPort;
private final String amqpUsername;
@@ -99,6 +108,8 @@
private SpoutOutputCollector collector;
+ private int prefetchCount;
+
/**
* Create a new AMQP spout. When
@@ -225,6 +236,10 @@ public void nextTuple() {
* Avoid infinite retry!
* Maybe we should output them on a separate stream.
*/
+ } catch (ShutdownSignalException e) {
+ log.warn("AMQP connection dropped, will attempt to reconnect...");
+ Utils.sleep(WAIT_AFTER_SHUTDOWN_SIGNAL);
+ reconnect();
} catch (InterruptedException e) {
// interrupted while waiting for message, big deal
}
@@ -245,18 +260,21 @@ public void open(@SuppressWarnings("rawtypes") Map config, TopologyContext conte
} else if (prefetchCount < 1) {
throw new IllegalArgumentException(CONFIG_PREFETCH_COUNT + " must be at least 1");
}
+ this.prefetchCount = prefetchCount.intValue();
try {
this.collector = collector;
- setupAMQP(prefetchCount.intValue());
+ setupAMQP();
} catch (IOException e) {
log.error("AMQP setup failed", e);
}
}
- private void setupAMQP(int prefetchCount) throws IOException {
+ private void setupAMQP() throws IOException {
+ final int prefetchCount = this.prefetchCount;
+
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(amqpHost);
@@ -280,6 +298,16 @@ private void setupAMQP(int prefetchCount) throws IOException {
}
+ private void reconnect() {
+ log.info("Reconnecting to AMQP broker...");
+ try {
+ setupAMQP();
+ } catch (IOException e) {
+ log.warn("Failed to reconnect to AMQP broker", e);
+ }
+ }
+
+
/**
* Declares the output fields of this spout according to the provided
* {@link backtype.storm.spout.Scheme}.

0 comments on commit 6e1a88f

Please sign in to comment.