From e3f9e8c77bb26dfe8563a8470206c7044303825e Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 25 Mar 2019 17:26:46 -0400 Subject: [PATCH 1/3] GH-1026: Support KafkaHeaders.GROUP_ID Resolves https://github.com/spring-projects/spring-kafka/issues/1026 Also improve `DefaultKafkaHeaderMapper` with a `NeverMatchHeaderMatcher`. --- .../kafka/annotation/KafkaListener.java | 9 + ...kaListenerAnnotationBeanPostProcessor.java | 3 + ...AbstractKafkaListenerContainerFactory.java | 77 +++----- .../config/AbstractKafkaListenerEndpoint.java | 16 ++ .../kafka/config/KafkaListenerEndpoint.java | 8 + .../config/KafkaListenerEndpointAdapter.java | 5 + .../config/MethodKafkaListenerEndpoint.java | 20 +- .../kafka/core/KafkaOperations.java | 3 +- .../kafka/core/KafkaTemplate.java | 3 +- .../kafka/core/ProducerFactoryUtils.java | 19 +- .../KafkaMessageListenerContainer.java | 8 +- .../BatchMessagingMessageListenerAdapter.java | 4 +- .../MessagingMessageListenerAdapter.java | 50 +++-- .../support/AbstractKafkaHeaderMapper.java | 105 ++++++++--- .../support/DefaultKafkaHeaderMapper.java | 14 +- .../kafka/support/JavaUtils.java | 175 ++++++++++++++++++ .../kafka/support/KafkaHeaders.java | 6 + .../kafka/support/KafkaUtils.java | 28 +++ .../converter/BatchMessageConverter.java | 3 +- .../BatchMessagingMessageConverter.java | 8 +- .../converter/MessagingMessageConverter.java | 6 +- .../converter/RecordMessageConverter.java | 5 +- .../EnableKafkaIntegrationTests.java | 17 +- .../kafka/core/KafkaTemplateTests.java | 2 +- ...hMessagingMessageListenerAdapterTests.java | 14 +- .../MessagingMessageListenerAdapterTests.java | 4 +- .../converter/BatchMessageConverterTests.java | 6 +- src/reference/asciidoc/kafka.adoc | 23 +++ src/reference/asciidoc/whats-new.adoc | 4 + 29 files changed, 499 insertions(+), 146 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/support/JavaUtils.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java index 8a24074476..1c8a6a534b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java @@ -241,4 +241,13 @@ */ String[] properties() default {}; + /** + * When set to true, enable the listener code to access the consumer's {@link code + * group.id} property using a {@code @Header(KafkaHeaders.GROUP_ID} String groupId) + * method parameter. Useful if you use the same listener code in multiple containers. + * @return true to provide the header. + * @since 2.3 + */ + String exposeGroupId() default "false"; + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index b341b2bb10..92a5f6b353 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -446,6 +446,9 @@ protected void processListener(MethodKafkaListenerEndpoint endpoint, Kafka endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup")); } resolveKafkaProperties(endpoint, kafkaListener.properties()); + if (StringUtils.hasText(kafkaListener.exposeGroupId())) { + endpoint.setExposeGroupId(resolveExpressionAsBoolean(kafkaListener.exposeGroupId(), "exposeGroupId")); + } KafkaListenerContainerFactory factory = null; String containerFactoryBeanName = resolve(kafkaListener.containerFactory()); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index 4deb3c65da..24a5877eca 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -39,6 +39,7 @@ import org.springframework.kafka.listener.adapter.RecordFilterStrategy; import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer; import org.springframework.kafka.requestreply.ReplyingKafkaOperations; +import org.springframework.kafka.support.JavaUtils; import org.springframework.kafka.support.TopicPartitionInitialOffset; import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.retry.RecoveryCallback; @@ -286,10 +287,8 @@ public void afterPropertiesSet() { @Override public C createListenerContainer(KafkaListenerEndpoint endpoint) { C instance = createContainerInstance(endpoint); - - if (endpoint.getId() != null) { - instance.setBeanName(endpoint.getId()); - } + JavaUtils.INSTANCE + .acceptIfNotNull(endpoint.getId(), instance::setBeanName); if (endpoint instanceof AbstractKafkaListenerEndpoint) { configureEndpoint((AbstractKafkaListenerEndpoint) endpoint); } @@ -301,30 +300,15 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) { } private void configureEndpoint(AbstractKafkaListenerEndpoint aklEndpoint) { - if (this.recordFilterStrategy != null) { - aklEndpoint.setRecordFilterStrategy(this.recordFilterStrategy); - } - if (this.ackDiscarded != null) { - aklEndpoint.setAckDiscarded(this.ackDiscarded); - } - if (this.retryTemplate != null) { - aklEndpoint.setRetryTemplate(this.retryTemplate); - } - if (this.recoveryCallback != null) { - aklEndpoint.setRecoveryCallback(this.recoveryCallback); - } - if (this.statefulRetry != null) { - aklEndpoint.setStatefulRetry(this.statefulRetry); - } - if (this.batchListener != null) { - aklEndpoint.setBatchListener(this.batchListener); - } - if (this.replyTemplate != null) { - aklEndpoint.setReplyTemplate(this.replyTemplate); - } - if (this.replyHeadersConfigurer != null) { - aklEndpoint.setReplyHeadersConfigurer(this.replyHeadersConfigurer); - } + JavaUtils.INSTANCE + .acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy) + .acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded) + .acceptIfNotNull(this.retryTemplate, aklEndpoint::setRetryTemplate) + .acceptIfNotNull(this.recoveryCallback, aklEndpoint::setRecoveryCallback) + .acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry) + .acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener) + .acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate) + .acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer); } /** @@ -345,35 +329,26 @@ protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) { ContainerProperties properties = instance.getContainerProperties(); BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern", "messageListener", "ackCount", "ackTime"); - if (this.afterRollbackProcessor != null) { - instance.setAfterRollbackProcessor(this.afterRollbackProcessor); - } - if (this.containerProperties.getAckCount() > 0) { - properties.setAckCount(this.containerProperties.getAckCount()); - } - if (this.containerProperties.getAckTime() > 0) { - properties.setAckTime(this.containerProperties.getAckTime()); - } - if (this.errorHandler != null) { - instance.setGenericErrorHandler(this.errorHandler); - } + JavaUtils.INSTANCE + .acceptIfNotNull(this.afterRollbackProcessor, instance::setAfterRollbackProcessor) + .acceptIfCondition(this.containerProperties.getAckCount() > 0, this.containerProperties.getAckCount(), + properties::setAckCount) + .acceptIfCondition(this.containerProperties.getAckTime() > 0, this.containerProperties.getAckTime(), + properties::setAckTime) + .acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler); if (endpoint.getAutoStartup() != null) { instance.setAutoStartup(endpoint.getAutoStartup()); } else if (this.autoStartup != null) { instance.setAutoStartup(this.autoStartup); } - if (this.phase != null) { - instance.setPhase(this.phase); - } - if (this.applicationEventPublisher != null) { - instance.setApplicationEventPublisher(this.applicationEventPublisher); - } - instance.getContainerProperties().setGroupId(endpoint.getGroupId()); - instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix()); - if (endpoint.getConsumerProperties() != null) { - instance.getContainerProperties().setConsumerProperties(endpoint.getConsumerProperties()); - } + JavaUtils.INSTANCE + .acceptIfNotNull(this.phase, instance::setPhase) + .acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher) + .acceptIfNotNull(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId) + .acceptIfNotNull(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId) + .acceptIfNotNull(endpoint.getConsumerProperties(), + instance.getContainerProperties()::setConsumerProperties); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 703e0c4d22..7db28d5ef4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -113,6 +113,8 @@ public abstract class AbstractKafkaListenerEndpoint private Properties consumerProperties; + private Boolean exposeGroupId; + @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; @@ -411,6 +413,20 @@ public void setConsumerProperties(Properties consumerProperties) { this.consumerProperties = consumerProperties; } + @Override + public Boolean getExposeGroupId() { + return this.exposeGroupId; + } + + /** + * Set to true to expose the {@code group.id} property in a header. + * @param exposeGroupId true to expose. + * @since 2.3 + */ + public void setExposeGroupId(Boolean exposeGroupId) { + this.exposeGroupId = exposeGroupId; + } + @Override public void afterPropertiesSet() { boolean topicsEmpty = getTopics().isEmpty(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java index c3ec65c0b7..c5f0293d8f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java @@ -115,6 +115,14 @@ default Properties getConsumerProperties() { return null; } + /** + * Whether or not to expose the {@code group.id} as a header. + * @return true to expose. + * @since 2.3 + */ + @Nullable + Boolean getExposeGroupId(); + /** * Setup the specified message listener container with the model * defined by this endpoint. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java index c81ab4fc57..b16abe4d05 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java @@ -82,6 +82,11 @@ public Boolean getAutoStartup() { // NOSONAR return null; // NOSONAR null check by caller } + @Override + public Boolean getExposeGroupId() { + return Boolean.FALSE; + } + @Override public void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 190e7b755d..228bcc2efe 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -30,6 +30,7 @@ import org.springframework.kafka.listener.adapter.HandlerAdapter; import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; +import org.springframework.kafka.support.JavaUtils; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; @@ -147,19 +148,20 @@ protected MessageHandlerMethodFactory getMessageHandlerMethodFactory() { @Override protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container, MessageConverter messageConverter) { + Assert.state(this.messageHandlerMethodFactory != null, "Could not create message listener - MessageHandlerMethodFactory not set"); MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(messageConverter); messageListener.setHandlerMethod(configureListenerAdapter(messageListener)); - String replyTopic = getReplyTopic(); - if (replyTopic != null) { - Assert.state(getMethod().getReturnType().equals(void.class) - || getReplyTemplate() != null, "a KafkaTemplate is required to support replies"); - messageListener.setReplyTopic(replyTopic); - } - if (getReplyTemplate() != null) { - messageListener.setReplyTemplate(getReplyTemplate()); - } + JavaUtils.INSTANCE + .acceptIfNotNull(getReplyTopic(), replyTopic -> { + Assert.state(getMethod().getReturnType().equals(void.class) + || getReplyTemplate() != null, "a KafkaTemplate is required to support replies"); + messageListener.setReplyTopic(replyTopic); + }) + .acceptIfNotNull(getReplyTemplate(), messageListener::setReplyTemplate) + .acceptIfNotNull(getExposeGroupId(), messageListener::setExposeGroupId); + return messageListener; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java index 1b9a78fd92..db1d1e6101 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java @@ -186,7 +186,8 @@ public interface KafkaOperations { /** * When running in a transaction, send the consumer offset(s) to the transaction. The - * group id is obtained from {@link ProducerFactoryUtils#getConsumerGroupId()}. It is + * group id is obtained from + * {@link org.springframework.kafka.support.KafkaUtils#getConsumerGroupId()}. It is * not necessary to call this method if the operations are invoked on a listener * container thread (and the listener container is configured with a * {@link org.springframework.kafka.transaction.KafkaAwareTransactionManager}) since diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 5cf590ca55..4bfd1e3e51 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.SendResult; @@ -329,7 +330,7 @@ public void flush() { @Override public void sendOffsetsToTransaction(Map offsets) { - sendOffsetsToTransaction(offsets, ProducerFactoryUtils.getConsumerGroupId()); + sendOffsetsToTransaction(offsets, KafkaUtils.getConsumerGroupId()); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java index 97d699966a..e4207dfaf2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.producer.Producer; +import org.springframework.kafka.support.KafkaUtils; import org.springframework.lang.Nullable; import org.springframework.transaction.support.ResourceHolderSynchronization; import org.springframework.transaction.support.TransactionSynchronization; @@ -35,8 +36,6 @@ */ public final class ProducerFactoryUtils { - private static ThreadLocal groupIds = new ThreadLocal<>(); - private ProducerFactoryUtils() { super(); } @@ -82,27 +81,33 @@ public static void releaseResources(@Nullable KafkaResourceHolder r /** * Set the group id for the consumer bound to this thread. * @param groupId the group id. + * @deprecated in favor of {@link KafkaUtils#setConsumerGroupId(String)}. * @since 1.3 */ + @Deprecated public static void setConsumerGroupId(String groupId) { - groupIds.set(groupId); + KafkaUtils.setConsumerGroupId(groupId); } /** * Get the group id for the consumer bound to this thread. * @return the group id. + * @deprecated in favor of {@link KafkaUtils#getConsumerGroupId()}. * @since 1.3 */ + @Deprecated public static String getConsumerGroupId() { - return groupIds.get(); + return KafkaUtils.getConsumerGroupId(); } /** * Clear the group id for the consumer bound to this thread. + * @deprecated in favor of {@link KafkaUtils#clearConsumerGroupId()}. * @since 1.3 */ + @Deprecated public static void clearConsumerGroupId() { - groupIds.remove(); + KafkaUtils.clearConsumerGroupId(); } private static void bindResourceToTransaction(KafkaResourceHolder resourceHolder, @@ -151,8 +156,8 @@ public void afterCompletion(int status) { } @Override - protected void releaseResource(KafkaResourceHolder resourceHolder, Object resourceKey) { - ProducerFactoryUtils.releaseResources(resourceHolder); + protected void releaseResource(KafkaResourceHolder holder, Object resourceKey) { + ProducerFactoryUtils.releaseResources(holder); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 3634197a8d..7de59fcaed 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -56,7 +56,6 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.KafkaResourceHolder; import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.core.ProducerFactoryUtils; import org.springframework.kafka.event.ConsumerPausedEvent; import org.springframework.kafka.event.ConsumerResumedEvent; import org.springframework.kafka.event.ConsumerStoppedEvent; @@ -66,6 +65,7 @@ import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.LogIfLevelEnabled; import org.springframework.kafka.support.TopicPartitionInitialOffset; import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition; @@ -743,9 +743,7 @@ public void run() { if (this.genericListener instanceof ConsumerSeekAware) { ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this); } - if (this.transactionManager != null) { - ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId); - } + KafkaUtils.setConsumerGroupId(this.consumerGroupId); this.count = 0; this.last = System.currentTimeMillis(); initAssignedPartitions(); @@ -864,7 +862,7 @@ private void checkIdle() { } public void wrapUp() { - ProducerFactoryUtils.clearConsumerGroupId(); + KafkaUtils.clearConsumerGroupId(); publishConsumerStoppingEvent(this.consumer); if (!this.fatalError) { if (this.kafkaTxManager == null) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java index a2b070d954..556bc42ebc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java @@ -29,6 +29,7 @@ import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaNull; +import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.messaging.Message; @@ -173,7 +174,8 @@ protected void invoke(Object records, Acknowledgment acknowledgment, Consumer toMessagingMessage(List records, Acknowledgment acknowledgment, Consumer consumer) { - return getBatchMessageConverter().toMessage(records, acknowledgment, consumer, getType()); + return getBatchMessageConverter().toMessage(records, acknowledgment, consumer, getType(), + isExposeGroupId() ? KafkaUtils.getConsumerGroupId() : null); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 508c1ce9cd..de1e70ca5e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -57,6 +57,7 @@ import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessagingException; import org.springframework.messaging.converter.MessageConversionException; +import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -113,6 +114,8 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS private ReplyHeadersConfigurer replyHeadersConfigurer; + private boolean exposeGroupId; + public MessagingMessageListenerAdapter(Object bean, Method method) { this.bean = bean; this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final @@ -238,6 +241,14 @@ public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigu this.replyHeadersConfigurer = replyHeadersConfigurer; } + public boolean isExposeGroupId() { + return this.exposeGroupId; + } + + public void setExposeGroupId(boolean exposeGroupId) { + this.exposeGroupId = exposeGroupId; + } + @Override public void registerSeekCallback(ConsumerSeekCallback callback) { if (this.bean instanceof ConsumerSeekAware) { @@ -261,7 +272,8 @@ public void onIdleContainer(Map assignments, ConsumerSeekC protected Message toMessagingMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer) { - return getMessageConverter().toMessage(record, acknowledgment, consumer, getType()); + return getMessageConverter().toMessage(record, acknowledgment, consumer, getType(), + this.exposeGroupId ? KafkaUtils.getConsumerGroupId() : null); } /** @@ -475,7 +487,7 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity } Type genericParameterType = null; - boolean hasConsumerParameter = false; + int allowedBatchParameters = 1; for (int i = 0; i < method.getParameterCount(); i++) { MethodParameter methodParameter = new MethodParameter(method, i); @@ -529,23 +541,33 @@ else if (parameterizedType.getRawType().equals(List.class) } else if (methodParameter.getGenericParameterType().equals(Acknowledgment.class)) { this.hasAckParameter = true; + allowedBatchParameters++; + } + else if (methodParameter.hasParameterAnnotation(Header.class)) { + Header header = methodParameter.getParameterAnnotation(Header.class); + if (header.value().equals(KafkaHeaders.GROUP_ID)) { + allowedBatchParameters++; + } } else { if (methodParameter.getGenericParameterType().equals(Consumer.class)) { - hasConsumerParameter = true; + allowedBatchParameters++; } else { Type parameterType = methodParameter.getGenericParameterType(); - hasConsumerParameter = parameterType instanceof ParameterizedType - && ((ParameterizedType) parameterType).getRawType().equals(Consumer.class); + if (parameterType instanceof ParameterizedType + && ((ParameterizedType) parameterType).getRawType().equals(Consumer.class)) { + allowedBatchParameters++; + } } } } - boolean validParametersForBatch = validParametersForBatch(method.getGenericParameterTypes().length, - this.hasAckParameter, hasConsumerParameter); + boolean validParametersForBatch = method.getGenericParameterTypes().length <= allowedBatchParameters; + if (!validParametersForBatch) { String stateMessage = "A parameter of type '%s' must be the only parameter " - + "(except for an optional 'Acknowledgment' and/or 'Consumer')"; + + "(except for an optional 'Acknowledgment' and/or 'Consumer' " + + "and/or '@Header(KafkaHeaders.GROUP_ID) String groupId'"; Assert.state(!this.isConsumerRecords, () -> String.format(stateMessage, "ConsumerRecords")); Assert.state(!this.isConsumerRecordList, @@ -557,18 +579,6 @@ else if (methodParameter.getGenericParameterType().equals(Acknowledgment.class)) return genericParameterType; } - private boolean validParametersForBatch(int parameterCount, boolean hasAck, boolean hasConsumer) { - if (hasAck) { - return parameterCount == 2 || (hasConsumer && parameterCount == 3); // NOSONAR magic # - } - else if (hasConsumer) { - return parameterCount == 2; // NOSONAR magic # - } - else { - return parameterCount == 1; // NOSONAR magic # - } - } - /* * Don't consider parameter types that are available after conversion. * Acknowledgment, ConsumerRecord, Consumer, ConsumerRecord<...>, Consumer<...>, and Message. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index bdece396e8..46a335d950 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -21,9 +21,12 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,26 +47,29 @@ */ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { - protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR - - private static final List NEVER_MAPPED = Arrays.asList( - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.ACKNOWLEDGMENT), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.CONSUMER), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.MESSAGE_KEY), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.OFFSET), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.PARTITION_ID), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.RAW_DATA), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.RECEIVED_MESSAGE_KEY), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.RECEIVED_PARTITION_ID), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.RECEIVED_TIMESTAMP), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.RECEIVED_TOPIC), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.TIMESTAMP), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.TIMESTAMP_TYPE), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.BATCH_CONVERTED_HEADERS), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.NATIVE_HEADERS), - new SimplePatternBasedHeaderMatcher("!" + KafkaHeaders.TOPIC)); - - protected final List matchers = new ArrayList<>(NEVER_MAPPED); // NOSONAR + protected final Log LOGGER = LogFactory.getLog(getClass()); // NOSONAR + + private static final List NEVER_MAPPED = Collections.singletonList( + new NeverMatchHeaderMatcher(Arrays.stream(new String[] { + KafkaHeaders.ACKNOWLEDGMENT, + KafkaHeaders.CONSUMER, + KafkaHeaders.MESSAGE_KEY, + KafkaHeaders.OFFSET, + KafkaHeaders.PARTITION_ID, + KafkaHeaders.RAW_DATA, + KafkaHeaders.RECEIVED_MESSAGE_KEY, + KafkaHeaders.RECEIVED_PARTITION_ID, + KafkaHeaders.RECEIVED_TIMESTAMP, + KafkaHeaders.RECEIVED_TOPIC, + KafkaHeaders.TIMESTAMP, + KafkaHeaders.TIMESTAMP_TYPE, + KafkaHeaders.BATCH_CONVERTED_HEADERS, + KafkaHeaders.NATIVE_HEADERS, + KafkaHeaders.TOPIC, + KafkaHeaders.GROUP_ID + }).collect(Collectors.toSet()))); + + private final List matchers = new ArrayList<>(NEVER_MAPPED); private final Map rawMappedtHeaders = new HashMap<>(); @@ -125,8 +131,8 @@ protected boolean matches(String header, Object value) { if (matches(header)) { if ((header.equals(MessageHeaders.REPLY_CHANNEL) || header.equals(MessageHeaders.ERROR_CHANNEL)) && !(value instanceof String)) { - if (this.logger.isDebugEnabled()) { - this.logger.debug("Cannot map " + header + " when type is [" + value.getClass() + if (this.LOGGER.isDebugEnabled()) { + this.LOGGER.debug("Cannot map " + header + " when type is [" + value.getClass() + "]; it must be a String"); } return false; @@ -137,13 +143,13 @@ protected boolean matches(String header, Object value) { } protected boolean matches(String header) { - for (SimplePatternBasedHeaderMatcher matcher : this.matchers) { + for (HeaderMatcher matcher : this.matchers) { if (matcher.matchHeader(header)) { return !matcher.isNegated(); } } - if (this.logger.isDebugEnabled()) { - this.logger.debug(MessageFormat.format("headerName=[{0}] WILL NOT be mapped; matched no patterns", + if (this.LOGGER.isDebugEnabled()) { + this.LOGGER.debug(MessageFormat.format("headerName=[{0}] WILL NOT be mapped; matched no patterns", header)); } return false; @@ -200,13 +206,58 @@ private String mapRawIn(String header, byte[] value) { } + /** + * A matcher for headers. + * @since 2.3 + */ + protected interface HeaderMatcher { + + /** + * Return true if the header matches. + * @param headerName the header name. + * @return true for a match. + */ + boolean matchHeader(String headerName); + + /** + * Return true if this matcher is a negative matcher. + * @return true for a negative matcher. + */ + boolean isNegated(); + + } + + /** + * A matcher that never matches a set of headers. + * @since 2.3 + */ + protected static class NeverMatchHeaderMatcher implements HeaderMatcher { + + private final Set neverMatchHeaders; + + protected NeverMatchHeaderMatcher(Set neverMatchHeaders) { + this.neverMatchHeaders = neverMatchHeaders; + } + + @Override + public boolean matchHeader(String headerName) { + return this.neverMatchHeaders.contains(headerName); + } + + @Override + public boolean isNegated() { + return true; + } + + } + /** * A pattern-based header matcher that matches if the specified * header matches the specified simple pattern. *

