Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: rapportive-oss/storm-amqp-spout
base: f101e913bf
...
head fork: rapportive-oss/storm-amqp-spout
compare: 0537043f70
  • 4 commits
  • 4 files changed
  • 0 commit comments
  • 1 contributor
View
3  .classpath
@@ -1,9 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java"/>
- <classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources"/>
- <classpathentry kind="src" output="target/test-classes" path="src/test/java"/>
- <classpathentry excluding="**" kind="src" output="target/test-classes" path="src/test/resources"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6"/>
<classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER"/>
<classpathentry kind="output" path="target/classes"/>
View
2  AUTHORS
@@ -0,0 +1,2 @@
+Sam Stokes <sam@rapportive.com>
+Olga Gorun
View
2  pom.xml
@@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.rapportive</groupId>
<artifactId>storm-amqp-spout</artifactId>
- <version>0.1.0</version>
+ <version>0.1.1</version>
<repositories>
<repository>
<id>clojars.org</id>
View
32 src/main/java/com/rapportive/storm/spout/AMQPSpout.java
@@ -11,6 +11,7 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
+import com.rabbitmq.client.ShutdownSignalException;
import com.rapportive.storm.amqp.QueueDeclaration;
import backtype.storm.spout.Scheme;
@@ -19,6 +20,8 @@
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.utils.Utils;
+
/**
* Spout to feed messages into Storm from an AMQP queue. Each message routed
* to the queue will be emitted as a Storm tuple. The message will be acked or
@@ -82,6 +85,12 @@
*/
public static final long WAIT_FOR_NEXT_MESSAGE = 1L;
+ /**
+ * Time in milliseconds to wait after losing connection to the AMQP broker
+ * before attempting to reconnect.
+ */
+ public static final long WAIT_AFTER_SHUTDOWN_SIGNAL = 10000L;
+
private final String amqpHost;
private final int amqpPort;
private final String amqpUsername;
@@ -99,6 +108,8 @@
private SpoutOutputCollector collector;
+ private int prefetchCount;
+
/**
* Create a new AMQP spout. When
@@ -225,6 +236,10 @@ public void nextTuple() {
* Avoid infinite retry!
* Maybe we should output them on a separate stream.
*/
+ } catch (ShutdownSignalException e) {
+ log.warn("AMQP connection dropped, will attempt to reconnect...");
+ Utils.sleep(WAIT_AFTER_SHUTDOWN_SIGNAL);
+ reconnect();
} catch (InterruptedException e) {
// interrupted while waiting for message, big deal
}
@@ -245,18 +260,21 @@ public void open(@SuppressWarnings("rawtypes") Map config, TopologyContext conte
} else if (prefetchCount < 1) {
throw new IllegalArgumentException(CONFIG_PREFETCH_COUNT + " must be at least 1");
}
+ this.prefetchCount = prefetchCount.intValue();
try {
this.collector = collector;
- setupAMQP(prefetchCount.intValue());
+ setupAMQP();
} catch (IOException e) {
log.error("AMQP setup failed", e);
}
}
- private void setupAMQP(int prefetchCount) throws IOException {
+ private void setupAMQP() throws IOException {
+ final int prefetchCount = this.prefetchCount;
+
final ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(amqpHost);
@@ -280,6 +298,16 @@ private void setupAMQP(int prefetchCount) throws IOException {
}
+ private void reconnect() {
+ log.info("Reconnecting to AMQP broker...");
+ try {
+ setupAMQP();
+ } catch (IOException e) {
+ log.warn("Failed to reconnect to AMQP broker", e);
+ }
+ }
+
+
/**
* Declares the output fields of this spout according to the provided
* {@link backtype.storm.spout.Scheme}.

No commit comments for this range

Something went wrong with that request. Please try again.