Skip to content

Latest commit

 

History

History
3050 lines (2376 loc) · 124 KB

kafka.adoc

File metadata and controls

3050 lines (2376 loc) · 124 KB

Using Spring for Apache Kafka

This section offers detailed explanations of the various concerns that impact using Spring for Apache Kafka. For a quick but less detailed introduction, see [quick-tour].

Configuring Topics

If you define a KafkaAdmin bean in your application context, it can automatically add topics to the broker. To do so, you can add a NewTopic @Bean for each topic to the application context. The following example shows how to do so:

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
            StringUtils.arrayToCommaDelimitedString(embeddedKafka().getBrokerAddresses()));
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return new NewTopic("thing1", 10, (short) 2);
}

@Bean
public NewTopic topic2() {
    return new NewTopic("thing2", 10, (short) 2);
}

By default, if the broker is not available, a message is logged, but the context continues to load. You can programmatically invoke the admin’s initialize() method to try again later. If you wish this condition to be considered fatal, set the admin’s fatalIfBrokerNotAvailable property to true. The context then fails to initialize.

Note
If the broker supports it (1.0.0 or higher), the admin increases the number of partitions if it is found that an existing topic has fewer partitions than the NewTopic.numPartitions.

For more advanced features, such as assigning partitions to replicas, you can use the AdminClient directly. The following example shows how to do so:

@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfig());
    ...
    client.close();

Sending Messages

This section covers how to send messages.

Using KafkaTemplate

This section covers how to use KafkaTemplate to send messages.

Overview

The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics. The following listing shows the relevant methods from KafkaTemplate:

ListenableFuture<SendResult<K, V>> sendDefault(V data);

ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, V data);

ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

ListenableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

// Flush the producer.

void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

See the Javadoc for more detail.

The sendDefault API requires that a default topic has been provided to the template.

The API takes in a timestamp as a parameter and stores this timestamp in the record. How the user-provided timestamp is stored depends on the timestamp type configured on the Kafka topic. If the topic is configured to use CREATE_TIME, the user specified timestamp is recorded (or generated if not specified). If the topic is configured to use LOG_APPEND_TIME, the user-specified timestamp is ignored and the broker adds in the local broker time.

The metrics and partitionsFor methods delegate to the same methods on the underlying Producer. The execute method provides direct access to the underlying Producer.

To use the template, you can configure a producer factory and provide it in the template’s constructor. The following example shows how to do so:

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

You can also configure the template by using standard <bean/> definitions.

Then, to use the template, you can invoke one of its methods.

When you use the methods with a Message<?> parameter, the topic, partition, and key information is provided in a message header that includes the following items:

  • KafkaHeaders.TOPIC

  • KafkaHeaders.PARTITION_ID

  • KafkaHeaders.MESSAGE_KEY

  • KafkaHeaders.TIMESTAMP

The message payload is the data.

Optionally, you can configure the KafkaTemplate with a ProducerListener to get an asynchronous callback with the results of the send (success or failure) instead of waiting for the Future to complete. The following listing shows the definition of the ProducerListener interface:

public interface ProducerListener<K, V> {

    void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);

    void onError(String topic, Integer partition, K key, V value, Exception exception);

    boolean isInterestedInSuccess();

}

By default, the template is configured with a LoggingProducerListener, which logs errors and does nothing when the send is successful.

onSuccess is called only if isInterestedInSuccess returns true.

For convenience, the abstract ProducerListenerAdapter is provided in case you want to implement only one of the methods. It returns false for isInterestedInSuccess.

Notice that the send methods return a ListenableFuture<SendResult>. You can register a callback with the listener to receive the result of the send asynchronously. The following examlpe shows how to do so:

ListenableFuture<SendResult<Integer, String>> future = template.send("something");
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {

    @Override
    public void onSuccess(SendResult<Integer, String> result) {
        ...
    }

    @Override
    public void onFailure(Throwable ex) {
        ...
    }

});

SendResult has two properties, a ProducerRecord and RecordMetadata. See the Kafka API documentation for information about those objects.

If you wish to block the sending thread to await the result, you can invoke the future’s get() method. You may wish to invoke flush() before waiting or, for convenience, the template has a constructor with an autoFlush parameter that causes the template to flush() on each send. Note, however, that flushing likely significantly reduces performance.

Examples

This section shows examples of sending messages to Kafka:

Example 1. Non Blocking (Async)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    ListenableFuture<SendResult<Integer, String>> future = template.send(record);
    future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {

        @Override
        public void onSuccess(SendResult<Integer, String> result) {
            handleSuccess(data);
        }

        @Override
        public void onFailure(Throwable ex) {
            handleFailure(data, record, ex);
        }

    });
}
Blocking (Sync)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}
Transactions

This section describes how Spring for Apache Kafka supports transactions.

Overview

The 0.11.0.0 client library added support for transactions. Spring for Apache Kafka adds support in the following ways:

  • KafkaTransactionManager: Used with normal Spring transaction support (@Transactional, TransactionTemplate etc).

  • Transactional KafkaMessageListenerContainer

  • Local transactions with KafkaTemplate

Transactions are enabled by providing the DefaultKafkaProducerFactory with a transactionIdPrefix. In that case, instead of managing a single shared Producer, the factory maintains a cache of transactional producers. When the user calls close() on a producer, it is returned to the cache for reuse instead of actually being closed. The transactional.id property of each producer is transactionIdPrefix + n, where n starts with 0 and is incremented for each new producer, unless the transaction is started by a listener container with a record-based listener. In that case, the transactional.id is <transactionIdPrefix>.<group.id>.<topic>.<partition>. This is to properly support fencing zombies, as described here. This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and 2.2.0. If you wish to revert to the previous behavior, you can set the producerPerConsumerPartition property on the DefaultKafkaProducerFactory to false.

Note
While transactions are supported with batch listeners, zombie fencing cannot be supported because a batch may contain records from multiple topics or partitions.
Using KafkaTransactionManager

The KafkaTransactionManager is an implementation of Spring Framework’s PlatformTransactionManager. It is provided with a reference to the producer factory in its constructor. If you provide a custom producer factory, it must support transactions. See ProducerFactory.transactionCapable().

You can use the KafkaTransactionManager with normal Spring transaction support (@Transactional, TransactionTemplate, and others). If a transaction is active, any KafkaTemplate operations performed within the scope of the transaction use the transaction’s Producer. The manager commits or rolls back the transaction, depending on success or failure. You must configure the KafkaTemplate to use the same ProducerFactory as the transaction manager.

Transactional Listener Container and Exactly Once Processing

You can provide a listener container with a KafkaAwareTransactionManager instance. When so configured, the container starts a transaction before invoking the listener. Any KafkaTemplate operations performed by the listener participate in the transaction. If the listener successfully processes the record (or multiple records, when using a BatchMessageListener), the container sends the offsets to the transaction by using producer.sendOffsetsToTransaction()), before the transaction manager commits the transaction. If the listener throws an exception, the transaction is rolled back and the consumer is repositioned so that the rolled-back record(s) can be retrieved on the next poll. See After-rollback Processor for more information and for handling records that repeatedly fail.

Transaction Synchronization

If you need to synchronize a Kafka transaction with some other transaction, configure the listener container with the appropriate transaction manager (one that supports synchronization, such as the DataSourceTransactionManager). Any operations performed on a transactional KafkaTemplate from the listener participate in a single transaction. The Kafka transaction is committed (or rolled back) immediately after the controlling transaction. Before exiting the listener, you should invoke one of the template’s sendOffsetsToTransaction methods (unless you use a ChainedKafkaTransactionManager). For convenience, the listener container binds its consumer group ID to the thread, so, generally, you can use the first method. The following listing shows the two method signatures:

void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets);

void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId);

The following example shows how to use the first signature of the sendOffsetsToTransaction method:

@Bean
KafkaMessageListenerContainer container(ConsumerFactory<String, String> cf,
            final KafkaTemplate template) {
    ContainerProperties props = new ContainerProperties("foo");
    props.setGroupId("group");
    props.setTransactionManager(new SomeOtherTransactionManager());
    ...
    props.setMessageListener((MessageListener<String, String>) m -> {
        template.send("foo", "bar");
        template.send("baz", "qux");
        template.sendOffsetsToTransaction(
            Collections.singletonMap(new TopicPartition(m.topic(), m.partition()),
                new OffsetAndMetadata(m.offset() + 1)));
    });
    return new KafkaMessageListenerContainer<>(cf, props);
}
Note
The offset to be committed is one greater than the offset of the records processed by the listener.
Important
You should call this should only when you use transaction synchronization. When a listener container is configured to use a KafkaTransactionManager, it takes care of sending the offsets to the transaction.
Using ChainedKafkaTransactionManager