The {@code negate == true} state indicates if the matching should be treated as "not matched". * @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String) */ - protected static class SimplePatternBasedHeaderMatcher { + protected static class SimplePatternBasedHeaderMatcher implements HeaderMatcher { private static final Log logger = LogFactory.getLog(SimplePatternBasedHeaderMatcher.class); // NOSONAR @@ -224,6 +275,7 @@ public SimplePatternBasedHeaderMatcher(String pattern) { this.negate = negate; } + @Override public boolean matchHeader(String headerName) { String header = headerName.toLowerCase(); if (PatternMatchUtils.simpleMatch(this.pattern, header)) { @@ -239,6 +291,7 @@ public boolean matchHeader(String headerName) { return false; } + @Override public boolean isNegated() { return this.negate; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java index 6751a6adc6..c27e3f8f17 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java @@ -231,8 +231,8 @@ public void fromHeaders(MessageHeaders headers, Headers target) { jsonHeaders.put(key, className); } catch (@SuppressWarnings("unused") Exception e) { - if (logger.isDebugEnabled()) { - logger.debug("Could not map " + key + " with type " + valueToAdd.getClass().getName()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Could not map " + key + " with type " + valueToAdd.getClass().getName()); } } } @@ -243,7 +243,7 @@ public void fromHeaders(MessageHeaders headers, Headers target) { target.add(new RecordHeader(JSON_TYPES, headerObjectMapper.writeValueAsBytes(jsonHeaders))); } catch (IllegalStateException | JsonProcessingException e) { - logger.error("Could not add json types header", e); + LOGGER.error("Could not add json types header", e); } } } @@ -264,7 +264,7 @@ public void toHeaders(Headers source, final Map headers) { } } catch (Exception e) { - logger.error("Could not load class for header: " + header.key(), e); + LOGGER.error("Could not load class for header: " + header.key(), e); } if (trusted) { try { @@ -272,7 +272,7 @@ public void toHeaders(Headers source, final Map headers) { headers.put(header.key(), value); } catch (IOException e) { - logger.error("Could not decode json type: " + new String(header.value()) + " for key: " + header + LOGGER.error("Could not decode json type: " + new String(header.value()) + " for key: " + header .key(), e); headers.put(header.key(), header.value()); @@ -303,7 +303,7 @@ private Object decodeValue(Header h, Class type) ClassUtils.forName(nth.getUntrustedType(), null)); } catch (Exception e) { - logger.error("Could not decode header: " + nth, e); + LOGGER.error("Could not decode header: " + nth, e); } } } @@ -323,7 +323,7 @@ private Map decodeJsonTypes(Headers source) { types = headerObjectMapper.readValue(next.value(), Map.class); } catch (IOException e) { - logger.error("Could not decode json types: " + new String(next.value()), e); + LOGGER.error("Could not decode json types: " + new String(next.value()), e); } break; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/JavaUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/support/JavaUtils.java new file mode 100644 index 0000000000..16b38ce824 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/JavaUtils.java @@ -0,0 +1,175 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.support; + +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; +import org.springframework.util.StringUtils; + +/** + * Chained utility methods to simplify some Java repetitive code. Obtain a reference to + * the singleton {@link #INSTANCE} and then chain calls to the utility methods. + * + * @author Gary Russell + * @author Artem Bilan + * + * @since 2.3 + */ +public final class JavaUtils { + + /** + * The singleton instance of this utility class. + */ + public static final JavaUtils INSTANCE = new JavaUtils(); + + private JavaUtils() { + super(); + } + + /** + * Invoke {@link Consumer#accept(Object)} with the value if the condition is true. + * @param condition the condition. + * @param value the value. + * @param consumer the consumer. + * @param the value type. + * @return this. + */ + public JavaUtils acceptIfCondition(boolean condition, T value, Consumer consumer) { + if (condition) { + consumer.accept(value); + } + return this; + } + + /** + * Invoke {@link Consumer#accept(Object)} with the value if it is not null. + * @param value the value. + * @param consumer the consumer. + * @param the value type. + * @return this. + */ + public JavaUtils acceptIfNotNull(T value, Consumer consumer) { + if (value != null) { + consumer.accept(value); + } + return this; + } + + /** + * Invoke {@link Consumer#accept(Object)} with the value if it is not null or empty. + * @param value the value. + * @param consumer the consumer. + * @return this. + * @since 5.2 + */ + public JavaUtils acceptIfHasText(String value, Consumer consumer) { + if (StringUtils.hasText(value)) { + consumer.accept(value); + } + return this; + } + + /** + * Invoke {@link Consumer#accept(Object)} with the value if it is not null or empty. + * @param value the value. + * @param consumer the consumer. + * @param the value type. + * @return this. + * @since 5.2 + */ + public JavaUtils acceptIfNotEmpty(List value, Consumer> consumer) { + if (!CollectionUtils.isEmpty(value)) { + consumer.accept(value); + } + return this; + } + + /** + * Invoke {@link Consumer#accept(Object)} with the value if it is not null or empty. + * @param value the value. + * @param consumer the consumer. + * @param the value type. + * @return this. + * @since 5.2 + */ + public JavaUtils acceptIfNotEmpty(T[] value, Consumer consumer) { + if (!ObjectUtils.isEmpty(value)) { + consumer.accept(value); + } + return this; + } + + /** + * Invoke {@link BiConsumer#accept(Object, Object)} with the arguments if the + * condition is true. + * @param condition the condition. + * @param t1 the first consumer argument + * @param t2 the second consumer argument + * @param consumer the consumer. + * @param the first argument type. + * @param the second argument type. + * @return this. + * @since 5.2 + */ + public JavaUtils acceptIfCondition(boolean condition, T1 t1, T2 t2, BiConsumer consumer) { + if (condition) { + consumer.accept(t1, t2); + } + return this; + } + + /** + * Invoke {@link BiConsumer#accept(Object, Object)} with the arguments if the t2 + * argument is not null. + * @param t1 the first argument + * @param t2 the second consumer argument + * @param consumer the consumer. + * @param the first argument type. + * @param the second argument type. + * @return this. + * @since 5.2 + */ + public JavaUtils acceptIfNotNull(T1 t1, T2 t2, BiConsumer consumer) { + if (t2 != null) { + consumer.accept(t1, t2); + } + return this; + } + + /** + * Invoke {@link BiConsumer#accept(Object, Object)} with the arguments if the value + * argument is not null or empty. + * @param t1 the first consumer argument. + * @param value the second consumer argument + * @param the first argument type. + * @param consumer the consumer. + * @return this. + * @since 5.2 + */ + public JavaUtils acceptIfHasText(T t1, String value, BiConsumer consumer) { + if (StringUtils.hasText(value)) { + consumer.accept(t1, value); + } + return this; + } + + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java index 67f96cdfbf..46d4b363e5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaHeaders.java @@ -189,4 +189,10 @@ public abstract class KafkaHeaders { */ public static final String DLT_ORIGINAL_TIMESTAMP_TYPE = PREFIX + "dlt-original-timestamp-type"; + /** + * For inbound messages, the container's {@code group.id} consumer property. + * @since 2.3 + */ + public static final String GROUP_ID = PREFIX + "groupId"; + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java index 07afc8d991..22a02ea740 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java @@ -33,6 +33,8 @@ */ public final class KafkaUtils { + private static ThreadLocal groupIds = new ThreadLocal<>(); + /** * Return true if the method return type is {@link Message} or * {@code Collection>}. @@ -63,6 +65,32 @@ public static boolean returnTypeMessageOrCollectionOf(Method method) { } + /** + * Set the group id for the consumer bound to this thread. + * @param groupId the group id. + * @since 2.3 + */ + public static void setConsumerGroupId(String groupId) { + KafkaUtils.groupIds.set(groupId); + } + + /** + * Get the group id for the consumer bound to this thread. + * @return the group id. + * @since 2.3 + */ + public static String getConsumerGroupId() { + return KafkaUtils.groupIds.get(); + } + + /** + * Clear the group id for the consumer bound to this thread. + * @since 2.3 + */ + public static void clearConsumerGroupId() { + KafkaUtils.groupIds.remove(); + } + private KafkaUtils() { super(); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessageConverter.java index 78cb48a239..498b1044b2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessageConverter.java @@ -41,10 +41,11 @@ public interface BatchMessageConverter extends MessageConverter { * @param acknowledgment the acknowledgment. * @param consumer the consumer. * @param payloadType the required payload type. + * @param groupId the {@code group.id} consumer property. * @return the message. */ Message toMessage(List> records, Acknowledgment acknowledgment, Consumer consumer, - Type payloadType); + Type payloadType, String groupId); /** * Convert a message to a producer record. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 9fc484dced..d85982ab9a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -124,7 +124,8 @@ public RecordMessageConverter getRecordMessageConverter() { @Override public Message toMessage(List> records, Acknowledgment acknowledgment, - Consumer consumer, Type type) { + Consumer consumer, Type type, String groupId) { + KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); @@ -157,6 +158,9 @@ public Message toMessage(List> records, Acknowledgment a if (consumer != null) { rawHeaders.put(KafkaHeaders.CONSUMER, consumer); } + if (groupId != null) { + rawHeaders.put(KafkaHeaders.GROUP_ID, groupId); + } boolean logged = false; for (ConsumerRecord record : records) { @@ -214,7 +218,7 @@ protected Object extractAndConvertValue(ConsumerRecord record, Type type) */ protected Object convert(ConsumerRecord record, Type type) { return this.recordConverter - .toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload(); + .toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0], null).getPayload(); } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index c17254b6ad..2f4dceb57e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -100,7 +100,8 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) { @Override public Message toMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer, - Type type) { + Type type, String groupId) { + KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); @@ -130,6 +131,9 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle if (consumer != null) { rawHeaders.put(KafkaHeaders.CONSUMER, consumer); } + if (groupId != null) { + rawHeaders.put(KafkaHeaders.GROUP_ID, groupId); + } return MessageBuilder.createMessage(extractAndConvertValue(record, type), kafkaMessageHeaders); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java index fe08cf313d..bf5bdd919e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.support.Acknowledgment; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; /** @@ -39,10 +40,12 @@ public interface RecordMessageConverter extends MessageConverter { * @param acknowledgment the acknowledgment. * @param consumer the consumer * @param payloadType the required payload type. + * @param groupId the {@code group.id} consumer property. * @return the message. + * @since 2.3 */ Message toMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer, - Type payloadType); + Type payloadType, @Nullable String groupId); /** * Convert a message to a producer record. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 707f3f3a3b..cf86b0dfd6 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -215,6 +215,7 @@ public void testSimple() throws Exception { template.flush(); assertThat(this.listener.latch1.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.globalErrorThrowable).isNotNull(); + assertThat(this.listener.receivedGroupId).isNull(); template.send("annotated2", 0, 123, "foo"); template.flush(); @@ -222,6 +223,7 @@ public void testSimple() throws Exception { assertThat(this.listener.key).isEqualTo(123); assertThat(this.listener.partition).isNotNull(); assertThat(this.listener.topic).isEqualTo("annotated2"); + assertThat(this.listener.receivedGroupId).isEqualTo("bar"); template.send("annotated3", 0, "foo"); template.flush(); @@ -442,6 +444,7 @@ public void testBatch() throws Exception { assertThat(list.get(0)).isInstanceOf(String.class); assertThat(this.recordFilter.called).isTrue(); assertThat(this.config.listen10Exception).isNotNull(); + assertThat(this.listener.receivedGroupId).isEqualTo("list1"); assertThat(this.config.spyLatch.await(30, TimeUnit.SECONDS)).isTrue(); } @@ -1375,6 +1378,8 @@ static class Listener implements ConsumerSeekAware { private volatile ConsumerRecords consumerRecords; + private volatile String receivedGroupId; + @KafkaListener(id = "manualStart", topics = "manualStart", containerFactory = "kafkaAutoStartFalseListenerContainerFactory") public void manualStart(String foo) { @@ -1383,21 +1388,24 @@ public void manualStart(String foo) { private final AtomicBoolean reposition1 = new AtomicBoolean(); @KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}") - public void listen1(String foo) { + public void listen1(String foo, @Header(value = KafkaHeaders.GROUP_ID, required = false) String groupId) { + this.receivedGroupId = groupId; if (this.reposition1.compareAndSet(false, true)) { throw new RuntimeException("reposition"); } this.latch1.countDown(); } - @KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}") + @KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}") public void listen2(@Payload String foo, + @Header(KafkaHeaders.GROUP_ID) String groupId, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { this.key = key; this.partition = partition; this.topic = topic; + this.receivedGroupId = groupId; this.latch2.countDown(); if (this.latch2.getCount() > 0) { this.seekCallBack.get().seek(topic, partition, 0); @@ -1484,12 +1492,13 @@ public void listen9(Object payload) { private final AtomicBoolean reposition10 = new AtomicBoolean(); @KafkaListener(id = "list1", topics = "annotated14", containerFactory = "batchSpyFactory", - errorHandler = "listen10ErrorHandler") - public void listen10(List list) { + errorHandler = "listen10ErrorHandler", exposeGroupId = "true") + public void listen10(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { if (this.reposition10.compareAndSet(false, true)) { throw new RuntimeException("reposition"); } this.payload = list; + this.receivedGroupId = groupId; this.latch10.countDown(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index 9ccbe6d06d..504b40104e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -231,7 +231,7 @@ public void testWithMessage() throws Exception { Acknowledgment ack = mock(Acknowledgment.class); Consumer mockConsumer = mock(Consumer.class); - Message recordToMessage = messageConverter.toMessage(r2, ack, mockConsumer, String.class); + Message recordToMessage = messageConverter.toMessage(r2, ack, mockConsumer, String.class, "test.group.id"); assertThat(recordToMessage.getHeaders().get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME"); assertThat(recordToMessage.getHeaders().get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048615L); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java index 8bed127fee..11bde7e730 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java @@ -33,6 +33,9 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaUtils; +import org.springframework.messaging.handler.annotation.Header; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -51,19 +54,24 @@ public void testKafkaNullInList(@Autowired KafkaListenerEndpointRegistry registr BatchMessagingMessageListenerAdapter adapter = (BatchMessagingMessageListenerAdapter) registry .getListenerContainer("foo").getContainerProperties().getMessageListener(); + KafkaUtils.setConsumerGroupId("test.group"); adapter.onMessage(Collections.singletonList(new ConsumerRecord<>("foo", 0, 0L, null, null)), null, null); assertThat(foo.value).isNull(); + assertThat(foo.group).isEqualTo("test.group"); } public static class Foo { - public String value = "someValue"; + public volatile String value = "someValue"; - @KafkaListener(id = "foo", topics = "foo", autoStartup = "false") - public void listen(List list) { + public volatile String group; + + @KafkaListener(id = "foo", topics = "foo", autoStartup = "false", exposeGroupId = "true") + public void listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { list.forEach(s -> { this.value = s; }); + this.group = groupId; } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java index 177f782516..c08b500270 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java @@ -55,10 +55,10 @@ public void onMessage(ConsumerRecord data, Acknowledgment acknow RecordMessageConverter converter = mock(RecordMessageConverter.class); ConsumerRecord cr = new ConsumerRecord<>("foo", 1, 1L, null, null); Acknowledgment ack = mock(Acknowledgment.class); - willReturn(new GenericMessage<>("foo")).given(converter).toMessage(cr, ack, null, String.class); + willReturn(new GenericMessage<>("foo")).given(converter).toMessage(cr, ack, null, String.class, null); adapter.setMessageConverter(converter); adapter.onMessage(cr, ack); - verify(converter).toMessage(cr, ack, null, String.class); + verify(converter).toMessage(cr, ack, null, String.class, null); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java index 11a79dd839..80fbf8585a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java @@ -46,7 +46,7 @@ public class BatchMessageConverterTests { @Test - public void testBatchConverters() throws Exception { + public void testBatchConverters() { BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter(); MessageHeaders headers = testGuts(batchMessageConverter); @@ -89,8 +89,7 @@ private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) { Acknowledgment ack = mock(Acknowledgment.class); Consumer consumer = mock(Consumer.class); - Message message = batchMessageConverter.toMessage(consumerRecords, ack, consumer, - String.class); + Message message = batchMessageConverter.toMessage(consumerRecords, ack, consumer, String.class, "test.g"); assertThat(message.getPayload()) .isEqualTo(Arrays.asList("value1", "value2", "value3")); @@ -109,6 +108,7 @@ private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) { .isEqualTo(Arrays.asList(1487694048607L, 1487694048608L, 1487694048609L)); assertThat(headers.get(KafkaHeaders.ACKNOWLEDGMENT)).isSameAs(ack); assertThat(headers.get(KafkaHeaders.CONSUMER)).isSameAs(consumer); + assertThat(headers.get(KafkaHeaders.GROUP_ID)).isEqualTo("test.g"); return headers; } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index 6ab0862d98..aaa0fe54d5 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -1219,6 +1219,29 @@ The properties are specified as individual strings with the normal Java `Propert ---- ==== +[[listener-group-id]] +===== Obtaining the Consumer `group.id` + +When running the same listener code in multiple containers, it may be useful to be able to determine which container (identified by its `group.id` consumer property) that a record came from. + +You can call `KafkaUtils.getConsumerGroupId()` on the listener thread to do this. +Alternatively, you can set the `exposeGroupId` property on the annotation and then access the group id in a method parameter. + +==== +[source, java] +---- +@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}") +public void listener(@Payload String foo, + @Header(KafkaHeaders.GROUP_ID) String groupId) { +... +} +---- +==== + +IMPORTANT: This is available in record listeners and batch listeners that receive a `List` of records. +It is **not** available in a batch listener that receives a `ConsumerRecords` argument. +Use the `KafkaUtils` mechanism in that case. + ===== Container Thread Naming Listener containers currently use two task executors, one to invoke the consumer and another that is used to invoke the listener when the kafka consumer property `enable.auto.commit` is `false`. diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 65a8e1fd11..c91007bfc3 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -17,6 +17,10 @@ Because the listener container has it's own mechanism for committing offsets, it It now sets it to false automatically unless specifically set in the consumer factory or the container's consumer property overrides. The `ackOnError` property is now `false` by default. +See <> for more information. + +It is now possible to obtain the consumer's `group.id` property in the listener method. +See <> for more information. ==== ErrorHandler Changes From 870d33bb14651b8d65c258e6f75ae3891a86a411 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 26 Mar 2019 12:26:16 -0400 Subject: [PATCH 2/3] Always add GROUP_ID header; polish header mapper per PR comments. --- .../kafka/annotation/KafkaListener.java | 9 ----- ...kaListenerAnnotationBeanPostProcessor.java | 3 -- .../config/AbstractKafkaListenerEndpoint.java | 16 --------- .../kafka/config/KafkaListenerEndpoint.java | 8 ----- .../config/KafkaListenerEndpointAdapter.java | 5 --- .../config/MethodKafkaListenerEndpoint.java | 3 +- .../BatchMessagingMessageListenerAdapter.java | 4 +-- .../MessagingMessageListenerAdapter.java | 13 +------- .../support/AbstractKafkaHeaderMapper.java | 33 +++++++++---------- .../kafka/support/JavaUtils.java | 7 ---- .../converter/BatchMessageConverter.java | 3 +- .../BatchMessagingMessageConverter.java | 23 +++---------- .../support/converter/MessageConverter.java | 32 ++++++++++++++++++ .../converter/MessagingMessageConverter.java | 21 ++---------- .../converter/RecordMessageConverter.java | 5 +-- .../EnableKafkaIntegrationTests.java | 6 ++-- .../kafka/core/KafkaTemplateTests.java | 7 ++-- ...hMessagingMessageListenerAdapterTests.java | 2 +- .../MessagingMessageListenerAdapterTests.java | 4 +-- .../converter/BatchMessageConverterTests.java | 5 ++- src/reference/asciidoc/kafka.adoc | 2 +- 21 files changed, 75 insertions(+), 136 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java index 1c8a6a534b..8a24074476 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java @@ -241,13 +241,4 @@ */ String[] properties() default {}; - /** - * When set to true, enable the listener code to access the consumer's {@link code - * group.id} property using a {@code @Header(KafkaHeaders.GROUP_ID} String groupId) - * method parameter. Useful if you use the same listener code in multiple containers. - * @return true to provide the header. - * @since 2.3 - */ - String exposeGroupId() default "false"; - } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 92a5f6b353..b341b2bb10 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -446,9 +446,6 @@ protected void processListener(MethodKafkaListenerEndpoint endpoint, Kafka endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup")); } resolveKafkaProperties(endpoint, kafkaListener.properties()); - if (StringUtils.hasText(kafkaListener.exposeGroupId())) { - endpoint.setExposeGroupId(resolveExpressionAsBoolean(kafkaListener.exposeGroupId(), "exposeGroupId")); - } KafkaListenerContainerFactory factory = null; String containerFactoryBeanName = resolve(kafkaListener.containerFactory()); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index 7db28d5ef4..703e0c4d22 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -113,8 +113,6 @@ public abstract class AbstractKafkaListenerEndpoint private Properties consumerProperties; - private Boolean exposeGroupId; - @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; @@ -413,20 +411,6 @@ public void setConsumerProperties(Properties consumerProperties) { this.consumerProperties = consumerProperties; } - @Override - public Boolean getExposeGroupId() { - return this.exposeGroupId; - } - - /** - * Set to true to expose the {@code group.id} property in a header. - * @param exposeGroupId true to expose. - * @since 2.3 - */ - public void setExposeGroupId(Boolean exposeGroupId) { - this.exposeGroupId = exposeGroupId; - } - @Override public void afterPropertiesSet() { boolean topicsEmpty = getTopics().isEmpty(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java index c5f0293d8f..c3ec65c0b7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java @@ -115,14 +115,6 @@ default Properties getConsumerProperties() { return null; } - /** - * Whether or not to expose the {@code group.id} as a header. - * @return true to expose. - * @since 2.3 - */ - @Nullable - Boolean getExposeGroupId(); - /** * Setup the specified message listener container with the model * defined by this endpoint. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java index b16abe4d05..c81ab4fc57 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java @@ -82,11 +82,6 @@ public Boolean getAutoStartup() { // NOSONAR return null; // NOSONAR null check by caller } - @Override - public Boolean getExposeGroupId() { - return Boolean.FALSE; - } - @Override public void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 228bcc2efe..440026b15e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -159,8 +159,7 @@ protected MessagingMessageListenerAdapter createMessageListener(MessageLis || getReplyTemplate() != null, "a KafkaTemplate is required to support replies"); messageListener.setReplyTopic(replyTopic); }) - .acceptIfNotNull(getReplyTemplate(), messageListener::setReplyTemplate) - .acceptIfNotNull(getExposeGroupId(), messageListener::setExposeGroupId); + .acceptIfNotNull(getReplyTemplate(), messageListener::setReplyTemplate); return messageListener; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java index 556bc42ebc..a2b070d954 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java @@ -29,7 +29,6 @@ import org.springframework.kafka.listener.ListenerExecutionFailedException; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaNull; -import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.messaging.Message; @@ -174,8 +173,7 @@ protected void invoke(Object records, Acknowledgment acknowledgment, Consumer toMessagingMessage(List records, Acknowledgment acknowledgment, Consumer consumer) { - return getBatchMessageConverter().toMessage(records, acknowledgment, consumer, getType(), - isExposeGroupId() ? KafkaUtils.getConsumerGroupId() : null); + return getBatchMessageConverter().toMessage(records, acknowledgment, consumer, getType()); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index de1e70ca5e..89a99ca4ae 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -114,8 +114,6 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS private ReplyHeadersConfigurer replyHeadersConfigurer; - private boolean exposeGroupId; - public MessagingMessageListenerAdapter(Object bean, Method method) { this.bean = bean; this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final @@ -241,14 +239,6 @@ public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigu this.replyHeadersConfigurer = replyHeadersConfigurer; } - public boolean isExposeGroupId() { - return this.exposeGroupId; - } - - public void setExposeGroupId(boolean exposeGroupId) { - this.exposeGroupId = exposeGroupId; - } - @Override public void registerSeekCallback(ConsumerSeekCallback callback) { if (this.bean instanceof ConsumerSeekAware) { @@ -272,8 +262,7 @@ public void onIdleContainer(Map assignments, ConsumerSeekC protected Message toMessagingMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer) { - return getMessageConverter().toMessage(record, acknowledgment, consumer, getType(), - this.exposeGroupId ? KafkaUtils.getConsumerGroupId() : null); + return getMessageConverter().toMessage(record, acknowledgment, consumer, getType()); } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index 46a335d950..2446fb44d4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -21,7 +21,6 @@ import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,8 +48,17 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { protected final Log LOGGER = LogFactory.getLog(getClass()); // NOSONAR - private static final List NEVER_MAPPED = Collections.singletonList( - new NeverMatchHeaderMatcher(Arrays.stream(new String[] { + private final List matchers = new ArrayList<>(); + + private final Map rawMappedtHeaders = new HashMap<>(); + + private boolean mapAllStringsOut; + + private Charset charset = StandardCharsets.UTF_8; + + public AbstractKafkaHeaderMapper(String... patterns) { + Assert.notNull(patterns, "'patterns' must not be null"); + this.matchers.add(new NeverMatchHeaderMatcher( KafkaHeaders.ACKNOWLEDGMENT, KafkaHeaders.CONSUMER, KafkaHeaders.MESSAGE_KEY, @@ -66,19 +74,7 @@ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { KafkaHeaders.BATCH_CONVERTED_HEADERS, KafkaHeaders.NATIVE_HEADERS, KafkaHeaders.TOPIC, - KafkaHeaders.GROUP_ID - }).collect(Collectors.toSet()))); - - private final List matchers = new ArrayList<>(NEVER_MAPPED); - - private final Map rawMappedtHeaders = new HashMap<>(); - - private boolean mapAllStringsOut; - - private Charset charset = StandardCharsets.UTF_8; - - public AbstractKafkaHeaderMapper(String... patterns) { - Assert.notNull(patterns, "'patterns' must not be null"); + KafkaHeaders.GROUP_ID)); for (String pattern : patterns) { this.matchers.add(new SimplePatternBasedHeaderMatcher(pattern)); } @@ -235,8 +231,9 @@ protected static class NeverMatchHeaderMatcher implements HeaderMatcher { private final Set neverMatchHeaders; - protected NeverMatchHeaderMatcher(Set neverMatchHeaders) { - this.neverMatchHeaders = neverMatchHeaders; + protected NeverMatchHeaderMatcher(String... headers) { + this.neverMatchHeaders = Arrays.stream(headers) + .collect(Collectors.toSet()); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/JavaUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/support/JavaUtils.java index 16b38ce824..e37d6b5fc9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/JavaUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/JavaUtils.java @@ -78,7 +78,6 @@ public JavaUtils acceptIfNotNull(T value, Consumer consumer) { * @param value the value. * @param consumer the consumer. * @return this. - * @since 5.2 */ public JavaUtils acceptIfHasText(String value, Consumer consumer) { if (StringUtils.hasText(value)) { @@ -93,7 +92,6 @@ public JavaUtils acceptIfHasText(String value, Consumer consumer) { * @param consumer the consumer. * @param the value type. * @return this. - * @since 5.2 */ public JavaUtils acceptIfNotEmpty(List value, Consumer> consumer) { if (!CollectionUtils.isEmpty(value)) { @@ -108,7 +106,6 @@ public JavaUtils acceptIfNotEmpty(List value, Consumer> consumer) * @param consumer the consumer. * @param the value type. * @return this. - * @since 5.2 */ public JavaUtils acceptIfNotEmpty(T[] value, Consumer consumer) { if (!ObjectUtils.isEmpty(value)) { @@ -127,7 +124,6 @@ public JavaUtils acceptIfNotEmpty(T[] value, Consumer consumer) { * @param the first argument type. * @param the second argument type. * @return this. - * @since 5.2 */ public JavaUtils acceptIfCondition(boolean condition, T1 t1, T2 t2, BiConsumer consumer) { if (condition) { @@ -145,7 +141,6 @@ public JavaUtils acceptIfCondition(boolean condition, T1 t1, T2 t2, BiC * @param the first argument type. * @param the second argument type. * @return this. - * @since 5.2 */ public JavaUtils acceptIfNotNull(T1 t1, T2 t2, BiConsumer consumer) { if (t2 != null) { @@ -162,7 +157,6 @@ public JavaUtils acceptIfNotNull(T1 t1, T2 t2, BiConsumer consu * @param the first argument type. * @param consumer the consumer. * @return this. - * @since 5.2 */ public JavaUtils acceptIfHasText(T t1, String value, BiConsumer consumer) { if (StringUtils.hasText(value)) { @@ -171,5 +165,4 @@ public JavaUtils acceptIfHasText(T t1, String value, BiConsumer c return this; } - } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessageConverter.java index 498b1044b2..78cb48a239 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessageConverter.java @@ -41,11 +41,10 @@ public interface BatchMessageConverter extends MessageConverter { * @param acknowledgment the acknowledgment. * @param consumer the consumer. * @param payloadType the required payload type. - * @param groupId the {@code group.id} consumer property. * @return the message. */ Message toMessage(List> records, Acknowledgment acknowledgment, Consumer consumer, - Type payloadType, String groupId); + Type payloadType); /** * Convert a message to a producer record. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index d85982ab9a..57788f23e6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -124,8 +124,7 @@ public RecordMessageConverter getRecordMessageConverter() { @Override public Message toMessage(List> records, Acknowledgment acknowledgment, - Consumer consumer, Type type, String groupId) { - + Consumer consumer, Type type) { KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); @@ -139,28 +138,14 @@ public Message toMessage(List> records, Acknowledgment a List timestamps = new ArrayList<>(); List> convertedHeaders = new ArrayList<>(); List natives = new ArrayList<>(); - rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, keys); - rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topics); - rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, partitions); - rawHeaders.put(KafkaHeaders.OFFSET, offsets); - rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, timestampTypes); - rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, timestamps); if (this.headerMapper != null) { rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders); } else { rawHeaders.put(KafkaHeaders.NATIVE_HEADERS, natives); } - - if (acknowledgment != null) { - rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment); - } - if (consumer != null) { - rawHeaders.put(KafkaHeaders.CONSUMER, consumer); - } - if (groupId != null) { - rawHeaders.put(KafkaHeaders.GROUP_ID, groupId); - } + commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes, + timestamps); boolean logged = false; for (ConsumerRecord record : records) { @@ -218,7 +203,7 @@ protected Object extractAndConvertValue(ConsumerRecord record, Type type) */ protected Object convert(ConsumerRecord record, Type type) { return this.recordConverter - .toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0], null).getPayload(); + .toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload(); } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java index 6822776d8b..aa640e71c7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java @@ -16,6 +16,16 @@ package org.springframework.kafka.support.converter; +import java.util.Map; + +import org.apache.kafka.clients.consumer.Consumer; + +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.kafka.support.JavaUtils; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaUtils; +import org.springframework.lang.Nullable; + /** * A top level interface for message converters. * @@ -25,4 +35,26 @@ */ public interface MessageConverter { + @Nullable + static String getGroupid() { + String groupId = KafkaUtils.getConsumerGroupId(); + return groupId == null ? null : groupId; + } + + default void commonHeaders(Acknowledgment acknowledgment, Consumer consumer, Map rawHeaders, + Object theKey, Object topic, Object partition, Object offset, Object timestampType, Object timestamp) { + + rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, theKey); + rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, topic); + rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, partition); + rawHeaders.put(KafkaHeaders.OFFSET, offset); + rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, timestampType); + rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, timestamp); + JavaUtils.INSTANCE + .acceptIfNotNull(KafkaHeaders.GROUP_ID, MessageConverter.getGroupid(), + (key, val) -> rawHeaders.put(key, val)) + .acceptIfNotNull(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment, (key, val) -> rawHeaders.put(key, val)) + .acceptIfNotNull(KafkaHeaders.CONSUMER, consumer, (key, val) -> rawHeaders.put(key, val)); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index 2f4dceb57e..6720be0ed8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -100,8 +100,7 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) { @Override public Message toMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer, - Type type, String groupId) { - + Type type) { KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); @@ -118,22 +117,8 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle } rawHeaders.put(KafkaHeaders.NATIVE_HEADERS, record.headers()); } - rawHeaders.put(KafkaHeaders.RECEIVED_MESSAGE_KEY, record.key()); - rawHeaders.put(KafkaHeaders.RECEIVED_TOPIC, record.topic()); - rawHeaders.put(KafkaHeaders.RECEIVED_PARTITION_ID, record.partition()); - rawHeaders.put(KafkaHeaders.OFFSET, record.offset()); - rawHeaders.put(KafkaHeaders.TIMESTAMP_TYPE, record.timestampType().name()); - rawHeaders.put(KafkaHeaders.RECEIVED_TIMESTAMP, record.timestamp()); - - if (acknowledgment != null) { - rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment); - } - if (consumer != null) { - rawHeaders.put(KafkaHeaders.CONSUMER, consumer); - } - if (groupId != null) { - rawHeaders.put(KafkaHeaders.GROUP_ID, groupId); - } + commonHeaders(acknowledgment, consumer, rawHeaders, record.key(), record.topic(), record.partition(), + record.offset(), record.timestampType().name(), record.timestamp()); return MessageBuilder.createMessage(extractAndConvertValue(record, type), kafkaMessageHeaders); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java index bf5bdd919e..fe08cf313d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/RecordMessageConverter.java @@ -23,7 +23,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.kafka.support.Acknowledgment; -import org.springframework.lang.Nullable; import org.springframework.messaging.Message; /** @@ -40,12 +39,10 @@ public interface RecordMessageConverter extends MessageConverter { * @param acknowledgment the acknowledgment. * @param consumer the consumer * @param payloadType the required payload type. - * @param groupId the {@code group.id} consumer property. * @return the message. - * @since 2.3 */ Message toMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer, - Type payloadType, @Nullable String groupId); + Type payloadType); /** * Convert a message to a producer record. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index cf86b0dfd6..38fce828bc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -215,7 +215,7 @@ public void testSimple() throws Exception { template.flush(); assertThat(this.listener.latch1.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(this.config.globalErrorThrowable).isNotNull(); - assertThat(this.listener.receivedGroupId).isNull(); + assertThat(this.listener.receivedGroupId).isEqualTo("foo"); template.send("annotated2", 0, 123, "foo"); template.flush(); @@ -1396,7 +1396,7 @@ public void listen1(String foo, @Header(value = KafkaHeaders.GROUP_ID, required this.latch1.countDown(); } - @KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}") + @KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}") public void listen2(@Payload String foo, @Header(KafkaHeaders.GROUP_ID) String groupId, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @@ -1492,7 +1492,7 @@ public void listen9(Object payload) { private final AtomicBoolean reposition10 = new AtomicBoolean(); @KafkaListener(id = "list1", topics = "annotated14", containerFactory = "batchSpyFactory", - errorHandler = "listen10ErrorHandler", exposeGroupId = "true") + errorHandler = "listen10ErrorHandler") public void listen10(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { if (this.reposition10.compareAndSet(false, true)) { throw new RuntimeException("reposition"); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index 504b40104e..f0bbf922c0 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -55,6 +55,7 @@ import org.springframework.kafka.support.CompositeProducerListener; import org.springframework.kafka.support.DefaultKafkaHeaderMapper; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.converter.MessagingMessageConverter; @@ -231,7 +232,8 @@ public void testWithMessage() throws Exception { Acknowledgment ack = mock(Acknowledgment.class); Consumer mockConsumer = mock(Consumer.class); - Message recordToMessage = messageConverter.toMessage(r2, ack, mockConsumer, String.class, "test.group.id"); + KafkaUtils.setConsumerGroupId("test.group.id"); + Message recordToMessage = messageConverter.toMessage(r2, ack, mockConsumer, String.class); assertThat(recordToMessage.getHeaders().get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME"); assertThat(recordToMessage.getHeaders().get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048615L); @@ -240,7 +242,8 @@ public void testWithMessage() throws Exception { assertThat(recordToMessage.getHeaders().get(KafkaHeaders.CONSUMER)).isSameAs(mockConsumer); assertThat(recordToMessage.getHeaders().get("foo")).isEqualTo("bar"); assertThat(recordToMessage.getPayload()).isEqualTo("foo-message-2"); - + assertThat(recordToMessage.getHeaders().get(KafkaHeaders.GROUP_ID)).isEqualTo("test.group.id"); + KafkaUtils.clearConsumerGroupId(); pf.destroy(); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java index 11bde7e730..d19d1bca26 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java @@ -66,7 +66,7 @@ public static class Foo { public volatile String group; - @KafkaListener(id = "foo", topics = "foo", autoStartup = "false", exposeGroupId = "true") + @KafkaListener(id = "foo", topics = "foo", autoStartup = "false") public void listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { list.forEach(s -> { this.value = s; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java index c08b500270..177f782516 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java @@ -55,10 +55,10 @@ public void onMessage(ConsumerRecord data, Acknowledgment acknow RecordMessageConverter converter = mock(RecordMessageConverter.class); ConsumerRecord cr = new ConsumerRecord<>("foo", 1, 1L, null, null); Acknowledgment ack = mock(Acknowledgment.class); - willReturn(new GenericMessage<>("foo")).given(converter).toMessage(cr, ack, null, String.class, null); + willReturn(new GenericMessage<>("foo")).given(converter).toMessage(cr, ack, null, String.class); adapter.setMessageConverter(converter); adapter.onMessage(cr, ack); - verify(converter).toMessage(cr, ack, null, String.class, null); + verify(converter).toMessage(cr, ack, null, String.class); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java index 80fbf8585a..25cdc12629 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java @@ -36,6 +36,7 @@ import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -89,7 +90,8 @@ private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) { Acknowledgment ack = mock(Acknowledgment.class); Consumer consumer = mock(Consumer.class); - Message message = batchMessageConverter.toMessage(consumerRecords, ack, consumer, String.class, "test.g"); + KafkaUtils.setConsumerGroupId("test.g"); + Message message = batchMessageConverter.toMessage(consumerRecords, ack, consumer, String.class); assertThat(message.getPayload()) .isEqualTo(Arrays.asList("value1", "value2", "value3")); @@ -109,6 +111,7 @@ private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) { assertThat(headers.get(KafkaHeaders.ACKNOWLEDGMENT)).isSameAs(ack); assertThat(headers.get(KafkaHeaders.CONSUMER)).isSameAs(consumer); assertThat(headers.get(KafkaHeaders.GROUP_ID)).isEqualTo("test.g"); + KafkaUtils.clearConsumerGroupId(); return headers; } diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index aaa0fe54d5..017f893c44 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -1225,7 +1225,7 @@ The properties are specified as individual strings with the normal Java `Propert When running the same listener code in multiple containers, it may be useful to be able to determine which container (identified by its `group.id` consumer property) that a record came from. You can call `KafkaUtils.getConsumerGroupId()` on the listener thread to do this. -Alternatively, you can set the `exposeGroupId` property on the annotation and then access the group id in a method parameter. +Alternatively, you can access the group id in a method parameter. ==== [source, java] From 222516bca3574d59734caf7660eb001cdac4e690 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 26 Mar 2019 12:36:16 -0400 Subject: [PATCH 3/3] Support custom matchers in DefaultHeaderMapper. --- .../support/AbstractKafkaHeaderMapper.java | 29 ++++++++++++++----- .../support/DefaultKafkaHeaderMapper.java | 14 ++++----- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index 2446fb44d4..a395ba1ffc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -46,7 +46,7 @@ */ public abstract class AbstractKafkaHeaderMapper implements KafkaHeaderMapper { - protected final Log LOGGER = LogFactory.getLog(getClass()); // NOSONAR + protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR private final List matchers = new ArrayList<>(); @@ -80,6 +80,19 @@ public AbstractKafkaHeaderMapper(String... patterns) { } } + /** + * Subclasses can invoke this to add custom {@link HeaderMatcher}s. + * @param matchersToAdd the matchers to add. + * @since 2.3 + */ + protected final void addMatchers(HeaderMatcher... matchersToAdd) { + Assert.notNull(matchersToAdd, "'matchersToAdd' cannot be null"); + Assert.noNullElements(matchersToAdd, "'matchersToAdd' cannot have null elements"); + for (HeaderMatcher matcher : matchersToAdd) { + this.matchers.add(matcher); + } + } + /** * Set to true to map all {@code String} valued outbound headers to {@code byte[]}. * To map to a {@code String} for inbound, there must be an entry in the rawMappedHeaders map. @@ -127,8 +140,8 @@ protected boolean matches(String header, Object value) { if (matches(header)) { if ((header.equals(MessageHeaders.REPLY_CHANNEL) || header.equals(MessageHeaders.ERROR_CHANNEL)) && !(value instanceof String)) { - if (this.LOGGER.isDebugEnabled()) { - this.LOGGER.debug("Cannot map " + header + " when type is [" + value.getClass() + if (this.logger.isDebugEnabled()) { + this.logger.debug("Cannot map " + header + " when type is [" + value.getClass() + "]; it must be a String"); } return false; @@ -144,8 +157,8 @@ protected boolean matches(String header) { return !matcher.isNegated(); } } - if (this.LOGGER.isDebugEnabled()) { - this.LOGGER.debug(MessageFormat.format("headerName=[{0}] WILL NOT be mapped; matched no patterns", + if (this.logger.isDebugEnabled()) { + this.logger.debug(MessageFormat.format("headerName=[{0}] WILL NOT be mapped; matched no patterns", header)); } return false; @@ -256,7 +269,7 @@ public boolean isNegated() { */ protected static class SimplePatternBasedHeaderMatcher implements HeaderMatcher { - private static final Log logger = LogFactory.getLog(SimplePatternBasedHeaderMatcher.class); // NOSONAR + private static final Log LOGGER = LogFactory.getLog(SimplePatternBasedHeaderMatcher.class); private final String pattern; @@ -276,8 +289,8 @@ public SimplePatternBasedHeaderMatcher(String pattern) { public boolean matchHeader(String headerName) { String header = headerName.toLowerCase(); if (PatternMatchUtils.simpleMatch(this.pattern, header)) { - if (logger.isDebugEnabled()) { - logger.debug( + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( MessageFormat.format( "headerName=[{0}] WILL " + (this.negate ? "NOT " : "") + "be mapped, matched pattern=" + (this.negate ? "!" : "") + "{1}", diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java index c27e3f8f17..6751a6adc6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java @@ -231,8 +231,8 @@ public void fromHeaders(MessageHeaders headers, Headers target) { jsonHeaders.put(key, className); } catch (@SuppressWarnings("unused") Exception e) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Could not map " + key + " with type " + valueToAdd.getClass().getName()); + if (logger.isDebugEnabled()) { + logger.debug("Could not map " + key + " with type " + valueToAdd.getClass().getName()); } } } @@ -243,7 +243,7 @@ public void fromHeaders(MessageHeaders headers, Headers target) { target.add(new RecordHeader(JSON_TYPES, headerObjectMapper.writeValueAsBytes(jsonHeaders))); } catch (IllegalStateException | JsonProcessingException e) { - LOGGER.error("Could not add json types header", e); + logger.error("Could not add json types header", e); } } } @@ -264,7 +264,7 @@ public void toHeaders(Headers source, final Map headers) { } } catch (Exception e) { - LOGGER.error("Could not load class for header: " + header.key(), e); + logger.error("Could not load class for header: " + header.key(), e); } if (trusted) { try { @@ -272,7 +272,7 @@ public void toHeaders(Headers source, final Map headers) { headers.put(header.key(), value); } catch (IOException e) { - LOGGER.error("Could not decode json type: " + new String(header.value()) + " for key: " + header + logger.error("Could not decode json type: " + new String(header.value()) + " for key: " + header .key(), e); headers.put(header.key(), header.value()); @@ -303,7 +303,7 @@ private Object decodeValue(Header h, Class type) ClassUtils.forName(nth.getUntrustedType(), null)); } catch (Exception e) { - LOGGER.error("Could not decode header: " + nth, e); + logger.error("Could not decode header: " + nth, e); } } } @@ -323,7 +323,7 @@ private Map decodeJsonTypes(Headers source) { types = headerObjectMapper.readValue(next.value(), Map.class); } catch (IOException e) { - LOGGER.error("Could not decode json types: " + new String(next.value()), e); + logger.error("Could not decode json types: " + new String(next.value()), e); } break; }