Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
Polishing JavaDocs and possible NPEs
Browse files Browse the repository at this point in the history
  • Loading branch information
artembilan committed Aug 29, 2019
1 parent 90108c1 commit a39c2b6
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
* @return the spec.
* @since 3.0.1
* @deprecated in favor of
* {@link #inboundChannelAdapter(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory)}
* {@code #inboundChannelAdapter(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory)}
*/
@Deprecated
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
Expand All @@ -123,7 +123,7 @@ public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
* @return the spec.
* @since 3.0.1
* @deprecated in favor of
* {@link #inboundChannelAdapter(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)}
* {@code #inboundChannelAdapter(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)}
*/
@Deprecated
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
Expand All @@ -132,7 +132,8 @@ public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
boolean allowMultiFetch,
String... topics) {

return new KafkaInboundChannelAdapterSpec<>(consumerFactory, new ConsumerProperties(topics), ackCallbackFactory, allowMultiFetch);
return new KafkaInboundChannelAdapterSpec<>(consumerFactory, new ConsumerProperties(topics),
ackCallbackFactory, allowMultiFetch);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.springframework.integration.acks.AcknowledgmentCallbackFactory;
import org.springframework.integration.core.Pausable;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.integration.kafka.inbound.KafkaMessageSource.KafkaAckInfo;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
Expand Down Expand Up @@ -251,8 +250,8 @@ public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory,
Assert.notNull(ackCallbackFactory, "'ackCallbackFactory' must not be null");
Assert.isTrue(
!ObjectUtils.isEmpty(consumerProperties.getTopics())
|| !ObjectUtils.isEmpty(consumerProperties.getTopicPartitionsToAssign())
|| consumerProperties.getTopicPattern() != null,
|| !ObjectUtils.isEmpty(consumerProperties.getTopicPartitionsToAssign())
|| consumerProperties.getTopicPattern() != null,
"topics, topicPattern, or topicPartitions must be provided"
);
this.consumerProperties = consumerProperties;
Expand Down Expand Up @@ -623,7 +622,8 @@ else if (partition.isRelativeToCurrent()) {
}
catch (Exception e) {
this.logger.error("Failed to set initial offset for " + topicPartition
+ " at " + newOffset + ". Position is " + this.consumer.position(topicPartition), e);
+ " at " + newOffset + ". Position is " + this.consumer
.position(topicPartition), e);
}
}
}
Expand Down Expand Up @@ -678,7 +678,7 @@ public static class KafkaAckCallbackFactory<K, V> implements AcknowledgmentCallb
/**
* Deprecated constructor.
* @deprecated in favor of
* {@link #KafkaMessageSource$KafkaAckCallbackFactory(ConsumerProperties)}.
* {@code #KafkaAckCallbackFactory(ConsumerProperties)}.
*/
@Deprecated
public KafkaAckCallbackFactory() {
Expand All @@ -691,9 +691,8 @@ public KafkaAckCallbackFactory(ConsumerProperties consumerProperties) {

/**
* Deprecated setter.
* @deprecated in favor of
* {@link #KafkaMessageSource$KafkaAckCallbackFactory(ConsumerProperties)}.
* @param commitTimeout the commit timeout.
* @deprecated in favor of {@code #KafkaAckCallbackFactory(ConsumerProperties)}.
*/
@Deprecated
public void setCommitTimeout(Duration commitTimeout) {
Expand Down Expand Up @@ -734,8 +733,7 @@ public static class KafkaAckCallback<K, V> implements AcknowledgmentCallback, Ac
/**
* Deprecated constructor.
* @param ackInfo the ack info.
* @deprecated in favor of
* {@link #KafkaMessageSource$KafkaAckCallback(KafkaAckInfo, ConsumerProperties)}
* @deprecated in favor of {@code #KafkaAckCallback(KafkaAckInfo, ConsumerProperties)}
*/
@Deprecated
public KafkaAckCallback(KafkaAckInfo<K, V> ackInfo) {
Expand All @@ -746,8 +744,7 @@ public KafkaAckCallback(KafkaAckInfo<K, V> ackInfo) {
* Deprecated constructor.
* @param ackInfo the ack info.
* @param commitTimeout the commit timeout.
* @deprecated in favor of
* {@link #KafkaMessageSource4KafkaAckCallback(KafkaAckInfo, ConsumerProperties)}
* @deprecated in favor of {@code #KafkaAckCallback(KafkaAckInfo, ConsumerProperties)}
*/
@Deprecated
public KafkaAckCallback(KafkaAckInfo<K, V> ackInfo, @Nullable Duration commitTimeout) {
Expand All @@ -761,16 +758,18 @@ public KafkaAckCallback(KafkaAckInfo<K, V> ackInfo, @Nullable Duration commitTim
* properties are used.
*/
public KafkaAckCallback(KafkaAckInfo<K, V> ackInfo, @Nullable ConsumerProperties consumerProperties) {

Assert.notNull(ackInfo, "'ackInfo' cannot be null");
Assert.notNull(ackInfo, "'ackInfo' cannot be null");
this.ackInfo = ackInfo;
this.commitTimeout = consumerProperties.getSyncCommitTimeout();
this.isSyncCommits = consumerProperties == null ? true : consumerProperties.isSyncCommits();
this.commitCallback = consumerProperties != null && consumerProperties.getCommitCallback() != null
? consumerProperties.getCommitCallback()
: new LoggingCommitCallback();
this.commitTimeout = consumerProperties != null ? consumerProperties.getSyncCommitTimeout() : null;
this.isSyncCommits = consumerProperties == null || consumerProperties.isSyncCommits();
this.commitCallback =
consumerProperties != null && consumerProperties.getCommitCallback() != null
? consumerProperties.getCommitCallback()
: new LoggingCommitCallback();
this.commitLogger = new LogIfLevelEnabled(this.logger,
consumerProperties.getCommitLogLevel());
consumerProperties != null
? consumerProperties.getCommitLogLevel()
: LogIfLevelEnabled.Level.DEBUG);
}

@Override
Expand Down

0 comments on commit a39c2b6

Please sign in to comment.