The ChainedKafkaTransactionManager was introduced in version 2.1.3. This is a subclass of ChainedTransactionManager that can have exactly one KafkaTransactionManager. Since it is a KafkaAwareTransactionManager, the container can send the offsets to the transaction in the same way as when the container is configured with a simple KafkaTransactionManager. This provides another mechanism for synchronizing transactions without having to send the offsets to the transaction in the listener code. You should chain your transaction managers in the desired order and provide the ChainedTransactionManager in the ContainerProperties.

KafkaTemplate Local Transactions

You can use the KafkaTemplate to execute a series of operations within a local transaction. The following example shows how to do so:

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

The argument in the callback is the template itself (this). If the callback exits normally, the transaction is committed. If an exception is thrown, the transaction is rolled back.

Note
If there is a KafkaTransactionManager (or synchronized) transaction in process, it is not used. Instead, a new "nested" transaction is used.
Using ReplyingKafkaTemplate

Version 2.1.3 introduced a subclass of KafkaTemplate to provide request/reply semantics. The class is named ReplyingKafkaTemplate and has one method (in addition to those in the superclass). The following listing shows the method’s signature:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

The result is a ListenableFuture that is asynchronously populated with the result (or an exception, for a timeout). The result also has a sendFuture property, which is the result of calling KafkaTemplate.send(). You can use this future to determine the result of the send operation.

The following Spring Boot application shows an example of how to use the feature:

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get();
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get();
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<Long, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("replies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return new NewTopic("kRequests", 10, (short) 2);
    }

    @Bean
    public NewTopic kReplies() {
        return new NewTopic("kReplies", 10, (short) 2);
    }

}

Note that we can use Boot’s auto-configured container factory to create the reply container.

The template sets a header called KafkaHeaders.CORRELATION_ID, which must be echoed back by the server side.

In this case, the following @KafkaListener application responds:

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return new NewTopic("kRequests", 10, (short) 2);
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

The @KafkaListener infrastructure echoes the correlation ID and determines the reply topic.

See Forwarding Listener Results using @SendTo for more information about sending replies. The template uses the default header KafKaHeaders.REPLY_TOPIC to indicate the topic to which the reply goes.

Starting with version 2.2, the template tries to detect the reply topic or partition from the configured reply container. If the container is configured to listen to a single topic or a single TopicPartitionInitialOffset, it is used to set the reply headers. If the container is configured otherwise, the user must set up the reply headers. In this case, an INFO log message is written during initialization. The following example uses KafkaHeaders.REPLY_TOPIC:

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

When you configure with a single reply TopicPartitionInitialOffset, you can use the same reply topic for multiple templates, as long as each instance listens on a different partition. When configuring with a single reply topic, each instance must use a different group.id. In this case, all instances receive each reply, but only the instance that sent the request finds the correlation ID. This may be useful for auto-scaling, but with the overhead of additional network traffic and the small cost of discarding each unwanted reply. When you use this setting, we recommend that you set the template’s sharedReplyTopic to true, which reduces the logging level of unexpected replies to DEBUG instead of the default ERROR.

Important
If you have multiple client instances and you do not configure them as discussed in the preceding paragraph, each instance needs a dedicated reply topic. An alternative is to set the KafkaHeaders.REPLY_PARTITION and use a dedicated partition for each instance. The Header contains a four-byte int (big-endian). The server must use this header to route the reply to the correct topic (@KafkaListener does this). In this case, though, the reply container must not use Kafka’s group management feature and must be configured to listen on a fixed partition (by using a TopicPartitionInitialOffset in its ContainerProperties constructor).
Note
The DefaultKafkaHeaderMapper requires Jackson to be on the classpath (for the @KafkaListener). If it is not available, the message converter has no header mapper, so you must configure a MessagingMessageConverter with a SimpleKafkaHeaderMapper, as shown earlier.

Receiving Messages

You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.

Message Listeners

When you use a message listener container, you must provide a listener to receive data. There are currently eight supported interfaces for message listeners. The following listing shows these interfaces:

public interface MessageListener<K, V> { (1)

    void onMessage(ConsumerRecord<K, V> data);

}

public interface AcknowledgingMessageListener<K, V> { (2)

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

}

public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (3)

    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);

}

public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (4)

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

public interface BatchMessageListener<K, V> { (5)

    void onMessage(List<ConsumerRecord<K, V>> data);

}

public interface BatchAcknowledgingMessageListener<K, V> { (6)

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);

}

public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (7)

    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);

}

public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (8)

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}
  1. Use this interface for processing individual ConsumerRecord instances received from the Kafka consumer poll() operation when using auto-commit or one of the container-managed commit methods.

  2. Use this interface for processing individual ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods.

  3. Use this interface for processing individual ConsumerRecord instances received from the Kafka consumer poll() operation when using auto-commit or one of the container-managed commit methods. Access to the Consumer object is provided.

  4. Use this interface for processing individual ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods. Access to the Consumer object is provided.

  5. Use this interface for processing all ConsumerRecord instances received from the Kafka consumer poll() operation when using auto-commit or one of the container-managed commit methods. AckMode.RECORD is not supported when you use this interface, since the listener is given the complete batch.

  6. Use this interface for processing all ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods.

  7. Use this interface for processing all ConsumerRecord instances received from the Kafka consumer poll() operation when using auto-commit or one of the container-managed commit methods. AckMode.RECORD is not supported when you use this interface, since the listener is given the complete batch. Access to the Consumer object is provided.

  8. Use this interface for processing all ConsumerRecord instances received from the Kafka consumer poll() operation when using one of the manual commit methods. Access to the Consumer object is provided.

Important
The Consumer object is not thread-safe. You must only invoke its methods on the thread that calls the listener.
Message Listener Containers

Two MessageListenerContainer implementations are provided:

  • KafkaMessageListenerContainer

  • ConcurrentMessageListenerContainer

The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.

Using KafkaMessageListenerContainer

The following constructors are available:

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties,
                    TopicPartitionInitialOffset... topicPartitions)

Each takes a ConsumerFactory and information about topics and partitions, as well as other configuration in a ContainerProperties object. The second constructor is used by the ConcurrentMessageListenerContainer (described later) to distribute TopicPartitionInitialOffset across the consumer instances. ContainerProperties has the following constructors:

public ContainerProperties(TopicPartitionInitialOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

The first constructor takes an array of TopicPartitionInitialOffset arguments to explicitly instruct the container about which partitions to use (using the consumer assign() method) and with an optional initial offset. A positive value is an absolute offset by default. A negative value is relative to the current last offset within a partition by default. A constructor for TopicPartitionInitialOffset that takes an additional boolean argument is provided. If this is true, the initial offsets (positive or negative) are relative to the current position for this consumer. The offsets are applied when the container is started. The second takes an array of topics, and Kafka allocates the partitions based on the group.id property — distributing partitions across the group. The third uses a regex Pattern to select the topics.

To assign a MessageListener to a container, you can use the ContainerProps.setMessageListener method when creating the Container. The following example shows how to do so:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<Integer, String>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

Refer to the Javadoc for ContainerProperties for more information about the various properties that you can set.

Since version 2.1.1, a new property called logContainerConfig is available. When true and INFO logging is enabled each listener container writes a log message summarizing its configuration properties.

By default, logging of topic offset commits is performed at the DEBUG logging level. Starting with version 2.1.2, a property in ContainerProperties called commitLogLevel lets you specify the log level for these messages. For example, to change the log level to INFO, you can use containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);.

Starting with version 2.2, a new container property called missingTopicsFatal has been added (default: true). This prevents the container from starting if any of the configured topics are not present on the broker. It does not apply if the container is configured to listen to a topic pattern (regex). Previously, the container threads looped within the consumer.poll() method waiting for the topic to appear while logging many messages. Aside from the logs, there was no indication that there was a problem. To restore the previous behavior, you canset the property to false.

Using ConcurrentMessageListenerContainer

The single constructor is similar to the first KafkaListenerContainer constructor. The following listing shows the constructor’s signature:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

It also has a concurrency property. For example, container.setConcurrency(3) creates three KafkaMessageListenerContainer instances.

For the first constructor, Kafka distributes the partitions across the consumers using its group management capabilities.

Important

When listening to multiple topics, the default partition distribution may not be what you expect. For example, if you have three topics with five partitions each and you want to use concurrency=15, you see only five active consumers, each assigned one partition from each topic, with the other 10 consumers being idle. This is because the default Kafka PartitionAssignor is the RangeAssignor (see its Javadoc). For this scenario, you may want to consider using the RoundRobinAssignor instead, which distributes the partitions across all of the consumers. Then, each consumer is assigned one topic or partition. To change the PartitionAssignor, you can set the partition.assignment.strategy consumer property (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG) in the properties provided to the DefaultKafkaConsumerFactory.

When using Spring Boot, you can assign set the strategy as follows:

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

For the second constructor, the ConcurrentMessageListenerContainer distributes the TopicPartition instances across the delegate KafkaMessageListenerContainer instances.

If, say, six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.

Note
The client.id property (if set) is appended with -n where n is the consumer instance that corresponds to the concurrency. This is required to provide unique names for MBeans when JMX is enabled.

