Skip to content

Commit

Permalink
GH-1766 - Use next topic's delay in retry topic (#1767)
Browse files Browse the repository at this point in the history
Fixes #1766
  • Loading branch information
tomazfernandes committed Apr 13, 2021
1 parent 99ff5cf commit 45be8f0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 24 deletions.
Expand Up @@ -144,7 +144,8 @@ private long getNextExecutionTimestamp(ConsumerRecord<?, ?> consumerRecord, Exce
long originalTimestamp = new BigInteger(originalTimestampHeader).longValue();
long failureTimestamp = getFailureTimestamp(e);
long nextExecutionTimestamp = failureTimestamp + this.destinationTopicResolver
.getDestinationTopicByName(consumerRecord.topic()).getDestinationDelay();
.resolveDestinationTopic(consumerRecord.topic(), getAttempts(consumerRecord), e, originalTimestamp)
.getDestinationDelay();
LOGGER.debug(() -> String.format("FailureTimestamp: %s, Original timestamp: %s, nextExecutionTimestamp: %s",
failureTimestamp, originalTimestamp, nextExecutionTimestamp));
return nextExecutionTimestamp;
Expand Down
Expand Up @@ -79,9 +79,6 @@ class DeadLetterPublishingRecovererFactoryTests {
@Mock
private KafkaOperations<?, ?> kafkaOperations;

@Mock
private KafkaOperations<?, ?> kafkaOperations2;

@Mock
private ListenableFuture<?> listenableFuture;

Expand All @@ -106,11 +103,10 @@ void shouldSendMessage() {
given(destinationTopic.isNoOpsTopic()).willReturn(false);
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
given(destinationTopic.getDestinationPartitions()).willReturn(3);
given(destinationTopicResolver.getDestinationTopicByName(testTopic)).willReturn(destinationTopic);
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
given(destinationTopic.getDestinationDelay()).willReturn(1000L);
willReturn(this.kafkaOperations2).given(destinationTopic).getKafkaOperations();
given(kafkaOperations2.send(any(ProducerRecord.class))).willReturn(listenableFuture);
willReturn(this.kafkaOperations).given(destinationTopic).getKafkaOperations();
given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture);
this.consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, originalTimestampBytes);

DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver);
Expand All @@ -120,7 +116,7 @@ void shouldSendMessage() {
deadLetterPublishingRecoverer.accept(this.consumerRecord, e);

// then
then(kafkaOperations2).should(times(1)).send(producerRecordCaptor.capture());
then(kafkaOperations).should(times(1)).send(producerRecordCaptor.capture());
ProducerRecord producerRecord = producerRecordCaptor.getValue();
assertThat(producerRecord.topic()).isEqualTo(testRetryTopic);
assertThat(producerRecord.value()).isEqualTo(value);
Expand Down Expand Up @@ -151,9 +147,8 @@ void shouldIncreaseAttempts() {
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
given(destinationTopic.getDestinationPartitions()).willReturn(1);
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
given(destinationTopicResolver.getDestinationTopicByName(testTopic)).willReturn(destinationTopic);
willReturn(kafkaOperations2).given(destinationTopic).getKafkaOperations();
given(kafkaOperations2.send(any(ProducerRecord.class))).willReturn(listenableFuture);
willReturn(kafkaOperations).given(destinationTopic).getKafkaOperations();
given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture);

DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver);

Expand All @@ -162,7 +157,7 @@ void shouldIncreaseAttempts() {
deadLetterPublishingRecoverer.accept(consumerRecord, e);

// then
then(kafkaOperations2).should(times(1)).send(producerRecordCaptor.capture());
then(kafkaOperations).should(times(1)).send(producerRecordCaptor.capture());
ProducerRecord producerRecord = producerRecordCaptor.getValue();
Header attemptsHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
assertThat(attemptsHeader).isNotNull();
Expand All @@ -182,10 +177,8 @@ void shouldAddOriginalTimestampHeader() {
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
given(destinationTopic.getDestinationPartitions()).willReturn(1);
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
long nextExecutionTimestamp = this.nowTimestamp + destinationTopic.getDestinationDelay();
given(destinationTopicResolver.getDestinationTopicByName(testTopic)).willReturn(destinationTopic);
willReturn(this.kafkaOperations2).given(destinationTopic).getKafkaOperations();
given(kafkaOperations2.send(any(ProducerRecord.class))).willReturn(listenableFuture);
willReturn(this.kafkaOperations).given(destinationTopic).getKafkaOperations();
given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture);

DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver);

Expand All @@ -194,7 +187,7 @@ void shouldAddOriginalTimestampHeader() {
deadLetterPublishingRecoverer.accept(consumerRecord, e);

// then
then(kafkaOperations2).should(times(1)).send(producerRecordCaptor.capture());
then(kafkaOperations).should(times(1)).send(producerRecordCaptor.capture());
ProducerRecord producerRecord = producerRecordCaptor.getValue();
Header originalTimestampHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP);
assertThat(originalTimestampHeader).isNotNull();
Expand All @@ -215,10 +208,8 @@ void shouldNotReplaceOriginalTimestampHeader() {
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
given(destinationTopic.getDestinationPartitions()).willReturn(1);
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
long nextExecutionTimestamp = this.nowTimestamp + destinationTopic.getDestinationDelay();
given(destinationTopicResolver.getDestinationTopicByName(testTopic)).willReturn(destinationTopic);
willReturn(this.kafkaOperations2).given(destinationTopic).getKafkaOperations();
given(kafkaOperations2.send(any(ProducerRecord.class))).willReturn(listenableFuture);
willReturn(this.kafkaOperations).given(destinationTopic).getKafkaOperations();
given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture);

DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver);

Expand All @@ -227,7 +218,7 @@ void shouldNotReplaceOriginalTimestampHeader() {
deadLetterPublishingRecoverer.accept(consumerRecord, e);

// then
then(kafkaOperations2).should(times(1)).send(producerRecordCaptor.capture());
then(kafkaOperations).should(times(1)).send(producerRecordCaptor.capture());
ProducerRecord producerRecord = producerRecordCaptor.getValue();
Header originalTimestampHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP);
assertThat(originalTimestampHeader).isNotNull();
Expand All @@ -249,7 +240,7 @@ void shouldNotSendMessageIfNoOpsDestination() {
deadLetterPublishingRecoverer.accept(this.consumerRecord, e);

// then
then(kafkaOperations2).should(times(0)).send(any(ProducerRecord.class));
then(kafkaOperations).should(times(0)).send(any(ProducerRecord.class));
}

@Test
Expand All @@ -264,7 +255,7 @@ void shouldThrowIfKafkaBackoffException() {
.isThrownBy(() -> deadLetterPublishingRecoverer.accept(this.consumerRecord, e));

// then
then(kafkaOperations2).should(times(0)).send(any(ProducerRecord.class));
then(kafkaOperations).should(times(0)).send(any(ProducerRecord.class));
}

@Test
Expand Down

0 comments on commit 45be8f0

Please sign in to comment.