Skip to content

Commit

Permalink
GH-716: Null Payload Doc Improvements
Browse files Browse the repository at this point in the history
Fixes #716

Improve documentation - `null` is not necessarily just for tombstone records.
  • Loading branch information
garyrussell authored and artembilan committed Jun 26, 2018
1 parent 7d85c56 commit 9ac47f6
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
Expand Up @@ -257,12 +257,16 @@ public void testSimple() throws Exception {
.isEqualTo("clientIdViaAnnotation-0");

template.send("annotated11", 0, "foo");
template.flush();
assertThat(this.listener.latch7.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.consumerRef.get()).isNotNull();
assertThat(this.listener.latch7String).isEqualTo("foo");

assertThat(this.recordFilter.called).isTrue();

template.send("annotated11", 0, null);
assertThat(this.listener.latch7a.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(this.listener.latch7String).isNull();

MessageListenerContainer rebalanceConcurrentContainer = registry.getListenerContainer("rebalanceListener");
assertThat(rebalanceConcurrentContainer).isNotNull();
MessageListenerContainer rebalanceContainer = (MessageListenerContainer) KafkaTestUtils
Expand Down Expand Up @@ -1114,6 +1118,10 @@ static class Listener implements ConsumerSeekAware {

private final CountDownLatch latch7 = new CountDownLatch(1);

private final CountDownLatch latch7a = new CountDownLatch(2);

private volatile String latch7String;

private final CountDownLatch latch8 = new CountDownLatch(1);

private final CountDownLatch latch9 = new CountDownLatch(1);
Expand Down Expand Up @@ -1262,8 +1270,10 @@ public void jsonHeaders(Bar foo) { // should be mapped to Foo via Headers

@KafkaListener(id = "rebalanceListener", topics = "annotated11", idIsGroup = false,
containerFactory = "kafkaRebalanceListenerContainerFactory")
public void listen7(String foo) {
public void listen7(@Payload(required = false) String foo) {
this.latch7String = foo;
this.latch7.countDown();
this.latch7a.countDown();
}

@KafkaListener(id = "quux", topics = "annotated12")
Expand Down
10 changes: 6 additions & 4 deletions src/reference/asciidoc/kafka.adoc
Expand Up @@ -1686,11 +1686,11 @@ If the converter has no converter (either because Jackson is not present, or it
IMPORTANT: The Jackson `ObjectMapper` (even if provided) will be enhanced to support deserializing `org.springframework.util.MimeType` objects, often used in the `spring-messaging` `contentType` header.
If you don't wish your mapper to be enhanced in this way, for some reason, you should subclass the `DefaultKafkaHeaderMapper` and override `getObjectMapper()` to return your mapper.

==== Log Compaction
==== Null Payloads and Log Compaction 'Tombstone' Records

When using https://kafka.apache.org/documentation/#compaction[Log Compaction], it is possible to send and receive messages with `null` payloads which identifies the deletion of a key.

Starting with _version 1.0.3_, this is now fully supported.
It is also possible to receive `null` values for other reasons - such as a `Deserializer` that might return `null` when it can't deserialize a value.

To send a `null` payload using the `KafkaTemplate` simply pass null into the value argument of the `send()` methods.
One exception to this is the `send(Message<?> message)` variant.
Expand All @@ -1699,7 +1699,7 @@ For convenience, the static `KafkaNull.INSTANCE` is provided.

When using a message listener container, the received `ConsumerRecord` will have a `null` `value()`.

To configure the `@KafkaListener` to handle `null` payloads, you must use the `@Payload` annotation with `required = false`; you will usually also need the key so your application knows which key was "deleted":
To configure the `@KafkaListener` to handle `null` payloads, you must use the `@Payload` annotation with `required = false`; if it's a tombstone message for a compacted log, you will usually also need the key so your application can determine which key was "deleted":

[source, java]
----
Expand All @@ -1709,7 +1709,7 @@ public void listen(@Payload(required = false) String value, @Header(KafkaHeaders
}
----

When using a class-level `@KafkaListener`, some additional configuration is needed - a `@KafkaHandler` method with a `KafkaNull` payload:
When using a class-level `@KafkaListener` with multiple `@KafkaHandler` methods, some additional configuration is needed - a `@KafkaHandler` method with a `KafkaNull` payload:

[source, java]
----
Expand All @@ -1734,6 +1734,8 @@ static class MultiListenerBean {
}
----

Note that the argument will be `null` not a `KafkaNull`.

[[annotation-error-handling]]
==== Handling Exceptions

Expand Down

0 comments on commit 9ac47f6

Please sign in to comment.