Starting with version 1.3, the MessageListenerContainer provides access to the metrics of the underlying KafkaConsumer. In the case of ConcurrentMessageListenerContainer, the metrics() method returns the metrics for all the target KafkaMessageListenerContainer instances. The metrics are grouped into the Map<MetricName, ? extends Metric> by the client-id provided for the underlying KafkaConsumer.

Committing Offsets

Several options are provided for committing offsets. If the enable.auto.commit consumer property is true, Kafka auto-commits the offsets according to its configuration. If it is false, the containers support several AckMode settings (described in the next list).

The consumer poll() method returns one or more ConsumerRecords. The MessageListener is called for each record. The following lists describes the action taken by the container for each AckMode:

  • RECORD: Commit the offset when the listener returns after processing the record.

  • BATCH: Commit the offset when all the records returned by the poll() have been processed.

  • TIME: Commit the offset when all the records returned by the poll() have been processed, as long as the ackTime since the last commit has been exceeded.

  • COUNT: Commit the offset when all the records returned by the poll() have been processed, as long as ackCount records have been received since the last commit.

  • COUNT_TIME: Similar to TIME and COUNT, but the commit is performed if either condition is true.

  • MANUAL: The message listener is responsible to acknowledge() the Acknowledgment. After that, the same semantics as BATCH are applied.

  • MANUAL_IMMEDIATE: Commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.

Note
MANUAL, and MANUAL_IMMEDIATE require the listener to be an AcknowledgingMessageListener or a BatchAcknowledgingMessageListener. See Message Listeners.

Depending on the syncCommits container property, the commitSync() or commitAsync() method on the consumer is used.

The Acknowledgment has the following method:

public interface Acknowledgment {

    void acknowledge();

}

This method gives the listener control over when offsets are committed.

Listener Container Auto Startup

The listener containers implement SmartLifecycle, and autoStartup is true by default. The containers are started in a late phase (Integer.MAX-VALUE - 100). Other components that implement SmartLifecycle, to handle data from listeners, should be started in an earlier phase. The - 100 leaves room for later phases to enable components to be auto-started after the containers.

@KafkaListener Annotation

The @KafkaListener annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter configured with various features, such as converters to convert the data, if necessary, to match the method parameters.

You can configure most attributes on the annotation with SpEL by using #{…​} or property placeholders (${…​}). See the Javadoc for more information.

Record Listeners

The @KafkaListener annotation provides a mechanism for simple POJO listeners. The following example shows how to use it:

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}

This mechanism requires an @EnableKafka annotation on one of your @Configuration classes and a listener container factory, which is used to configure the underlying ConcurrentMessageListenerContainer. By default, a bean with name kafkaListenerContainerFactory is expected. The following example shows how to use ConcurrentMessageListenerContainer:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

Notice that, to set container properties, you must use the getContainerProperties() method on the factory. It is used as a template for the actual properties injected into the container.

Starting with version 2.1.1, you can now set the client.id property for consumers created by the annotation. The clientIdPrefix is suffixed with -n, where n is an integer representing the container number when using concurrency.

Starting with version 2.2, you can now override the container factory’s concurrency and autoStartup properties by using properties on the annotation itself. The properties can be simple values, property placeholders, or SpEL expressions. The following example shows how to do so:

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}

You can also configure POJO listeners with explicit topics and partitions (and, optionally, their initial offsets). The following example shows how to do so:

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

You can specify each partition in the partitions or partitionOffsets attribute but not both.

When using manual AckMode, you can also provide the listener with the Acknowledgment. The following example also shows how to use a different container factory.

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

Finally, metadata about the message is available from message headers. You can use the following header names to retrieve the headers of the message:

  • KafkaHeaders.RECEIVED_MESSAGE_KEY

  • KafkaHeaders.RECEIVED_TOPIC

  • KafkaHeaders.RECEIVED_PARTITION_ID

  • KafkaHeaders.RECEIVED_TIMESTAMP

  • KafkaHeaders.TIMESTAMP_TYPE

The following example shows how to use the headers:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}
Batch listeners

Starting with version 1.1, you can configure @KafkaListener methods to receive the entire batch of consumer records received from the consumer poll. To configure the listener container factory to create batch listeners, you can set the batchListener property. The following example shows how to do so:

@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
    return factory;
}

The following example shows how to receive a list of payloads:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

The topic, partition, offset, and so on are available in headers that parallel the payloads. The following example shows how to use the headers:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

Alternatively, you can receive a List of Message<?> objects with each offset and other details in each message, but it must be the only parameter (aside from optional Acknowledgment, when using manual commits, and/or Consumer<?, ?> parameters) defined on the method. The following example shows how to do so:

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

No conversion is performed on the payloads in this case.

If the BatchMessagingMessageConverter is configured with a RecordMessageConverter, you can also add a generic type to the Message parameter and the payloads are converted. See Payload Conversion with Batch Listeners for more information.

You can also receive a list of ConsumerRecord<?, ?> objects, but it must be the only parameter (aside from optional Acknowledgment, when using manual commits and Consumer<?, ?> parameters) defined on the method. The following example shows how to do so:

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}

Starting with version 2.2, the listener can receive the complete ConsumerRecords<?, ?> object returned by the poll() method, letting the listener access additional methods, such as partitions() (which returns the TopicPartition instances in the list) and records(TopicPartition) (which gets selective records). Again, this must be the only parameter (aside from optional Acknowledgment, when using manual commits or Consumer<?, ?> parameters) on the method. The following example shows how to do so:

@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
    ...
}
Important
If the container factory has a RecordFilterStrategy configured, it is ignored for ConsumerRecords<?, ?> listeners, with a WARN log message emitted. Records can only be filtered with a batch listener if the <List<?>> form of listener is used.
Annotation Properties

Starting with version 2.0, the id property (if present) is used as the Kafka consumer group.id property, overriding the configured property in the consumer factory, if present. You can also set groupId explicitly or set idIsGroup to false to restore the previous behavior of using the consumer factory group.id.

You can use property placeholders or SpEL expressions within most annotation properties, as the following example shows:

@KafkaListener(topics = "${some.property}")

@KafkaListener(topics = "#{someBean.someProperty}",
    groupId = "#{someBean.someProperty}.group")

Starting with version 2.1.2, the SpEL expressions support a special token: __listener. It is a pseudo bean name that represents the current bean instance within which this annotation exists.

Consider the following example:

@Bean
public Listener listener1() {
    return new Listener("topic1");
}

@Bean
public Listener listener2() {
    return new Listener("topic2");
}

Given the beans in the previous example, we can then use the following:

public class Listener {

    private final String topic;

    public Listener(String topic) {
        this.topic = topic;
    }

    @KafkaListener(topics = "#{__listener.topic}",
        groupId = "#{__listener.topic}.group")
    public void listen(...) {
        ...
    }

    public String getTopic() {
        return this.topic;
    }

}

If, in the unlikely event that you have an actual bean called __listener, you can change the expression token byusing the beanRef attribute. The following example shows how to do so:

@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",
    groupId = "#{__x.topic}.group")

Starting with version 2.2.4, you can specify Kafka consumer properties directly on the annotation, these will override any properties with the same name configured in the consumer factory. You cannot specify the group.id and client.id properties this way; they will be ignored; use the groupId and clientIdPrefix annotation properties for those.

The properties are specified as individual strings with the normal Java Properties file format: foo:bar, foo=bar, or foo bar.

