Skip to content

Commit

Permalink
GH-2357: Switch to CompletableFuture
Browse files Browse the repository at this point in the history
Resolves #2357

Spring Framework is planning to deprecate `ListenableFuture` in 6.0.

Add methods to the `KafkaOperations` (`KafkaTemplate`) that return
`CompletableFuture` instead; the `ListenableFuture` methods have now
been removed in 3.0.

* Remove KafkaOperations2; doc polishing.

* Fix import.
  • Loading branch information
garyrussell committed Jul 21, 2022
1 parent a9ec919 commit ed6673e
Show file tree
Hide file tree
Showing 20 changed files with 502 additions and 235 deletions.
9 changes: 9 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/changes-since-1.0.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,22 @@ You can now configure which inbound headers should be mapped.
Also available in version 2.8.8 or later.
See <<headers>> for more information.

[[x29-template-changes]]
==== `KafkaTemplate` Changes

In 3.0, the futures returned by this class will be `CompletableFuture` s instead of `ListenableFuture` s.
See <<kafka-template>> for assistance in transitioning when using this release.

[[x29-rkt-changes]]
==== `ReplyingKafkaTemplate` Changes

The template now provides a method to wait for assignment on the reply container, to avoid a race when sending a request before the reply container is initialized.
Also available in version 2.8.8 or later.
See <<replying-template>>.

In 3.0, the futures returned by this class will be `CompletableFuture` s instead of `ListenableFuture` s.
See <<replying-template>> and <<exchanging-messages>> for assistance in transitioning when using this release.

=== What's New in 2.8 Since 2.7

This section covers the changes made from version 2.7 to version 2.8.
Expand Down
105 changes: 30 additions & 75 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,25 +170,25 @@ The following listing shows the relevant methods from `KafkaTemplate`:
====
[source, java]
----
ListenableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
Expand All @@ -210,6 +210,9 @@ interface ProducerCallback<K, V, T> {

See the https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/KafkaTemplate.html[Javadoc] for more detail.

IMPORTANT: In version 3.0, the methods that previously returned `ListenableFuture` have been changed to return `CompletableFuture`.
To facilitate the migration, the 2.9 version added a method `.usingCompletableFuture()` which provided the same methods with `CompletableFuture` return types; this method is no longer available.

The `sendDefault` API requires that a default topic has been provided to the template.

The API takes in a `timestamp` as a parameter and stores this timestamp in the record.
Expand Down Expand Up @@ -302,75 +305,27 @@ By default, the template is configured with a `LoggingProducerListener`, which l

For convenience, default method implementations are provided in case you want to implement only one of the methods.

Notice that the send methods return a `ListenableFuture<SendResult>`.
Notice that the send methods return a `CompletableFuture<SendResult>`.
You can register a callback with the listener to receive the result of the send asynchronously.
The following example shows how to do so:

====
[source, java]
----
ListenableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
----
====

`SendResult` has two properties, a `ProducerRecord` and `RecordMetadata`.
See the Kafka API documentation for information about those objects.

The `Throwable` in `onFailure` can be cast to a `KafkaProducerException`; its `failedProducerRecord` property contains the failed record.

Starting with version 2.5, you can use a `KafkaSendCallback` instead of a `ListenableFutureCallback`, making it easier to extract the failed `ProducerRecord`, avoiding the need to cast the `Throwable`:

====
[source, java]
----
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(new KafkaSendCallback<Integer, String>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(KafkaProducerException ex) {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
...
}
});
----
====

You can also use a pair of lambdas:

====
[source, java]
----
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(result -> {
...
}, (KafkaFailureCallback<Integer, String>) ex -> {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
...
});
----
====
The `Throwable` can be cast to a `KafkaProducerException`; its `failedProducerRecord` property contains the failed record.

If you wish to block the sending thread to await the result, you can invoke the future's `get()` method; using the method with a timeout is recommended.
You may wish to invoke `flush()` before waiting or, for convenience, the template has a constructor with an `autoFlush` parameter that causes the template to `flush()` on each send.
If you have set a `linger.ms`, you may wish to invoke `flush()` before waiting or, for convenience, the template has a constructor with an `autoFlush` parameter that causes the template to `flush()` on each send.
Flushing is only needed if you have set the `linger.ms` producer property and want to immediately send a partial batch.

====== Examples
Expand All @@ -384,19 +339,14 @@ This section shows examples of sending messages to Kafka:
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
ListenableFuture<SendResult<Integer, String>> future = template.send(record);
future.addCallback(new KafkaSendCallback<Integer, String>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
@Override
public void onFailure(KafkaProducerException ex) {
else {
handleFailure(data, record, ex);
}
});
}
----
Expand Down Expand Up @@ -549,10 +499,12 @@ RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,

