Skip to content
This repository has been archived by the owner on Dec 5, 2023. It is now read-only.

Commit

Permalink
Make queue declaration configurable; now supports reliable processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
samstokes committed Oct 31, 2011
1 parent 307d4d0 commit c9a5544
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 82 deletions.
47 changes: 24 additions & 23 deletions README.markdown
@@ -1,13 +1,19 @@
# storm-amqp-spout: AMQP input source for Storm #

storm-amqp-spout allows a [Storm](https://github.com/nathanmarz/storm) topology
to consume an AMQP exchange as an input source. It currently provides one
class:
storm-amqp-spout allows a [Storm][] topology to consume an AMQP queue as an
input source. It currently provides:

* <tt>[AMQPSpout][]</tt>: an implementation of
[`backtype.storm.topology.IRichSpout`][IRichSpout] that connects to an AMQP
broker, consumes the messages published to a specified AMQP exchange and
emits them as Storm tuples.
broker, consumes the messages routed to a specified AMQP queue and emits them
as Storm tuples.
* <tt>[QueueDeclaration][]</tt>: an interface that encapsulates declaring an
AMQP queue and setting up any exchange bindings it requires, used by
AMQPSpout to set up the queue to consume.
* <tt>[ExclusiveQueueWithBinding][]</tt>: a QueueDeclaration suitable for
prototyping and one-off analytics use cases.
* <tt>[SharedQueueWithBinding][]</tt>: a QueueDeclaration suitable for
production use cases needing guaranteed processing.

You'll need to provide a [Scheme][] to tell AMQPSpout how to interpret the
messages and turn them into Storm tuples. See e.g. [storm-json][] if your
Expand Down Expand Up @@ -51,22 +57,13 @@ This is very early software. It may break and the API is liable to change
completely between releases. Pull requests, patches and bug reports are very
welcome.

This should not currently be used where guaranteed message processing is
required, because it binds to the exchange using a temporary queue when the
topology calls <tt>[open()][]</tt> on the spout. This means it will only
receive messages published to the exchange after the call to
<tt>[open()][]</tt>, and if the spout worker restarts or the topology is
killed, it will not receive any messages published while the worker or topology
is down.

For the same reason, this spout cannot currently be distributed among
multiple workers (each worker gets its own exclusive queue, so multiple
workers would each receive their own copy of every message).

Improvements are planned to overcome both these limitations and support
guaranteed message processing, distributed across any number of workers.
These improvements may require API changes (e.g. to specify the name of an
existing queue to consume, rather than an exchange to bind to).
**N.B.** if you need to guarantee all messages are reliably processed, you
should have AMQPSpout consume from a queue that is *not* set as 'exclusive' or
'auto-delete': otherwise if the spout task crashes or is restarted, the queue
will be deleted and any messages in it lost, as will any messages published
while the task remains down. See [SharedQueueWithBinding][] to declare a
shared queue that allows for guaranteed processing. (For prototyping, an
[ExclusiveQueueWithBinding][] may be simpler to manage.)

## Compatibility ##

Expand All @@ -78,11 +75,15 @@ versions and other AMQP brokers.
"Storm project homepage"
[IRichSpout]: <http://nathanmarz.github.com/storm/doc/backtype/storm/topology/IRichSpout.html>
"Javadoc for backtype.storm.topology.IRichSpout"
[open()]: <http://nathanmarz.github.com/storm/doc/backtype/storm/spout/ISpout.html#open(java.util.Map,%20backtype.storm.task.TopologyContext,%20backtype.storm.spout.SpoutOutputCollector\)>
"Javadoc for backtype.storm.spout.ISpout.open()"
[Scheme]: <http://nathanmarz.github.com/storm/doc/backtype/storm/spout/Scheme.html>
"Javadoc for backtype.storm.spout.Scheme"
[AMQPSpout]: <http://code.rapportive.com/storm-amqp-spout/doc/com/rapportive/storm/spout/AMQPSpout.html>
"Javadoc for AMQPSpout"
[QueueDeclaration]: <http://code.rapportive.com/storm-amqp-spout/doc/com/rapportive/storm/amqp/QueueDeclaration.html>
"Javadoc for QueueDeclaration"
[ExclusiveQueueWithBinding]: <http://code.rapportive.com/storm-amqp-spout/doc/com/rapportive/storm/amqp/ExclusiveQueueWithBinding.html>
"Javadoc for ExclusiveQueueWithBinding"
[SharedQueueWithBinding]: <http://code.rapportive.com/storm-amqp-spout/doc/com/rapportive/storm/amqp/SharedQueueWithBinding.html>
"Javadoc for SharedQueueWithBinding"
[storm-json]: <https://github.com/rapportive-oss/storm-json>
"JSON {,de}serialisation support for Storm"
@@ -0,0 +1,77 @@
package com.rapportive.storm.amqp;

import java.io.IOException;

import com.rabbitmq.client.AMQP.Queue;

import com.rabbitmq.client.Channel;

/**
* Declares an exclusive, server-named queue and binds it to an existing
* exchange. This is probably the easiest way to start prototyping with an
* {@link com.rapportive.storm.spout.AMQPSpout}: if your app already publishes
* to an exchange, you can just point this at the exchange and start consuming
* messages.
*
* <p>However <strong>N.B.</strong> this queue setup <em>is not reliable</em>,
* in that if the spout task crashes or restarts, messages published while the
* spout is down will be lost (because the spout creates the queue when it
* starts up, and the server deletes the queue when the spout closes).</p>
*
* <p>It also cannot scale out to multiple parallel spout tasks. The semantics
* of an exclusive queue mean that each spout task would get its own queue
* bound to the exchange. That means each task would receive a copy of every
* message, so messages would get processed multiple times.</p>
*
* <p>If you need guaranteed processing or a horizontally scalable spout,
* consider {@link SharedQueueWithBinding}.</p>
*/
public class ExclusiveQueueWithBinding implements QueueDeclaration {
private static final long serialVersionUID = 7923072289071634425L;

private final String exchange;
private final String routingKey;

/**
* Create a declaration of an exclusive server-named queue bound to the
* specified exchange.
*
* @param exchange exchange to bind the queue to.
* @param routingKey routing key for the exchange binding. Use "#" to
* receive all messages published to the exchange.
*/
public ExclusiveQueueWithBinding(String exchange, String routingKey) {
this.exchange = exchange;
this.routingKey = routingKey;
}

/**
* Verifies the exchange exists, creates an exclusive, server-named queue
* and binds it to the exchange.
*
* @return the server's response to the successful queue declaration (you
* can use this to discover the name of the queue).
*
* @throws IOException if the exchange does not exist, or if the AMQP
* connection drops.
*/
@Override
public Queue.DeclareOk declare(Channel channel) throws IOException {
channel.exchangeDeclarePassive(exchange);

final Queue.DeclareOk queue = channel.queueDeclare();

channel.queueBind(queue.getQueue(), exchange, routingKey);

return queue;
}

/**
* Returns <tt>false</tt> as this queue is <em>not</em> safe for parallel
* consumers.
*/
@Override
public boolean isParallelConsumable() {
return false;
}
}
49 changes: 49 additions & 0 deletions src/main/java/com/rapportive/storm/amqp/QueueDeclaration.java
@@ -0,0 +1,49 @@
package com.rapportive.storm.amqp;

import java.io.IOException;
import java.io.Serializable;

import com.rabbitmq.client.AMQP.Queue;

import com.rabbitmq.client.Channel;

/**
* Declaration of a queue to consume, and any exchange bindings the queue needs.
*
* <p>Depending on the queue parameters (exclusive, auto_delete, server-named)
* and exchange bindings, it may or may not be safe to start several consumers
* in parallel using a given queue declaration. For example, an exclusive
* named queue bound to an exchange is not safe because only one of the
* consumers will succeed in declaring the queue; an exclusive
* <em>server-named</em> queue does not have that problem, but is still
* probably not safe, because most exchange types will send a copy of every
* message to every queue bound to them, so you will end up consuming each
* message several times.</p>
*
* <p>For that reason, to implement this interface you must implement
* {@link #isParallelConsumable} to indicate whether or not this queue is safe
* for parallel consumers.</p>
*/
public interface QueueDeclaration extends Serializable {
/**
* Declare the queue, and any exchanges and bindings that it needs. Called
* once to determine the queue to consume from.
*
* @param channel An open AMQP channel which can be used to send the
* declarations.
*
* @return the server's response to the successful queue declaration (used
* to determine the queue name to subscribe to).
*
* @throws IOException if a declaration fails or the AMQP connection drops.
*/
Queue.DeclareOk declare(Channel channel) throws IOException;

/**
* Indicate whether this queue is safe for parallel consumers.
*
* @return <tt>true</tt> if safe for parallel consumers, otherwise
* <tt>false</tt>.
*/
boolean isParallelConsumable();
}
@@ -0,0 +1,80 @@
package com.rapportive.storm.amqp;

import java.io.IOException;

import com.rabbitmq.client.AMQP.Queue;

import com.rabbitmq.client.Channel;

/**
* Declares a named, durable queue and binds it to an existing exchange. This
* is a good choice for production use as the queue will survive spout
* restarts, so you won't miss messages if your spout crashes.
*
* <p><strong>N.B.</strong> this could be risky under some circumstances. e.g.
* if while prototyping you set a development topology consuming from a
* production AMQP server, then kill your topology and go home for the night;
* messages will continue to be queued up, which could threaten the stability
* of the AMQP server if the exchange is high-volume. For prototyping consider
* {@link ExclusiveQueueWithBinding}.</p>
*
* <p>This queue is safe for multiple parallel spout tasks: as they all consume
* the same named queue, the AMQP broker will round-robin messages between
* them, so each message will get processed only once (barring redelivery due
* to outages).</p>
*/
public class SharedQueueWithBinding implements QueueDeclaration {
private static final long serialVersionUID = 2364833412534518859L;

private final String queueName;
private final String exchange;
private final String routingKey;

/**
* Create a declaration of a named, durable, non-exclusive queue bound to
* the specified exchange.
*
* @param queueName name of the queue to be declared.
* @param exchange exchange to bind the queue to.
* @param routingKey routing key for the exchange binding. Use "#" to
* receive all messages published to the exchange.
*/
public SharedQueueWithBinding(String queueName, String exchange, String routingKey) {
this.queueName = queueName;
this.exchange = exchange;
this.routingKey = routingKey;
}

/**
* Verifies the exchange exists, creates the named queue if it does not
* exist, and binds it to the exchange.
*
* @return the server's response to the successful queue declaration.
*
* @throws IOException if the exchange does not exist, the queue could not
* be declared, or if the AMQP connection drops.
*/
@Override
public Queue.DeclareOk declare(Channel channel) throws IOException {
channel.exchangeDeclarePassive(exchange);

final Queue.DeclareOk queue = channel.queueDeclare(
queueName,
/* durable */ true,
/* non-exclusive */ false,
/* non-auto-delete */ false,
/* no arguments */ null);

channel.queueBind(queue.getQueue(), exchange, routingKey);

return queue;
}

/**
* Returns <tt>true</tt> as this queue is safe for parallel consumers.
*/
@Override
public boolean isParallelConsumable() {
return true;
}
}

0 comments on commit c9a5544

Please sign in to comment.