@KafkaListener(topics = "myTopic", groupId="group", properties= {
    "max.poll.interval.ms:60000",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
Container Thread Naming

Listener containers currently use two task executors, one to invoke the consumer and another that is used to invoke the listener when the kafka consumer property enable.auto.commit is false. You can provide custom executors by setting the consumerExecutor and listenerExecutor properties of the container’s ContainerProperties. When using pooled executors, be sure that enough threads are available to handle the concurrency across all the containers in which they are used. When using the ConcurrentMessageListenerContainer, a thread from each is used for each consumer (concurrency).

If you do not provide a consumer executor, a SimpleAsyncTaskExecutor is used. This executor creates threads with names similar to <beanName>-C-1 (consumer thread). For the ConcurrentMessageListenerContainer, the <beanName> part of the thread name becomes <beanName>-m, where m represents the consumer instance. n increments each time the container is started. So, with a bean name of container, threads in this container will be named container-0-C-1, container-1-C-1 etc., after the container is started the first time; container-0-C-2, container-1-C-2 etc., after a stop and subsequent start.

@KafkaListener as a Meta Annotation

Starting with version 2.2, you can now use @KafkaListener as a meta annotation. The following example shows how to do so:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {

    @AliasFor(annotation = KafkaListener.class, attribute = "id")
    String id();

    @AliasFor(annotation = KafkaListener.class, attribute = "topics")
    String[] topics();

    @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
    String concurrency() default "3";

}

You must alias at least one of topics, topicPattern, or topicPartitions (and, usually, id or groupId unless you have specified a group.id in the consumer factory configuration). The following example shows how to do so:

@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
    ...
}
@KafkaListener on a Class

When you use @KafkaListener at the class-level, you must specify @KafkaHandler at the method level. When messages are delivered, the converted message payload type is used to determine which method to call. The following example shows how to do so:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true`)
    public void listenDefault(Object object) {
        ...
    }

}

Starting with version 2.1.3, you can designate a @KafkaHandler method as the default method that is invoked if there is no match on other methods. At most, one method can be so designated. When using @KafkaHandler methods, the payload must have already been converted to the domain object (so the match can be performed). Use a custom deserializer, the JsonDeserializer, or the (String|Bytes)JsonMessageConverter with its TypePrecedence set to TYPE_ID. See Serialization, Deserialization, and Message Conversion for more information.

@KafkaListener Lifecycle Management

The listener containers created for @KafkaListener annotations are not beans in the application context. Instead, they are registered with an infrastructure bean of type KafkaListenerEndpointRegistry. This bean is automatically declared by the framework and manages the containers' lifecycles; it will auto-start any containers that have autoStartup set to true. All containers created by all container factories must be in the same phase. See Listener Container Auto Startup for more information. You can manage the lifecycle programmatically by using the registry. Starting or stopping the registry will start or stop all the registered containers. Alternatively, you can get a reference to an individual container by using its id attribute. You can set autoStartup on the annotation, which overrides the default setting configured into the container factory. You can get a reference to the bean from the application context, such as auto-wiring, to manage its registered containers. The following examples show how to do so:

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

The registry only maintains the life cycle of containers it manages; containers declared as beans are not managed by the registry and can be obtained from the application context. A collection of managed containers can be obtained by calling the registry’s getListenerContainers() method. Version 2.2.5 added a convenience method getAllListenerContainers(), which returns a collection of all containers, including those managed by the registry and those declared as beans. The collection returned will include any prototype beans that have been initialized, but it will not initialize any lazy bean declarations.

@KafkaListener @Payload Validation

Starting with version 2.2, it is now easier to add a Validator to validate @KafkaListener @Payload arguments. Previously, you had to configure a custom DefaultMessageHandlerMethodFactory and add it to the registrar. Now, you can add the validator to the registrar itself. The following code shows how to do so:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(new MyValidator());
    }
}
Note
When you use Spring Boot with the validation starter, a LocalValidatorFactoryBean is auto-configured, as the following example shows:
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

The follwing examples show how to validate:

public static class ValidatedClass {

  @Max(10)
  private int bar;

  public int getBar() {
    return this.bar;
  }

  public void setBar(int bar) {
    this.bar = bar;
  }

}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}
Rebalancing Listeners

ContainerProperties has a property called consumerRebalanceListener, which takes an implementation of the Kafka client’s ConsumerRebalanceListener interface. If this property is not provided, the container configures a logging listener that logs rebalance events at the INFO level. The framework also adds a sub-interface ConsumerAwareRebalanceListener. The following listing shows the ConsumerAwareRebalanceListener interface definition:

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}

Notice that there are two callbacks when partitions are revoked. The first is called immediately. The second is called after any pending offsets are committed. This is useful if you wish to maintain offsets in some external repository, as the following example shows:

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
            store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
            consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});
Forwarding Listener Results using @SendTo

Starting with version 2.0, if you also annotate a @KafkaListener with a @SendTo annotation and the method invocation returns a result, the result is forwarded to the topic specified by the @SendTo.

The @SendTo value can have several forms:

  • @SendTo("someTopic") routes to the literal topic

  • @SendTo("#{someExpression}") routes to the topic determined by evaluating the expression once during application context initialization.

  • @SendTo("!{someExpression}") routes to the topic determined by evaluating the expression at runtime. The #root object for the evaluation has three properties:

    • request: The inbound ConsumerRecord (or ConsumerRecords object for a batch listener))

    • source: The org.springframework.messaging.Message<?> converted from the request.

    • result: The method return result.

  • @SendTo (no properties): This is treated as !{source.headers['kafka_replyTopic']} (since version 2.1.3).

Starting with versions 2.1.11 and 2.2.1, property placeholders are resolved within @SendTo values.

The result of the expression evaluation must be a String that represents the topic name. The following examples show the various ways to use @SendTo:

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }

}

Starting with version 2.2, you can add a ReplyHeadersConfigurer to the listener container factory. This is consulted to determine which headers you want to set in the reply message. The following example shows how to add a ReplyHeadersConfigurer:

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
    return factory;
}

You can also add more headers if you wish. The following example shows how to do so:

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {

      @Override
      public boolean shouldCopy(String headerName, Object headerValue) {
        return false;
      }

      @Override
      public Map<String, Object> additionalHeaders() {
        return Collections.singletonMap("qux", "fiz");
      }

    });
    return factory;
}

When you use @SendTo, you must configure the ConcurrentKafkaListenerContainerFactory with a KafkaTemplate in its replyTemplate property to perform the send.

Note
Unless you use request/reply semantics only the simple send(topic, value) method is used, so you may wish to create a subclass to generate the partition or key. The following example shows how to do so:
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory()) {

        @Override
        public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
            return super.send(topic, partitionForData(data), keyForData(data), data);
        }

        ...

    };
}
Important

If the listener method returns Message<?> or Collection<Message<?>>, the listener method is responsible for setting up the message headers for the reply. For example, when handling a request from a ReplyingKafkaTemplate, you might do the following:

@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
        @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.MESSAGE_KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader("someOtherHeader", "someValue")
            .build();
}

When using request/reply semantics, the target partition can be requested by the sender.

Note

You can annotate a @KafkaListener method with @SendTo even if no result is returned. This is to allow the configuration of an errorHandler that can forward information about a failed message delivery to some topic. The following example shows how to do so:

@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
        errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
    throw new RuntimeException("fail");
}

@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
    return (m, e) -> {
        return ... // some information about the failure and input data
    };
}

See Handling Exceptions for more information.

Filtering Messages

In certain scenarios, such as rebalancing, a message that has already been processed may be redelivered. The framework cannot know whether such a message has been processed or not. That is an application-level function. This is known as the Idempotent Receiver pattern and Spring Integration provides an implementation of it.

The Spring for Apache Kafka project also provides some assistance by means of the FilteringMessageListenerAdapter class, which can wrap your MessageListener. This class takes an implementation of RecordFilterStrategy in which you implement the filter method to signal that a message is a duplicate and should be discarded. This has an additional property called ackDiscarded, which indicates whether the adapter should acknowledge the discarded record. It is false by default.

When you use @KafkaListener, set the RecordFilterStrategy (and optionally ackDiscarded) on the container factory so that the listener is wrapped in the appropriate filtering adapter.

In addition, a FilteringBatchMessageListenerAdapter is provided, for when you use a batch message listener.

Important
The FilteringBatchMessageListenerAdapter is ignored if your @KafkaListener receives a ConsumerRecords<?, ?> instead of List<ConsumerRecord<?, ?>>, because ConsumerRecords is immutable.
Retrying Deliveries

If your listener throws an exception, the default behavior is to invoke the ErrorHandler, if configured, or logged otherwise.

Note
Two error handler interfaces (ErrorHandler and BatchErrorHandler) are provided. You must configure the appropriate type to match the message listener.

To retry deliveries, a convenient listener adapter RetryingMessageListenerAdapter is provided.

You can configure it with a RetryTemplate and RecoveryCallback<Void> - see the spring-retry project for information about these components. If a recovery callback is not provided, the exception is thrown to the container after retries are exhausted. In that case, the ErrorHandler is invoked, if configured, or logged otherwise.

When you use @KafkaListener, you can set the RetryTemplate (and optionally recoveryCallback) on the container factory. When you do so, the listener is wrapped in the appropriate retrying adapter.

The contents of the RetryContext passed into the RecoveryCallback depend on the type of listener. The context always has a record attribute, which is the record for which the failure occurred. If your listener is acknowledging or consumer aware, additional acknowledgment or consumer attributes are available. For convenience, the RetryingMessageListenerAdapter provides static constants for these keys. See its Javadoc for more information.

A retry adapter is not provided for any of the batch message listeners, because the framework has no knowledge of where in a batch the failure occurred. If you need retry capabilities when you use a batch listener, we recommend that you use a RetryTemplate within the listener itself.

Stateful Retry

You should understand that the retry discussed in the preceding section suspends the consumer thread (if a BackOffPolicy is used). There are no calls to Consumer.poll() during the retries. Kafka has two properties to determine consumer health. The session.timeout.ms is used to determine if the consumer is active. Since version 0.10.1.0, heartbeats are sent on a background thread, so a slow consumer no longer affects that. max.poll.interval.ms (default: five minutes) is used to determine if a consumer appears to be hung (taking too long to process records from the last poll). If the time between poll() calls exceeds this, the broker revokes the assigned partitions and performs a rebalance. For lengthy retry sequences, with back off, this can easily happen.

Since version 2.1.3, you can avoid this problem by using stateful retry in conjunction with a SeekToCurrentErrorHandler. In this case, each delivery attempt throws the exception back to the container, the error handler re-seeks the unprocessed offsets, and the same message is redelivered by the next poll(). This avoids the problem of exceeding the max.poll.interval.ms property (as long as an individual delay between attempts does not exceed it). So, when you use an ExponentialBackOffPolicy, you must ensure that the maxInterval is less than the max.poll.interval.ms property. To enable stateful retry, you can use the RetryingMessageListenerAdapter constructor that takes a stateful boolean argument (set it to true). When you configure the listener container factory (for @KafkaListener), set the factory’s statefulRetry property to true.

Detecting Idle and Non-Responsive Consumers

While efficient, one problem with asynchronous consumers is detecting when they are idle. You might want to take some action if no messages arrive for some period of time.

You can configure the listener container to publish a ListenerContainerIdleEvent when some time passes with no message delivery. While the container is idle, an event is published every idleEventInterval milliseconds.

To configure this feature, set the idleEventInterval on the container. The following example shows how to do so:

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(...);
    return container;
}

The following example shows how to set the idleEventInterval for a @KafkaListener:

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

In each of these cases, an event is published once per minute while the container is idle.

In addition, if the broker is unreachable, the consumer poll() method does not exit, so no messages are received and idle events cannot be generated. To solve this issue, the container publishes a NonResponsiveConsumerEvent if a poll does not return within 3x the pollInterval property. By default, this check is performed once every 30 seconds in each container. You can modify this behavior by setting the monitorInterval and noPollThreshold properties in the ContainerProperties when configuring the listener container. Receiving such an event lets you stop the containers, thus waking the consumer so that it can terminate.

Event Consumption

You can capture these events by implementing ApplicationListener — either a general listener or one narrowed to only receive this specific event. You can also use @EventListener, introduced in Spring Framework 4.2.

The next example combines @KafkaListener and @EventListener into a single class. You should understand that the application listener gets events for all containers, so you may need to check the listener ID if you want to take specific action based on which container is idle. You can also use the @EventListener condition for this purpose.

See Events for information about event properties.

The event is normally published on the consumer thread, so it is safe to interact with the Consumer object.

The following example uses both @KafkaListener and @EventListener:

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}
Important
Event listeners see events for all containers. Consequently, in the preceding example, we narrow the events received based on the listener ID. Since containers created for the @KafkaListener support concurrency, the actual containers are named id-n where the n is a unique value for each instance to support the concurrency. That is why we use startsWith in the condition.
Caution
If you wish to use the idle event to stop the lister container, you should not call container.stop() on the thread that calls the listener. Doing so causes delays and unnecessary log messages. Instead, you should hand off the event to a different thread that can then stop the container. Also, you should not stop() the container instance if it is a child container. You should stop the concurrent container instead.
Current Positions when Idle

Note that you can obtain the current positions when idle is detected by implementing ConsumerSeekAware in your listener. See onIdleContainer() in `Seeking to a Specific Offset.

Topic/Partition Initial Offset

There are several ways to set the initial offset for a partition.

When manually assigning partitions, you can set the initial offset (if desired) in the configured TopicPartitionInitialOffset arguments (see Message Listener Containers). You can also seek to a specific offset at any time.

When you use group management where the broker assigns partitions:

  • For a new group.id, the initial offset is determined by the auto.offset.reset consumer property (earliest or latest).

  • For an existing group ID, the initial offset is the current offset for that group ID. You can, however, seek to a specific offset during initialization (or at any time thereafter).

Seeking to a Specific Offset

In order to seek, your listener must implement ConsumerSeekAware, which has the following methods:

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

The first method is called when the container is started. You should use this callback when seeking at some arbitrary time after initialization. You should save a reference to the callback. If you use the same listener in multiple containers (or in a ConcurrentMessageListenerContainer), you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.

When using group management, the second method is called when assignments change. You can use this method, for example, for setting initial offsets for the partitions, by calling the callback. You must use the callback argument, not the one passed into registerSeekCallback. This method is never called if you explicitly assign partitions yourself. Use the TopicPartitionInitialOffset in that case.

The callback has the following methods:

void seek(String topic, int partition, long offset);

void seekToBeginning(String topic, int partition);

void seekToEnd(String topic, int partition);

You can also perform seek operations from onIdleContainer() when an idle container is detected. See Detecting Idle and Non-Responsive Consumers for how to enable idle container detection.

To arbitrarily seek at runtime, use the callback reference from the registerSeekCallback for the appropriate thread.

Container factory

As discussed in @KafkaListener Annotation, a ConcurrentKafkaListenerContainerFactory is used to create containers for annotated methods.

Starting with version 2.2, you can use the same factory to create any ConcurrentMessageListenerContainer. This might be useful if you want to create several containers with similar properties or you wish to use some externally configured factory, such as the one provided by Spring Boot auto-configuration. Once the container is created, you can further modify its properties, many of which are set by using container.getContainerProperties(). The following example configures a ConcurrentMessageListenerContainer:

@Bean
public ConcurrentMessageListenerContainer<String, String>(
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> container =
        factory.createContainer("topic1", "topic2");
    container.setMessageListener(m -> { ... } );
    return container;
}
Important
Containers created this way are not added to the endpoint registry. They should be created as @Bean definitions so that they are registered with the application context.
Thread Safety

When using a concurrent message listener container, a single listener instance is invoked on all consumer threads. Listeners, therefore, need to be thread-safe, and it is preferable to use stateless listeners. If it is not possible to make your listener thread-safe or adding synchronization would significantly reduce the benefit of adding concurrency, you can use one of a few techniques:

  • Use n containers with concurrency=1 with a prototype scoped MessageListener bean so that each container gets its own instance (this is not possible when using @KafkaListener).

  • Keep the state in ThreadLocal<?> instances.

  • Have the singleton listener delegate to a bean that is declared in SimpleThreadScope (or a similar scope).

To facilitate cleaning up thread state (for the second and third items in the preceding list), starting with version 2.2, the listener container publishes a ConsumerStoppedEvent when each thread exits. You can consume these events with an ApplicationListener or @EventListener method to remove ThreadLocal<?> instances or remove() thread-scoped beans from the scope. Note that SimpleThreadScope does not destroy beans that have a destruction interface (such as DisposableBean), so you should destroy() the instance yourself.

Important
By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective.

Wiring Spring Beans into Producer/Consumer Interceptors

Apache Kafka provides a mechanism to add interceptors to producers and consumers. These objects are managed by Kafka, not Spring, and so normal Spring dependency injection won’t work for wiring in dependent Spring Beans. However, you can manually wire in those dependencies using the interceptor config() method. The following Spring Boot application shows how to do this by overriding boot’s default factories to add some dependent bean into the configuration properties.

@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties, SomeBean someBean) {
		Map<String, Object> consumerProperties = properties.buildConsumerProperties();
		consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
		consumerProperties.put("some.bean", someBean);
		return new DefaultKafkaConsumerFactory<>(consumerProperties);
	}

	@Bean
	public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties, SomeBean someBean) {
		Map<String, Object> producerProperties = properties.buildProducerProperties();
		producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
		producerProperties.put("some.bean", someBean);
		DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);
		String transactionIdPrefix = properties.getProducer()
				.getTransactionIdPrefix();
		if (transactionIdPrefix != null) {
			factory.setTransactionIdPrefix(transactionIdPrefix);
		}
		return factory;
	}

	@Bean
	public SomeBean someBean() {
		return new SomeBean();
	}

	@KafkaListener(id = "kgk897", topics = "kgh897")
	public void listen(String in) {
		System.out.println("Received " + in);
	}

	@Bean
	public ApplicationRunner runner(KafkaTemplate<String, String> template) {
		return args -> template.send("kgh897", "test");
	}

	@Bean
	public NewTopic topic() {
		return new NewTopic("kgh897", 1, (short) 1);
	}

}
public class SomeBean {

