Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.concurrent.Executor;

import org.aopalliance.aop.Advice;
import org.jspecify.annotations.Nullable;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessagePostProcessor;
Expand Down Expand Up @@ -56,7 +55,7 @@ public AbstractMessageListenerContainerSpec(C listenerContainer) {
}

@Override
public S id(@Nullable String id) { // NOSONAR - not useless, increases visibility
public S id(String id) {
return super.id(id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.springframework.integration.amqp.dsl;

import org.jspecify.annotations.Nullable;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
Expand Down Expand Up @@ -274,7 +272,7 @@ public static AmqpAsyncOutboundGatewaySpec asyncOutboundGateway(AsyncRabbitTempl
public static AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> pollableChannel(
ConnectionFactory connectionFactory) {

return pollableChannel(null, connectionFactory);
return new AmqpPollableMessageChannelSpec<>(connectionFactory);
}

/**
Expand All @@ -283,7 +281,7 @@ public static AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> pollableCha
* @param connectionFactory the connectionFactory.
* @return the AmqpPollableMessageChannelSpec.
*/
public static AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> pollableChannel(@Nullable String id,
public static AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> pollableChannel(String id,
ConnectionFactory connectionFactory) {

AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> spec =
Expand All @@ -297,7 +295,7 @@ public static AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> pollableCha
* @return the AmqpMessageChannelSpec.
*/
public static AmqpMessageChannelSpec<?, ?> channel(ConnectionFactory connectionFactory) {
return channel(null, connectionFactory);
return new AmqpMessageChannelSpec<>(connectionFactory);
}

/**
Expand All @@ -306,7 +304,7 @@ public static AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> pollableCha
* @param connectionFactory the connectionFactory.
* @return the AmqpMessageChannelSpec.
*/
public static AmqpMessageChannelSpec<?, ?> channel(@Nullable String id, ConnectionFactory connectionFactory) {
public static AmqpMessageChannelSpec<?, ?> channel(String id, ConnectionFactory connectionFactory) {
return new AmqpMessageChannelSpec<>(connectionFactory)
.id(id);
}
Expand All @@ -317,7 +315,7 @@ public static AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> pollableCha
* @return the AmqpPublishSubscribeMessageChannelSpec.
*/
public static AmqpPublishSubscribeMessageChannelSpec publishSubscribeChannel(ConnectionFactory connectionFactory) {
return publishSubscribeChannel(null, connectionFactory);
return new AmqpPublishSubscribeMessageChannelSpec(connectionFactory);
}

/**
Expand All @@ -326,7 +324,7 @@ public static AmqpPublishSubscribeMessageChannelSpec publishSubscribeChannel(Con
* @param connectionFactory the connectionFactory.
* @return the AmqpPublishSubscribeMessageChannelSpec.
*/
public static AmqpPublishSubscribeMessageChannelSpec publishSubscribeChannel(@Nullable String id,
public static AmqpPublishSubscribeMessageChannelSpec publishSubscribeChannel(String id,
ConnectionFactory connectionFactory) {

return new AmqpPublishSubscribeMessageChannelSpec(connectionFactory).id(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.springframework.integration.amqp.dsl;

import org.jspecify.annotations.Nullable;

import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
Expand All @@ -29,7 +27,6 @@
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.dsl.MessageChannelSpec;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
* A {@link MessageChannelSpec} for a {@link AbstractAmqpChannel}s.
Expand Down Expand Up @@ -66,10 +63,8 @@ protected AmqpPollableMessageChannelSpec(ConnectionFactory connectionFactory) {
}

@Override
protected S id(@Nullable String id) {
if (StringUtils.hasText(id)) {
this.amqpChannelFactoryBean.setBeanName(id);
}
protected S id(String id) {
this.amqpChannelFactoryBean.setBeanName(id);
return super.id(id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
*/
public class AggregatorSpec extends CorrelationHandlerSpec<AggregatorSpec, AggregatingMessageHandler> {

private Function<MessageGroup, Map<String, Object>> headersFunction;
private @Nullable Function<MessageGroup, Map<String, Object>> headersFunction;

protected AggregatorSpec() {
super(new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor()));
Expand Down Expand Up @@ -123,7 +123,7 @@ public AggregatorSpec headersFunction(Function<MessageGroup, Map<String, Object>
}

@Override
public Map<Object, String> getComponentsToRegister() {
public Map<Object, @Nullable String> getComponentsToRegister() {
if (this.headersFunction != null) {
MessageGroupProcessor outputProcessor = this.handler.getOutputProcessor();
if (outputProcessor instanceof AbstractAggregatingMessageGroupProcessor abstractAggregatingMessageGroupProcessor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,13 @@ public class BarrierSpec extends ConsumerEndpointSpec<BarrierSpec, BarrierMessag
private CorrelationStrategy correlationStrategy =
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID);

@Nullable
private MessageChannel discardChannel;
private @Nullable MessageChannel discardChannel;

@Nullable
private String discardChannelName;
private @Nullable String discardChannelName;

@Nullable
private Long triggerTimeout;
private @Nullable Long triggerTimeout;

protected BarrierSpec(long timeout) {
super(null);
this.timeout = timeout;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import reactor.core.publisher.Flux;
import reactor.util.function.Tuple2;

import org.springframework.aop.framework.Advised;
import org.springframework.aop.framework.AopInfrastructureBean;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
Expand Down Expand Up @@ -130,15 +129,15 @@ public abstract class BaseIntegrationFlowDefinition<B extends BaseIntegrationFlo

protected static final SpelExpressionParser PARSER = new SpelExpressionParser(); //NOSONAR - final

protected final Map<Object, String> integrationComponents = new LinkedHashMap<>(); //NOSONAR - final
protected final Map<Object, @Nullable String> integrationComponents = new LinkedHashMap<>(); //NOSONAR - final

private MessageChannel currentMessageChannel;
private @Nullable MessageChannel currentMessageChannel;

private Object currentComponent;
private @Nullable Object currentComponent;

private boolean implicitChannel;

private StandardIntegrationFlow integrationFlow;
private @Nullable StandardIntegrationFlow integrationFlow;

protected BaseIntegrationFlowDefinition() {
}
Expand All @@ -152,14 +151,14 @@ protected B addComponent(Object component, @Nullable String beanName) {
return _this();
}

protected B addComponents(Map<Object, String> components) {
protected B addComponents(Map<Object, @Nullable String> components) {
if (!CollectionUtils.isEmpty(components)) {
this.integrationComponents.putAll(components);
}
return _this();
}

protected Map<Object, String> getIntegrationComponents() {
protected Map<Object, @Nullable String> getIntegrationComponents() {
return this.integrationComponents;
}

Expand All @@ -168,8 +167,7 @@ protected B currentComponent(@Nullable Object component) {
return _this();
}

@Nullable
protected Object getCurrentComponent() {
protected @Nullable Object getCurrentComponent() {
return this.currentComponent;
}

Expand All @@ -178,8 +176,7 @@ protected B currentMessageChannel(@Nullable MessageChannel currentMessageChannel
return _this();
}

@Nullable
protected MessageChannel getCurrentMessageChannel() {
protected @Nullable MessageChannel getCurrentMessageChannel() {
return this.currentMessageChannel;
}

Expand Down Expand Up @@ -1591,7 +1588,7 @@ public B resequence(@Nullable Consumer<ResequencerSpec> resequencer) {
* @return the current {@link BaseIntegrationFlowDefinition}.
*/
public B aggregate() {
return aggregate((Consumer<AggregatorSpec>) null);
return aggregate(null);
}

/**
Expand Down Expand Up @@ -1829,8 +1826,8 @@ protected <R extends AbstractMessageRouter, S extends AbstractRouterSpec<? super
BridgeHandler bridgeHandler = new BridgeHandler();
boolean registerSubflowBridge = false;

Map<Object, String> componentsToRegister = null;
Map<Object, String> routerComponents = routerSpec.getComponentsToRegister();
Map<Object, @Nullable String> componentsToRegister = null;
Map<Object, @Nullable String> routerComponents = routerSpec.getComponentsToRegister();
if (!CollectionUtils.isEmpty(routerComponents)) {
componentsToRegister = new LinkedHashMap<>(routerComponents);
routerComponents.clear();
Expand Down Expand Up @@ -2571,7 +2568,7 @@ protected <T> Publisher<Message<T>> toReactivePublisher() {
protected <T> Publisher<Message<T>> toReactivePublisher(boolean autoStartOnSubscribe) {
MessageChannel channelForPublisher = getCurrentMessageChannel();
Publisher<Message<T>> publisher;
Map<Object, String> components = getIntegrationComponents();
Map<Object, @Nullable String> components = getIntegrationComponents();
if (channelForPublisher instanceof Publisher) {
publisher = (Publisher<Message<T>>) channelForPublisher;
}
Expand Down Expand Up @@ -2700,7 +2697,7 @@ protected StandardIntegrationFlow get() {
"EIP-method in the 'IntegrationFlow' definition.");
}

Map<Object, String> components = getIntegrationComponents();
Map<Object, @Nullable String> components = getIntegrationComponents();
if (components.size() == 1) {
Object currComponent = getCurrentComponent();
if (currComponent != null) {
Expand Down Expand Up @@ -2736,17 +2733,8 @@ protected void checkReuse(MessageProducer replyHandler) {
REFERENCED_REPLY_PRODUCERS.add(replyHandler);
}

@Nullable
protected static Object extractProxyTarget(@Nullable Object target) {
if (!(target instanceof Advised advised)) {
return target;
}
try {
return extractProxyTarget(advised.getTargetSource().getTarget());
}
catch (Exception e) {
throw new BeanCreationException("Could not extract target", e);
}
protected static @Nullable Object extractProxyTarget(@Nullable Object target) {
return IntegrationFlow.extractProxyTarget(target);
}

public static final class ReplyProducerCleaner implements DestructionAwareBeanPostProcessor, AopInfrastructureBean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.LinkedHashMap;
import java.util.Map;

import org.jspecify.annotations.Nullable;

import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
Expand All @@ -36,7 +38,7 @@ public class BroadcastPublishSubscribeSpec
extends IntegrationComponentSpec<BroadcastPublishSubscribeSpec, BroadcastCapableChannel>
implements ComponentsRegistration {

private final Map<Object, String> subscriberFlows = new LinkedHashMap<>();
private final Map<Object, @Nullable String> subscriberFlows = new LinkedHashMap<>();

private int order;

Expand Down Expand Up @@ -74,7 +76,7 @@ public BroadcastPublishSubscribeSpec subscribe(IntegrationFlow subFlow) {
}

@Override
public Map<Object, String> getComponentsToRegister() {
public Map<Object, @Nullable String> getComponentsToRegister() {
return this.subscriberFlows;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@ public abstract class ConsumerEndpointSpec<S extends ConsumerEndpointSpec<S, H>,
@Nullable
private Boolean async;

@Nullable
private String[] notPropagatedHeaders;
private String @Nullable [] notPropagatedHeaders;

protected ConsumerEndpointSpec() {
super(new ConsumerEndpointFactoryBean());
}

protected ConsumerEndpointSpec(@Nullable H messageHandler) {
protected ConsumerEndpointSpec(H messageHandler) {
super(messageHandler, new ConsumerEndpointFactoryBean());
}

Expand Down Expand Up @@ -373,6 +376,7 @@ protected Tuple2<ConsumerEndpointFactoryBean, H> doGet() {
.acceptIfNotEmpty(this.adviceChain, producingMessageHandler::setAdviceChain);
}

Assert.state(this.handler != null, "'this.handler' must not be null.");
this.endpointFactoryBean.setHandler(this.handler);
return super.doGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,25 @@ public abstract class EndpointSpec<S extends EndpointSpec<S, F, H>, F extends Be
extends IntegrationComponentSpec<S, Tuple2<F, H>>
implements ComponentsRegistration {

protected final Map<Object, String> componentsToRegister = new LinkedHashMap<>(); // NOSONAR final
protected final Map<Object, @Nullable String> componentsToRegister = new LinkedHashMap<>(); // NOSONAR final

protected final F endpointFactoryBean; // NOSONAR final

protected H handler; // NOSONAR
@SuppressWarnings("NullAway.Init")
protected H handler;

protected EndpointSpec(@Nullable H handler, F endpointFactoryBean) {
protected EndpointSpec(F endpointFactoryBean) {
this.endpointFactoryBean = endpointFactoryBean;
}

protected EndpointSpec(H handler, F endpointFactoryBean) {
this.endpointFactoryBean = endpointFactoryBean;
this.handler = handler;
}

@Override
public S id(@Nullable String id) {
if (id != null) {
this.endpointFactoryBean.setBeanName(id);
}
public S id(String id) {
this.endpointFactoryBean.setBeanName(id);
return super.id(id);
}

Expand All @@ -85,7 +88,7 @@ public S poller(Function<PollerFactory, PollerSpec> pollers) {
* @see PollerSpec
*/
public S poller(PollerSpec pollerMetadataSpec) {
Map<Object, String> components = pollerMetadataSpec.getComponentsToRegister();
Map<Object, @Nullable String> components = pollerMetadataSpec.getComponentsToRegister();
this.componentsToRegister.putAll(components);
return poller(pollerMetadataSpec.getObject());
}
Expand Down Expand Up @@ -122,17 +125,14 @@ public S poller(PollerSpec pollerMetadataSpec) {
public abstract S role(String role);

@Override
public Map<Object, String> getComponentsToRegister() {
public Map<Object, @Nullable String> getComponentsToRegister() {
return this.componentsToRegister;
}

@Override
protected Tuple2<F, H> doGet() {
return Tuples.of(this.endpointFactoryBean, this.handler);
}

protected void assertHandler() {
Assert.state(this.handler != null, "'this.handler' must not be null.");
return Tuples.of(this.endpointFactoryBean, this.handler);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected FilterEndpointSpec(MessageFilter messageFilter) {
/**
* The default value is <code>false</code> meaning that rejected
* Messages will be quietly dropped or sent to the discard channel if
* available. Typically this value would not be <code>true</code> when
* available. Typically, this value would not be <code>true</code> when
* a discard channel is provided, but if so, it will still apply
* (in such a case, the Message will be sent to the discard channel,
* and <em>then</em> the exception will be thrown).
Expand Down
Loading