diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java index b0f24d3fe3..f58f2a2791 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java @@ -244,4 +244,13 @@ */ String[] properties() default {}; + /** + * When false and the return type is a {@link Iterable} return the result as the value + * of a single reply record instead of individual records for each element. Default + * true. Ignored if the reply is of type {@code Iterable>}. + * @return false to create a single reply record. + * @since 2.3.5 + */ + boolean splitIterables() default true; + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 902992b4c4..0df3f20163 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -437,6 +437,7 @@ protected void processListener(MethodKafkaListenerEndpoint endpoint, Kafka endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup")); } resolveKafkaProperties(endpoint, kafkaListener.properties()); + endpoint.setSplitIterables(kafkaListener.splitIterables()); KafkaListenerContainerFactory factory = null; String containerFactoryBeanName = resolve(kafkaListener.containerFactory()); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 3426187f9e..8fbf6820dc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -114,6 +114,8 @@ public abstract class AbstractKafkaListenerEndpoint private Properties consumerProperties; + private boolean splitIterables = true; + @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; @@ -432,6 +434,21 @@ public void setConsumerProperties(Properties consumerProperties) { this.consumerProperties = consumerProperties; } + @Override + public boolean isSplitIterables() { + return this.splitIterables; + } + + /** + * Set to false to disable splitting {@link Iterable} reply values into separate + * records. + * @param splitIterables false to disable; default true. + * @since 2.3.5 + */ + public void setSplitIterables(boolean splitIterables) { + this.splitIterables = splitIterables; + } + @Override public void afterPropertiesSet() { boolean topicsEmpty = getTopics().isEmpty(); @@ -470,6 +487,7 @@ private void setupMessageListener(MessageListenerContainer container, MessageCon if (this.replyHeadersConfigurer != null) { adapter.setReplyHeadersConfigurer(this.replyHeadersConfigurer); } + adapter.setSplitIterables(this.splitIterables); Object messageListener = adapter; Assert.state(messageListener != null, () -> "Endpoint [" + this + "] must provide a non null message listener"); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java index 73d05bc9d6..d30448f46a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java @@ -144,4 +144,11 @@ default Properties getConsumerProperties() { */ void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter); + /** + * When true, {@link Iterable} return results will be split into discrete records. + * @return true to split. + * @since 2.3.5 + */ + boolean isSplitIterables(); + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java index 0d9c059c24..da980b3bf9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java @@ -87,4 +87,9 @@ public void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter) { } + @Override + public boolean isSplitIterables() { + return true; + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index a86f975933..35b47d8bd5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -123,6 +124,8 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS private ReplyHeadersConfigurer replyHeadersConfigurer; + private boolean splitIterables = true; + public MessagingMessageListenerAdapter(Object bean, Method method) { this.bean = bean; this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final @@ -252,6 +255,25 @@ public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigu this.replyHeadersConfigurer = replyHeadersConfigurer; } + /** + * When true, {@link Iterable} return results will be split into discrete records. + * @return true to split. + * @since 2.3.5 + */ + protected boolean isSplitIterables() { + return this.splitIterables; + } + + /** + * Set to false to disable splitting {@link Iterable} reply values into separate + * records. + * @param splitIterables false to disable; default true. + * @since 2.3.5 + */ + public void setSplitIterables(boolean splitIterables) { + this.splitIterables = splitIterables; + } + @Override public void registerSeekCallback(ConsumerSeekCallback callback) { if (this.bean instanceof ConsumerSeekAware) { @@ -406,15 +428,25 @@ else if (result instanceof Message) { this.replyTemplate.send((Message) result); } else { - if (result instanceof Collection) { - ((Collection) result).forEach(v -> { - if (v instanceof Message) { - this.replyTemplate.send((Message) v); - } - else { - this.replyTemplate.send(topic, v); - } - }); + if (result instanceof Iterable) { + Iterator iterator = ((Iterable) result).iterator(); + boolean iterableOfMessages = false; + if (iterator.hasNext()) { + iterableOfMessages = iterator.next() instanceof Message; + } + if (iterableOfMessages || this.splitIterables) { + ((Iterable) result).forEach(v -> { + if (v instanceof Message) { + this.replyTemplate.send((Message) v); + } + else { + this.replyTemplate.send(topic, v); + } + }); + } + else { + sendSingleResult(result, topic, source); + } } else { sendSingleResult(result, topic, source); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 3658cacac0..5cb2f5fee7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -107,6 +107,8 @@ import org.springframework.kafka.support.converter.ProjectingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.StringJsonMessageConverter; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; @@ -153,7 +155,8 @@ "annotated22reply", "annotated23", "annotated23reply", "annotated24", "annotated24reply", "annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28", "annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33", - "annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle" }) + "annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle", + "annotated38", "annotated38reply" }) public class EnableKafkaIntegrationTests { private static final String DEFAULT_TEST_GROUP_ID = "testAnnot"; @@ -802,6 +805,41 @@ public void testSeekToLastOnIdle() throws InterruptedException { assertThat(KafkaTestUtils.getPropertyValue(this.seekOnIdleListener, "callbacks", Map.class)).hasSize(0); } + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testReplyingBatchListenerReturnCollection() { + Map consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties()); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testReplyingBatchListenerReturnCollection"); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); + Consumer consumer = cf.createConsumer(); + this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "annotated38reply"); + template.send("annotated38", 0, 0, "FoO"); + template.send("annotated38", 0, 0, "BaR"); + template.flush(); + ConsumerRecords replies = KafkaTestUtils.getRecords(consumer); + assertThat(replies.count()).isGreaterThanOrEqualTo(1); + Iterator> iterator = replies.iterator(); + Object value = iterator.next().value(); + assertThat(value).isInstanceOf(List.class); + List list = (List) value; + assertThat(list).hasSizeGreaterThanOrEqualTo(1); + assertThat(list.get(0)).isEqualTo("FOO"); + if (list.size() > 1) { + assertThat(list.get(1)).isEqualTo("BAR"); + } + else { + replies = KafkaTestUtils.getRecords(consumer); + assertThat(replies.count()).isGreaterThanOrEqualTo(1); + iterator = replies.iterator(); + value = iterator.next().value(); + list = (List) value; + assertThat(list).hasSize(1); + assertThat(list.get(0)).isEqualTo("BAR"); + } + consumer.close(); + } + @Configuration @EnableKafka @EnableTransactionManagement(proxyTargetClass = true) @@ -850,7 +888,7 @@ public ChainedKafkaTransactionManager cktm() { new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setRecordFilterStrategy(recordFilter()); - factory.setReplyTemplate(partitionZeroReplyingTemplate()); + factory.setReplyTemplate(partitionZeroReplyTemplate()); factory.setErrorHandler((ConsumerAwareErrorHandler) (t, d, c) -> { this.globalErrorThrowable = t; c.seek(new org.apache.kafka.common.TopicPartition(d.topic(), d.partition()), d.offset()); @@ -867,7 +905,7 @@ public ChainedKafkaTransactionManager cktm() { new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setRecordFilterStrategy(recordFilter()); - factory.setReplyTemplate(partitionZeroReplyingTemplate()); + factory.setReplyTemplate(partitionZeroReplyTemplate()); factory.setErrorHandler((ConsumerAwareErrorHandler) (t, d, c) -> { this.globalErrorThrowable = t; c.seek(new org.apache.kafka.common.TopicPartition(d.topic(), d.partition()), d.offset()); @@ -987,7 +1025,19 @@ public KafkaListenerContainerFactory batchFactory() { factory.setBatchListener(true); factory.setRecordFilterStrategy(recordFilter()); // always send to the same partition so the replies are in order for the test - factory.setReplyTemplate(partitionZeroReplyingTemplate()); + factory.setReplyTemplate(partitionZeroReplyTemplate()); + return factory; + } + + @Bean + public KafkaListenerContainerFactory batchJsonReplyFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setBatchListener(true); + factory.setRecordFilterStrategy(recordFilter()); + // always send to the same partition so the replies are in order for the test + factory.setReplyTemplate(partitionZeroReplyJsonTemplate()); return factory; } @@ -1017,7 +1067,7 @@ public KafkaListenerContainerFactory batchSpyFactory() { factory.setBatchListener(true); factory.setRecordFilterStrategy(recordFilter()); // always send to the same partition so the replies are in order for the test - factory.setReplyTemplate(partitionZeroReplyingTemplate()); + factory.setReplyTemplate(partitionZeroReplyTemplate()); factory.setMissingTopicsFatal(false); return factory; } @@ -1167,6 +1217,13 @@ public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } + @Bean + public ProducerFactory jsonProducerFactory() { + Map producerConfigs = producerConfigs(); + producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + return new DefaultKafkaProducerFactory<>(producerConfigs); + } + @Bean public ProducerFactory txProducerFactory() { DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(producerConfigs()); @@ -1197,7 +1254,7 @@ public KafkaTemplate bytesKeyTemplate() { } @Bean - public KafkaTemplate partitionZeroReplyingTemplate() { + public KafkaTemplate partitionZeroReplyTemplate() { // reply always uses the no-partition, no-key method; subclasses can be used return new KafkaTemplate(producerFactory(), true) { @@ -1209,6 +1266,19 @@ public ListenableFuture> send(String topic, String d }; } + @Bean + public KafkaTemplate partitionZeroReplyJsonTemplate() { + // reply always uses the no-partition, no-key method; subclasses can be used + return new KafkaTemplate(jsonProducerFactory(), true) { + + @Override + public ListenableFuture> send(String topic, Object data) { + return super.send(topic, 0, null, data); + } + + }; + } + @Bean public KafkaTemplate kafkaJsonTemplate() { KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory()); @@ -1727,6 +1797,15 @@ public Collection replyingBatchListenerWithErrorHandler(List in) throw new RuntimeException("return this"); } + @KafkaListener(id = "replyingBatchListenerCollection", topics = "annotated38", + containerFactory = "batchJsonReplyFactory", splitIterables = false) + @SendTo("annotated38reply") + public Collection replyingBatchListenerCollection(List in) { + return in.stream() + .map(String::toUpperCase) + .collect(Collectors.toList()); + } + @KafkaListener(id = "batchAckListener", topics = { "annotated26", "annotated27" }, containerFactory = "batchFactory") public void batchAckListener(List in, diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 7a5347ba3e..729a0ff1c1 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -1734,6 +1734,11 @@ public KafkaListenerErrorHandler voidSendToErrorHandler() { See <> for more information. ==== +NOTE: If a listener method returns an `Iterable`, by default a record for each element as the value is sent. +Starting with version 2.3.5, set the `splitIterables` property on `@KafkaListener` to `false` and the entire result will be sent as the value of a single `ProducerRecord`. +This requires a suitable serializer in the reply template's producer configuration. +However, if the reply is `Iterable>` the property is ignored and each message is sent separately. + ===== Filtering Messages In certain scenarios, such as rebalancing, a message that has already been processed may be redelivered. diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 6a501eacd7..9b23d17e92 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -78,94 +78,13 @@ See <> for more information The `ContainerProperties` provides an `authorizationExceptionRetryInterval` option to let the listener container to retry after any `AuthorizationException` is thrown by the `KafkaConsumer`. See its JavaDocs and <> for more information. -==== ErrorHandler Changes +==== @KafkaListener -The `SeekToCurrentErrorHandler` now treats certain exceptions as fatal and disables retry for those, invoking the recoverer on first failure. +The `@KafkaListener` annotation has a new property `splitIterables`; default true. +When a replying listener returns an `Iterable` this property controls whether the return result is sent as a single record or a record for each element is sent. +See <> for more information. -The `SeekToCurrentErrorHandler` and `SeekToCurrentBatchErrorHandler` can now be configured to apply a `BackOff` (thread sleep) between delivery attempts. - -Starting with version 2.3.2, recovered records' offsets will be committed when the error handler returns after recovering a failed record. - -See <> for more information. - -The `DeadLetterPublishingRecoverer`, when used in conjunction with an `ErrorHandlingDeserializer2`, now sets the payload of the message sent to the dead-letter topic, to the original value that could not be deserialized. -Previously, it was `null` and user code needed to extract the `DeserializationException` from the message headers. -See <> for more information. - -==== TopicBuilder - -A new class `TopicBuilder` is provided for more convenient creation of `NewTopic` `@Bean` s for automatic topic provisioning. -See <> for more information. - -==== Kafka Streams Changes - -You can now perform additional configuration of the `StreamsBuilderFactoryBean` created by `@EnableKafkaStreams`. -See <> for more information. - -A `RecoveringDeserializationExceptionHandler` is now provided which allows records with deserialization errors to be recovered. -It can be used in conjunction with a `DeadLetterPublishingRecoverer` to send these records to a dead-letter topic. -See <> for more information. - -The `HeaderEnricher` transformer has been provided, using SpEL to generate the header values. -See <> for more information. - -The `MessagingTransformer` has been provided. -This allows a Kafka streams topology to interact with a spring-messaging component, such as a Spring Integration flow. -See <> and <> for more information. - -==== JSON Component Changes - -Now all the JSON-aware components are configured by default with a Jackson `ObjectMapper` produced by the `JacksonUtils.enhancedObjectMapper()`. -The `JsonDeserializer` now provides `TypeReference`-based constructors for better handling of target generic container types. -Also a `JacksonMimeTypeModule` has been introduced for serialization of `org.springframework.util.MimeType` to plain string. -See its JavaDocs and <> for more information. - -A `ByteArrayJsonMessageConverter` has been provided as well as a new super class for all Json converters, `JsonMessageConverter`. -Also, a `StringOrBytesSerializer` is now available; it can serialize `byte[]`, `Bytes` and `String` values in `ProducerRecord` s. -See <> for more information. - -The `JsonSerializer`, `JsonDeserializer` and `JsonSerde` now have fluent APIs to make programmatic configuration simpler. -See the javadocs, <>, and <> for more informaion. - -==== ReplyingKafkaTemplate - -When a reply times out, the future is completed exceptionally with a `KafkaReplyTimeoutException` instead of a `KafkaException`. - -Also, an overloaded `sendAndReceive` method is now provided that allows specifying the reply timeout on a per message basis. - -==== AggregatingReplyingKafkaTemplate - -Extends the `ReplyingKafkaTemplate` by aggregating replies from multiple receivers. -The `releaseStrategy` is a `BiConsumer`. -It is called after a timeout (as well as when records arrive); the second parameter is `true` in the case of a call after a timeout. -See <> for more information. - -==== Transaction Changes - -You can now override the producer factory's `transactionIdPrefix` on the `KafkaTemplate` and `KafkaTransactionManager`. -See <> for more information. - -==== New Delegating Serializer/Deserializer - -The framework now provides a delegating `Serializer` and `Deserializer`, utilizing a header to enable producing and consuming records with multiple key/value types. -See <> for more information. - -==== New Retrying Deserializer - -The framework now provides a delegating `RetryingDeserializer`, to retry serialization when transient errors such as network problems might occur. -See <> for more information. - -==== New function for recovering from deserializing errors - -`ErrorHandlingDeserializer2` now uses a POJO (`FailedDeserializationInfo`) for passing all the contextual information around a deserialization error. -This enables the code to access to extra information that was missing in the old `BiFunction failedDeserializationFunction`. - -==== EmbeddedKafkaBroker Changes - -You can now override the default broker list property name in the annotation. -See <> for more information. - -==== ReplyingKafkaTemplate Changes +=== Migration Guide You can now customize the header names for correlation, reply topic and reply partition. See <> for more information.