	public void someMethod(String what) {
		System.out.println(what + " in my foo bean");
	}

}
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

	private SomeBean bean;

	@Override
	public void configure(Map<String, ?> configs) {
		this.bean = (SomeBean) configs.get("some.bean");
	}

	@Override
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
		this.bean.someMethod("producer interceptor");
		return record;
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
	}

	@Override
	public void close() {
	}

}
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {

	private SomeBean bean;

	@Override
	public void configure(Map<String, ?> configs) {
		this.bean = (SomeBean) configs.get("some.bean");
	}

	@Override
	public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
		this.bean.someMethod("consumer interceptor");
		return records;
	}

	@Override
	public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
	}

	@Override
	public void close() {
	}

}

Result:

producer interceptor in my foo bean
consumer interceptor in my foo bean
Received test

Pausing and Resuming Listener Containers

Version 2.1.3 added pause() and resume() methods to listener containers. Previously, you could pause a consumer within a ConsumerAwareMessageListener and resume it by listening for a ListenerContainerIdleEvent, which provides access to the Consumer object. While you could pause a consumer in an idle container byi using an event listener, in some cases, this was not thread-safe, since there is no guarantee that the event listener is invoked on the consumer thread. To safely pause and resume consumers, you should use the pause and resume methods on the listener containers. A pause() takes effect just before the next poll(); a resume() takes effect just after the current poll() returns. When a container is paused, it continues to poll() the consumer, avoiding a rebalance if group management is being used, but it does not retrieve any records. See the Kafka documentation for more information.

