Allows a Storm topology to consume an AMQP exchange as an input source.
Latest commit b369353 Oct 3, 2012 Sam Stokes Link to maintained forks
Failed to load latest commit information.
src/main/java/com/rapportive/storm Auto reconnect on connection drop (thanks Olga Gorun) Dec 22, 2011
.classpath Remove Eclipse classpath entries missing from git Dec 22, 2011
.gitignore gitignore target/ Sep 25, 2011
.project boilerplate (Eclipse and Maven) Sep 25, 2011
AUTHORS Add AUTHORS file crediting contributors Dec 22, 2011
LICENSE.MIT Add MIT license Sep 29, 2011
README.markdown Link to maintained forks Oct 3, 2012
pom.xml Bump version to 0.1.1 Dec 22, 2011


storm-amqp-spout: AMQP input source for Storm

This is out of date!

This is a rather old version of the spout that is not API-compatible with newer versions of Storm. Various people have forked it to bring it up to date and add features:

If you want your fork added to this list, or to contribute a description of your fork, please email


storm-amqp-spout allows a Storm topology to consume an AMQP queue as an input source. It currently provides:

  • AMQPSpout: an implementation of backtype.storm.topology.IRichSpout that connects to an AMQP broker, consumes the messages routed to a specified AMQP queue and emits them as Storm tuples.
  • QueueDeclaration: an interface that encapsulates declaring an AMQP queue and setting up any exchange bindings it requires, used by AMQPSpout to set up the queue to consume.
  • ExclusiveQueueWithBinding: a QueueDeclaration suitable for prototyping and one-off analytics use cases.
  • SharedQueueWithBinding: a QueueDeclaration suitable for production use cases needing guaranteed processing.

You'll need to provide a Scheme to tell AMQPSpout how to interpret the messages and turn them into Storm tuples. See e.g. storm-json if your messages are JSON.


The Javadocs can be found at


To produce a jar:

$ mvn package

To install in your local Maven repository:

$ mvn install

To use in your pom.xml:

  <!-- ... -->
    <!-- ... -->
    <!-- ... -->
  <!-- ... -->


This is early software. It has been used at production volumes, but not yet for business-critical use cases. It may break and the API is liable to change completely between releases. Pull requests, patches and bug reports are very welcome.

N.B. if you need to guarantee all messages are reliably processed, you should have AMQPSpout consume from a queue that is not set as 'exclusive' or 'auto-delete': otherwise if the spout task crashes or is restarted, the queue will be deleted and any messages in it lost, as will any messages published while the task remains down. See SharedQueueWithBinding to declare a shared queue that allows for guaranteed processing. (For prototyping, an ExclusiveQueueWithBinding may be simpler to manage.)

This does not currently handle malformed messages very well: the spout worker will crash if the provided Scheme fails to deserialise a message.

This does not currently support retrying messages in the event of transient failure to process: any message which the topology fails to process will simply be dropped. This is to prevent infinite redelivery in the event of non-transient failures (e.g. malformed messages, though see previous caveat!). This will probably be made configurable in a future release.


AMQPSpout has been tested with RabbitMQ 2.3.1, 2.6.1 and 2.7.0. It should probably work with other versions and other AMQP brokers.