(Also see <<exchanging-messages>>).

The result is a `ListenableFuture` that is asynchronously populated with the result (or an exception, for a timeout).
The result is a `CompletableFuture` that is asynchronously populated with the result (or an exception, for a timeout).
The result also has a `sendFuture` property, which is the result of calling `KafkaTemplate.send()`.
You can use this future to determine the result of the send operation.

IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) have been changed to `CompletableFuture` s instead of `ListenableFuture` s.

If the first method is used, or the `replyTimeout` argument is `null`, the template's `defaultReplyTimeout` property is used (5 seconds by default).

Starting with version 2.8.8, the template has a new method `waitForAssignment`.
Expand Down Expand Up @@ -791,6 +743,8 @@ RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

These will use the template's default `replyTimeout`, there are also overloaded versions that can take a timeout in the method call.

IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) have been changed to `CompletableFuture` s instead of `ListenableFuture` s.

Use the first method if the consumer's `Deserializer` or the template's `MessageConverter` can convert the payload without any additional information, either via configuration or type metadata in the reply message.

Use the second method if you need to provide type information for the return type, to assist the message converter.
Expand Down Expand Up @@ -2236,6 +2190,7 @@ public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerCon
====

When you use `@SendTo`, you must configure the `ConcurrentKafkaListenerContainerFactory` with a `KafkaTemplate` in its `replyTemplate` property to perform the send.
Spring Boot will automatically wire in its auto configured template (or any if a single instance is present).

NOTE: Unless you use <<replying-template,request/reply semantics>> only the simple `send(topic, value)` method is used, so you may wish to create a subclass to generate the partition or key.
The following example shows how to do so:
Expand All @@ -2248,7 +2203,7 @@ public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory()) {
@Override
public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
Expand Down
12 changes: 12 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,15 @@ See <<retry-config>> for more information.

Events related to consumer authentication and authorization failures are now published by the container.
See <<events>> for more information.

[[x30-template-changes]]
==== `KafkaTemplate` Changes

The futures returned by this class are now `CompletableFuture` s instead of `ListenableFuture` s.
See <<kafka-template>>.

[[x30-rkt-changes]]
==== `ReplyingKafkaTemplate` Changes

The futures returned by this class are now `CompletableFuture` s instead of `ListenableFuture` s.
See <<replying-template>> and <<exchanging-messages>>.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -36,10 +37,9 @@
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.concurrent.ListenableFuture;

/**
* The basic Kafka operations contract returning {@link ListenableFuture}s.
* The basic Kafka operations contract returning {@link CompletableFuture}s.
*
* @param <K> the key type.
* @param <V> the value type.
Expand Down Expand Up @@ -68,15 +68,15 @@ public interface KafkaOperations<K, V> {
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(V data);

/**
* Send the data to the default topic with the provided key and no partition.
* @param key the key.
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

/**
* Send the data to the default topic with the provided key and partition.
Expand All @@ -85,7 +85,7 @@ public interface KafkaOperations<K, V> {
* @param data the data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

/**
* Send the data to the default topic with the provided key and partition.
Expand All @@ -96,15 +96,15 @@ public interface KafkaOperations<K, V> {
* @return a Future for the {@link SendResult}.
* @since 1.3
*/
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

/**
* Send the data to the provided topic with no key or partition.
* @param topic the topic.
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);

/**
* Send the data to the provided topic with the provided key and no partition.
Expand All @@ -113,7 +113,7 @@ public interface KafkaOperations<K, V> {
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

/**
* Send the data to the provided topic with the provided key and partition.
Expand All @@ -123,7 +123,7 @@ public interface KafkaOperations<K, V> {
* @param data the data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

/**
* Send the data to the provided topic with the provided key and partition.
Expand All @@ -135,15 +135,15 @@ public interface KafkaOperations<K, V> {
* @return a Future for the {@link SendResult}.
* @since 1.3
*/
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

/**
* Send the provided {@link ProducerRecord}.
* @param record the record.
* @return a Future for the {@link SendResult}.
* @since 1.3
*/
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

/**
* Send a message with routing information in message headers. The message payload
Expand All @@ -154,7 +154,7 @@ public interface KafkaOperations<K, V> {
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION
* @see org.springframework.kafka.support.KafkaHeaders#KEY
*/
ListenableFuture<SendResult<K, V>> send(Message<?> message);
CompletableFuture<SendResult<K, V>> send(Message<?> message);

/**
* See {@link Producer#partitionsFor(String)}.
Expand Down
Loading

0 comments on commit ed6673e

Please sign in to comment.