Skip to content

Commit

Permalink
Some docs clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
artembilan committed Nov 15, 2021
1 parent db61102 commit 6f3fdc7
Show file tree
Hide file tree
Showing 23 changed files with 64 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.springframework.util.Assert;

/**
* The base {@link AbstractMessageChannel} implementation for AMQP.
*
* @author Mark Fisher
* @author Artem Bilan
* @author Gary Russell
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.springframework.util.Assert;

/**
* The base {@link AbstractAmqpChannel} extension for a {@link SubscribableChannel} contract.
*
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
import org.springframework.integration.dispatcher.UnicastingDispatcher;

/**
* The {@link AbstractSubscribableAmqpChannel} implementation for one-to-one subscription
* over AMQP queue.
* <p>
* If queue name is not provided, the channel bean name is used internally to declare
* a queue via provided {@link AmqpAdmin} (if any).
*
* @author Mark Fisher
* @author Artem Bilan
*
Expand All @@ -45,6 +51,7 @@ public class PointToPointSubscribableAmqpChannel extends AbstractSubscribableAmq
*/
public PointToPointSubscribableAmqpChannel(String channelName, AbstractMessageListenerContainer container,
AmqpTemplate amqpTemplate) {

super(channelName, container, amqpTemplate);
}

Expand All @@ -62,6 +69,7 @@ public PointToPointSubscribableAmqpChannel(String channelName, AbstractMessageLi
*/
public PointToPointSubscribableAmqpChannel(String channelName, AbstractMessageListenerContainer container,
AmqpTemplate amqpTemplate, AmqpHeaderMapper outboundMapper, AmqpHeaderMapper inboundMapper) {

super(channelName, container, amqpTemplate, outboundMapper, inboundMapper);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public PollableAmqpChannel(String channelName, AmqpTemplate amqpTemplate) {
*/
public PollableAmqpChannel(String channelName, AmqpTemplate amqpTemplate, AmqpHeaderMapper outboundMapper,
AmqpHeaderMapper inboundMapper) {

super(amqpTemplate, outboundMapper, inboundMapper);
Assert.hasText(channelName, "channel name must not be empty");
this.channelName = channelName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.util.StringUtils;

/**
* Parser for the AMQP 'inbound-channel-adapter' element.
*
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.1
*/
Expand All @@ -42,7 +44,9 @@ public class AmqpInboundChannelAdapterParser extends AbstractAmqpInboundAdapterP
}

@Override
protected final String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext) throws BeanDefinitionStoreException {
protected final String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext)
throws BeanDefinitionStoreException {

String id = element.getAttribute("id");
if (!element.hasAttribute("channel")) {
// the created channel will get the 'id', so the adapter's bean name includes a suffix
Expand All @@ -69,8 +73,7 @@ private String createDirectChannel(Element element, ParserContext parserContext)
parserContext.getReaderContext().error("The channel-adapter's 'id' attribute is required when no 'channel' "
+ "reference has been provided, because that 'id' would be used for the created channel.", element);
}
BeanDefinitionBuilder channelBuilder = BeanDefinitionBuilder.genericBeanDefinition(
"org.springframework.integration.channel.DirectChannel");
BeanDefinitionBuilder channelBuilder = BeanDefinitionBuilder.genericBeanDefinition(DirectChannel.class);
BeanDefinitionHolder holder = new BeanDefinitionHolder(channelBuilder.getBeanDefinition(), channelId);
BeanDefinitionReaderUtils.registerBeanDefinition(holder, parserContext.getRegistry());
return channelId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@
*
* @author Mark Fisher
* @author Gary Russell
*
* @since 2.1
*/
public class AmqpNamespaceHandler extends AbstractIntegrationNamespaceHandler {

@Override
public void init() {
this.registerBeanDefinitionParser("channel", new AmqpChannelParser());
this.registerBeanDefinitionParser("publish-subscribe-channel", new AmqpChannelParser());
this.registerBeanDefinitionParser("inbound-channel-adapter", new AmqpInboundChannelAdapterParser());
this.registerBeanDefinitionParser("inbound-gateway", new AmqpInboundGatewayParser());
this.registerBeanDefinitionParser("outbound-channel-adapter", new AmqpOutboundChannelAdapterParser());
this.registerBeanDefinitionParser("outbound-gateway", new AmqpOutboundGatewayParser());
this.registerBeanDefinitionParser("outbound-async-gateway", new AmqpOutboundGatewayParser());
registerBeanDefinitionParser("channel", new AmqpChannelParser());
registerBeanDefinitionParser("publish-subscribe-channel", new AmqpChannelParser());
registerBeanDefinitionParser("inbound-channel-adapter", new AmqpInboundChannelAdapterParser());
registerBeanDefinitionParser("inbound-gateway", new AmqpInboundGatewayParser());
registerBeanDefinitionParser("outbound-channel-adapter", new AmqpOutboundChannelAdapterParser());
registerBeanDefinitionParser("outbound-gateway", new AmqpOutboundGatewayParser());
registerBeanDefinitionParser("outbound-async-gateway", new AmqpOutboundGatewayParser());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* @author Oleg Zhurakousky
* @author Gary Russell
* @author Artem Bilan
*
* @since 2.1
*/
public class AmqpOutboundChannelAdapterParser extends AbstractOutboundChannelAdapterParser {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public S defaultRequeueRejected(boolean defaultRequeueRejected) {
}

/**
* Determine whether or not the container should de-batch batched
* Determine whether the container should de-batch batched
* messages (true) or call the listener with the batch (false). Default: true.
* @param deBatchingEnabled the deBatchingEnabled to set.
* @return the spec.
Expand Down Expand Up @@ -300,7 +300,7 @@ public S messagePropertiesConverter(MessagePropertiesConverter messageProperties
}

/**
* If all of the configured queue(s) are not available on the broker, this setting
* If all the configured queue(s) are not available on the broker, this setting
* determines whether the condition is fatal. When true, and
* the queues are missing during startup, the context refresh() will fail.
* <p> When false, the condition is not considered fatal and the container will
Expand All @@ -316,7 +316,7 @@ public S missingQueuesFatal(boolean missingQueuesFatal) {

/**
* Prevent the container from starting if any of the queues defined in the context have
* mismatched arguments (TTL etc). Default false.
* mismatched arguments (TTL etc.). Default false.
* @param mismatchedQueuesFatal true to fail initialization when this condition occurs.
* @return the spec.
* @see AbstractMessageListenerContainer#setMismatchedQueuesFatal(boolean)
Expand All @@ -329,7 +329,7 @@ public S mismatchedQueuesFatal(boolean mismatchedQueuesFatal) {
/**
* Set to true to automatically declare elements (queues, exchanges, bindings)
* in the application context during container start().
* @param autoDeclare the boolean flag to indicate an declaration operation.
* @param autoDeclare the boolean flag to indicate a declaration operation.
* @return the spec.
* @see AbstractMessageListenerContainer#setAutoDeclare(boolean)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ public static AmqpAsyncOutboundGatewaySpec asyncOutboundGateway(AsyncRabbitTempl
* @param connectionFactory the connectionFactory.
* @return the AmqpPollableMessageChannelSpec.
*/
public static AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> pollableChannel(ConnectionFactory connectionFactory) {
public static AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> pollableChannel(
ConnectionFactory connectionFactory) {

return pollableChannel(null, connectionFactory);
}

Expand All @@ -284,7 +286,8 @@ public static AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> pollableCha
public static AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> pollableChannel(@Nullable String id,
ConnectionFactory connectionFactory) {

AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> spec = new AmqpPollableMessageChannelSpec<>(connectionFactory);
AmqpPollableMessageChannelSpec<?, PollableAmqpChannel> spec =
new AmqpPollableMessageChannelSpec<>(connectionFactory);
return spec.id(id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.springframework.util.concurrent.SettableListenableFuture;

/**
* A base {@link AbstractReplyProducingMessageHandler} extension for AMQP message handlers.
*
* @author Gary Russell
* @author Artem Bilan
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class AmqpMessageHeaderErrorMessageStrategy implements ErrorMessageStrate
public ErrorMessage buildErrorMessage(Throwable throwable, @Nullable AttributeAccessor context) {
Object inputMessage = context == null ? null
: context.getAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY);
Map<String, Object> headers = new HashMap<String, Object>();
Map<String, Object> headers = new HashMap<>();
if (context != null) {
headers.put(AMQP_RAW_MESSAGE, context.getAttribute(AMQP_RAW_MESSAGE));
headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, context.getAttribute(AMQP_RAW_MESSAGE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* An advice that causes all downstream {@link RabbitOperations} operations to be executed
* on the same channel, as long as there are no thread handoffs, since the channel is
* bound to the thread. The same RabbitOperations must be used in this and all downstream
* components. Typically used with a splitter or some other mechanism that would cause
* components. Typically, used with a splitter or some other mechanism that would cause
* multiple messages to be sent. Optionally waits for publisher confirms if the channel is
* so configured.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,7 +79,6 @@ public static <K, V> KafkaProducerMessageHandlerSpec.KafkaProducerMessageHandler
* @param <K> the Kafka message key type.
* @param <V> the Kafka message value type.
* @return the spec.
* @since 3.2
*/
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
ConsumerFactory<K, V> consumerFactory, ConsumerProperties consumerProperties) {
Expand All @@ -96,7 +95,6 @@ public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
* @param <K> the Kafka message key type.
* @param <V> the Kafka message value type.
* @return the spec.
* @since 3.2
*/
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
ConsumerFactory<K, V> consumerFactory,
Expand All @@ -115,7 +113,6 @@ public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
* @param <K> the Kafka message key type.
* @param <V> the Kafka message value type.
* @return the spec.
* @since 3.2
*/
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
ConsumerFactory<K, V> consumerFactory,
Expand All @@ -135,7 +132,6 @@ public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
* @param <K> the Kafka message key type.
* @param <V> the Kafka message value type.
* @return the spec.
* @since 3.2
*/
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
ConsumerFactory<K, V> consumerFactory,
Expand Down Expand Up @@ -341,7 +337,6 @@ KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerCon
* @param <V> the Kafka message value type (request).
* @param <R> the Kafka message value type (reply).
* @return the KafkaGatewayMessageHandlerSpec.
* @since 3.0.2
*/
public static <K, V, R> KafkaOutboundGatewaySpec<K, V, R, ?> outboundGateway(
ReplyingKafkaTemplate<K, V, R> kafkaTemplate) {
Expand All @@ -357,7 +352,6 @@ KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerCon
* @param <V> the Kafka message value type (request).
* @param <R> the Kafka message value type (reply).
* @return the KafkaGatewayMessageHandlerSpec.
* @since 3.0.2
*/
public static <K, V, R> KafkaOutboundGatewaySpec.KafkaGatewayMessageHandlerTemplateSpec<K, V, R> outboundGateway(
ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer) {
Expand All @@ -378,7 +372,6 @@ public static <K, V, R> KafkaOutboundGatewaySpec.KafkaGatewayMessageHandlerTempl
* @param <V> the Kafka message value type (request).
* @param <R> the Kafka message value type (reply).
* @return the spec.
* @since 3.0.2
*/
public static <K, V, R> KafkaInboundGatewaySpec<K, V, R, ?> inboundGateway(
AbstractMessageListenerContainer<K, V> container, KafkaTemplate<K, R> template) {
Expand All @@ -396,7 +389,6 @@ public static <K, V, R> KafkaOutboundGatewaySpec.KafkaGatewayMessageHandlerTempl
* @param <V> the Kafka message value type (request).
* @param <R> the Kafka message value type (reply).
* @return the spec.
* @since 3.0.2
*/
public static <K, V, R> KafkaInboundGatewaySpec.KafkaInboundGatewayListenerContainerSpec<K, V, R> inboundGateway(
ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties,
Expand All @@ -416,7 +408,6 @@ public static <K, V, R> KafkaInboundGatewaySpec.KafkaInboundGatewayListenerConta
* @param <V> the Kafka message value type (request).
* @param <R> the Kafka message value type (reply).
* @return the spec.
* @since 3.0.2
*/
public static <K, V, R> KafkaInboundGatewaySpec.KafkaInboundGatewayListenerContainerSpec<K, V, R> inboundGateway(
KafkaMessageListenerContainerSpec<K, V> containerSpec, KafkaTemplateSpec<K, R> templateSpec) {
Expand All @@ -430,7 +421,6 @@ public static <K, V, R> KafkaInboundGatewaySpec.KafkaInboundGatewayListenerConta
* @param containerFactory the container factory.
* @param topic the topic.
* @return the spec.
* @since 3.3
*/
public static KafkaPointToPointChannelSpec channel(KafkaTemplate<?, ?> template,
KafkaListenerContainerFactory<?> containerFactory, String topic) {
Expand All @@ -439,12 +429,11 @@ public static KafkaPointToPointChannelSpec channel(KafkaTemplate<?, ?> template,
}

/**
* Create a spec for a publish/subscribe channel with the provided parameters.
* Create a spec for a publish-subscribe channel with the provided parameters.
* @param template the template.
* @param containerFactory the container factory.
* @param topic the topic.
* @return the spec.
* @since 3.3
*/
public static KafkaPublishSubscribeChannelSpec publishSubscribeChannel(KafkaTemplate<?, ?> template,
KafkaListenerContainerFactory<?> containerFactory, String topic) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -90,7 +90,7 @@ public S retryTemplate(RetryTemplate retryTemplate) {
* @param recoveryCallback the recovery callback.
* @return the spec
*/
public S recoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
public S recoveryCallback(RecoveryCallback<?> recoveryCallback) {
this.target.setRecoveryCallback(recoveryCallback);
return _this();
}
Expand All @@ -101,7 +101,6 @@ public S recoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
* call from the {@link org.springframework.kafka.listener.KafkaMessageListenerContainer}.
* @param onPartitionsAssignedCallback the {@link BiConsumer} to use
* @return the spec
* @since 3.0.4
*/
public S onPartitionsAssignedSeekCallback(
BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -136,7 +136,7 @@ public S retryTemplate(RetryTemplate retryTemplate) {
* @param recoveryCallback the recovery callback.
* @return the spec
*/
public S recoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
public S recoveryCallback(RecoveryCallback<?> recoveryCallback) {
this.target.setRecoveryCallback(recoveryCallback);
return _this();
}
Expand All @@ -146,7 +146,6 @@ public S recoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
* set the payload type the converter should create. Defaults to {@link Object}.
* @param payloadType the type.
* @return the spec
* @since 3.2.0
*/
public S payloadType(Class<?> payloadType) {
this.target.setPayloadType(payloadType);
Expand Down Expand Up @@ -178,7 +177,6 @@ public S filterInRetry(boolean filterInRetry) {
* call from the {@link org.springframework.kafka.listener.KafkaMessageListenerContainer}.
* @param onPartitionsAssignedCallback the {@link BiConsumer} to use
* @return the spec
* @since 3.0.4
*/
public S onPartitionsAssignedSeekCallback(
BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -204,7 +204,7 @@ public KafkaMessageListenerContainerSpec<K, V> commitCallback(OffsetCommitCallba
}

/**
* Set whether or not to call consumer.commitSync() or commitAsync() when the
* Set whether to call consumer.commitSync() or commitAsync() when the
* container is responsible for commits. Default true. See
* https://github.com/spring-projects/spring-kafka/issues/62 At the time of
* writing, async commits are not entirely reliable.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down

0 comments on commit 6f3fdc7

Please sign in to comment.