From aba6ce35e62bd80435eea3210497bf48c746f3e6 Mon Sep 17 00:00:00 2001 From: Jujuwryy Date: Thu, 23 Oct 2025 23:06:13 +0300 Subject: [PATCH 1/4] Fix SmartMessageConverter support in batch listeners BatchMessagingMessageConverter was missing setMessagingConverter() method that exists in MessagingMessageConverter, causing SmartMessageConverter configured via @KafkaListener(contentTypeConverter) to be ignored in batch listeners. This inconsistency between regular and batch listeners leads to ClassCastException when byte[] values aren't converted to the expected String type, breaking the contract that SmartMessageConverter should work the same way regardless of listener type. The fix ensures SmartMessageConverter propagation works consistently by: - Adding setMessagingConverter() to BatchMessagingMessageConverter that delegates to underlying MessagingMessageConverter - Overriding setMessagingConverter() in BatchMessagingMessageListenerAdapter to propagate the converter to batch converter - Maintaining the same SmartMessageConverter behavior between regular and batch listeners Fixes GH-4097 Signed-off-by: Jujuwryy --- .../BatchMessagingMessageListenerAdapter.java | 23 ++- .../BatchMessagingMessageConverter.java | 24 ++- .../BatchSmartMessageConverterTests.java | 155 ++++++++++++++++++ 3 files changed, 197 insertions(+), 5 deletions(-) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java index 1d0979d7a2..b466a1005f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java @@ -32,6 +32,7 @@ import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.messaging.Message; +import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -80,7 +81,7 @@ public BatchMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Met * @param errorHandler the error handler. */ public BatchMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method, - @Nullable KafkaListenerErrorHandler errorHandler) { + @Nullable KafkaListenerErrorHandler errorHandler) { super(bean, method, errorHandler); } @@ -107,6 +108,24 @@ public void setBatchToRecordAdapter(BatchToRecordAdapter batchToRecordAdap this.batchToRecordAdapter = batchToRecordAdapter; } + /** + * Set the {@link SmartMessageConverter} to use with both the default record converter + * and the batch message converter. + *

