Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
import org.springframework.kafka.support.JavaUtils;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.retry.RecoveryCallback;
Expand Down Expand Up @@ -286,10 +287,8 @@ public void afterPropertiesSet() {
@Override
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
C instance = createContainerInstance(endpoint);

if (endpoint.getId() != null) {
instance.setBeanName(endpoint.getId());
}
JavaUtils.INSTANCE
.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
}
Expand All @@ -301,30 +300,15 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
}

private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint) {
if (this.recordFilterStrategy != null) {
aklEndpoint.setRecordFilterStrategy(this.recordFilterStrategy);
}
if (this.ackDiscarded != null) {
aklEndpoint.setAckDiscarded(this.ackDiscarded);
}
if (this.retryTemplate != null) {
aklEndpoint.setRetryTemplate(this.retryTemplate);
}
if (this.recoveryCallback != null) {
aklEndpoint.setRecoveryCallback(this.recoveryCallback);
}
if (this.statefulRetry != null) {
aklEndpoint.setStatefulRetry(this.statefulRetry);
}
if (this.batchListener != null) {
aklEndpoint.setBatchListener(this.batchListener);
}
if (this.replyTemplate != null) {
aklEndpoint.setReplyTemplate(this.replyTemplate);
}
if (this.replyHeadersConfigurer != null) {
aklEndpoint.setReplyHeadersConfigurer(this.replyHeadersConfigurer);
}
JavaUtils.INSTANCE
.acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy)
.acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded)
.acceptIfNotNull(this.retryTemplate, aklEndpoint::setRetryTemplate)
.acceptIfNotNull(this.recoveryCallback, aklEndpoint::setRecoveryCallback)
.acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry)
.acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener)
.acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate)
.acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer);
}