Starting with version 2.1.5, you can call isPauseRequested() to see if pause() has been called. However, the consumers might not have actually paused yet. isConsumerPaused() returns true if all Consumer instances have actually paused.

In addition (also since 2.1.5), ConsumerPausedEvent and ConsumerResumedEvent instances are published with the container as the source property and the TopicPartition instances involved in the partitions property.

The following simple Spring Boot application demonstrates by using the container registry to get a reference to a @KafkaListener method’s container and pausing or resuming its consumers as well as receiving the corresponding events:

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("pause.resume.topic", 2, (short) 1);
    }

}

The following listing shows the results of the preceding example:

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2

Events

The following events are published by listener containers and their consumers:

  • ContainerIdleEvent: Issued when no messages have been received in idleInterval (if configured).

  • NonResponsiveConsumerEvent: Issued when the consumer appears to be blocked in the poll method.

  • ConsumerPausedEvent: Issued by each consumer when the container is paused.

  • ConsumerResumedEvent: Issued by each consumer when the container is resumed.

  • ConsumerStoppingEvent: Issued by each consumer just before stopping.

  • ConsumerStoppedEvent: Issued after the consumer is closed. See Thread Safety.

  • ContainerStoppedEvent: Issued when all consumers have terminated.

Important
By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, you must not invoke any Consumer methods when the event contains a reference to the consumer.

The ContainerIdleEvent has the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • id: The listener ID (or container bean name).

  • idleTime: The time the container had been idle when the event was published.

  • topicPartitions: The topics and partitions that the container was assigned at the time the event was generated.

  • consumer: A reference to the Kafka Consumer object. For example, if the consumer’s pause() method was previously called, it can resume() when the event is received.

  • paused: Whether the container is currently paused. See Pausing and Resuming Listener Containers for more information.

The NonResponsiveConsumerEvent has the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • id: The listener ID (or container bean name).

  • timeSinceLastPoll: The time just before the container last called poll().

  • topicPartitions: The topics and partitions that the container was assigned at the time the event was generated.

  • consumer: A reference to the Kafka Consumer object. For example, if the consumer’s pause() method was previously called, it can resume() when the event is received.

  • paused: Whether the container is currently paused. See Pausing and Resuming Listener Containers for more information.

The ConsumerPausedEvent, ConsumerResumedEvent, and ConsumerStopping events have the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

  • partitions: The TopicPartition instances involved.

The ConsumerStoppedEvent and ContainerStoppedEvent events have the following properties:

  • source: The listener container instance that published the event.

  • container: The listener container or the parent listener container, if the source container is a child.

All containers (whether a child or a parent) publish ContainerStoppedEvent. For a parent container, the source and container properties are identical.

Serialization, Deserialization, and Message Conversion

Apache Kafka provides a high-level API for serializing and deserializing record values as well as their keys. It is present with the org.apache.kafka.common.serialization.Serializer<T> and org.apache.kafka.common.serialization.Deserializer<T> abstractions with some built-in implementations. Meanwhile, we can specify serializer and deserializer classes by using Producer or Consumer configuration properties. The following example shows how to do so:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

For more complex or particular cases, the KafkaConsumer (and, therefore, KafkaProducer) provides overloaded constructors to accept Serializer and Deserializer instances for keys and values, respectively.

When you use this API, the DefaultKafkaProducerFactory and DefaultKafkaConsumerFactory also provide properties (through constructors or setter methods) to inject custom Serializer and Deserializer instances into the target Producer or Consumer.

Spring for Apache Kafka also provides JsonSerializer and JsonDeserializer implementations that are based on the Jackson JSON object mapper. The JsonSerializer allows writing any Java object as a JSON byte[]. The JsonDeserializer requires an additional Class<?> targetType argument to allow the deserialization of a consumed byte[] to the proper target object. The following example shows how to create a JsonDeserializer:

JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);

You can customize both JsonSerializer and JsonDeserializer with an ObjectMapper. You can also extend them to implement some particular configuration logic in the configure(Map<String, ?> configs, boolean isKey) method.

Starting with version 2.1, you can convey type information in record Headers, allowing the handling of multiple types. In addition, you can configure the serializer and deserializer by using the following Kafka properties:

  • JsonSerializer.ADD_TYPE_INFO_HEADERS (default true): You can set it to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).

  • JsonSerializer.TYPE_MAPPINGS (default empty): See Mapping Types.

  • JsonDeserializer.USE_TYPE_INFO_HEADERS (default true): You can set it to false to ignore headers set by the serializer.

  • JsonDeserializer.REMOVE_TYPE_INFO_HEADERS (default true): You can set it to false to retain headers set by the serializer.

  • JsonDeserializer.KEY_DEFAULT_TYPE: Fallback type for deserialization of keys if no header information is present.

  • JsonDeserializer.VALUE_DEFAULT_TYPE: Fallback type for deserialization of values if no header information is present.

  • JsonDeserializer.TRUSTED_PACKAGES (default java.util, java.lang): Comma-delimited list of package patterns allowed for deserialization. * means deserialize all.

  • JsonDeserializer.TYPE_MAPPINGS (default empty): See Mapping Types.

Starting with version 2.2, the type information headers (if added by the serializer) are removed by the deserializer. You can revert to the previous behavior by setting the removeTypeHeaders property to false, either directly on the deserializer or with the configuration property described earlier.

Mapping Types

Starting with version 2.2, you can now provide type mappings by using the properties in the preceding list. Previously, you had to customize the type mapper within the serializer and deserializer. Mappings consist of a comma-delimited list of token:className pairs. On outbound, the payload’s class name is mapped to the corresponding token. On inbound, the token in the type header is mapped to the corresponding class name.

The following example creates a set of mappings:

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.hat");
Important
The corresponding objects must be compatible.

If you use Spring Boot, you can provide these properties in the application.properties (or yaml) file. The following example shows how to do so:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
Important

You can perform only simple configuration with properties. For more advanced configuration (such as using a custom ObjectMapper in the serializer and deserializer), you should use the producer and consumer factory constructors that accept a pre-built serializer and deserializer. The following Spring Boot example overrides the default factories:

@Bean
public ConsumerFactory<Foo, Bar> kafkaConsumerFactory(KafkaProperties properties,
    JsonDeserializer customDeserializer) {

    return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(),
        customDeserializer, customDeserializer);
}

@Bean
public ProducererFactory<Foo, Bar> kafkaProducerFactory(KafkaProperties properties,
    JsonSserializer customSerializer) {

    return new DefaultKafkaConsumerFactory<>(properties.buildProducerProperties(),
        customSerializer, customSerializer);
}

Setters are also provided, as an alternative to using these constructors.

Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean useHeadersIfPresent (which is true by default). The following example shows how to do so:

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
Spring Messaging Message Conversion

Although the Serializer and Deserializer API is quite simple and flexible from the low-level Kafka Consumer and Producer perspective, you might need more flexibility at the Spring Messaging level, when using either @KafkaListener or Spring Integration. To let you easily convert to and from org.springframework.messaging.Message, Spring for Apache Kafka provides a MessageConverter abstraction with the MessagingMessageConverter implementation and its StringJsonMessageConverter and BytesJsonMessageConverter customization. You can inject the MessageConverter into a KafkaTemplate instance directly and by using AbstractKafkaListenerContainerFactory bean definition for the @KafkaListener.containerFactory() property. The following example shows how to do so:

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}

When you use a @KafkaListener, the parameter type is provided to the message converter to assist with the conversion.

Note

This type inference can be achieved only when the @KafkaListener annotation is declared at the method level. With a class-level @KafkaListener, the payload type is used to select which @KafkaHandler method to invoke, so it must already have been converted before the method can be chosen.

Note
When you use the StringJsonMessageConverter, you should use a StringDeserializer in the Kafka consumer configuration and a StringSerializer in the Kafka producer configuration when you use Spring Integration or the KafkaTemplate.send(Message<?> message) method. When you use the BytesJsonMessageConverter, you should use a BytesDeserializer in the Kafka consumer configuration and BytesSerializer in the Kafka producer configuration when you use Spring Integration or the KafkaTemplate.send(Message<?> message) method (see Using KafkaTemplate). Generally, the BytesJsonMessageConverter is more efficient because it avoids a String to and from byte[] conversion.
Using ErrorHandlingDeserializer

