Skip to content

Commit 86c39a3

Browse files
author
Stig Rohde Døssing
committed
STORM-2413: Make new Kafka spout respect tuple retry limit
1 parent 2cc1015 commit 86c39a3

File tree

5 files changed

+54
-7
lines changed

5 files changed

+54
-7
lines changed

external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ private void emit() {
330330
//@return true if tuple was emitted
331331
private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
332332
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
333-
final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
333+
final KafkaSpoutMessageId msgId = retryService.getMessageId(record);
334334
if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) { // has been acked
335335
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
336336
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail

external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.apache.kafka.common.TopicPartition;
2323

2424
public class KafkaSpoutMessageId {
25-
private transient TopicPartition topicPart;
26-
private transient long offset;
25+
private final transient TopicPartition topicPart;
26+
private final transient long offset;
2727
private transient int numFails = 0;
2828

2929
public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) {

external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Set;
3131
import java.util.TreeSet;
3232
import java.util.concurrent.TimeUnit;
33+
import org.apache.kafka.clients.consumer.ConsumerRecord;
3334
import org.apache.storm.utils.Time;
3435

3536
/**
@@ -269,6 +270,19 @@ public boolean schedule(KafkaSpoutMessageId msgId) {
269270
return true;
270271
}
271272
}
273+
274+
@Override
275+
public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
276+
KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
277+
if(toRetryMsgs.contains(msgId)) {
278+
for(KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
279+
if(originalMsgId.equals(msgId)) {
280+
return originalMsgId;
281+
}
282+
}
283+
}
284+
return msgId;
285+
}
272286

273287
// if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
274288
private long nextTime(KafkaSpoutMessageId msgId) {

external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222

2323
import java.io.Serializable;
2424
import java.util.Collection;
25+
import java.util.Optional;
2526
import java.util.Set;
27+
import org.apache.kafka.clients.consumer.ConsumerRecord;
2628

2729
/**
2830
* Represents the logic that manages the retrial of failed tuples.
@@ -75,4 +77,11 @@ public interface KafkaSpoutRetryService extends Serializable {
7577
* Returns false is this message is not scheduled for retrial
7678
*/
7779
boolean isScheduled(KafkaSpoutMessageId msgId);
80+
81+
/**
82+
* Gets the {@link KafkaSpoutMessageId} for the given record.
83+
* @param record The record to fetch the id for
84+
* @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for retry.
85+
*/
86+
KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record);
7887
}

external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.storm.kafka.spout;
1919

20-
2120
import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
2221

2322
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -41,6 +40,7 @@
4140
import static org.mockito.Matchers.anyObject;
4241
import static org.mockito.Matchers.eq;
4342
import static org.mockito.Mockito.mock;
43+
import static org.mockito.Mockito.never;
4444
import static org.mockito.Mockito.reset;
4545
import static org.mockito.Mockito.spy;
4646
import static org.mockito.Mockito.times;
@@ -57,7 +57,6 @@
5757
import org.mockito.Captor;
5858
import org.mockito.MockitoAnnotations;
5959

60-
6160
public class SingleTopicKafkaSpoutTest {
6261

6362
@Rule
@@ -73,12 +72,15 @@ public class SingleTopicKafkaSpoutTest {
7372
private KafkaConsumer<String, String> consumerSpy;
7473
private KafkaConsumerFactory<String, String> consumerFactory;
7574
private KafkaSpout<String, String> spout;
75+
private int maxRetries = 3;
7676

7777
@Before
7878
public void setUp() {
7979
MockitoAnnotations.initMocks(this);
8080
KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
8181
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
82+
.setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
83+
maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
8284
.build();
8385
this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
8486
this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
@@ -90,8 +92,8 @@ void populateTopicData(String topicName, int msgCount) throws InterruptedExcepti
9092

9193
for (int i = 0; i < msgCount; i++) {
9294
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
93-
topicName, Integer.toString(i),
94-
Integer.toString(i));
95+
topicName, Integer.toString(i),
96+
Integer.toString(i));
9597
kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
9698
}
9799
}
@@ -266,4 +268,26 @@ public void shouldReplayFirstTupleFailedOutOfOrder() throws Exception {
266268
verifyAllMessagesCommitted(messageCount);
267269
}
268270
}
271+
272+
@Test
273+
public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception {
274+
//Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted
275+
int messageCount = 1;
276+
initializeSpout(messageCount);
277+
278+
//Emit and fail the same tuple until we've reached retry limit
279+
for (int i = 0; i <= maxRetries; i++) {
280+
ArgumentCaptor<Object> messageIdFailed = ArgumentCaptor.forClass(Object.class);
281+
spout.nextTuple();
282+
verify(collector).emit(anyObject(), anyObject(), messageIdFailed.capture());
283+
KafkaSpoutMessageId msgId = (KafkaSpoutMessageId)messageIdFailed.getValue();
284+
spout.fail(msgId);
285+
assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i+1));
286+
reset(collector);
287+
}
288+
289+
//Verify that the tuple is not emitted again
290+
spout.nextTuple();
291+
verify(collector, never()).emit(anyObject(), anyObject(), anyObject());
292+
}
269293
}

0 commit comments

Comments
 (0)