Skip to content

Commit

Permalink
GH-1494: Fix NPE in DeadLetterPublishingRecoverer
Browse files Browse the repository at this point in the history
Resolves #1494
  • Loading branch information
garyrussell authored and artembilan committed May 28, 2020
1 parent 0281b77 commit 96efd3f
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 6 deletions.
Expand Up @@ -129,7 +129,9 @@ public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends O
* original topic. The templates map keys are classes and the value the corresponding
* template to use for objects (producer record values) of that type. A
* {@link java.util.LinkedHashMap} is recommended when there is more than one
* template, to ensure the map is traversed in order.
* template, to ensure the map is traversed in order. To send records with a null
* value, add a template with the {@link Void} class as a key; otherwise the first
* template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
*/
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates) {
Expand All @@ -143,7 +145,9 @@ public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Obj
* 0, no partition is set when publishing to the topic. The templates map keys are
* classes and the value the corresponding template to use for objects (producer
* record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
* there is more than one template, to ensure the map is traversed in order.
* there is more than one template, to ensure the map is traversed in order. To send
* records with a null value, add a template with the {@link Void} class as a key;
* otherwise the first template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
* @param destinationResolver the resolving function.
*/
Expand Down Expand Up @@ -212,10 +216,19 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
}

@SuppressWarnings("unchecked")
private KafkaOperations<Object, Object> findTemplateForValue(Object value) {
private KafkaOperations<Object, Object> findTemplateForValue(@Nullable Object value) {
if (this.template != null) {
return this.template;
}
if (value == null) {
KafkaOperations<?, ?> operations = this.templates.get(Void.class);
if (operations == null) {
return (KafkaOperations<Object, Object>) this.templates.values().iterator().next();
}
else {
return (KafkaOperations<Object, Object>) operations;
}
}
Optional<Class<?>> key = this.templates.keySet()
.stream()
.filter((k) -> k.isAssignableFrom(value.getClass()))
Expand Down
Expand Up @@ -29,6 +29,8 @@
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -52,38 +54,41 @@
*/
public class DeadLetterPublishingRecovererTests {

@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void testTxNoTx() {
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
given(template.isTransactional()).willReturn(true);
given(template.inTransaction()).willReturn(false);
given(template.isAllowNonTransactional()).willReturn(true);
given(template.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
recoverer.accept(record, new RuntimeException());
verify(template, never()).executeInTransaction(any());
verify(template).send(any(ProducerRecord.class));
}

@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void testTxExisting() {
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
given(template.isTransactional()).willReturn(true);
given(template.inTransaction()).willReturn(true);
given(template.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
recoverer.accept(record, new RuntimeException());
verify(template, never()).executeInTransaction(any());
verify(template).send(any(ProducerRecord.class));
}

@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void testNonTx() {
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
given(template.isTransactional()).willReturn(false);
given(template.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
recoverer.accept(record, new RuntimeException());
Expand All @@ -103,6 +108,7 @@ void testTxNewTx() {
((OperationsCallback) inv.getArgument(0)).doInOperations(template);
return null;
}).given(template).executeInTransaction(any());
given(template.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
recoverer.accept(record, new RuntimeException());
Expand Down Expand Up @@ -166,6 +172,36 @@ void headersNotStripped() {
assertThat(headers.lastHeader(ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNotNull();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void tombstoneWithMultiTemplates() {
KafkaOperations<?, ?> template1 = mock(KafkaOperations.class);
given(template1.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
KafkaOperations<?, ?> template2 = mock(KafkaOperations.class);
Map<Class<?>, KafkaOperations<?, ?>> templates = new HashMap<>();
templates.put(String.class, template1);
templates.put(Integer.class, template2);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(templates);
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
recoverer.accept(record, new RuntimeException());
verify(template1).send(any(ProducerRecord.class));
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void tombstoneWithMultiTemplatesExplicit() {
KafkaOperations<?, ?> template1 = mock(KafkaOperations.class);
KafkaOperations<?, ?> template2 = mock(KafkaOperations.class);
given(template2.send(any(ProducerRecord.class))).willReturn(new SettableListenableFuture());
Map<Class<?>, KafkaOperations<?, ?>> templates = new HashMap<>();
templates.put(String.class, template1);
templates.put(Void.class, template2);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(templates);
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
recoverer.accept(record, new RuntimeException());
verify(template2).send(any(ProducerRecord.class));
}

private byte[] header(boolean isKey) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
Expand Down
2 changes: 2 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Expand Up @@ -4859,6 +4859,8 @@ public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplat
The publisher uses the map keys to locate a template that is suitable for the `value()` about to be published.
A `LinkedHashMap` is recommended so that the keys are examined in order.

When publishing `null` values, when there are multiple templates, the recoverer will look for a template for the `Void` class; if none is present, the first template from the `values().iterator()` will be used.

IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.

Starting with version 2.3, the recoverer can also be used with Kafka Streams - see <<streams-deser-recovery>> for more information.
Expand Down

0 comments on commit 96efd3f

Please sign in to comment.