When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer2. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer2 returns a null value and a DeserializationException in a header that contains the cause and the raw bytes. When you use a record-level MessageListener, if the ConsumerRecord contains a DeserializationException header for either the key or value, the container’s ErrorHandler is called with the failed ConsumerRecord. The record is not passed to the listener.

Alternatively, you can configure the ErrorHandlingDeserializer2 to create a custom value by providing a failedDeserializationFunction, which is a BiConsumer<byte[], Headers, T>. This function is invoked to create an instance of T, which is passed to the listener in the usual fashion. The raw record value and headers are provided to the function. You can find the DeserializationException (as a serialized Java object) in headers. See the Javadoc for the ErrorHandlingDeserializer2 for more information.

Caution
When you use a BatchMessageListener, you must provide a failedDeserializationFunction. Otherwise, the batch of records are not type safe.

You can use the DefaultKafkaConsumerFactory constructor that takes key and value Deserializer objects and wire in appropriate ErrorHandlingDeserializer2 instances that you have configured with the proper delegates. Alternatively, you can use consumer configuration properties (which are used by the ErrorHandlingDeserializer) to instantiate the delegates. The property names are ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS and ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS. The property value can be a class or class name. The following example shows how to set these properties:

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

The following example uses a failedDeserializationFunction.

public class BadFoo extends Foo {

  private final byte[] failedDecode;

  public BadFoo(byte[] failedDecode) {
    this.failedDecode = failedDecode;
  }

  public byte[] getFailedDecode() {
    return this.failedDecode;
  }

}

public class FailedFooProvider implements BiFunction<byte[], Headers, Foo> {

  @Override
  public Foo apply(byte[] t, Headers u) {
    return new BadFoo(t);
  }

}

The preceding example uses the following configuration:

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedFooProvider.class);
...
Payload Conversion with Batch Listeners

Starting with version 1.3.2, you can also use a StringJsonMessageConverter or BytesJsonMessageConverter within a BatchMessagingMessageConverter to convert batch messages when you use a batch listener container factory. See Serialization, Deserialization, and Message Conversion for more information.

By default, the type for the conversion is inferred from the listener argument. If you configure the (Bytes|String)JsonMessageConverter with a DefaultJackson2TypeMapper that has its TypePrecedence set to TYPE_ID (instead of the default INFERRED), the converter uses the type information in headers (if present) instead. This allows, for example, listener methods to be declared with interfaces instead of concrete classes. Also, the type converter supports mapping, so the deserialization can be to a different type than the source (as long as the data is compatible). This is also useful when you use class-level @KafkaListener instances where the payload must have already been converted to determine which method to invoke. The following example creates beans that use this method:

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}

@Bean
public StringJsonMessageConverter converter() {
    return new StringJsonMessageConverter();
}

Note that, for this to work, the method signature for the conversion target must be a container object with a single generic parameter type, such as the following:

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

Note that you can still access the batch headers.

If the batch converter has a record converter that supports it, you can also receive a list of messages where the payloads are converted according to the generic type. The following example shows how to do so:

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen1(List<Message<Foo>> fooMessages) {
    ...
}
ConversionService Customization

Starting with version 2.1.1, the org.springframework.core.convert.ConversionService used by the default o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory to resolve parameters for the invocation of a listener method is supplied with all beans that implement any of the following interfaces:

  • org.springframework.core.convert.converter.Converter

  • org.springframework.core.convert.converter.GenericConverter

  • org.springframework.format.Formatter

This lets you further customize listener deserialization without changing the default configuration for ConsumerFactory and KafkaListenerContainerFactory.

Important
Setting a custom MessageHandlerMethodFactory on the KafkaListenerEndpointRegistrar through a KafkaListenerConfigurer bean disables this feature.

Message Headers

The 0.11.0.0 client introduced support for headers in messages. As of version 2.0, Spring for Apache Kafka now supports mapping these headers to and from spring-messaging MessageHeaders.

Note
Previous versions mapped ConsumerRecord and ProducerRecord to spring-messaging Message<?>, where the value property is mapped to and from the payload and other properties (topic, partition, and so on) were mapped to headers. This is still the case, but additional (arbitrary) headers can now be mapped.

Apache Kafka headers have a simple API, shown in the following interface definition:

public interface Header {

    String key();

    byte[] value();

}

The KafkaHeaderMapper strategy is provided to map header entries between Kafka Headers and MessageHeaders. Its interface definition is as follows:

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

The DefaultKafkaHeaderMapper maps the key to the MessageHeaders header name and, in order to support rich header types for outbound messages, JSON conversion is performed. A “special” header (with a key of spring_json_header_types) contains a JSON map of <key>:<type>. This header is used on the inbound side to provide appropriate conversion of each header value to the original type.

On the inbound side, all Kafka Header instances are mapped to MessageHeaders. On the outbound side, by default, all MessageHeaders are mapped, except id, timestamp, and the headers that map to ConsumerRecord properties.

You can specify which headers are to be mapped for outbound messages, by providing patterns to the mapper. The following listing shows a number of example mappings:

public DefaultKafkaHeaderMapper() { (1)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { (3)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
    ...
}
  1. Uses a default Jackson ObjectMapper and maps most headers, as discussed before the example.

  2. Uses the provided Jackson ObjectMapper and maps most headers, as discussed before the example.

  3. Uses a default Jackson ObjectMapper and maps headers according to the provided patterns.

  4. Uses the provided Jackson ObjectMapper and maps headers according to the provided patterns.

Patterns are rather simple and can contain a leading wildcard (), a trailing wildcard, or both (for example, .cat.*). You can negate patterns with a leading !. The first pattern that matches a header name (whether positive or negative) wins.

When you provide your own patterns, we recommend including !id and !timestamp, since these headers are read-only on the inbound side.

Important
By default, the mapper deserializes only classes in java.lang and java.util. You can trust other (or all) packages by adding trusted packages with the addTrustedPackages method. If you receive messages from untrusted sources, you may wish to add only those packages you trust. To trust all packages, you can use mapper.addTrustedPackages("*").

By default, the DefaultKafkaHeaderMapper is used in the MessagingMessageConverter and BatchMessagingMessageConverter, as long as Jackson is on the class path.

With the batch converter, the converted headers are available in the KafkaHeaders.BATCH_CONVERTED_HEADERS as a List<Map<String, Object>> where the map in a position of the list corresponds to the data position in the payload.

If there is no converter (either because Jackson is not present or it is explicitly set to null), the headers from the consumer record are provided unconverted in the KafkaHeaders.NATIVE_HEADERS header. This header is a Headers object (or a List<Headers> in the case of the batch converter), where the position in the list corresponds to the data position in the payload).

Important
Certain types are not suitable for JSON serialization, and a simple toString() serialization might be preferred for these types. The DefaultKafkaHeaderMapper has a method called addToStringClasses() that lets you supply the names of classes that should be treated this way for outbound mapping. During inbound mapping, they are mapped as String. By default, only org.springframework.util.MimeType and org.springframework.http.MediaType are mapped this way.

Null Payloads and Log Compaction of 'Tombstone' Records

When you use Log Compaction, you can send and receive messages with null payloads to identify the deletion of a key.

You can also receive null values for other reasons, such as a Deserializer that might return null when it cannot deserialize a value.

To send a null payload by using the KafkaTemplate, you can pass null into the value argument of the send() methods. One exception to this is the send(Message<?> message) variant. Since spring-messaging Message<?> cannot have a null payload, you can use a special payload type called KafkaNull, and the framework sends null. For convenience, the static KafkaNull.INSTANCE is provided.

When you use a message listener container, the received ConsumerRecord has a null value().

To configure the @KafkaListener to handle null payloads, you must use the @Payload annotation with required = false. If itis a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was “deleted”. The following example shows such a configuration:

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
    // value == null represents key deletion
}

When you use a class-level @KafkaListener with multiple @KafkaHandler methods, some additional configuration is needed. Specifically, you need a @KafkaHandler method with a KafkaNull payload. The following example shows how to configure one:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }

}

Note that the argument is null, not KafkaNull.

Handling Exceptions

This section describes how to handle various exceptions that may arise when you use Spring for Apache Kafka.

Listener Error Handlers

Starting with version 2.0, the @KafkaListener annotation has a new attribute: errorHandler.

By default, this attribute is not configured.

You can use the errorHandler to provide the bean name of a KafkaListenerErrorHandler implementation. This functional interface has one method, as the following listing shows:

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

You have access to the spring-messaging Message<?> object produced by the message converter and the exception that was thrown by the listener, which is wrapped in a ListenerExecutionFailedException. The error handler can throw the original or a new exception, which is thrown to the container. Anything returned by the error handler is ignored.

It has a sub-interface (ConsumerAwareListenerErrorHandler) that has access to the consumer object, through the following method:

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

If your error handler implements this interface, you can, for example, adjust the offsets accordingly. For example, to reset the offset to replay the failed message, you could do something like the following:

