Skip to content
Permalink
Browse files

GH-1032: Add consumer-side batching support

Resolves #1032

* Missing Javadocs
* Still De-Batch producer batches if so configured
* Polishing; detect incorrect configuration.
* Revert to generic message for incorrect configuration.
  • Loading branch information...
garyrussell authored and artembilan committed Jun 26, 2019
1 parent c1e3179 commit 3aed80fbd02def9f2ae59c2ccde8aea6e2f685d2
Showing with 492 additions and 120 deletions.
  1. +39 −0 spring-amqp/src/main/java/org/springframework/amqp/core/BatchMessageListener.java
  2. +15 −0 spring-amqp/src/main/java/org/springframework/amqp/core/MessageListener.java
  3. +54 −5 spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerFactoryBean.java
  4. +1 −1 ...it/src/main/java/org/springframework/amqp/rabbit/config/SimpleRabbitListenerContainerFactory.java
  5. +76 −32 ...bbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java
  6. +192 −54 ...rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java
  7. +42 −0 .../src/main/java/org/springframework/amqp/rabbit/listener/api/ChannelAwareBatchMessagelistener.java
  8. +7 −0 ...abbit/src/main/java/org/springframework/amqp/rabbit/listener/api/ChannelAwareMessageListener.java
  9. +1 −1 spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java
  10. +1 −1 ...bit/src/test/java/org/springframework/amqp/rabbit/config/RabbitListenerContainerFactoryTests.java
  11. +1 −1 spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java
  12. +1 −1 ...rg/springframework/amqp/rabbit/listener/MessageListenerContainerErrorHandlerIntegrationTests.java
  13. +1 −1 ...a/org/springframework/amqp/rabbit/listener/MessageListenerContainerLifecycleIntegrationTests.java
  14. +1 −1 .../java/org/springframework/amqp/rabbit/listener/MessageListenerContainerRetryIntegrationTests.java
  15. +1 −1 .../test/java/org/springframework/amqp/rabbit/listener/MessageListenerManualAckIntegrationTests.java
  16. +1 −1 .../java/org/springframework/amqp/rabbit/listener/MessageListenerRecoveryRepeatIntegrationTests.java
  17. +1 −1 ...src/test/java/org/springframework/amqp/rabbit/listener/MessageListenerTxSizeIntegrationTests.java
  18. +24 −1 ...java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerIntegrationTests.java
  19. +4 −4 ...t/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java
  20. +1 −1 ...bit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerWithRabbitMQ.java
  21. +25 −13 src/reference/asciidoc/amqp.adoc
  22. +3 −0 src/reference/asciidoc/whats-new.adoc
@@ -0,0 +1,39 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.core;

import java.util.List;

/**
* Used to receive a batch of messages if the container supports it.
*
* @author Gary Russell
* @since 2.2
*
*/
public interface BatchMessageListener extends MessageListener {

@Override
default void onMessage(Message message) {
throw new UnsupportedOperationException("Should never be called by the container");
}

@Override
void onMessageBatch(List<Message> messages);


}
@@ -16,6 +16,8 @@

package org.springframework.amqp.core;

import java.util.List;

/**
* Listener interface to receive asynchronous delivery of Amqp Messages.
*
@@ -25,6 +27,10 @@
@FunctionalInterface
public interface MessageListener {

/**
* Delivers a single message.
* @param message the message.
*/
void onMessage(Message message);

/**
@@ -37,4 +43,13 @@ default void containerAckMode(AcknowledgeMode mode) {
// NOSONAR - empty
}

/**
* Delivers a batch of messages.
* @param messages the messages.
* @since 2.2
*/
default void onMessageBatch(List<Message> messages) {
throw new UnsupportedOperationException("This listener does not support message batches");
}

}
@@ -161,12 +161,14 @@

private Long receiveTimeout;

private Integer txSize;
private Integer batchSize;

private Integer declarationRetries;

private Long retryDeclarationInterval;

private Boolean consumerBatchEnabled;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
@@ -366,8 +368,53 @@ public void setReceiveTimeout(long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}

/**
* This property has several functions.
* <p>
* When the channel is transacted, it determines how many messages to process in a
* single transaction. It should be less than or equal to
* {@link #setPrefetchCount(int) the prefetch count}.
* <p>
* It also affects how often acks are sent when using
* {@link org.springframework.amqp.core.AcknowledgeMode#AUTO} - one ack per BatchSize.
* <p>
* Finally, when {@link #setConsumerBatchEnabled(boolean)} is true, it determines how
* many records to include in the batch as long as sufficient messages arrive within
* {@link #setReceiveTimeout(long)}.
* <p>
* <b>IMPORTANT</b> The batch size represents the number of physical messages
* received. If {@link #setDeBatchingEnabled(boolean)} is true and a message is a
* batch created by a producer, the actual number of messages received by the listener
* will be larger than this batch size.
* <p>
*
* Default is 1.
* @param batchSize the batch size
* @since 2.2
*/
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

/**
* Set the txSize.
* @param txSize the txSize.
* @deprecated in favor of {@link #setBatchSize(int)}.
*/
@Deprecated
public void setTxSize(int txSize) {
this.txSize = txSize;
setBatchSize(txSize);
}

/**
* Set to true to present a list of messages based on the {@link #setBatchSize(int)},
* if the container and listener support it.
* @param consumerBatchEnabled true to create message batches in the container.
* @since 2.2
* @see #setBatchSize(int)
*/
public void setConsumerBatchEnabled(boolean consumerBatchEnabled) {
this.consumerBatchEnabled = consumerBatchEnabled;
}

public void setDeclarationRetries(int declarationRetries) {
@@ -380,8 +427,9 @@ public void setRetryDeclarationInterval(long retryDeclarationInterval) {

@Override
public Class<?> getObjectType() {
return this.listenerContainer == null ? AbstractMessageListenerContainer.class : this.listenerContainer
.getClass();
return this.listenerContainer == null
? AbstractMessageListenerContainer.class
: this.listenerContainer.getClass();
}

@SuppressWarnings("deprecation")
@@ -446,7 +494,8 @@ private AbstractMessageListenerContainer createContainer() {
.acceptIfNotNull(this.consecutiveActiveTrigger, container::setConsecutiveActiveTrigger)
.acceptIfNotNull(this.consecutiveIdleTrigger, container::setConsecutiveIdleTrigger)
.acceptIfNotNull(this.receiveTimeout, container::setReceiveTimeout)
.acceptIfNotNull(this.txSize, container::setTxSize)
.acceptIfNotNull(this.batchSize, container::setBatchSize)
.acceptIfNotNull(this.consumerBatchEnabled, container::setConsumerBatchEnabled)
.acceptIfNotNull(this.declarationRetries, container::setDeclarationRetries)
.acceptIfNotNull(this.retryDeclarationInterval, container::setRetryDeclarationInterval);
return container;
@@ -140,7 +140,7 @@ protected void initializeContainer(SimpleMessageListenerContainer instance, Rabb
super.initializeContainer(instance, endpoint);

JavaUtils javaUtils = JavaUtils.INSTANCE
.acceptIfNotNull(this.txSize, instance::setTxSize);
.acceptIfNotNull(this.txSize, instance::setBatchSize);
String concurrency = null;
if (endpoint != null) {
concurrency = endpoint.getConcurrency();

0 comments on commit 3aed80f

Please sign in to comment.
You can’t perform that action at this time.