Permalink
Browse files

Javadoc; document limitations

  • Loading branch information...
1 parent a0e7a94 commit b167e4049c6553214d2da869b0d095c7f038d7db @samstokes samstokes committed Sep 29, 2011
Showing with 63 additions and 0 deletions.
  1. +11 −0 pom.xml
  2. +52 −0 src/main/java/com/rapportive/storm/spout/AMQPSpout.java
View
@@ -21,4 +21,15 @@
<version>2.6.1</version>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
@@ -13,6 +13,40 @@
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
+
+/**
+ * Spout to feed messages into Storm from an AMQP exchange.
+ *
+ * This should not currently be used where guaranteed message processing is
+ * required, because of two limitations:
+ *
+ * <ol>
+ * <li>
+ * Uses a temporary queue to bind to the specified exchange when the topology
+ * calls <tt>open()</tt> on the spout, so it will only receive messages
+ * published to the exchange after the call to <tt>open()</tt>, and if the
+ * spout worker restarts or the topology is killed, it will not receive any
+ * messages published while the worker or topology is down.
+ * </li>
+ *
+ * <li>
+ * Currently auto-acks all consumed messages with the AMQP broker, and does not
+ * implement Storm's reliability API, so if processing a message fails it will
+ * simply be discarded.
+ * </li>
+ * </ol>
+ *
+ * Limitation 1 also means this spout cannot currently be distributed among
+ * multiple workers (each worker gets its own exclusive queue, so multiple
+ * workers would each receive their own copy of every message).
+ *
+ * Improvements are planned to overcome both these limitations and support
+ * guaranteed message processing, distributed across any number of workers.
+ * These improvements may require API changes (e.g. to specify the name of an
+ * existing queue to consume, rather than an exchange to bind to).
+ *
+ * @author Sam Stokes (sam@rapportive.com)
+ */
public class AMQPSpout implements IRichSpout {
private static final long serialVersionUID = 11258942292629263L;
@@ -36,6 +70,24 @@
private SpoutOutputCollector collector;
+ /**
+ * Create a new AMQP spout. When
+ * {@link #open(Map, TopologyContext, SpoutOutputCollector)} is called, it
+ * will create a new server-named, exclusive, auto-delete queue, bind it to
+ * the specified exchange on the specified server with the specified
+ * routing key, and start consuming messages. It will use the provided
+ * <tt>scheme</tt> to deserialise each AMQP message into a Storm tuple.
+ *
+ * @param host hostname of the AMQP broker node
+ * @param port port number of the AMQP broker node
+ * @param username username to log into to the broker
+ * @param password password to authenticate to the broker
+ * @param vhost vhost on the broker
+ * @param exchange exchange to bind to
+ * @param routingKey routing key for the binding
+ * @param scheme {@link backtype.storm.spout.Scheme} used to deserialise
+ * each AMQP message into a Storm tuple
+ */
public AMQPSpout(String host, int port, String username, String password, String vhost, String exchange, String routingKey, Scheme scheme) {
this.amqpHost = host;
this.amqpPort = port;

0 comments on commit b167e40

Please sign in to comment.