Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
* @author Gary Russell
* @author Christian Tzolov
* @author Ngoc Nhan
* @author Artem Bilan
*
* @since 1.2
*
*/
Expand Down Expand Up @@ -104,16 +106,20 @@ public void setIgnoreDeclarationExceptions(boolean ignoreDeclarationExceptions)
@Override
@SuppressWarnings("NullAway") // Dataflow analysis limitation
public void setAdminsThatShouldDeclare(@Nullable Object @Nullable ... adminArgs) {
Collection<Object> admins = new ArrayList<>();
if (adminArgs != null) {
if (adminArgs.length > 1) {
Assert.noNullElements(adminArgs, "'admins' cannot contain null elements");
}
if (adminArgs.length > 0 && !(adminArgs.length == 1 && adminArgs[0] == null)) {
admins = Arrays.asList(adminArgs);
this.declaringAdmins = Arrays.asList(adminArgs);
}
else {
this.declaringAdmins = Collections.emptyList();
}
}
this.declaringAdmins = admins;
else {
this.declaringAdmins = Collections.emptyList();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,27 @@
*
* @author Gary Russell
* @author Björn Michael
* @author Artem Bilan
*
* @since 2.1
*/
public class Declarables {

private final Collection<Declarable> declarables = new ArrayList<>();
private final Collection<Declarable> declarables;

public Declarables(Declarable... declarables) {
if (!ObjectUtils.isEmpty(declarables)) {
this.declarables = new ArrayList<>(declarables.length);
this.declarables.addAll(Arrays.asList(declarables));
}
else {
this.declarables = new ArrayList<>();
}
}

public Declarables(Collection<? extends Declarable> declarables) {
Assert.notNull(declarables, "declarables cannot be null");
this.declarables = new ArrayList<>(declarables.size());
this.declarables.addAll(declarables);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@
public final class MessagePostProcessorUtils {

public static Collection<MessagePostProcessor> sort(Collection<MessagePostProcessor> processors) {
List<MessagePostProcessor> priorityOrdered = new ArrayList<>();
List<MessagePostProcessor> ordered = new ArrayList<>();
List<MessagePostProcessor> unOrdered = new ArrayList<>();
int potentialSize = processors.size();
List<MessagePostProcessor> priorityOrdered = new ArrayList<>(potentialSize);
List<MessagePostProcessor> ordered = new ArrayList<>(potentialSize);
List<MessagePostProcessor> unOrdered = new ArrayList<>(potentialSize);
for (MessagePostProcessor processor : processors) {
if (processor instanceof PriorityOrdered) {
priorityOrdered.add(processor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ private static Collection<Declarable> declarables(String name, int partitions,
BiFunction<String, Integer, List<String>> routingKeyStrategy,
Map<String, Object> arguments) {

List<Declarable> declarables = new ArrayList<>();
List<String> rks = routingKeyStrategy.apply(name, partitions);
Assert.state(rks.size() == partitions, () -> "Expected " + partitions + " routing keys, not " + rks.size());
List<Declarable> declarables = new ArrayList<>(partitions + 1);
declarables.add(
new DirectExchange(name, true, false, Map.<String, @Nullable Object>of("x-super-stream", true)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ else if (source instanceof Method method) {
private void processMultiMethodListeners(RabbitListener[] classLevelListeners, Method[] multiMethods,
Object bean, String beanName) {

List<Method> checkedMethods = new ArrayList<>();
List<Method> checkedMethods = new ArrayList<>(multiMethods.length);
Method defaultMethod = null;
for (Method method : multiMethods) {
Method checked = checkProxy(method, bean);
Expand Down Expand Up @@ -646,8 +646,8 @@ private List<Object> resolveQueues(RabbitListener rabbitListener, Collection<Dec
String[] queues = rabbitListener.queues();
QueueBinding[] bindings = rabbitListener.bindings();
org.springframework.amqp.rabbit.annotation.Queue[] queuesToDeclare = rabbitListener.queuesToDeclare();
List<String> queueNames = new ArrayList<>();
List<Queue> queueBeans = new ArrayList<>();
List<String> queueNames = new ArrayList<>(queues.length);
List<Queue> queueBeans = new ArrayList<>(queues.length);
for (String queue : queues) {
resolveQueues(queue, queueNames, queueBeans);
}
Expand Down Expand Up @@ -818,7 +818,7 @@ private void registerBindings(QueueBinding binding, String queueName, String exc
}
else {
final int length = binding.key().length;
routingKeys = new ArrayList<>();
routingKeys = new ArrayList<>(length);
for (int i = 0; i < length; ++i) {
resolveAsStringOrQueue(resolveExpression(binding.key()[i]), routingKeys, null, "@QueueBinding.key");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@
/**
* A simple batching strategy that supports only one exchange/routingKey; includes a batch
* size, a batched message size limit and a timeout. The message properties from the first
* message in the batch is used in the batch message. Each message is preceded by a 4 byte
* message in the batch are used in the batch message. Each message is preceded by a 4-byte
* length field.
*
* @author Gary Russell
* @author Ngoc Nhan
* @author Artem Bilan
*
* @since 1.4.1
*
*/
Expand All @@ -53,7 +55,7 @@ public class SimpleBatchingStrategy implements BatchingStrategy {

private final long timeout;

private final List<Message> messages = new ArrayList<>();
private final List<Message> messages;

private @Nullable String exchange;

Expand All @@ -68,6 +70,7 @@ public class SimpleBatchingStrategy implements BatchingStrategy {
* @param timeout the batch timeout.
*/
public SimpleBatchingStrategy(int batchSize, int bufferLimit, long timeout) {
this.messages = new ArrayList<>(batchSize);
this.batchSize = batchSize;
this.bufferLimit = bufferLimit;
this.timeout = timeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,21 @@
*
* @author Rene Felgentraeger
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.4.8
*/
public class CompositeContainerCustomizer<C extends MessageListenerContainer> implements ContainerCustomizer<C> {

private final List<ContainerCustomizer<C>> customizers = new ArrayList<>();
private final List<ContainerCustomizer<C>> customizers;

/**
* Create an instance with the provided delegate customizers.
* @param customizers the customizers.
*/
public CompositeContainerCustomizer(List<ContainerCustomizer<C>> customizers) {
Assert.notNull(customizers, "At least one customizer must be present");
this.customizers.addAll(customizers);
this.customizers = new ArrayList<>(customizers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ public Collection<PendingConfirm> expire(Listener listener, long cutoffTime) {
return Collections.emptyList();
}

List<PendingConfirm> expired = new ArrayList<>();
List<PendingConfirm> expired = new ArrayList<>(pendingConfirmsForListener.size());
Iterator<Entry<Long, PendingConfirm>> iterator = pendingConfirmsForListener.entrySet().iterator();
while (iterator.hasNext()) {
PendingConfirm pendingConfirm = iterator.next().getValue();
Expand All @@ -926,7 +926,7 @@ public Collection<PendingConfirm> expire(Listener listener, long cutoffTime) {
iterator.remove();
CorrelationData correlationData = pendingConfirm.getCorrelationData();
if (correlationData != null && StringUtils.hasText(correlationData.getId())) {
this.pendingReturns.remove(correlationData.getId()); // NOSONAR never null
this.pendingReturns.remove(correlationData.getId());
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ public void setBeforePublishPostProcessors(MessagePostProcessor... beforePublish
public void addBeforePublishPostProcessors(MessagePostProcessor... beforePublishPostProcessors) {
Assert.notNull(beforePublishPostProcessors, "'beforePublishPostProcessors' cannot be null");
if (this.beforePublishPostProcessors == null) {
this.beforePublishPostProcessors = new ArrayList<>();
this.beforePublishPostProcessors = new ArrayList<>(beforePublishPostProcessors.length);
}
this.beforePublishPostProcessors.addAll(Arrays.asList(beforePublishPostProcessors));
this.beforePublishPostProcessors = MessagePostProcessorUtils.sort(this.beforePublishPostProcessors);
Expand Down Expand Up @@ -740,7 +740,7 @@ public void setAfterReceivePostProcessors(MessagePostProcessor... afterReceivePo
public void addAfterReceivePostProcessors(MessagePostProcessor... afterReceivePostProcessors) {
Assert.notNull(afterReceivePostProcessors, "'afterReceivePostProcessors' cannot be null");
if (this.afterReceivePostProcessors == null) {
this.afterReceivePostProcessors = new ArrayList<>();
this.afterReceivePostProcessors = new ArrayList<>(afterReceivePostProcessors.length);
}
this.afterReceivePostProcessors.addAll(Arrays.asList(afterReceivePostProcessors));
this.afterReceivePostProcessors = MessagePostProcessorUtils.sort(this.afterReceivePostProcessors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void setValidator(Validator validator) {

@Override
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) {
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<>();
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<>(this.methods.size());
InvocableHandlerMethod defaultHandler = null;
MessageHandlerMethodFactory messageHandlerMethodFactory = getMessageHandlerMethodFactory();
Assert.state(messageHandlerMethodFactory != null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void onMessageBatch(List<org.springframework.amqp.core.Message> messages,
converted = new GenericMessage<>(messages);
}
else {
List<Message<?>> messagingMessages = new ArrayList<>();
List<Message<?>> messagingMessages = new ArrayList<>(messages.size());
for (org.springframework.amqp.core.Message message : messages) {
try {
Message<?> messagingMessage = toMessagingMessage(message);
Expand All @@ -88,7 +88,7 @@ public void onMessageBatch(List<org.springframework.amqp.core.Message> messages,
converted = new GenericMessage<>(messagingMessages);
}
else {
List<Object> payloads = new ArrayList<>();
List<Object> payloads = new ArrayList<>(messagingMessages.size());
for (Message<?> message : messagingMessages) {
payloads.add(message.getPayload());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.rabbitmq.client.Channel;
Expand Down Expand Up @@ -116,7 +117,7 @@
* }
* </pre>
*
* For further examples and discussion please do refer to the Spring reference documentation which describes this class
* For further examples and discussion, please do refer to the Spring reference documentation which describes this class
* (and its attendant XML configuration) in detail.
*
* @author Juergen Hoeller
Expand Down Expand Up @@ -189,7 +190,7 @@ public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {
}

/**
* Set a target object to delegate message listening to. Specified listener methods have to be present on this
* Set a target object to delegate a message listening to. Specified listener methods have to be present on this
* target object.
* <p> If no explicit delegate object has been specified, listener methods are expected to present on this adapter
* instance, that is, on a custom subclass of this adapter, defining listener methods.
Expand All @@ -205,7 +206,7 @@ private void doSetDelegate(Object delegate) {
}

/**
* @return The target object to delegate message listening to.
* @return The target object to delegate a message listening to.
*/
protected Object getDelegate() {
return this.delegate;
Expand All @@ -229,8 +230,8 @@ protected String getDefaultListenerMethod() {
}

/**
* Set the mapping of queue name or consumer tag to method name. The first lookup
* is by queue name, if that returns null, we lookup by consumer tag, if that
* Set the mapping of the queue name or consumer tag to the method name. The first lookup
* is by queue name, if that returns null, we look up by consumer tag, if that
* returns null, the {@link #setDefaultListenerMethod(String) defaultListenerMethod}
* is used.
* @param queueOrTagToMethodName the map.
Expand All @@ -242,7 +243,7 @@ public void setQueueOrTagToMethodName(Map<String, String> queueOrTagToMethodName

/**
* Add the mapping of a queue name or consumer tag to a method name. The first lookup
* is by queue name, if that returns null, we lookup by consumer tag, if that
* is by queue name, if that returns null, we look up by consumer tag, if that
* returns null, the {@link #setDefaultListenerMethod(String) defaultListenerMethod}
* is used.
* @param queueOrTag The queue name or consumer tag.
Expand All @@ -266,14 +267,14 @@ public String removeQueueOrTagToMethodName(String queueOrTag) {
/**
* Spring {@link ChannelAwareMessageListener} entry point.
* <p>
* Delegates the message to the target listener method, with appropriate conversion of the message argument. If the
* target method returns a non-null object, wrap in a Rabbit message and send it back.
* Delegates the message to the target listener method, with the appropriate conversion of the message argument.
* If the target method returns a non-null object, wrap in a Rabbit message and send it back.
* @param message the incoming Rabbit message
* @param channel the Rabbit channel to operate on
* @throws Exception if thrown by Rabbit API methods
*/
@Override
public void onMessage(Message message, @Nullable Channel channel) throws Exception { // NOSONAR
public void onMessage(Message message, @Nullable Channel channel) throws Exception {
// Check whether the delegate is a MessageListener impl itself.
// In that case, the adapter will simply act as a pass-through.
Object delegateListener = getDelegate();
Expand Down Expand Up @@ -335,12 +336,12 @@ protected String getListenerMethodName(Message originalMessage, Object extracted
* Build an array of arguments to be passed into the target listener method. Allows for multiple method arguments to
* be built from a single message object.
* <p>
* The default implementation builds an array with the given message object as sole element. This means that the
* The default implementation builds an array with the given message object as a sole element. This means that the
* extracted message will always be passed into a <i>single</i> method argument, even if it is an array, with the
* target method having a corresponding single argument of the array's type declared.
* <p>
* This can be overridden to treat special message content such as arrays differently, for example passing in each
* element of the message array as distinct method argument.
* This can be overridden to treat special message content such as arrays differently, for example, passing in each
* element of the message array as a distinct method argument.
* @param extractedMessage the content of the message
* @param channel the Rabbit channel to operate on
* @param message the incoming Rabbit message
Expand Down Expand Up @@ -376,16 +377,17 @@ protected Object[] buildListenerArguments(Object extractedMessage, @Nullable Cha
catch (InvocationTargetException ex) {
Throwable targetEx = ex.getTargetException();
if (targetEx instanceof IOException iox) {
throw new AmqpIOException(iox); // NOSONAR lost stack trace
throw new AmqpIOException(iox);
}
else {
throw new ListenerExecutionFailedException("Listener method '" // NOSONAR lost stack trace
throw new ListenerExecutionFailedException("Listener method '"
+ methodName + "' threw exception", targetEx, originalMessage);
}
}
catch (Exception ex) {
ArrayList<String> arrayClass = new ArrayList<>();
List<String> arrayClass = null;
if (arguments != null) {
arrayClass = new ArrayList<>(arguments.length);
for (Object argument : arguments) {
arrayClass.add(argument != null ? argument.getClass().toString() : " null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.springframework.amqp.rabbit.support;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -27,7 +26,6 @@
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;


/**
* Exception to be thrown when the execution of a listener method failed.
*
Expand All @@ -40,7 +38,7 @@
@SuppressWarnings("serial")
public class ListenerExecutionFailedException extends AmqpException {

private final List<Message> failedMessages = new ArrayList<>();
private final List<Message> failedMessages;

/**
* Constructor for ListenerExecutionFailedException.
Expand All @@ -50,7 +48,7 @@ public class ListenerExecutionFailedException extends AmqpException {
*/
public ListenerExecutionFailedException(String msg, Throwable cause, Message... failedMessage) {
super(msg, cause);
this.failedMessages.addAll(Arrays.asList(failedMessage));
this.failedMessages = Arrays.asList(failedMessage);
}

public @Nullable Message getFailedMessage() {
Expand Down
Loading