Skip to content

Commit

Permalink
GH-2082: Fix RecordInterceptor
Browse files Browse the repository at this point in the history
Resolves #2082

Now that the `earlyRecordInterceptor` is always used unless explicitly disabled,
move delivery attempt header processing earlier so that the header is available
in the interceptor.

**cherry-pick to 2.8.x, 2.7.x**
  • Loading branch information
garyrussell authored and artembilan committed Jan 24, 2022
1 parent 7ddbf12 commit 0d430b9
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2427,16 +2427,28 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
}

@Nullable
private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> nextArg) {
ConsumerRecord<K, V> next = nextArg;
private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg) {
deliveryAttemptHeader(recordArg);
ConsumerRecord<K, V> record = recordArg;
if (this.earlyRecordInterceptor != null) {
next = this.earlyRecordInterceptor.intercept(next, this.consumer);
if (next == null) {
record = this.earlyRecordInterceptor.intercept(record, this.consumer);
if (record == null) {
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
+ ListenerUtils.recordToString(nextArg));
+ ListenerUtils.recordToString(recordArg));
}
}
return next;
return record;
}

private void deliveryAttemptHeader(final ConsumerRecord<K, V> record) {
if (this.deliveryAttemptAware != null) {
byte[] buff = new byte[4]; // NOSONAR (magic #)
ByteBuffer bb = ByteBuffer.wrap(buff);
bb.putInt(this.deliveryAttemptAware
.deliveryAttempt(
new TopicPartitionOffset(record.topic(), record.partition(), record.offset())));
record.headers().add(new RecordHeader(KafkaHeaders.DELIVERY_ATTEMPT, buff));
}
}

private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecord<K, V> record) {
Expand Down Expand Up @@ -2553,14 +2565,6 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record) {
if (record.key() == null && this.checkNullKeyForExceptions) {
checkDeser(record, SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
}
if (this.deliveryAttemptAware != null) {
byte[] buff = new byte[4]; // NOSONAR (magic #)
ByteBuffer bb = ByteBuffer.wrap(buff);
bb.putInt(this.deliveryAttemptAware
.deliveryAttempt(
new TopicPartitionOffset(record.topic(), record.partition(), record.offset())));
record.headers().add(new RecordHeader(KafkaHeaders.DELIVERY_ATTEMPT, buff));
}
doInvokeOnMessage(record);
if (this.nackSleep < 0 && !this.isManualImmediateAck) {
ackCurrent(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
assertThat(this.config.count).isEqualTo(8);
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux", "qux", "qux", "fiz", "buz");
assertThat(this.config.deliveries).contains(1, 1, 1, 1, 2, 3, 1, 1);
assertThat(this.config.deliveryAttempt).isNotNull();
}

@Configuration
Expand All @@ -146,6 +147,8 @@ public static class Config {

int count;

volatile org.apache.kafka.common.header.Header deliveryAttempt;

@KafkaListener(groupId = "grp",
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
partitions = "#{'0,1,2'.split(',')}",
Expand Down Expand Up @@ -234,6 +237,10 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.getContainerProperties().setDeliveryAttemptHeader(true);
factory.setRecordInterceptor(record -> {
Config.this.deliveryAttempt = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT);
return record;
});
return factory;
}

Expand Down

0 comments on commit 0d430b9

Please sign in to comment.