Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ public void onMessageBatch(List<Message> messages, @Nullable Channel channel) {
else {
converted = convertPayloads(messages, channel);
if (BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS.equals(AmqpInboundChannelAdapter.this.batchMode)) {
List<Map<String, @Nullable Object>> listHeaders = new ArrayList<>();
List<Map<String, @Nullable Object>> listHeaders = new ArrayList<>(messages.size());
messages.forEach(msg -> listHeaders.add(AmqpInboundChannelAdapter.this.headerMapper
.toHeadersFromRequest(msg.getMessageProperties())));
headers = listHeaders;
Expand Down Expand Up @@ -553,7 +553,7 @@ public void onMessageBatch(List<Message> messages, @Nullable Channel channel) {
private @Nullable List<org.springframework.messaging.Message<?>> convertMessages(List<Message> messages,
@Nullable Channel channel) {

List<org.springframework.messaging.Message<?>> converted = new ArrayList<>();
List<org.springframework.messaging.Message<?>> converted = new ArrayList<>(messages.size());
try {
messages.forEach(message -> converted.add(createMessageFromAmqp(message, channel)));
return converted;
Expand All @@ -574,7 +574,7 @@ public void onMessageBatch(List<Message> messages, @Nullable Channel channel) {
}

private @Nullable List<?> convertPayloads(List<Message> messages, @Nullable Channel channel) {
List<Object> converted = new ArrayList<>();
List<Object> converted = new ArrayList<>(messages.size());
try {
messages.forEach(message -> converted.add(this.converter.fromMessage(message)));
return converted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.springframework.integration.amqp.support;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -62,36 +61,33 @@
*/
public class DefaultAmqpHeaderMapper extends AbstractHeaderMapper<MessageProperties> implements AmqpHeaderMapper {

private static final List<String> STANDARD_HEADER_NAMES = new ArrayList<>();

static {
STANDARD_HEADER_NAMES.add(AmqpHeaders.APP_ID);
STANDARD_HEADER_NAMES.add(AmqpHeaders.CLUSTER_ID);
STANDARD_HEADER_NAMES.add(AmqpHeaders.CONTENT_ENCODING);
STANDARD_HEADER_NAMES.add(AmqpHeaders.CONTENT_LENGTH);
STANDARD_HEADER_NAMES.add(AmqpHeaders.CONTENT_TYPE);
STANDARD_HEADER_NAMES.add(AmqpHeaders.CORRELATION_ID);
STANDARD_HEADER_NAMES.add(AmqpHeaders.DELAY);
STANDARD_HEADER_NAMES.add(AmqpHeaders.DELIVERY_MODE);
STANDARD_HEADER_NAMES.add(AmqpHeaders.DELIVERY_TAG);
STANDARD_HEADER_NAMES.add(AmqpHeaders.EXPIRATION);
STANDARD_HEADER_NAMES.add(AmqpHeaders.MESSAGE_COUNT);
STANDARD_HEADER_NAMES.add(AmqpHeaders.MESSAGE_ID);
STANDARD_HEADER_NAMES.add(AmqpHeaders.RECEIVED_DELAY);
STANDARD_HEADER_NAMES.add(AmqpHeaders.RECEIVED_DELIVERY_MODE);
STANDARD_HEADER_NAMES.add(AmqpHeaders.RECEIVED_EXCHANGE);
STANDARD_HEADER_NAMES.add(AmqpHeaders.RECEIVED_ROUTING_KEY);
STANDARD_HEADER_NAMES.add(AmqpHeaders.REDELIVERED);
STANDARD_HEADER_NAMES.add(AmqpHeaders.REPLY_TO);
STANDARD_HEADER_NAMES.add(AmqpHeaders.TIMESTAMP);
STANDARD_HEADER_NAMES.add(AmqpHeaders.TYPE);
STANDARD_HEADER_NAMES.add(AmqpHeaders.USER_ID);
STANDARD_HEADER_NAMES.add(JsonHeaders.TYPE_ID);
STANDARD_HEADER_NAMES.add(JsonHeaders.CONTENT_TYPE_ID);
STANDARD_HEADER_NAMES.add(JsonHeaders.KEY_TYPE_ID);
STANDARD_HEADER_NAMES.add(AmqpHeaders.SPRING_REPLY_CORRELATION);
STANDARD_HEADER_NAMES.add(AmqpHeaders.SPRING_REPLY_TO_STACK);
}
private static final List<String> STANDARD_HEADER_NAMES =
List.of(AmqpHeaders.APP_ID,
AmqpHeaders.CLUSTER_ID,
AmqpHeaders.CONTENT_ENCODING,
AmqpHeaders.CONTENT_LENGTH,
AmqpHeaders.CONTENT_TYPE,
AmqpHeaders.CORRELATION_ID,
AmqpHeaders.DELAY,
AmqpHeaders.DELIVERY_MODE,
AmqpHeaders.DELIVERY_TAG,
AmqpHeaders.EXPIRATION,
AmqpHeaders.MESSAGE_COUNT,
AmqpHeaders.MESSAGE_ID,
AmqpHeaders.RECEIVED_DELAY,
AmqpHeaders.RECEIVED_DELIVERY_MODE,
AmqpHeaders.RECEIVED_EXCHANGE,
AmqpHeaders.RECEIVED_ROUTING_KEY,
AmqpHeaders.REDELIVERED,
AmqpHeaders.REPLY_TO,
AmqpHeaders.TIMESTAMP,
AmqpHeaders.TYPE,
AmqpHeaders.USER_ID,
JsonHeaders.TYPE_ID,
JsonHeaders.CONTENT_TYPE_ID,
JsonHeaders.KEY_TYPE_ID,
AmqpHeaders.SPRING_REPLY_CORRELATION,
AmqpHeaders.SPRING_REPLY_TO_STACK);

@SuppressWarnings("this-escape")
protected DefaultAmqpHeaderMapper(String @Nullable [] requestHeaderNames, String @Nullable [] replyHeaderNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Object processMessageGroup(MessageGroup group) {
if (!messages.isEmpty()) {
List<Message<?>> sorted = new ArrayList<>(messages);
sorted.sort(this.comparator);
ArrayList<Message<?>> partialSequence = new ArrayList<>();
ArrayList<Message<?>> partialSequence = new ArrayList<>(messages.size());
int previousSequence = extractSequenceNumber(sorted.get(0));
int currentSequence = previousSequence;
for (Message<?> message : sorted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ public List<Message<?>> purge(@Nullable MessageSelector selector) {
if (selector == null) {
return this.clear();
}
List<Message<?>> purgedMessages = new ArrayList<>();
Object[] array = this.queue.toArray();
List<Message<?>> purgedMessages = new ArrayList<>(array.length);
for (Object o : array) {
Message<?> message = (Message<?>) o;
if (!selector.accept(message) && this.queue.remove(message)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@
* A {@link KryoRegistrar} that delegates and validates registrations across all components.
*
* @author David Turanski
* @author Artem Bilan
*
* @since 4.2
*/
public class CompositeKryoRegistrar extends AbstractKryoRegistrar {

private final List<KryoRegistrar> delegates;

public CompositeKryoRegistrar(List<KryoRegistrar> delegates) {
this.delegates = new ArrayList<KryoRegistrar>(delegates);
this.delegates = new ArrayList<>(delegates);

if (!CollectionUtils.isEmpty(this.delegates)) {
validateRegistrations();
Expand All @@ -52,18 +54,19 @@ public void registerTypes(Kryo kryo) {

@Override
public final List<Registration> getRegistrations() {
List<Registration> registrations = new ArrayList<Registration>();
List<Registration> registrations = new ArrayList<>();
for (KryoRegistrar registrar : this.delegates) {
registrations.addAll(registrar.getRegistrations());
}
return registrations;
}

private void validateRegistrations() {
List<Integer> ids = new ArrayList<Integer>();
List<Class<?>> types = new ArrayList<Class<?>>();
List<Registration> registrations = getRegistrations();
List<Integer> ids = new ArrayList<>(registrations.size());
List<Class<?>> types = new ArrayList<>(registrations.size());

for (Registration registration : getRegistrations()) {
for (Registration registration : registrations) {
Assert.isTrue(registration.getId() >= MIN_REGISTRATION_VALUE,
"registration ID must be >= " + MIN_REGISTRATION_VALUE);
if (ids.contains(registration.getId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,18 @@ public KryoClassListRegistrar(List<Class<?>> classes) {
* @param initialValue the initial value
*/
public void setInitialValue(int initialValue) {
Assert.isTrue(initialValue >= MIN_REGISTRATION_VALUE, "'initialValue' must be >= " + MIN_REGISTRATION_VALUE);
Assert.isTrue(initialValue >= MIN_REGISTRATION_VALUE,
() -> "'initialValue' must be >= " + MIN_REGISTRATION_VALUE);
this.initialValue = initialValue;
}

@Override
public List<Registration> getRegistrations() {
List<Registration> registrations = new ArrayList<>();
List<Registration> registrations = new ArrayList<>(this.registeredClasses.size());
if (!CollectionUtils.isEmpty(this.registeredClasses)) {
for (int i = 0; i < this.registeredClasses.size(); i++) {
registrations.add(new Registration(this.registeredClasses.get(i),
KRYO.getSerializer(this.registeredClasses.get(i)), i + this.initialValue));
Class<?> type = this.registeredClasses.get(i);
registrations.add(new Registration(type, KRYO.getSerializer(type), i + this.initialValue));
}
}
return registrations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,25 @@
* used to explicitly set the registration ID for each class.
*
* @author David Turanski
* @author Artem Bilan
*
* @since 4.2
*/
public class KryoClassMapRegistrar extends AbstractKryoRegistrar {

private final Map<Integer, Class<?>> registeredClasses;

public KryoClassMapRegistrar(Map<Integer, Class<?>> kryoRegisteredClasses) {
this.registeredClasses = new HashMap<Integer, Class<?>>(kryoRegisteredClasses);
this.registeredClasses = new HashMap<>(kryoRegisteredClasses);
}

@Override
public List<Registration> getRegistrations() {
List<Registration> registrations = new ArrayList<Registration>();
List<Registration> registrations = new ArrayList<>(this.registeredClasses.size());
if (!CollectionUtils.isEmpty(this.registeredClasses)) {
for (Map.Entry<Integer, Class<?>> entry : this.registeredClasses.entrySet()) {
registrations.add(
new Registration(entry.getValue(), KRYO.getSerializer(entry.getValue()), entry.getKey()));
Class<?> type = entry.getValue();
registrations.add(new Registration(type, KRYO.getSerializer(type), entry.getKey()));
}
}
return registrations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@
* A {@link KryoRegistrar} implementation backed by a List of {@link Registration}.
*
* @author David Turanski
* @author Artem Bilan
*
* @since 4.2
*/
public class KryoRegistrationRegistrar extends AbstractKryoRegistrar {

private final List<Registration> registrations;

public KryoRegistrationRegistrar(List<Registration> registrations) {
this.registrations = registrations != null
? new ArrayList<Registration>(registrations)
: new ArrayList<Registration>();
this.registrations = new ArrayList<>(registrations);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void addMatchingInterceptors(InterceptableChannel channel, String beanNam
LOGGER.debug("Applying global interceptors on channel '" + beanName + "'");
}

List<GlobalChannelInterceptorWrapper> tempInterceptors = new ArrayList<>();
List<GlobalChannelInterceptorWrapper> tempInterceptors = new ArrayList<>(this.positiveOrderInterceptors.size());

this.positiveOrderInterceptors
.forEach(interceptorWrapper ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,14 @@ public class IntegrationComponentScanRegistrar implements ImportBeanDefinitionRe

private static final String BEAN_NAME = IntegrationComponentScanRegistrar.class.getName();

private final List<TypeFilter> defaultFilters = new ArrayList<>();
private final List<TypeFilter> defaultFilters = List.of(new AnnotationTypeFilter(MessagingGateway.class, true));

@SuppressWarnings("NullAway.Init")
private ResourceLoader resourceLoader;

@SuppressWarnings("NullAway.Init")
private Environment environment;

public IntegrationComponentScanRegistrar() {
this.defaultFilters.add(new AnnotationTypeFilter(MessagingGateway.class, true));
}

@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
Expand Down Expand Up @@ -132,7 +128,7 @@ protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {

};

filter(registry, componentScan, scanner); // NOSONAR - never null
filter(registry, componentScan, scanner);

scanner.setResourceLoader(this.resourceLoader);
scanner.setEnvironment(this.environment);
Expand All @@ -151,7 +147,7 @@ protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
private void filter(BeanDefinitionRegistry registry, AnnotationAttributes componentScan,
ClassPathScanningCandidateComponentProvider scanner) {

if (componentScan.getBoolean("useDefaultFilters")) { // NOSONAR - never null
if (componentScan.getBoolean("useDefaultFilters")) {
for (TypeFilter typeFilter : this.defaultFilters) {
scanner.addIncludeFilter(typeFilter);
}
Expand Down Expand Up @@ -188,10 +184,11 @@ protected Collection<String> getBasePackages(AnnotationAttributes componentScan,
}

private List<TypeFilter> typeFiltersFor(AnnotationAttributes filter, BeanDefinitionRegistry registry) {
List<TypeFilter> typeFilters = new ArrayList<>();
Class<?>[] classes = filter.getClassArray("classes");
List<TypeFilter> typeFilters = new ArrayList<>(classes.length);
FilterType filterType = filter.getEnum("type");

for (Class<?> filterClass : filter.getClassArray("classes")) {
for (Class<?> filterClass : classes) {
switch (filterType) {
case ANNOTATION -> {
Assert.isAssignable(Annotation.class, filterClass,
Expand Down Expand Up @@ -230,23 +227,23 @@ private static void invokeAwareMethods(Object parserStrategyBean, Environment en
ResourceLoader resourceLoader, BeanDefinitionRegistry registry) {

if (parserStrategyBean instanceof Aware) {
if (parserStrategyBean instanceof BeanClassLoaderAware) {
if (parserStrategyBean instanceof BeanClassLoaderAware beanClassLoaderAware) {
ClassLoader classLoader =
registry instanceof ConfigurableBeanFactory
? ((ConfigurableBeanFactory) registry).getBeanClassLoader()
: resourceLoader.getClassLoader();
if (classLoader != null) {
((BeanClassLoaderAware) parserStrategyBean).setBeanClassLoader(classLoader);
beanClassLoaderAware.setBeanClassLoader(classLoader);
}
}
if (parserStrategyBean instanceof BeanFactoryAware && registry instanceof BeanFactory) {
((BeanFactoryAware) parserStrategyBean).setBeanFactory((BeanFactory) registry);
if (parserStrategyBean instanceof BeanFactoryAware beanFactoryAware && registry instanceof BeanFactory) {
beanFactoryAware.setBeanFactory((BeanFactory) registry);
}
if (parserStrategyBean instanceof EnvironmentAware) {
((EnvironmentAware) parserStrategyBean).setEnvironment(environment);
if (parserStrategyBean instanceof EnvironmentAware environmentAware) {
environmentAware.setEnvironment(environment);
}
if (parserStrategyBean instanceof ResourceLoaderAware) {
((ResourceLoaderAware) parserStrategyBean).setResourceLoader(resourceLoader);
if (parserStrategyBean instanceof ResourceLoaderAware resourceLoaderAware) {
resourceLoaderAware.setResourceLoader(resourceLoader);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class MessagingAnnotationBeanPostProcessor

private final Map<Class<? extends Annotation>, MethodAnnotationPostProcessor<?>> postProcessors;

private final List<Runnable> methodsToPostProcessAfterContextInitialization = new ArrayList<>();
private final List<Runnable> methodsToPostProcessAfterContextInitialization;

@SuppressWarnings("NullAway.Init")
private ConfigurableListableBeanFactory beanFactory;
Expand All @@ -74,6 +74,7 @@ public MessagingAnnotationBeanPostProcessor(
Map<Class<? extends Annotation>, MethodAnnotationPostProcessor<?>> postProcessors) {

this.postProcessors = postProcessors;
this.methodsToPostProcessAfterContextInitialization = new ArrayList<>(this.postProcessors.size());
}

@Override
Expand Down Expand Up @@ -217,7 +218,7 @@ public void afterSingletonsInstantiated() {
protected static List<MessagingMetaAnnotation> obtainMessagingAnnotations(
Set<Class<? extends Annotation>> postProcessors, MergedAnnotations annotations, String identified) {

List<MessagingMetaAnnotation> messagingAnnotations = new ArrayList<>();
List<MessagingMetaAnnotation> messagingAnnotations = new ArrayList<>(postProcessors.size());

for (Class<? extends Annotation> annotationType : postProcessors) {
annotations.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@
*/
public class PartitionedDispatcher extends AbstractDispatcher {

private final List<UnicastingDispatcher> partitions = new ArrayList<>();
private final List<UnicastingDispatcher> partitions;

private final List<ExecutorService> executors = new ArrayList<>();
private final List<ExecutorService> executors;

private final int partitionCount;

Expand All @@ -79,7 +79,7 @@ public class PartitionedDispatcher extends AbstractDispatcher {
private final Lock lock = new ReentrantLock();

/**
* Instantiate based on a provided number of partitions and function for partition key against
* Instantiate based on a provided number of partitions and function for a partition key against
* the message to dispatch.
* @param partitionCount the number of partitions in this channel.
* @param partitionKeyFunction the function to resolve a partition key against the message
Expand All @@ -90,6 +90,8 @@ public PartitionedDispatcher(int partitionCount, Function<Message<?>, Object> pa
Assert.notNull(partitionKeyFunction, "'partitionKeyFunction' must not be null");
this.partitionKeyFunction = partitionKeyFunction;
this.partitionCount = partitionCount;
this.partitions = new ArrayList<>(partitionCount);
this.executors = new ArrayList<>(partitionCount);
}

/**
Expand Down Expand Up @@ -143,7 +145,7 @@ public void setErrorHandler(ErrorHandler errorHandler) {

/**
* Set a {@link MessageHandlingTaskDecorator} to wrap a message handling task into some
* addition logic, e.g. message channel may provide an interception for its operations.
* addition logic, e.g., a message channel may provide an interception for its operations.
* @param messageHandlingTaskDecorator the {@link MessageHandlingTaskDecorator} to use.
*/
public void setMessageHandlingTaskDecorator(MessageHandlingTaskDecorator messageHandlingTaskDecorator) {
Expand All @@ -153,7 +155,7 @@ public void setMessageHandlingTaskDecorator(MessageHandlingTaskDecorator message

/**
* Shutdown this dispatcher on application close.
* The partition executors are shutdown and internal state of this instance is cleared.
* The partition executors are shutdown and the internal state of this instance is cleared.
*/
public void shutdown() {
this.executors.forEach(ExecutorService::shutdown);
Expand Down
Loading