Skip to content
Browse files

Batch ack failed items.

  • Loading branch information...
1 parent 8750403 commit 34e1a86b76779c3941de7323f4bc6150854bfb7f @sritchie committed
Showing with 12 additions and 0 deletions.
  1. +12 −0 src/jvm/backtype/storm/spout/KestrelThriftSpout.java
View
12 src/jvm/backtype/storm/spout/KestrelThriftSpout.java
@@ -170,6 +170,8 @@ public boolean bufferKestrelGet(int index) {
// LOG.info("Kestrel batch get fetched " + items.size() + " items. (batchSize= " + BATCH_SIZE +
// " queueName=" + _queueName + ", index=" + index + ", host=" + info.host + ")");
+ HashSet toAck = new HashSet();
+
for(Item item : items) {
List<Object> retItems = _scheme.deserialize(item.get_data());
@@ -179,6 +181,16 @@ public boolean bufferKestrelGet(int index) {
if(!_emitBuffer.offer(emitItem)) {
throw new RuntimeException("KestrelThriftSpout's Internal Buffer Enqeueue Failed.");
}
+ } else {
+ toAck.add(item.get_id());
+ }
+ }
+
+ if(toAck.size() > 0) {
+ try {
+ info.client.abort(_queueName, toAck);
+ } catch(TException e) {
+ blacklist(info, e);
}
}

0 comments on commit 34e1a86

Please sign in to comment.
Something went wrong with that request. Please try again.