+ * When a {@code SmartMessageConverter} is configured via + * {@code @KafkaListener(contentTypeConverter = "...")}, this method ensures it is + * properly propagated to both the record converter (via the parent class) and the + * batch converter to support message conversion in batch listeners. + * @param messageConverter the converter to set + */ + @Override + public void setMessagingConverter(SmartMessageConverter messageConverter) { + super.setMessagingConverter(messageConverter); + if (this.batchMessageConverter instanceof BatchMessagingMessageConverter batchConverter) { + batchConverter.setMessagingConverter(messageConverter); + } + } + /** * Return the {@link BatchMessagingMessageConverter} for this listener, * being able to convert {@link org.springframework.messaging.Message}. @@ -170,7 +189,7 @@ public void onMessage(List> records, @Nullable Acknowledgme @SuppressWarnings({ "unchecked", "rawtypes" }) protected Message toMessagingMessage(List records, @Nullable Acknowledgment acknowledgment, - @Nullable Consumer consumer) { + @Nullable Consumer consumer) { return getBatchMessageConverter().toMessage(records, acknowledgment, consumer, getType()); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 3a1e74f580..dbdcae91e2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -43,6 +43,7 @@ import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.serializer.SerializationUtils; import org.springframework.messaging.Message; +import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.support.MessageBuilder; /** @@ -78,6 +79,8 @@ public class BatchMessagingMessageConverter implements BatchMessageConverter { @Nullable private final RecordMessageConverter recordConverter; + private @Nullable SmartMessageConverter messagingConverter; + private boolean generateMessageId = false; private boolean generateTimestamp = false; @@ -142,6 +145,20 @@ public RecordMessageConverter getRecordMessageConverter() { return this.recordConverter; } + /** + * Set a spring-messaging {@link SmartMessageConverter} to convert the record value to + * the desired type. + * @param messagingConverter the converter. + * @since 3.3.11 + */ + public void setMessagingConverter(@Nullable SmartMessageConverter messagingConverter) { + this.messagingConverter = messagingConverter; + + if (this.recordConverter instanceof MessagingMessageConverter messagingRecordConverter) { + messagingRecordConverter.setMessagingConverter(messagingConverter); + } + } + /** * Set to true to add the raw {@code List>} as a header * {@link KafkaHeaders#RAW_DATA}. @@ -154,7 +171,7 @@ public void setRawRecordHeader(boolean rawRecordHeader) { @Override // NOSONAR public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, - @Nullable Consumer consumer, Type type) { + @Nullable Consumer consumer, Type type) { KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); @@ -275,13 +292,14 @@ protected Object extractAndConvertValue(ConsumerRecord record, Type type) * @param type the type - must be a {@link ParameterizedType} with a single generic * type parameter. * @param conversionFailures Conversion failures. - * @return the converted payload. + * @return the converted payload, potentially further processed by a {@link SmartMessageConverter}. */ protected @Nullable Object convert(ConsumerRecord record, Type type, List conversionFailures) { try { if (this.recordConverter != null) { + Type actualType = ((ParameterizedType) type).getActualTypeArguments()[0]; Object payload = this.recordConverter - .toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload(); + .toMessage(record, null, null, actualType).getPayload(); conversionFailures.add(null); return payload; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java new file mode 100644 index 0000000000..2fb3349e57 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java @@ -0,0 +1,155 @@ +/* + * Copyright 2016-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.lang.reflect.Type; +import java.util.Arrays; +import java.util.List; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Test; + +import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; +import org.springframework.kafka.support.converter.MessagingMessageConverter; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.SmartMessageConverter; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for SmartMessageConverter support in batch listeners. + * Reproduces the issue described in GH-4097. + * + * @author Jujuwryy + * @since 3.3.11 + */ +class BatchSmartMessageConverterTests { + + @Test + void testSmartMessageConverterWorksInBatchConversion() { + // Given: A BatchMessagingMessageConverter with a record converter and SmartMessageConverter + MessagingMessageConverter recordConverter = new MessagingMessageConverter(); + BatchMessagingMessageConverter batchConverter = new BatchMessagingMessageConverter(recordConverter); + + // Set up SmartMessageConverter that converts byte[] to String + TestStringMessageConverter smartConverter = new TestStringMessageConverter(); + batchConverter.setMessagingConverter(smartConverter); + + // Create test records with byte[] values that need conversion to String + List> records = Arrays.asList( + new ConsumerRecord<>("topic", 0, 0, "key", "hello".getBytes()), + new ConsumerRecord<>("topic", 0, 1, "key", "world".getBytes()) + ); + + // When: Convert batch with List target type + Type targetType = new TestParameterizedType(List.class, new Type[]{String.class}); + Message result = batchConverter.toMessage(records, null, null, targetType); + + // Then: Verify the SmartMessageConverter was applied and byte[] was converted to String + assertThat(result).isNotNull(); + assertThat(result.getPayload()).isInstanceOf(List.class); + + List payloads = (List) result.getPayload(); + assertThat(payloads).hasSize(2); + assertThat(payloads.get(0)).isEqualTo("hello"); + assertThat(payloads.get(1)).isEqualTo("world"); + } + + @Test + void testBatchConversionWithoutSmartMessageConverter() { + // Given: A BatchMessagingMessageConverter without SmartMessageConverter + MessagingMessageConverter recordConverter = new MessagingMessageConverter(); + BatchMessagingMessageConverter batchConverter = new BatchMessagingMessageConverter(recordConverter); + + // Create test records with byte[] values + List> records = Arrays.asList( + new ConsumerRecord<>("topic", 0, 0, "key", "test".getBytes()) + ); + + // When: Convert batch + Type targetType = new TestParameterizedType(List.class, new Type[]{String.class}); + Message result = batchConverter.toMessage(records, null, null, targetType); + + // Then: Should work but payloads remain as byte[] + assertThat(result).isNotNull(); + List payloads = (List) result.getPayload(); + assertThat(payloads.get(0)).isInstanceOf(byte[].class); + } + + /** + * Test SmartMessageConverter that converts byte[] to String. + */ + static class TestStringMessageConverter implements SmartMessageConverter { + + @Override + public Object fromMessage(Message message, Class targetClass) { + return convertPayload(message.getPayload()); + } + + @Override + public Object fromMessage(Message message, Class targetClass, Object conversionHint) { + return convertPayload(message.getPayload()); + } + + @Override + public Message toMessage(Object payload, MessageHeaders headers) { + return MessageBuilder.withPayload(payload).copyHeaders(headers).build(); + } + + @Override + public Message toMessage(Object payload, MessageHeaders headers, Object conversionHint) { + return toMessage(payload, headers); + } + + private Object convertPayload(Object payload) { + // Convert byte[] to String - this is the core functionality being tested + if (payload instanceof byte[] bytes) { + return new String(bytes); + } + return payload; + } + } + + /** + * Helper class for creating parameterized types for testing. + */ + static class TestParameterizedType implements java.lang.reflect.ParameterizedType { + + private final Type rawType; + + private final Type[] typeArguments; + + TestParameterizedType(Type rawType, Type[] typeArguments) { + this.rawType = rawType; + this.typeArguments = typeArguments; + } + + public Type[] getActualTypeArguments() { + return typeArguments; + } + + public Type getRawType() { + return rawType; + } + + public Type getOwnerType() { + return null; + } + } +} From a22034667422e182a57bd5c5dc162279b2be3ea8 Mon Sep 17 00:00:00 2001 From: Jujuwryy Date: Fri, 24 Oct 2025 19:33:40 +0300 Subject: [PATCH 2/4] Address review feedback: Use integration test and fix adapter implementation Integration testing revealed the root cause of GH-4097. When Spring processes a `@KafkaListener` with `contentTypeConverter` and `batch="true"`, the framework: 1. Calls `setBatchMessageConverter()` on the adapter 2. This internally calls `setMessageConverter()` which sets `converterSet=true` 3. Spring then tries to apply `contentTypeConverter` by calling `setMessagingConverter()` 4. The parent's validation `Assert.isTrue(!this.converterSet, ...)` blocks this The unit test didn't catch this because it bypassed the adapter and Spring framework integration entirely. Changes: - `BatchMessagingMessageListenerAdapter.setMessagingConverter()`: Override now directly applies `SmartMessageConverter` to batch converter (which propagates to record converter) without calling super, bypassing the validation that doesn't apply to the batch listener workflow - `BatchSmartMessageConverterTests`: Replaced unit test with full integration test using `@SpringJUnitConfig`, `@EmbeddedKafka`, `ConcurrentKafkaListenerContainerFactory`, and `@KafkaListener` to verify the complete framework flow - Added minimal `ByteArrayToStringConverter` (24 lines) for testing as no existing Spring Framework converter provides simple byte[] to String conversion needed for this test scenario All tests pass and checkstyle validation successful. Signed-off-by: Jujuwryy --- .../BatchMessagingMessageListenerAdapter.java | 19 +- .../BatchMessagingMessageConverter.java | 5 +- .../BatchSmartMessageConverterTests.java | 225 +++++++++++------- 3 files changed, 147 insertions(+), 102 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java index b466a1005f..358e771b57 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java @@ -56,6 +56,7 @@ * @author Venil Noronha * @author Wang ZhiYang * @author Sanghyeok An + * @author George Mahfoud * @since 1.1 */ public class BatchMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter @@ -109,20 +110,24 @@ public void setBatchToRecordAdapter(BatchToRecordAdapter batchToRecordAdap } /** - * Set the {@link SmartMessageConverter} to use with both the default record converter - * and the batch message converter. + * Set the {@link SmartMessageConverter} to use with the batch message converter. *

* When a {@code SmartMessageConverter} is configured via * {@code @KafkaListener(contentTypeConverter = "...")}, this method ensures it is - * properly propagated to both the record converter (via the parent class) and the - * batch converter to support message conversion in batch listeners. + * properly propagated to the batch converter, which will then propagate it to the + * record converter for message conversion in batch listeners. + *

+ * This override does not call the parent implementation because the parent's validation + * (checking {@code converterSet}) blocks setting the SmartMessageConverter after + * {@code setBatchMessageConverter} has been called, which is the normal workflow for + * batch listeners. * @param messageConverter the converter to set */ @Override public void setMessagingConverter(SmartMessageConverter messageConverter) { - super.setMessagingConverter(messageConverter); - if (this.batchMessageConverter instanceof BatchMessagingMessageConverter batchConverter) { - batchConverter.setMessagingConverter(messageConverter); + if (this.batchMessageConverter instanceof BatchMessagingMessageConverter) { + ((BatchMessagingMessageConverter) this.batchMessageConverter) + .setMessagingConverter(messageConverter); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index dbdcae91e2..2bc6bac5d8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -68,6 +68,7 @@ * @author Borahm Lee * @author Artem Bilan * @author Soby Chacko + * @author George Mahfoud * * @since 1.1 */ @@ -79,8 +80,6 @@ public class BatchMessagingMessageConverter implements BatchMessageConverter { @Nullable private final RecordMessageConverter recordConverter; - private @Nullable SmartMessageConverter messagingConverter; - private boolean generateMessageId = false; private boolean generateTimestamp = false; @@ -152,8 +151,6 @@ public RecordMessageConverter getRecordMessageConverter() { * @since 3.3.11 */ public void setMessagingConverter(@Nullable SmartMessageConverter messagingConverter) { - this.messagingConverter = messagingConverter; - if (this.recordConverter instanceof MessagingMessageConverter messagingRecordConverter) { messagingRecordConverter.setMessagingConverter(messagingConverter); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java index 2fb3349e57..548eed67a1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/BatchSmartMessageConverterTests.java @@ -16,140 +16,183 @@ package org.springframework.kafka.listener; -import java.lang.reflect.Type; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; - -import org.apache.kafka.clients.consumer.ConsumerRecord; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.MessagingMessageConverter; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import static org.assertj.core.api.Assertions.assertThat; /** - * Tests for SmartMessageConverter support in batch listeners. - * Reproduces the issue described in GH-4097. + * Integration tests for SmartMessageConverter support in batch listeners. + * Reproduces and verifies the fix for the issue described in GH-4097. * - * @author Jujuwryy + * @author George Mahfoud * @since 3.3.11 */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(partitions = 1, topics = { "smartBatchTopic" }) class BatchSmartMessageConverterTests { - @Test - void testSmartMessageConverterWorksInBatchConversion() { - // Given: A BatchMessagingMessageConverter with a record converter and SmartMessageConverter - MessagingMessageConverter recordConverter = new MessagingMessageConverter(); - BatchMessagingMessageConverter batchConverter = new BatchMessagingMessageConverter(recordConverter); - - // Set up SmartMessageConverter that converts byte[] to String - TestStringMessageConverter smartConverter = new TestStringMessageConverter(); - batchConverter.setMessagingConverter(smartConverter); - - // Create test records with byte[] values that need conversion to String - List> records = Arrays.asList( - new ConsumerRecord<>("topic", 0, 0, "key", "hello".getBytes()), - new ConsumerRecord<>("topic", 0, 1, "key", "world".getBytes()) - ); - - // When: Convert batch with List target type - Type targetType = new TestParameterizedType(List.class, new Type[]{String.class}); - Message result = batchConverter.toMessage(records, null, null, targetType); - - // Then: Verify the SmartMessageConverter was applied and byte[] was converted to String - assertThat(result).isNotNull(); - assertThat(result.getPayload()).isInstanceOf(List.class); - - List payloads = (List) result.getPayload(); - assertThat(payloads).hasSize(2); - assertThat(payloads.get(0)).isEqualTo("hello"); - assertThat(payloads.get(1)).isEqualTo("world"); - } + @Autowired + private KafkaTemplate template; + + @Autowired + private Config config; @Test - void testBatchConversionWithoutSmartMessageConverter() { - // Given: A BatchMessagingMessageConverter without SmartMessageConverter - MessagingMessageConverter recordConverter = new MessagingMessageConverter(); - BatchMessagingMessageConverter batchConverter = new BatchMessagingMessageConverter(recordConverter); - - // Create test records with byte[] values - List> records = Arrays.asList( - new ConsumerRecord<>("topic", 0, 0, "key", "test".getBytes()) - ); - - // When: Convert batch - Type targetType = new TestParameterizedType(List.class, new Type[]{String.class}); - Message result = batchConverter.toMessage(records, null, null, targetType); - - // Then: Should work but payloads remain as byte[] - assertThat(result).isNotNull(); - List payloads = (List) result.getPayload(); - assertThat(payloads.get(0)).isInstanceOf(byte[].class); + void testContentTypeConverterWithBatchListener() throws Exception { + // Given: A batch listener with contentTypeConverter configured + BatchListener listener = this.config.batchListener(); + + // When: Send byte[] messages that should be converted to String + this.template.send("smartBatchTopic", "hello".getBytes()); + this.template.send("smartBatchTopic", "world".getBytes()); + + // Then: SmartMessageConverter should convert byte[] to String for batch listener + assertThat(listener.latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(listener.received).hasSize(2).containsExactly("hello", "world"); } - /** - * Test SmartMessageConverter that converts byte[] to String. - */ - static class TestStringMessageConverter implements SmartMessageConverter { + @Configuration + @EnableKafka + public static class Config { + + @Bean + public KafkaListenerContainerFactory kafkaListenerContainerFactory(EmbeddedKafkaBroker embeddedKafka) { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory(embeddedKafka)); + factory.setBatchListener(true); + // Set up batch converter with record converter - framework will propagate SmartMessageConverter + factory.setBatchMessageConverter(new BatchMessagingMessageConverter(new MessagingMessageConverter())); + return factory; + } - @Override - public Object fromMessage(Message message, Class targetClass) { - return convertPayload(message.getPayload()); + @Bean + public DefaultKafkaConsumerFactory consumerFactory(EmbeddedKafkaBroker embeddedKafka) { + return new DefaultKafkaConsumerFactory<>(consumerConfigs(embeddedKafka)); } - @Override - public Object fromMessage(Message message, Class targetClass, Object conversionHint) { - return convertPayload(message.getPayload()); + @Bean + public Map consumerConfigs(EmbeddedKafkaBroker embeddedKafka) { + Map consumerProps = + KafkaTestUtils.consumerProps(embeddedKafka, "smartBatchGroup", false); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + return consumerProps; } - @Override - public Message toMessage(Object payload, MessageHeaders headers) { - return MessageBuilder.withPayload(payload).copyHeaders(headers).build(); + @Bean + public KafkaTemplate template(EmbeddedKafkaBroker embeddedKafka) { + return new KafkaTemplate<>(producerFactory(embeddedKafka)); } - @Override - public Message toMessage(Object payload, MessageHeaders headers, Object conversionHint) { - return toMessage(payload, headers); + @Bean + public ProducerFactory producerFactory(EmbeddedKafkaBroker embeddedKafka) { + return new DefaultKafkaProducerFactory<>(producerConfigs(embeddedKafka)); + } + + @Bean + public Map producerConfigs(EmbeddedKafkaBroker embeddedKafka) { + Map props = KafkaTestUtils.producerProps(embeddedKafka); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + return props; + } + + @Bean + public SmartMessageConverter byteArrayToStringConverter() { + return new ByteArrayToStringConverter(); } - private Object convertPayload(Object payload) { - // Convert byte[] to String - this is the core functionality being tested - if (payload instanceof byte[] bytes) { - return new String(bytes); - } - return payload; + @Bean + public BatchListener batchListener() { + return new BatchListener(); } + } - /** - * Helper class for creating parameterized types for testing. - */ - static class TestParameterizedType implements java.lang.reflect.ParameterizedType { + public static class BatchListener { + + private final CountDownLatch latch = new CountDownLatch(2); - private final Type rawType; + private final List received = new ArrayList<>(); - private final Type[] typeArguments; + @KafkaListener( + id = "batchSmartListener", + topics = "smartBatchTopic", + groupId = "smartBatchGroup", + contentTypeConverter = "byteArrayToStringConverter", + batch = "true" + ) + public void listen(List messages) { + messages.forEach(message -> { + this.received.add(message); + this.latch.countDown(); + }); + } + + } + + /** + * Simple SmartMessageConverter for testing that converts byte[] to String. + */ + static class ByteArrayToStringConverter implements SmartMessageConverter { - TestParameterizedType(Type rawType, Type[] typeArguments) { - this.rawType = rawType; - this.typeArguments = typeArguments; + @Override + public Object fromMessage(Message message, Class targetClass) { + Object payload = message.getPayload(); + return (payload instanceof byte[] bytes) ? new String(bytes) : payload; } - public Type[] getActualTypeArguments() { - return typeArguments; + @Override + public Object fromMessage(Message message, Class targetClass, Object conversionHint) { + return fromMessage(message, targetClass); } - public Type getRawType() { - return rawType; + @Override + public Message toMessage(Object payload, MessageHeaders headers) { + return MessageBuilder.withPayload(payload).copyHeaders(headers).build(); } - public Type getOwnerType() { - return null; + @Override + public Message toMessage(Object payload, MessageHeaders headers, Object conversionHint) { + return toMessage(payload, headers); } + } } From 3cee10e82bb361a20a25bb72bcce14effb192515 Mon Sep 17 00:00:00 2001 From: Jujuwryy Date: Fri, 24 Oct 2025 20:54:10 +0300 Subject: [PATCH 3/4] Apply code review feedback: pattern matching and indentation - Use pattern matching for instanceof in setMessagingConverter() to avoid explicit casting - Fix constructor parameter indentation to use tabs only (not mixed spaces) - Address checkstyle violations per reviewer feedback These changes improve code readability without affecting functionality. Signed-off-by: Jujuwryy --- .../adapter/BatchMessagingMessageListenerAdapter.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java index 358e771b57..b90531228e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java @@ -82,7 +82,7 @@ public BatchMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Met * @param errorHandler the error handler. */ public BatchMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method, - @Nullable KafkaListenerErrorHandler errorHandler) { + @Nullable KafkaListenerErrorHandler errorHandler) { super(bean, method, errorHandler); } @@ -125,9 +125,8 @@ public void setBatchToRecordAdapter(BatchToRecordAdapter batchToRecordAdap */ @Override public void setMessagingConverter(SmartMessageConverter messageConverter) { - if (this.batchMessageConverter instanceof BatchMessagingMessageConverter) { - ((BatchMessagingMessageConverter) this.batchMessageConverter) - .setMessagingConverter(messageConverter); + if (this.batchMessageConverter instanceof BatchMessagingMessageConverter messagingConverter) { + messagingConverter.setMessagingConverter(messageConverter); } } From 240254e371893f5f9ff46043b9d4e400fb0de1c2 Mon Sep 17 00:00:00 2001 From: Jujuwryy Date: Fri, 24 Oct 2025 20:58:22 +0300 Subject: [PATCH 4/4] Fix remaining indentation issues in method parameters - Fix toMessagingMessage() parameter indentation in BatchMessagingMessageListenerAdapter - Fix toMessage() parameter indentation in BatchMessagingMessageConverter - Use single tab indentation consistently per Spring code style Signed-off-by: Jujuwryy --- .../listener/adapter/BatchMessagingMessageListenerAdapter.java | 2 +- .../kafka/support/converter/BatchMessagingMessageConverter.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java index b90531228e..f36d0ba5ce 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java @@ -193,7 +193,7 @@ public void onMessage(List> records, @Nullable Acknowledgme @SuppressWarnings({ "unchecked", "rawtypes" }) protected Message toMessagingMessage(List records, @Nullable Acknowledgment acknowledgment, - @Nullable Consumer consumer) { + @Nullable Consumer consumer) { return getBatchMessageConverter().toMessage(records, acknowledgment, consumer, getType()); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 2bc6bac5d8..b7fbfbbae6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -168,7 +168,7 @@ public void setRawRecordHeader(boolean rawRecordHeader) { @Override // NOSONAR public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, - @Nullable Consumer consumer, Type type) { + @Nullable Consumer consumer, Type type) { KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp);