diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index 4deb3c65da..24a5877eca 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -39,6 +39,7 @@ import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer; import org.springframework.kafka.requestreply.ReplyingKafkaOperations; +import org.springframework.kafka.support.JavaUtils; import org.springframework.kafka.support.TopicPartitionInitialOffset; import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.retry.RecoveryCallback; @@ -286,10 +287,8 @@ public void afterPropertiesSet() { @Override public C createListenerContainer(KafkaListenerEndpoint endpoint) { C instance = createContainerInstance(endpoint); - - if (endpoint.getId() != null) { - instance.setBeanName(endpoint.getId()); - } + JavaUtils.INSTANCE + .acceptIfNotNull(endpoint.getId(), instance::setBeanName); if (endpoint instanceof AbstractKafkaListenerEndpoint) { configureEndpoint((AbstractKafkaListenerEndpoint) endpoint); } @@ -301,30 +300,15 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) { } private void configureEndpoint(AbstractKafkaListenerEndpoint aklEndpoint) { - if (this.recordFilterStrategy != null) { - aklEndpoint.setRecordFilterStrategy(this.recordFilterStrategy); - } - if (this.ackDiscarded != null) { - aklEndpoint.setAckDiscarded(this.ackDiscarded); - } - if (this.retryTemplate != null) { - aklEndpoint.setRetryTemplate(this.retryTemplate); - } - if (this.recoveryCallback != null) { - aklEndpoint.setRecoveryCallback(this.recoveryCallback); - } - if (this.statefulRetry != null) { - aklEndpoint.setStatefulRetry(this.statefulRetry); - } - if (this.batchListener != null) { - aklEndpoint.setBatchListener(this.batchListener); - } - if (this.replyTemplate != null) { - aklEndpoint.setReplyTemplate(this.replyTemplate); - } - if (this.replyHeadersConfigurer != null) { - aklEndpoint.setReplyHeadersConfigurer(this.replyHeadersConfigurer); - } + JavaUtils.INSTANCE + .acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy) + .acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded) + .acceptIfNotNull(this.retryTemplate, aklEndpoint::setRetryTemplate) + .acceptIfNotNull(this.recoveryCallback, aklEndpoint::setRecoveryCallback) + .acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry) + .acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener) + .acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate) + .acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer); } /** @@ -345,35 +329,26 @@ protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) { ContainerProperties properties = instance.getContainerProperties(); BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern", "messageListener", "ackCount", "ackTime"); - if (this.afterRollbackProcessor != null) { - instance.setAfterRollbackProcessor(this.afterRollbackProcessor); - } - if (this.containerProperties.getAckCount() > 0) { - properties.setAckCount(this.containerProperties.getAckCount()); - } - if (this.containerProperties.getAckTime() > 0) { - properties.setAckTime(this.containerProperties.getAckTime()); - } - if (this.errorHandler != null) { - instance.setGenericErrorHandler(this.errorHandler); - } + JavaUtils.INSTANCE + .acceptIfNotNull(this.afterRollbackProcessor, instance::setAfterRollbackProcessor) + .acceptIfCondition(this.containerProperties.getAckCount() > 0, this.containerProperties.getAckCount(), + properties::setAckCount) + .acceptIfCondition(this.containerProperties.getAckTime() > 0, this.containerProperties.getAckTime(), + properties::setAckTime) + .acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler); if (endpoint.getAutoStartup() != null) { instance.setAutoStartup(endpoint.getAutoStartup()); } else if (this.autoStartup != null) { instance.setAutoStartup(this.autoStartup); } - if (this.phase != null) { - instance.setPhase(this.phase); - } - if (this.applicationEventPublisher != null) { - instance.setApplicationEventPublisher(this.applicationEventPublisher); - } - instance.getContainerProperties().setGroupId(endpoint.getGroupId()); - instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix()); - if (endpoint.getConsumerProperties() != null) { - instance.getContainerProperties().setConsumerProperties(endpoint.getConsumerProperties()); - } + JavaUtils.INSTANCE + .acceptIfNotNull(this.phase, instance::setPhase) + .acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher) + .acceptIfNotNull(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId) + .acceptIfNotNull(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId) + .acceptIfNotNull(endpoint.getConsumerProperties(), + instance.getContainerProperties()::setConsumerProperties); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 190e7b755d..440026b15e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -30,6 +30,7 @@ import org.springframework.kafka.listener.adapter.HandlerAdapter; import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; +import org.springframework.kafka.support.JavaUtils; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; @@ -147,19 +148,19 @@ protected MessageHandlerMethodFactory getMessageHandlerMethodFactory() { @Override protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container, MessageConverter messageConverter) { + Assert.state(this.messageHandlerMethodFactory != null, "Could not create message listener - MessageHandlerMethodFactory not set"); MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(messageConverter); messageListener.setHandlerMethod(configureListenerAdapter(messageListener)); - String replyTopic = getReplyTopic(); - if (replyTopic != null) { - Assert.state(getMethod().getReturnType().equals(void.class) - || getReplyTemplate() != null, "a KafkaTemplate is required to support replies"); - messageListener.setReplyTopic(replyTopic); - } - if (getReplyTemplate() != null) { - messageListener.setReplyTemplate(getReplyTemplate()); - } + JavaUtils.INSTANCE + .acceptIfNotNull(getReplyTopic(), replyTopic -> { + Assert.state(getMethod().getReturnType().equals(void.class) + || getReplyTemplate() != null, "a KafkaTemplate is required to support replies"); + messageListener.setReplyTopic(replyTopic); + }) + .acceptIfNotNull(getReplyTemplate(), messageListener::setReplyTemplate); + return messageListener; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java index 1b9a78fd92..db1d1e6101 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java @@ -186,7 +186,8 @@ public interface KafkaOperations { /** * When running in a transaction, send the consumer offset(s) to the transaction. The - * group id is obtained from {@link ProducerFactoryUtils#getConsumerGroupId()}. It is + * group id is obtained from + * {@link org.springframework.kafka.support.KafkaUtils#getConsumerGroupId()}. It is * not necessary to call this method if the operations are invoked on a listener * container thread (and the listener container is configured with a * {@link org.springframework.kafka.transaction.KafkaAwareTransactionManager}) since diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 5cf590ca55..4bfd1e3e51 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.SendResult; @@ -329,7 +330,7 @@ public void flush() { @Override public void sendOffsetsToTransaction(Map offsets) { - sendOffsetsToTransaction(offsets, ProducerFactoryUtils.getConsumerGroupId()); + sendOffsetsToTransaction(offsets, KafkaUtils.getConsumerGroupId()); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java index 97d699966a..e4207dfaf2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.producer.Producer; +import org.springframework.kafka.support.KafkaUtils; import org.springframework.lang.Nullable; import org.springframework.transaction.support.ResourceHolderSynchronization; import org.springframework.transaction.support.TransactionSynchronization; @@ -35,8 +36,6 @@ */ public final class ProducerFactoryUtils { - private static ThreadLocal groupIds = new ThreadLocal<>(); - private ProducerFactoryUtils() { super(); } @@ -82,27 +81,33 @@ public static void releaseResources(@Nullable KafkaResourceHolder r /** * Set the group id for the consumer bound to this thread. * @param groupId the group id. + * @deprecated in favor of {@link KafkaUtils#setConsumerGroupId(String)}. * @since 1.3 */ + @Deprecated public static void setConsumerGroupId(String groupId) { - groupIds.set(groupId); + KafkaUtils.setConsumerGroupId(groupId); } /** * Get the group id for the consumer bound to this thread. * @return the group id. + * @deprecated in favor of {@link KafkaUtils#getConsumerGroupId()}. * @since 1.3 */ + @Deprecated public static String getConsumerGroupId() { - return groupIds.get(); + return KafkaUtils.getConsumerGroupId(); } /** * Clear the group id for the consumer bound to this thread. + * @deprecated in favor of {@link KafkaUtils#clearConsumerGroupId()}. * @since 1.3 */ + @Deprecated public static void clearConsumerGroupId() { - groupIds.remove(); + KafkaUtils.clearConsumerGroupId(); } private static void bindResourceToTransaction(KafkaResourceHolder resourceHolder, @@ -151,8 +156,8 @@ public void afterCompletion(int status) { } @Override - protected void releaseResource(KafkaResourceHolder resourceHolder, Object resourceKey) { - ProducerFactoryUtils.releaseResources(resourceHolder); + protected void releaseResource(KafkaResourceHolder holder, Object resourceKey) { + ProducerFactoryUtils.releaseResources(holder); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 3634197a8d..7de59fcaed 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -56,7 +56,6 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaResourceHolder; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.core.ProducerFactoryUtils; import org.springframework.kafka.event.ConsumerPausedEvent; import org.springframework.kafka.event.ConsumerResumedEvent; import org.springframework.kafka.event.ConsumerStoppedEvent; @@ -66,6 +65,7 @@ import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.LogIfLevelEnabled; import org.springframework.kafka.support.TopicPartitionInitialOffset; import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition; @@ -743,9 +743,7 @@ public void run() { if (this.genericListener instanceof ConsumerSeekAware) { ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this); } - if (this.transactionManager != null) { - ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId); - } + KafkaUtils.setConsumerGroupId(this.consumerGroupId); this.count = 0; this.last = System.currentTimeMillis(); initAssignedPartitions(); @@ -864,7 +862,7 @@ private void checkIdle() { } public void wrapUp() { - ProducerFactoryUtils.clearConsumerGroupId(); + KafkaUtils.clearConsumerGroupId(); publishConsumerStoppingEvent(this.consumer); if (!this.fatalError) { if (this.kafkaTxManager == null) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 508c1ce9cd..89a99ca4ae 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -57,6 +57,7 @@ import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessagingException; import org.springframework.messaging.converter.MessageConversionException; +import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -475,7 +476,7 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity } Type genericParameterType = null; - boolean hasConsumerParameter = false; + int allowedBatchParameters = 1; for (int i = 0; i < method.getParameterCount(); i++) { MethodParameter methodParameter = new MethodParameter(method, i); @@ -529,23 +530,33 @@ else if (parameterizedType.getRawType().equals(List.class) } else if (methodParameter.getGenericParameterType().equals(Acknowledgment.class)) { this.hasAckParameter = true; + allowedBatchParameters++; + } + else if (methodParameter.hasParameterAnnotation(Header.class)) { + Header header = methodParameter.getParameterAnnotation(Header.class); + if (header.value().equals(KafkaHeaders.GROUP_ID)) { + allowedBatchParameters++; + } } else { if (methodParameter.getGenericParameterType().equals(Consumer.class)) { - hasConsumerParameter = true; + allowedBatchParameters++; } else { Type parameterType = methodParameter.getGenericParameterType(); - hasConsumerParameter = parameterType instanceof ParameterizedType - && ((ParameterizedType) parameterType).getRawType().equals(Consumer.class); + if (parameterType instanceof ParameterizedType + && ((ParameterizedType) parameterType).getRawType().equals(Consumer.class)) { + allowedBatchParameters++; + } } } } - boolean validParametersForBatch = validParametersForBatch(method.getGenericParameterTypes().length, - this.hasAckParameter, hasConsumerParameter); + boolean validParametersForBatch = method.getGenericParameterTypes().length <= allowedBatchParameters; + if (!validParametersForBatch) { String stateMessage = "A parameter of type '%s' must be the only parameter " - + "(except for an optional 'Acknowledgment' and/or 'Consumer')"; + + "(except for an optional 'Acknowledgment' and/or 'Consumer' " + + "and/or '@Header(KafkaHeaders.GROUP_ID) String groupId'"; Assert.state(!this.isConsumerRecords, () -> String.format(stateMessage, "ConsumerRecords")); Assert.state(!this.isConsumerRecordList, @@ -557,18 +568,6 @@ else if (methodParameter.getGenericParameterType().equals(Acknowledgment.class)) return genericParameterType; } - private boolean validParametersForBatch(int parameterCount, boolean hasAck, boolean hasConsumer) { - if (hasAck) { - return parameterCount == 2 || (hasConsumer && parameterCount == 3); // NOSONAR magic # - } - else if (hasConsumer) { - return parameterCount == 2; // NOSONAR magic # - } - else { - return parameterCount == 1; // NOSONAR magic # - } - } - /* * Don't consider parameter types that are available after conversion. * Acknowledgment, ConsumerRecord, Consumer, ConsumerRecord<...>, Consumer<...>, and Message. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index bdece396e8..a395ba1ffc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -24,6 +24,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,24 +48,7 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR - private static final List NEVER_MAPPED = Arrays.asList( - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.ACKNOWLEDGMENT), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.CONSUMER), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.MESSAGE_KEY), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.OFFSET), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.PARTITION_ID), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.RAW_DATA), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.RECEIVED_MESSAGE_KEY), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.RECEIVED_PARTITION_ID), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.RECEIVED_TIMESTAMP), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.RECEIVED_TOPIC), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.TIMESTAMP), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.TIMESTAMP_TYPE), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.BATCH_CONVERTED_HEADERS), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.NATIVE_HEADERS), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.TOPIC)); - - protected final List matchers = new ArrayList<>(NEVER_MAPPED); // NOSONAR + private final List matchers = new ArrayList<>(); private final Map rawMappedtHeaders = new HashMap<>(); @@ -73,11 +58,41 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { public AbstractKafkaHeaderMapper(String... patterns) { Assert.notNull(patterns, "'patterns' must not be null"); + this.matchers.add(new NeverMatchHeaderMatcher( + KafkaHeaders.ACKNOWLEDGMENT, + KafkaHeaders.CONSUMER, + KafkaHeaders.MESSAGE_KEY, + KafkaHeaders.OFFSET, + KafkaHeaders.PARTITION_ID, + KafkaHeaders.RAW_DATA, + KafkaHeaders.RECEIVED_MESSAGE_KEY, + KafkaHeaders.RECEIVED_PARTITION_ID, + KafkaHeaders.RECEIVED_TIMESTAMP, + KafkaHeaders.RECEIVED_TOPIC, + KafkaHeaders.TIMESTAMP, + KafkaHeaders.TIMESTAMP_TYPE, + KafkaHeaders.BATCH_CONVERTED_HEADERS, + KafkaHeaders.NATIVE_HEADERS, + KafkaHeaders.TOPIC, + KafkaHeaders.GROUP_ID)); for (String pattern : patterns) { this.matchers.add(new SimplePatternBasedHeaderMatcher(pattern)); } } + /** + * Subclasses can invoke this to add custom {@link HeaderMatcher}s. + * @param matchersToAdd the matchers to add. + * @since 2.3 + */ + protected final void addMatchers(HeaderMatcher... matchersToAdd) { + Assert.notNull(matchersToAdd, "'matchersToAdd' cannot be null"); + Assert.noNullElements(matchersToAdd, "'matchersToAdd' cannot have null elements"); + for (HeaderMatcher matcher : matchersToAdd) { + this.matchers.add(matcher); + } + } + /** * Set to true to map all {@code String} valued outbound headers to {@code byte[]}. * To map to a {@code String} for inbound, there must be an entry in the rawMappedHeaders map. @@ -137,7 +152,7 @@ protected boolean matches(String header, Object value) { } protected boolean matches(String header) { - for (SimplePatternBasedHeaderMatcher matcher : this.matchers) { + for (HeaderMatcher matcher : this.matchers) { if (matcher.matchHeader(header)) { return !matcher.isNegated(); } @@ -200,15 +215,61 @@ private String mapRawIn(String header, byte[] value) { } + /** + * A matcher for headers. + * @since 2.3 + */ + protected interface HeaderMatcher { + + /** + * Return true if the header matches. + * @param headerName the header name. + * @return true for a match. + */ + boolean matchHeader(String headerName); + + /** + * Return true if this matcher is a negative matcher. + * @return true for a negative matcher. + */ + boolean isNegated(); + + } + + /** + * A matcher that never matches a set of headers. + * @since 2.3 + */ + protected static class NeverMatchHeaderMatcher implements HeaderMatcher { + + private final Set neverMatchHeaders; + + protected NeverMatchHeaderMatcher(String... headers) { + this.neverMatchHeaders = Arrays.stream(headers) + .collect(Collectors.toSet()); + } + + @Override + public boolean matchHeader(String headerName) { + return this.neverMatchHeaders.contains(headerName); + } + + @Override + public boolean isNegated() { + return true; + } + + } + /** * A pattern-based header matcher that matches if the specified * header matches the specified simple pattern. *

The {@code negate == true} state indicates if the matching should be treated as "not matched". * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */ - protected static class SimplePatternBasedHeaderMatcher { + protected static class SimplePatternBasedHeaderMatcher implements HeaderMatcher { - private static final Log logger = LogFactory.getLog(SimplePatternBasedHeaderMatcher.class); // NOSONAR + private static final Log LOGGER = LogFactory.getLog(SimplePatternBasedHeaderMatcher.class); private final String pattern; @@ -224,11 +285,12 @@ public SimplePatternBasedHeaderMatcher(String pattern) { this.negate = negate; } + @Override public boolean matchHeader(String headerName) { String header = headerName.toLowerCase(); if (PatternMatchUtils.simpleMatch(this.pattern, header)) { - if (logger.isDebugEnabled()) { - logger.debug( + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( MessageFormat.format( "headerName=[{0}] WILL " + (this.negate ? "NOT " : "") + "be mapped, matched pattern=" + (this.negate ? "!" : "") + "{1}", @@ -239,6 +301,7 @@ public boolean matchHeader(String headerName) { return false; } + @Override public boolean isNegated() { return this.negate; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/JavaUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/support/JavaUtils.java new file mode 100644 index 0000000000..e37d6b5fc9 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/JavaUtils.java @@ -0,0 +1,168 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support; + +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; +import org.springframework.util.StringUtils; + +/** + * Chained utility methods to simplify some Java repetitive code. Obtain a reference to + * the singleton {@link #INSTANCE} and then chain calls to the utility methods. + * + * @author Gary Russell + * @author Artem Bilan + * + * @since 2.3 + */ +public final class JavaUtils { + + /** + * The singleton instance of this utility class. + */ + public static final JavaUtils INSTANCE = new JavaUtils(); + + private JavaUtils() { + super(); + } + + /** + * Invoke {@link Consumer#accept(Object)} with the value if the condition is true. + * @param condition the condition. + * @param value the value. + * @param consumer the consumer. + * @param the value type. + * @return this. + */ + public JavaUtils acceptIfCondition(boolean condition, T value, Consumer consumer) { + if (condition) { + consumer.accept(value); + } + return this; + } + + /** + * Invoke {@link Consumer#accept(Object)} with the value if it is not null. + * @param value the value. + * @param consumer the consumer. + * @param the value type. + * @return this. + */ + public JavaUtils acceptIfNotNull(T value, Consumer consumer) { + if (value != null) { + consumer.accept(value); + } + return this; + } + + /** + * Invoke {@link Consumer#accept(Object)} with the value if it is not null or empty. + * @param value the value. + * @param consumer the consumer. + * @return this. + */ + public JavaUtils acceptIfHasText(String value, Consumer consumer) { + if (StringUtils.hasText(value)) { + consumer.accept(value); + } + return this; + } + + /** + * Invoke {@link Consumer#accept(Object)} with the value if it is not null or empty. + * @param value the value. + * @param consumer the consumer. + * @param the value type. + * @return this. + */ + public JavaUtils acceptIfNotEmpty(List value, Consumer> consumer) { + if (!CollectionUtils.isEmpty(value)) { + consumer.accept(value); + } + return this; + } + + /** + * Invoke {@link Consumer#accept(Object)} with the value if it is not null or empty. + * @param value the value. + * @param consumer the consumer. + * @param the value type. + * @return this. + */ + public JavaUtils acceptIfNotEmpty(T[] value, Consumer consumer) { + if (!ObjectUtils.isEmpty(value)) { + consumer.accept(value); + } + return this; + } + + /** + * Invoke {@link BiConsumer#accept(Object, Object)} with the arguments if the + * condition is true. + * @param condition the condition. + * @param t1 the first consumer argument + * @param t2 the second consumer argument + * @param consumer the consumer. + * @param the first argument type. + * @param the second argument type. + * @return this. + */ + public JavaUtils acceptIfCondition(boolean condition, T1 t1, T2 t2, BiConsumer consumer) { + if (condition) { + consumer.accept(t1, t2); + } + return this; + } + + /** + * Invoke {@link BiConsumer#accept(Object, Object)} with the arguments if the t2 + * argument is not null. + * @param t1 the first argument + * @param t2 the second consumer argument + * @param consumer the consumer. + * @param the first argument type. + * @param the second argument type. + * @return this. + */ + public JavaUtils acceptIfNotNull(T1 t1, T2 t2, BiConsumer consumer) { + if (t2 != null) { + consumer.accept(t1, t2); + } + return this; + } + + /** + * Invoke {@link BiConsumer#accept(Object, Object)} with the arguments if the value + * argument is not null or empty. + * @param t1 the first consumer argument. + * @param value the second consumer argument + * @param the first argument type. + * @param consumer the consumer. + * @return this. + */ + public JavaUtils acceptIfHasText(T t1, String value, BiConsumer consumer) { + if (StringUtils.hasText(value)) { + consumer.accept(t1, value); + } + return this; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java index 67f96cdfbf..46d4b363e5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java @@ -189,4 +189,10 @@ public abstract class KafkaHeaders { */ public static final String DLT_ORIGINAL_TIMESTAMP_TYPE = PREFIX + "dlt-original-timestamp-type"; + /** + * For inbound messages, the container's {@code group.id} consumer property. + * @since 2.3 + */ + public static final String GROUP_ID = PREFIX + "groupId"; + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java index 07afc8d991..22a02ea740 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java @@ -33,6 +33,8 @@ */ public final class KafkaUtils { + private static ThreadLocal groupIds = new ThreadLocal<>(); + /** * Return true if the method return type is {@link Message} or * {@code Collection>}. @@ -63,6 +65,32 @@ public static boolean returnTypeMessageOrCollectionOf(Method method) { } + /** + * Set the group id for the consumer bound to this thread. + * @param groupId the group id. + * @since 2.3 + */ + public static void setConsumerGroupId(String groupId) { + KafkaUtils.groupIds.set(groupId); + } + + /** + * Get the group id for the consumer bound to this thread. + * @return the group id. + * @since 2.3 + */ + public static String getConsumerGroupId() { + return KafkaUtils.groupIds.get(); + } + + /** + * Clear the group id for the consumer bound to this thread. + * @since 2.3 + */ + public static void clearConsumerGroupId() { + KafkaUtils.groupIds.remove(); + } + private KafkaUtils() { super(); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 9fc484dced..57788f23e6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -138,25 +138,14 @@ public Message toMessage(List> records, Acknowledgment a List timestamps = new ArrayList<>(); List> convertedHeaders = new ArrayList<>(); List natives = new ArrayList<>(); - rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, keys); - rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topics); - rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, partitions); - rawHeaders.put(KafkaHeaders.OFFSET, offsets); - rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, timestampTypes); - rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, timestamps); if (this.headerMapper != null) { rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders); } else { rawHeaders.put(KafkaHeaders.NATIVE_HEADERS, natives); } - - if (acknowledgment != null) { - rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment); - } - if (consumer != null) { - rawHeaders.put(KafkaHeaders.CONSUMER, consumer); - } + commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes, + timestamps); boolean logged = false; for (ConsumerRecord record : records) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java index 6822776d8b..aa640e71c7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java @@ -16,6 +16,16 @@ package org.springframework.kafka.support.converter; +import java.util.Map; + +import org.apache.kafka.clients.consumer.Consumer; + +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.JavaUtils; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaUtils; +import org.springframework.lang.Nullable; + /** * A top level interface for message converters. * @@ -25,4 +35,26 @@ */ public interface MessageConverter { + @Nullable + static String getGroupid() { + String groupId = KafkaUtils.getConsumerGroupId(); + return groupId == null ? null : groupId; + } + + default void commonHeaders(Acknowledgment acknowledgment, Consumer consumer, Map rawHeaders, + Object theKey, Object topic, Object partition, Object offset, Object timestampType, Object timestamp) { + + rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, theKey); + rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topic); + rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, partition); + rawHeaders.put(KafkaHeaders.OFFSET, offset); + rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, timestampType); + rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, timestamp); + JavaUtils.INSTANCE + .acceptIfNotNull(KafkaHeaders.GROUP_ID, MessageConverter.getGroupid(), + (key, val) -> rawHeaders.put(key, val)) + .acceptIfNotNull(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment, (key, val) -> rawHeaders.put(key, val)) + .acceptIfNotNull(KafkaHeaders.CONSUMER, consumer, (key, val) -> rawHeaders.put(key, val)); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index c17254b6ad..6720be0ed8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -117,19 +117,8 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle } rawHeaders.put(KafkaHeaders.NATIVE_HEADERS, record.headers()); } - rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, record.key()); - rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, record.topic()); - rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, record.partition()); - rawHeaders.put(KafkaHeaders.OFFSET, record.offset()); - rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, record.timestampType().name()); - rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, record.timestamp()); - - if (acknowledgment != null) { - rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment); - } - if (consumer != null) { - rawHeaders.put(KafkaHeaders.CONSUMER, consumer); - } + commonHeaders(acknowledgment, consumer, rawHeaders, record.key(), record.topic(), record.partition(), + record.offset(), record.timestampType().name(), record.timestamp()); return MessageBuilder.createMessage(extractAndConvertValue(record, type), kafkaMessageHeaders); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 707f3f3a3b..38fce828bc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -215,6 +215,7 @@ public void testSimple() throws Exception { template.flush(); assertThat(this.listener.latch1.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.globalErrorThrowable).isNotNull(); + assertThat(this.listener.receivedGroupId).isEqualTo("foo"); template.send("annotated2", 0, 123, "foo"); template.flush(); @@ -222,6 +223,7 @@ public void testSimple() throws Exception { assertThat(this.listener.key).isEqualTo(123); assertThat(this.listener.partition).isNotNull(); assertThat(this.listener.topic).isEqualTo("annotated2"); + assertThat(this.listener.receivedGroupId).isEqualTo("bar"); template.send("annotated3", 0, "foo"); template.flush(); @@ -442,6 +444,7 @@ public void testBatch() throws Exception { assertThat(list.get(0)).isInstanceOf(String.class); assertThat(this.recordFilter.called).isTrue(); assertThat(this.config.listen10Exception).isNotNull(); + assertThat(this.listener.receivedGroupId).isEqualTo("list1"); assertThat(this.config.spyLatch.await(30, TimeUnit.SECONDS)).isTrue(); } @@ -1375,6 +1378,8 @@ static class Listener implements ConsumerSeekAware { private volatile ConsumerRecords consumerRecords; + private volatile String receivedGroupId; + @KafkaListener(id = "manualStart", topics = "manualStart", containerFactory = "kafkaAutoStartFalseListenerContainerFactory") public void manualStart(String foo) { @@ -1383,7 +1388,8 @@ public void manualStart(String foo) { private final AtomicBoolean reposition1 = new AtomicBoolean(); @KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}") - public void listen1(String foo) { + public void listen1(String foo, @Header(value = KafkaHeaders.GROUP_ID, required = false) String groupId) { + this.receivedGroupId = groupId; if (this.reposition1.compareAndSet(false, true)) { throw new RuntimeException("reposition"); } @@ -1392,12 +1398,14 @@ public void listen1(String foo) { @KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}") public void listen2(@Payload String foo, + @Header(KafkaHeaders.GROUP_ID) String groupId, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { this.key = key; this.partition = partition; this.topic = topic; + this.receivedGroupId = groupId; this.latch2.countDown(); if (this.latch2.getCount() > 0) { this.seekCallBack.get().seek(topic, partition, 0); @@ -1485,11 +1493,12 @@ public void listen9(Object payload) { @KafkaListener(id = "list1", topics = "annotated14", containerFactory = "batchSpyFactory", errorHandler = "listen10ErrorHandler") - public void listen10(List list) { + public void listen10(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { if (this.reposition10.compareAndSet(false, true)) { throw new RuntimeException("reposition"); } this.payload = list; + this.receivedGroupId = groupId; this.latch10.countDown(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index 9ccbe6d06d..f0bbf922c0 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -55,6 +55,7 @@ import org.springframework.kafka.support.CompositeProducerListener; import org.springframework.kafka.support.DefaultKafkaHeaderMapper; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.converter.MessagingMessageConverter; @@ -231,6 +232,7 @@ public void testWithMessage() throws Exception { Acknowledgment ack = mock(Acknowledgment.class); Consumer mockConsumer = mock(Consumer.class); + KafkaUtils.setConsumerGroupId("test.group.id"); Message recordToMessage = messageConverter.toMessage(r2, ack, mockConsumer, String.class); assertThat(recordToMessage.getHeaders().get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME"); @@ -240,7 +242,8 @@ public void testWithMessage() throws Exception { assertThat(recordToMessage.getHeaders().get(KafkaHeaders.CONSUMER)).isSameAs(mockConsumer); assertThat(recordToMessage.getHeaders().get("foo")).isEqualTo("bar"); assertThat(recordToMessage.getPayload()).isEqualTo("foo-message-2"); - + assertThat(recordToMessage.getHeaders().get(KafkaHeaders.GROUP_ID)).isEqualTo("test.group.id"); + KafkaUtils.clearConsumerGroupId(); pf.destroy(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java index 8bed127fee..d19d1bca26 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java @@ -33,6 +33,9 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaUtils; +import org.springframework.messaging.handler.annotation.Header; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -51,19 +54,24 @@ public void testKafkaNullInList(@Autowired KafkaListenerEndpointRegistry registr BatchMessagingMessageListenerAdapter adapter = (BatchMessagingMessageListenerAdapter) registry .getListenerContainer("foo").getContainerProperties().getMessageListener(); + KafkaUtils.setConsumerGroupId("test.group"); adapter.onMessage(Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, null, null)), null, null); assertThat(foo.value).isNull(); + assertThat(foo.group).isEqualTo("test.group"); } public static class Foo { - public String value = "someValue"; + public volatile String value = "someValue"; + + public volatile String group; @KafkaListener(id = "foo", topics = "foo", autoStartup = "false") - public void listen(List list) { + public void listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { list.forEach(s -> { this.value = s; }); + this.group = groupId; } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java index 11a79dd839..25cdc12629 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java @@ -36,6 +36,7 @@ import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -46,7 +47,7 @@ public class BatchMessageConverterTests { @Test - public void testBatchConverters() throws Exception { + public void testBatchConverters() { BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter(); MessageHeaders headers = testGuts(batchMessageConverter); @@ -89,8 +90,8 @@ private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) { Acknowledgment ack = mock(Acknowledgment.class); Consumer consumer = mock(Consumer.class); - Message message = batchMessageConverter.toMessage(consumerRecords, ack, consumer, - String.class); + KafkaUtils.setConsumerGroupId("test.g"); + Message message = batchMessageConverter.toMessage(consumerRecords, ack, consumer, String.class); assertThat(message.getPayload()) .isEqualTo(Arrays.asList("value1", "value2", "value3")); @@ -109,6 +110,8 @@ private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) { .isEqualTo(Arrays.asList(1487694048607L, 1487694048608L, 1487694048609L)); assertThat(headers.get(KafkaHeaders.ACKNOWLEDGMENT)).isSameAs(ack); assertThat(headers.get(KafkaHeaders.CONSUMER)).isSameAs(consumer); + assertThat(headers.get(KafkaHeaders.GROUP_ID)).isEqualTo("test.g"); + KafkaUtils.clearConsumerGroupId(); return headers; } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 6ab0862d98..017f893c44 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -1219,6 +1219,29 @@ The properties are specified as individual strings with the normal Java `Propert ---- ==== +[[listener-group-id]] +===== Obtaining the Consumer `group.id` + +When running the same listener code in multiple containers, it may be useful to be able to determine which container (identified by its `group.id` consumer property) that a record came from. + +You can call `KafkaUtils.getConsumerGroupId()` on the listener thread to do this. +Alternatively, you can access the group id in a method parameter. + +==== +[source, java] +---- +@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}") +public void listener(@Payload String foo, + @Header(KafkaHeaders.GROUP_ID) String groupId) { +... +} +---- +==== + +IMPORTANT: This is available in record listeners and batch listeners that receive a `List` of records. +It is **not** available in a batch listener that receives a `ConsumerRecords` argument. +Use the `KafkaUtils` mechanism in that case. + ===== Container Thread Naming Listener containers currently use two task executors, one to invoke the consumer and another that is used to invoke the listener when the kafka consumer property `enable.auto.commit` is `false`. diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 65a8e1fd11..c91007bfc3 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -17,6 +17,10 @@ Because the listener container has it's own mechanism for committing offsets, it It now sets it to false automatically unless specifically set in the consumer factory or the container's consumer property overrides. The `ackOnError` property is now `false` by default. +See <> for more information. + +It is now possible to obtain the consumer's `group.id` property in the listener method. +See <> for more information. ==== ErrorHandler Changes