Permalink
Browse files

Ack/fail messages when Storm acks/fails corresponding tuples

  • Loading branch information...
1 parent 05a971f commit 24d61e45f0ecd56208016fcb7324d8453af092bb @samstokes samstokes committed Oct 2, 2011
Showing with 39 additions and 32 deletions.
  1. +39 −32 src/main/java/com/rapportive/storm/spout/AMQPSpout.java
@@ -19,28 +19,19 @@
* Spout to feed messages into Storm from an AMQP exchange.
*
* Each message published to the exchange that matches the supplied routing key
- * will be emitted as a Storm tuple.
+ * will be emitted as a Storm tuple. The message will be acked or rejected
+ * once the topology has respectively fully processed or failed the
+ * corresponding tuple.
*
* This should not currently be used where guaranteed message processing is
- * required, because of two limitations:
+ * required, because it binds to the exchange using a temporary queue when the
+ * topology calls <tt>open()</tt> on the spout. This means it will only
+ * receive messages published to the exchange after the call to
+ * <tt>open()</tt>, and if the spout worker restarts or the topology is killed,
+ * it will not receive any messages published while the worker or topology is
+ * down.
*
- * <ol>
- * <li>
- * Uses a temporary queue to bind to the specified exchange when the topology
- * calls <tt>open()</tt> on the spout, so it will only receive messages
- * published to the exchange after the call to <tt>open()</tt>, and if the
- * spout worker restarts or the topology is killed, it will not receive any
- * messages published while the worker or topology is down.
- * </li>
- *
- * <li>
- * Currently auto-acks all consumed messages with the AMQP broker, and does not
- * implement Storm's reliability API, so if processing a message fails it will
- * simply be discarded.
- * </li>
- * </ol>
- *
- * Limitation 1 also means this spout cannot currently be distributed among
+ * For the same reason, this spout cannot currently be distributed among
* multiple workers (each worker gets its own exclusive queue, so multiple
* workers would each receive their own copy of every message).
*
@@ -108,9 +99,19 @@ public AMQPSpout(String host, int port, String username, String password, String
@Override
- public void ack(Object arg0) {
- // TODO Auto-generated method stub
-
+ public void ack(Object msgId) {
+ if (msgId instanceof Long) {
+ final long deliveryTag = (Long) msgId;
+ if (amqpChannel != null) {
+ try {
+ amqpChannel.basicAck(deliveryTag, false /* not multiple */);
+ } catch (IOException e) {
+ log.warn("Failed to ack delivery-tag " + deliveryTag, e);
+ }
+ }
+ } else {
+ log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
+ }
}
@@ -139,9 +140,19 @@ public void close() {
@Override
- public void fail(Object arg0) {
- // TODO Auto-generated method stub
-
+ public void fail(Object msgId) {
+ if (msgId instanceof Long) {
+ final long deliveryTag = (Long) msgId;
+ if (amqpChannel != null) {
+ try {
+ amqpChannel.basicReject(deliveryTag, false /* don't requeue */);
+ } catch (IOException e) {
+ log.warn("Failed to reject delivery-tag " + deliveryTag, e);
+ }
+ }
+ } else {
+ log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
+ }
}
@@ -152,8 +163,9 @@ public void nextTuple() {
try {
final QueueingConsumer.Delivery delivery = amqpConsumer.nextDelivery(WAIT_FOR_NEXT_MESSAGE);
if (delivery == null) break;
+ final long deliveryTag = delivery.getEnvelope().getDeliveryTag();
final byte[] message = delivery.getBody();
- collector.emit(serialisationScheme.deserialize(message));
+ collector.emit(serialisationScheme.deserialize(message), deliveryTag);
} catch (InterruptedException e) {
break;
}
@@ -213,12 +225,7 @@ private void setupAMQP() throws IOException {
amqpChannel.queueBind(queue, amqpExchange, amqpRoutingKey);
this.amqpConsumer = new QueueingConsumer(amqpChannel);
- /*
- * This tells the consumer to auto-ack every message as soon as we get
- * it. For reliability, we probably want to manually ack or reject
- * (e.g. in ack() and fail()).
- */
- this.amqpConsumerTag = amqpChannel.basicConsume(queue, true, amqpConsumer);
+ this.amqpConsumerTag = amqpChannel.basicConsume(queue, false /* no auto-ack */, amqpConsumer);
}

0 comments on commit 24d61e4

Please sign in to comment.