@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
    return (m, e, c) -> {
        this.listen3Exception = e;
        MessageHeaders headers = m.getHeaders();
        c.seek(new org.apache.kafka.common.TopicPartition(
                headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
                headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
                headers.get(KafkaHeaders.OFFSET, Long.class));
        return null;
    };
}

Similarly, you could do something like the following for a batch listener:

@Bean
public ConsumerAwareListenerErrorHandler listen10ErrorHandler() {
    return (m, e, c) -> {
        this.listen10Exception = e;
        MessageHeaders headers = m.getHeaders();
        List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
        List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
        List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
        Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
        for (int i = 0; i < topics.size(); i++) {
            int index = i;
            offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
                    (k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
        }
        offsetsToReset.forEach((k, v) -> c.seek(k, v));
        return null;
    };
}

This resets each topic/partition in the batch to the lowest offset in the batch.

Note
The preceding two examples are simplistic implementations, and you would probably want more checking in the error handler.
Container Error Handlers

You can specify a global error handler to be used for all listeners in the container factory. The following example shows how to do so:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setErrorHandler(myErrorHandler);
    ...
    return factory;
}

Similarly, you can set a global batch error handler:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setBatchErrorHandler(myBatchErrorHandler);
    ...
    return factory;
}

By default, if an annotated listener method throws an exception, it is thrown to the container, and the message is handled according to the container configuration.

Consumer-Aware Container Error Handlers

The container-level error handlers (ErrorHandler and BatchErrorHandler) have sub-interfaces called ConsumerAwareErrorHandler and ConsumerAwareBatchErrorHandler. The handle method of the ConsumerAwareErrorHandler has the following signature:

void handle(Exception thrownException, ConsumerRecord<?, ?> data, Consumer<?, ?> consumer);

The handle method of the ConsumerAwareBatchErrorHandler has the following signature:

void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer);

Similar to the @KafkaListener error handlers, you can reset the offsets as needed, based on the data that failed.

Note
Unlike the listener-level error handlers, however, you should set the ackOnError container property to false when making adjustments. Otherwise, any pending acks are applied after your repositioning.
Seek To Current Container Error Handlers

If an ErrorHandler implements RemainingRecordsErrorHandler, the error handler is provided with the failed record and any unprocessed records retrieved by the previous poll(). Those records are not passed to the listener after the handler exits. The following listing shows the RemainingRecordsErrorHandler interface definition:

@FunctionalInterface
public interface RemainingRecordsErrorHandler extends ConsumerAwareErrorHandler {

    void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer);

}

This interface lets implementations seek all unprocessed topics and partitions so that the current record (and the others remaining) are retrieved by the next poll. SeekToCurrentErrorHandler does exactly this.

The container commits any pending offset commits before calling the error handler.

To configure the listener container with this handler, add it to the ContainerProperties.

For example, with the @KafkaListener container factory, you can add SeekToCurrentErrorHandler as follows:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setErrorHandler(new SeekToCurrentErrorHandler());
    return factory;
}

As an example; if the poll returns six records (two from each partition 0, 1, 2) and the listener throws an exception on the fourth record, the container acknowledges the first three messages by committing their offsets. The SeekToCurrentErrorHandler seeks to offset 1 for partition 1 and offset 0 for partition 2. The next poll() returns the three unprocessed records.

If the AckMode was BATCH, the container commits the offsets for the first two partitions before calling the error handler.

Starting with version 2.2, the SeekToCurrentErrorHandler can now recover (skip) a record that keeps failing. By default, after ten failures, the failed record is logged (at the ERROR level). You can configure the handler with a custom recoverer (BiConsumer) and maximum failures. Setting the maxFailures property to a negative number causes infinite retries. The following example configures recovery after three tries:

SeekToCurrentErrorHandler errorHandler =
    new SeekToCurrentErrorHandler((record, exception) -> {
        // recover after 3 failures - e.g. send to a dead-letter topic
    }, 3);

Starting with version 2.2.4, when the container is configured with AckMode.MANUAL_IMMEDIATE, the error handler can be configured to commit the offset of recovered records; set the commitRecovered property to true.

When using transactions, similar functionality is provided by the DefaultAfterRollbackProcessor. See After-rollback Processor.

The SeekToCurrentBatchErrorHandler seeks each partition to the first record in each partition in the batch, so the whole batch is replayed. This error handler does not support recovery, because the framework cannot know which message in the batch is failing.

After seeking, an exception that wraps the ListenerExecutionFailedException is thrown. This is to cause the transaction to roll back (if transactions are enabled).

Container Stopping Error Handlers

The ContainerStoppingErrorHandler (used with record listeners) stops the container if the listener throws an exception. When the AckMode is RECORD, offsets for already processed records are committed. When the AckMode is any manual value, offsets for already acknowledged records are committed. When the AckMode is BATCH, the entire batch is replayed when the container is restarted (unless transactions are enabled — in which case, only the unprocessed records are re-fetched).

The ContainerStoppingBatchErrorHandler (used with batch listeners) stops the container, and the entire batch is replayed when the container is restarted.

After the container stops, an exception that wraps the ListenerExecutionFailedException is thrown. This is to cause the transaction to roll back (if transactions are enabled).

After-rollback Processor

When using transactions, if the listener throws an exception (and an error handler, if present, throws an exception), the transaction is rolled back. By default, any unprocessed records (including the failed record) are re-fetched on the next poll. This is achieved by performing seek operations in the DefaultAfterRollbackProcessor. With a batch listener, the entire batch of records is reprocessed (the container has no knowledge of which record in the batch failed). To modify this behavior, you can configure the listener container with a custom AfterRollbackProcessor. For example, with a record-based listener, you might want to keep track of the failed record and give up after some number of attempts, perhaps by publishing it to a dead-letter topic.

Starting with version 2.2, the DefaultAfterRollbackProcessor can now recover (skip) a record that keeps failing. By default, after ten failures, the failed record is logged (at the ERROR level). You can configure the processor with a custom recoverer (BiConsumer) and maximum failures. Setting the maxFailures property to a negative number causes infinite retries. The following example configures recovery after three tries:

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures - e.g. send to a dead-letter topic
    }, 3);

When you do not use transactions, you can achieve similar functionality by configuring a SeekToCurrentErrorHandler. See Seek To Current Container Error Handlers.

Important
Recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing. In such cases, the application listener must handle a record that keeps failing.
Publishing Dead-letter Records

As discussed earlier, you can configure the SeekToCurrentErrorHandler and DefaultAfterRollbackProcessor with a record recoverer when the maximum number of failures is reached for a record. The framework provides the DeadLetterPublishingRecoverer, which publishes the failed message to another topic. The recoverer requires a KafkaTemplate<Object, Object>, which is used to send the record. You can also, optionally, configure it with a BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>, which is called to resolve the destination topic and partition. By default, the dead-letter record is sent to a topic named <originalTopic>.DLT (the original topic name suffixed with .DLT) and to the same partition as the original record. Therefore, when you use the default resolver, the dead-letter topic must have at least as many partitions as the original topic. If the returned TopicPartition has a negative partition, the partition is not set in the ProducerRecord, so the partition is selected by Kafka. Starting with version 2.2.4, any ListenerExecutionFailedException (thrown, for example, when an exception is detected in a @KafkaListener method) is enhanced with the groupId property. This allows the destination resolver to use this, in addition to the information in the ConsumerRecord to select the dead letter topic.

The following example shows how to wire a custom destination resolver:

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, 3);

The record sent to the dead-letter topic is enhanced with the following headers:

  • KafkaHeaders.DLT_EXCEPTION_FQCN: The Exception class name.

  • KafkaHeaders.DLT_EXCEPTION_STACKTRACE: The Exception stack trace.

  • KafkaHeaders.DLT_EXCEPTION_MESSAGE: The Exception message.

  • KafkaHeaders.DLT_ORIGINAL_TOPIC: The original topic.

  • KafkaHeaders.DLT_ORIGINAL_PARTITION: The original partition.

  • KafkaHeaders.DLT_ORIGINAL_OFFSET: The original offset.

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: The original timestamp.

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: The original timestamp type.

Kerberos

Starting with version 2.0, a KafkaJaasLoginModuleInitializer class has been added to assist with Kerberos configuration. You can add this bean, with the desired configuration, to your application context. The following example configures such a bean:

@Bean
public KafkaJaasLoginModuleInitializer jaasConfig() throws IOException {
    KafkaJaasLoginModuleInitializer jaasConfig = new KafkaJaasLoginModuleInitializer();
    jaasConfig.setControlFlag("REQUIRED");
    Map<String, String> options = new HashMap<>();
    options.put("useKeyTab", "true");
    options.put("storeKey", "true");
    options.put("keyTab", "/etc/security/keytabs/kafka_client.keytab");
    options.put("principal", "kafka-client-1@EXAMPLE.COM");
    jaasConfig.setOptions(options);
    return jaasConfig;
}