/**
Expand All @@ -345,35 +329,26 @@ protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
ContainerProperties properties = instance.getContainerProperties();
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
"messageListener", "ackCount", "ackTime");
if (this.afterRollbackProcessor != null) {
instance.setAfterRollbackProcessor(this.afterRollbackProcessor);
}
if (this.containerProperties.getAckCount() > 0) {
properties.setAckCount(this.containerProperties.getAckCount());
}
if (this.containerProperties.getAckTime() > 0) {
properties.setAckTime(this.containerProperties.getAckTime());
}
if (this.errorHandler != null) {
instance.setGenericErrorHandler(this.errorHandler);
}
JavaUtils.INSTANCE
.acceptIfNotNull(this.afterRollbackProcessor, instance::setAfterRollbackProcessor)
.acceptIfCondition(this.containerProperties.getAckCount() > 0, this.containerProperties.getAckCount(),
properties::setAckCount)
.acceptIfCondition(this.containerProperties.getAckTime() > 0, this.containerProperties.getAckTime(),
properties::setAckTime)
.acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler);
if (endpoint.getAutoStartup() != null) {
instance.setAutoStartup(endpoint.getAutoStartup());
}
else if (this.autoStartup != null) {
instance.setAutoStartup(this.autoStartup);
}
if (this.phase != null) {
instance.setPhase(this.phase);
}
if (this.applicationEventPublisher != null) {
instance.setApplicationEventPublisher(this.applicationEventPublisher);
}
instance.getContainerProperties().setGroupId(endpoint.getGroupId());
instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix());
if (endpoint.getConsumerProperties() != null) {
instance.getContainerProperties().setConsumerProperties(endpoint.getConsumerProperties());
}
JavaUtils.INSTANCE
.acceptIfNotNull(this.phase, instance::setPhase)
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
.acceptIfNotNull(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId)
.acceptIfNotNull(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId)
.acceptIfNotNull(endpoint.getConsumerProperties(),
instance.getContainerProperties()::setConsumerProperties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.kafka.listener.adapter.HandlerAdapter;
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
import org.springframework.kafka.support.JavaUtils;
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
Expand Down Expand Up @@ -147,19 +148,19 @@ protected MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
@Override
protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,
MessageConverter messageConverter) {

Assert.state(this.messageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not set");
MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);
messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
String replyTopic = getReplyTopic();
if (replyTopic != null) {
Assert.state(getMethod().getReturnType().equals(void.class)
|| getReplyTemplate() != null, "a KafkaTemplate is required to support replies");
messageListener.setReplyTopic(replyTopic);
}
if (getReplyTemplate() != null) {
messageListener.setReplyTemplate(getReplyTemplate());
}
JavaUtils.INSTANCE
.acceptIfNotNull(getReplyTopic(), replyTopic -> {
Assert.state(getMethod().getReturnType().equals(void.class)
|| getReplyTemplate() != null, "a KafkaTemplate is required to support replies");
messageListener.setReplyTopic(replyTopic);
})
.acceptIfNotNull(getReplyTemplate(), messageListener::setReplyTemplate);

return messageListener;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ public interface KafkaOperations<K, V> {

/**
* When running in a transaction, send the consumer offset(s) to the transaction. The
* group id is obtained from {@link ProducerFactoryUtils#getConsumerGroupId()}. It is
* group id is obtained from
* {@link org.springframework.kafka.support.KafkaUtils#getConsumerGroupId()}. It is
* not necessary to call this method if the operations are invoked on a listener
* container thread (and the listener container is configured with a
* {@link org.springframework.kafka.transaction.KafkaAwareTransactionManager}) since
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.TopicPartition;

import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.SendResult;
Expand Down Expand Up @@ -329,7 +330,7 @@ public void flush() {

@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) {
sendOffsetsToTransaction(offsets, ProducerFactoryUtils.getConsumerGroupId());
sendOffsetsToTransaction(offsets, KafkaUtils.getConsumerGroupId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.producer.Producer;

import org.springframework.kafka.support.KafkaUtils;
import org.springframework.lang.Nullable;
import org.springframework.transaction.support.ResourceHolderSynchronization;
import org.springframework.transaction.support.TransactionSynchronization;
Expand All @@ -35,8 +36,6 @@
*/
public final class ProducerFactoryUtils {

private static ThreadLocal<String> groupIds = new ThreadLocal<>();

private ProducerFactoryUtils() {
super();
}
Expand Down Expand Up @@ -82,27 +81,33 @@ public static <K, V> void releaseResources(@Nullable KafkaResourceHolder<K, V> r
/**
* Set the group id for the consumer bound to this thread.
* @param groupId the group id.
* @deprecated in favor of {@link KafkaUtils#setConsumerGroupId(String)}.
* @since 1.3
*/
@Deprecated
public static void setConsumerGroupId(String groupId) {
groupIds.set(groupId);
KafkaUtils.setConsumerGroupId(groupId);
}

/**
* Get the group id for the consumer bound to this thread.
* @return the group id.
* @deprecated in favor of {@link KafkaUtils#getConsumerGroupId()}.
* @since 1.3
*/
@Deprecated
public static String getConsumerGroupId() {
return groupIds.get();
return KafkaUtils.getConsumerGroupId();
}

/**
* Clear the group id for the consumer bound to this thread.
* @deprecated in favor of {@link KafkaUtils#clearConsumerGroupId()}.
* @since 1.3
*/
@Deprecated
public static void clearConsumerGroupId() {
groupIds.remove();
KafkaUtils.clearConsumerGroupId();
}

private static <K, V> void bindResourceToTransaction(KafkaResourceHolder<K, V> resourceHolder,
Expand Down Expand Up @@ -151,8 +156,8 @@ public void afterCompletion(int status) {
}

@Override
protected void releaseResource(KafkaResourceHolder<K, V> resourceHolder, Object resourceKey) {
ProducerFactoryUtils.releaseResources(resourceHolder);
protected void releaseResource(KafkaResourceHolder<K, V> holder, Object resourceKey) {
ProducerFactoryUtils.releaseResources(holder);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaResourceHolder;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ProducerFactoryUtils;
import org.springframework.kafka.event.ConsumerPausedEvent;
import org.springframework.kafka.event.ConsumerResumedEvent;
import org.springframework.kafka.event.ConsumerStoppedEvent;
Expand All @@ -66,6 +65,7 @@
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.kafka.support.LogIfLevelEnabled;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition;
Expand Down Expand Up @@ -743,9 +743,7 @@ public void run() {
if (this.genericListener instanceof ConsumerSeekAware) {
((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
}
if (this.transactionManager != null) {
ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
}
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
this.count = 0;
this.last = System.currentTimeMillis();
initAssignedPartitions();
Expand Down Expand Up @@ -864,7 +862,7 @@ private void checkIdle() {
}

public void wrapUp() {
ProducerFactoryUtils.clearConsumerGroupId();
KafkaUtils.clearConsumerGroupId();
publishConsumerStoppingEvent(this.consumer);
if (!this.fatalError) {
if (this.kafkaTxManager == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -475,7 +476,7 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
}

Type genericParameterType = null;
boolean hasConsumerParameter = false;
int allowedBatchParameters = 1;

for (int i = 0; i < method.getParameterCount(); i++) {
MethodParameter methodParameter = new MethodParameter(method, i);
Expand Down Expand Up @@ -529,23 +530,33 @@ else if (parameterizedType.getRawType().equals(List.class)
}
else if (methodParameter.getGenericParameterType().equals(Acknowledgment.class)) {
this.hasAckParameter = true;
allowedBatchParameters++;
}
else if (methodParameter.hasParameterAnnotation(Header.class)) {
Header header = methodParameter.getParameterAnnotation(Header.class);
if (header.value().equals(KafkaHeaders.GROUP_ID)) {
allowedBatchParameters++;
}
}
else {
if (methodParameter.getGenericParameterType().equals(Consumer.class)) {
hasConsumerParameter = true;
allowedBatchParameters++;
}
else {
Type parameterType = methodParameter.getGenericParameterType();
hasConsumerParameter = parameterType instanceof ParameterizedType
&& ((ParameterizedType) parameterType).getRawType().equals(Consumer.class);
if (parameterType instanceof ParameterizedType
&& ((ParameterizedType) parameterType).getRawType().equals(Consumer.class)) {
allowedBatchParameters++;
}
}
}
}
boolean validParametersForBatch = validParametersForBatch(method.getGenericParameterTypes().length,
this.hasAckParameter, hasConsumerParameter);
boolean validParametersForBatch = method.getGenericParameterTypes().length <= allowedBatchParameters;

if (!validParametersForBatch) {
String stateMessage = "A parameter of type '%s' must be the only parameter "
+ "(except for an optional 'Acknowledgment' and/or 'Consumer')";
+ "(except for an optional 'Acknowledgment' and/or 'Consumer' "
+ "and/or '@Header(KafkaHeaders.GROUP_ID) String groupId'";
Assert.state(!this.isConsumerRecords,
() -> String.format(stateMessage, "ConsumerRecords"));
Assert.state(!this.isConsumerRecordList,
Expand All @@ -557,18 +568,6 @@ else if (methodParameter.getGenericParameterType().equals(Acknowledgment.class))
return genericParameterType;
}

private boolean validParametersForBatch(int parameterCount, boolean hasAck, boolean hasConsumer) {
if (hasAck) {
return parameterCount == 2 || (hasConsumer && parameterCount == 3); // NOSONAR magic #
}
else if (hasConsumer) {
return parameterCount == 2; // NOSONAR magic #
}
else {
return parameterCount == 1; // NOSONAR magic #
}
}

/*
* Don't consider parameter types that are available after conversion.
* Acknowledgment, ConsumerRecord, Consumer, ConsumerRecord<...>, Consumer<...>, and Message<?>.
Expand Down
Loading