Skip to content
This repository has been archived by the owner on Dec 5, 2023. It is now read-only.

Commit

Permalink
AMQPSpout.CONFIG_PREFETCH_COUNT to set qos prefetch-count.
Browse files Browse the repository at this point in the history
  • Loading branch information
samstokes committed Oct 5, 2011
1 parent c06ea6b commit 56dc896
Showing 1 changed file with 34 additions and 3 deletions.
37 changes: 34 additions & 3 deletions src/main/java/com/rapportive/storm/spout/AMQPSpout.java
Expand Up @@ -23,6 +23,11 @@
* once the topology has respectively fully processed or failed the
* corresponding tuple.</p>
*
* <p>This consumes messages from AMQP asynchronously, so it may receive
* messages before Storm requests them as tuples; therefore it buffers messages
* in an internal queue. To avoid this buffer growing large and consuming too
* much RAM, set {@link #CONFIG_PREFETCH_COUNT}.</p>
*
* <p>This should not currently be used where guaranteed message processing is
* required, because it binds to the exchange using a temporary queue when the
* topology calls <tt>open()</tt> on the spout. This means it will only
Expand All @@ -47,6 +52,23 @@ public class AMQPSpout implements IRichSpout {

private static final Logger log = Logger.getLogger(AMQPSpout.class);

/**
* Storm config key to set the AMQP basic.qos prefetch-count parameter.
* Defaults to 100.
*
* <p>This caps the number of messages outstanding (i.e. unacked) at a time
* that will be sent to each spout worker. Increasing this will improve
* throughput if the network roundtrip time to the AMQP broker is
* significant compared to the time for the topology to process each
* message; this will also increase the RAM requirements as the internal
* message buffer grows.</p>
*
* <p>AMQP allows a prefetch-count of zero, indicating unlimited delivery,
* but that is not allowed here to avoid unbounded buffer growth.)</p>
*/
public static final String CONFIG_PREFETCH_COUNT = "amqp.prefetch.count";
private static final long DEFAULT_PREFETCH_COUNT = 100;

private static final long WAIT_FOR_NEXT_MESSAGE = 1L;

private final String amqpHost;
Expand Down Expand Up @@ -174,17 +196,25 @@ public void nextTuple() {

@Override
public void open(@SuppressWarnings("rawtypes") Map config, TopologyContext context, SpoutOutputCollector collector) {
Long prefetchCount = (Long) config.get(CONFIG_PREFETCH_COUNT);
if (prefetchCount == null) {
log.info("Using default prefetch-count");
prefetchCount = DEFAULT_PREFETCH_COUNT;
} else if (prefetchCount < 1) {
throw new IllegalArgumentException(CONFIG_PREFETCH_COUNT + " must be at least 1");
}

try {
this.collector = collector;

setupAMQP();
setupAMQP(prefetchCount.intValue());
} catch (IOException e) {
log.error("AMQP setup failed", e);
}
}


private void setupAMQP() throws IOException {
private void setupAMQP(int prefetchCount) throws IOException {
final ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setHost(amqpHost);
Expand All @@ -196,7 +226,8 @@ private void setupAMQP() throws IOException {
this.amqpConnection = connectionFactory.newConnection();
this.amqpChannel = amqpConnection.createChannel();

// TODO qos (avoid huge internal queue)
log.info("Setting basic.qos prefetch-count to " + prefetchCount);
amqpChannel.basicQos(prefetchCount);

amqpChannel.exchangeDeclarePassive(amqpExchange);

Expand Down

0 comments on commit 56dc896

Please sign in to comment.