From c3179d5c9cbbd25874ae3be7bea0ff3e1e30bdcb Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 17 Mar 2020 13:09:25 -0400 Subject: [PATCH] Remove deprecated methods --- .../integration/kafka/dsl/Kafka.java | 64 --------- .../dsl/KafkaInboundChannelAdapterSpec.java | 85 ------------ .../kafka/dsl/KafkaOutboundGatewaySpec.java | 13 -- .../kafka/inbound/KafkaMessageSource.java | 131 +----------------- 4 files changed, 2 insertions(+), 291 deletions(-) diff --git a/src/main/java/org/springframework/integration/kafka/dsl/Kafka.java b/src/main/java/org/springframework/integration/kafka/dsl/Kafka.java index b35592c4..34aa81e3 100644 --- a/src/main/java/org/springframework/integration/kafka/dsl/Kafka.java +++ b/src/main/java/org/springframework/integration/kafka/dsl/Kafka.java @@ -73,70 +73,6 @@ public static KafkaProducerMessageHandlerSpec.KafkaProducerMessageHandler return new KafkaProducerMessageHandlerSpec.KafkaProducerMessageHandlerTemplateSpec<>(producerFactory); } - /** - * Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and - * topics. - * @param consumerFactory the consumer factory. - * @param topics the topic(s). - * @param the Kafka message key type. - * @param the Kafka message value type. - * @return the spec. - * @since 3.0.1 - * @deprecated in favor of {@link #inboundChannelAdapter(ConsumerFactory, ConsumerProperties)} - */ - @Deprecated - public static KafkaInboundChannelAdapterSpec inboundChannelAdapter( - ConsumerFactory consumerFactory, String... topics) { - - return inboundChannelAdapter(consumerFactory, new ConsumerProperties(topics), false); - } - - /** - * Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and - * topics with a custom ack callback factory. - * @param consumerFactory the consumer factory. - * @param ackCallbackFactory the callback factory. - * @param topics the topic(s). - * @param the Kafka message key type. - * @param the Kafka message value type. - * @return the spec. - * @since 3.0.1 - * @deprecated in favor of - * {@code #inboundChannelAdapter(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory)} - */ - @Deprecated - public static KafkaInboundChannelAdapterSpec inboundChannelAdapter( - ConsumerFactory consumerFactory, - KafkaAckCallbackFactory ackCallbackFactory, String... topics) { - - return inboundChannelAdapter(consumerFactory, ackCallbackFactory, false, topics); - } - - /** - * Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and - * topics with a custom ack callback factory. - * @param consumerFactory the consumer factory. - * @param ackCallbackFactory the callback factory. - * @param allowMultiFetch true to fetch multiple records on each poll. - * @param topics the topic(s). - * @param the Kafka message key type. - * @param the Kafka message value type. - * @return the spec. - * @since 3.0.1 - * @deprecated in favor of - * {@code #inboundChannelAdapter(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)} - */ - @Deprecated - public static KafkaInboundChannelAdapterSpec inboundChannelAdapter( - ConsumerFactory consumerFactory, - KafkaAckCallbackFactory ackCallbackFactory, - boolean allowMultiFetch, - String... topics) { - - return new KafkaInboundChannelAdapterSpec<>(consumerFactory, new ConsumerProperties(topics), - ackCallbackFactory, allowMultiFetch); - } - /** * Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and * topics. diff --git a/src/main/java/org/springframework/integration/kafka/dsl/KafkaInboundChannelAdapterSpec.java b/src/main/java/org/springframework/integration/kafka/dsl/KafkaInboundChannelAdapterSpec.java index e5e879f8..a6c2943d 100644 --- a/src/main/java/org/springframework/integration/kafka/dsl/KafkaInboundChannelAdapterSpec.java +++ b/src/main/java/org/springframework/integration/kafka/dsl/KafkaInboundChannelAdapterSpec.java @@ -16,7 +16,6 @@ package org.springframework.integration.kafka.dsl; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.integration.IntegrationMessageHeaderAccessor; @@ -44,38 +43,6 @@ public class KafkaInboundChannelAdapterSpec extends MessageSourceSpec, KafkaMessageSource> { - /** - * Create an initial {@link KafkaMessageSource} with the consumer factory and - * topics. - * @param consumerFactory the consumer factory. - * @param allowMultiFetch true to allow {@code max.poll.records > 1}. - * @param topics the topics. - * @deprecated in favor of - * {@link #KafkaInboundChannelAdapterSpec(ConsumerFactory, ConsumerProperties, boolean)} - */ - @Deprecated - KafkaInboundChannelAdapterSpec(ConsumerFactory consumerFactory, boolean allowMultiFetch, String... topics) { - this.target = new KafkaMessageSource<>(consumerFactory, new ConsumerProperties(topics), allowMultiFetch); - } - - /** - * Create an initial {@link KafkaMessageSource} with the consumer factory and - * topics with a custom ack callback factory. - * @param consumerFactory the consumer factory. - * @param ackCallbackFactory the callback factory. - * @param allowMultiFetch true to allow {@code max.poll.records > 1}. - * @param topics the topics. - * @deprecated in favor of - * {@link #KafkaInboundChannelAdapterSpec(ConsumerFactory, ConsumerProperties, - * KafkaAckCallbackFactory, boolean)} - */ - @Deprecated - KafkaInboundChannelAdapterSpec(ConsumerFactory consumerFactory, - KafkaAckCallbackFactory ackCallbackFactory, boolean allowMultiFetch, String... topics) { - - this.target = new KafkaMessageSource<>(consumerFactory, new ConsumerProperties(topics), ackCallbackFactory, allowMultiFetch); - } - /** * Create an initial {@link KafkaMessageSource} with the consumer factory and * topics with a custom ack callback factory. @@ -103,45 +70,6 @@ public class KafkaInboundChannelAdapterSpec this.target = new KafkaMessageSource<>(consumerFactory, consumerProperties, ackCallbackFactory, allowMultiFetch); } - /** - * Set the group.id property for the consumer. - * @param groupId the group id. - * @return the spec. - * @see ConsumerProperties - * @deprecated in favor of using {@link ConsumerProperties} - */ - @Deprecated - public KafkaInboundChannelAdapterSpec groupId(String groupId) { - this.target.setGroupId(groupId); - return this; - } - - /** - * Set the client.id property for the consumer. - * @param clientId the client id. - * @return the spec. - * @see ConsumerProperties - * @deprecated in favor of using {@link ConsumerProperties} - */ - @Deprecated - public KafkaInboundChannelAdapterSpec clientId(String clientId) { - this.target.setClientId(clientId); - return this; - } - - /** - * Set the pollTimeout for the poll() operations. - * @param pollTimeout the poll timeout. - * @return the spec. - * @see ConsumerProperties - * @deprecated in favor of using {@link ConsumerProperties} - */ - @Deprecated - public KafkaInboundChannelAdapterSpec pollTimeout(long pollTimeout) { - this.target.setPollTimeout(pollTimeout); - return this; - } - /** * Set the message converter to replace the default. * {@link MessagingMessageConverter}. @@ -164,19 +92,6 @@ public KafkaInboundChannelAdapterSpec payloadType(Class type) { return this; } - /** - * Set a rebalance listener. - * @param rebalanceListener the rebalance listener. - * @return the spec. - * @see ConsumerProperties - * @deprecated in favor of using {@link ConsumerProperties} - */ - @Deprecated - public KafkaInboundChannelAdapterSpec rebalanceListener(ConsumerRebalanceListener rebalanceListener) { - this.target.setRebalanceListener(rebalanceListener); - return this; - } - /** * Set to true to include the raw {@link ConsumerRecord} as headers with keys * {@link KafkaHeaders#RAW_DATA} and diff --git a/src/main/java/org/springframework/integration/kafka/dsl/KafkaOutboundGatewaySpec.java b/src/main/java/org/springframework/integration/kafka/dsl/KafkaOutboundGatewaySpec.java index 79ae5a7e..0b01d01c 100644 --- a/src/main/java/org/springframework/integration/kafka/dsl/KafkaOutboundGatewaySpec.java +++ b/src/main/java/org/springframework/integration/kafka/dsl/KafkaOutboundGatewaySpec.java @@ -125,19 +125,6 @@ public ReplyingKafkaTemplateSpec taskScheduler(TaskScheduler scheduler) return this; } - /** - * Default reply timeout. - * @param replyTimeout the timeout. - * @return the spec. - * @deprecated in favor of {@link #defaultReplyTimeout(Duration)}. - */ - @Deprecated - @SuppressWarnings("unchecked") - public ReplyingKafkaTemplateSpec replyTimeout(long replyTimeout) { - ((ReplyingKafkaTemplate) this.target).setDefaultReplyTimeout(Duration.ofMillis(replyTimeout)); - return this; - } - /** * Default reply timeout. * @param replyTimeout the timeout. diff --git a/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java b/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java index 9c573339..aa12332d 100644 --- a/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java +++ b/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java @@ -144,36 +144,6 @@ public class KafkaMessageSource extends AbstractMessageSource impl private volatile Iterator> recordsIterator; - /** - * Construct an instance with the supplied parameters. Fetching multiple - * records per poll will be disabled. - * @param consumerFactory the consumer factory. - * @param topics the topics. - * @see #KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean) - * @deprecated in favor of {@link #KafkaMessageSource(ConsumerFactory, ConsumerProperties)} - */ - @Deprecated - public KafkaMessageSource(ConsumerFactory consumerFactory, String... topics) { - this(consumerFactory, new ConsumerProperties(topics), new KafkaAckCallbackFactory<>(), false); - } - - /** - * Construct an instance with the supplied parameters. Fetching multiple - * records per poll will be disabled. - * @param consumerFactory the consumer factory. - * @param ackCallbackFactory the ack callback factory. - * @param topics the topics. - * @see #KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean) - * @deprecated in favor of - * {@link #KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory)} - */ - @Deprecated - public KafkaMessageSource(ConsumerFactory consumerFactory, - KafkaAckCallbackFactory ackCallbackFactory, String... topics) { - - this(consumerFactory, new ConsumerProperties(topics), ackCallbackFactory, false); - } - /** * Construct an instance with the supplied parameters. Fetching multiple * records per poll will be disabled. @@ -284,48 +254,14 @@ protected String getGroupId() { return this.consumerProperties.getGroupId(); } - /** - * Set the group.id property for the consumer. - * @param groupId the group id. - * @see ConsumerProperties - * @deprecated in favor of using {@link ConsumerProperties} - */ - @Deprecated - public void setGroupId(String groupId) { - this.consumerProperties.setGroupId(groupId); - } - protected String getClientId() { return this.consumerProperties.getClientId(); } - /** - * Set the client.id property for the consumer. - * @param clientId the client id. - * @see ConsumerProperties - * @deprecated in favor of using {@link ConsumerProperties} - */ - @Deprecated - public void setClientId(String clientId) { - this.consumerProperties.setClientId(clientId); - } - protected long getPollTimeout() { return this.pollTimeout.toMillis(); } - /** - * Set the pollTimeout for the poll() operations. - * @param pollTimeout the poll timeout. - * @see ConsumerProperties - * @deprecated in favor of using {@link ConsumerProperties} - */ - @Deprecated - public void setPollTimeout(long pollTimeout) { - this.pollTimeout = Duration.ofMillis(pollTimeout); - this.assignTimeout = this.minTimeoutProvider.get(); - } - protected RecordMessageConverter getMessageConverter() { return this.messageConverter; } @@ -356,17 +292,6 @@ protected ConsumerRebalanceListener getRebalanceListener() { return this.consumerProperties.getConsumerRebalanceListener(); } - /** - * Set a rebalance listener. - * @param rebalanceListener the rebalance listener. - * @see ConsumerProperties - * @deprecated in favor of using {@link ConsumerProperties} - */ - @Deprecated - public void setRebalanceListener(ConsumerRebalanceListener rebalanceListener) { - this.consumerProperties.setConsumerRebalanceListener(rebalanceListener); - } - @Override public String getComponentType() { return "kafka:message-source"; @@ -660,21 +585,6 @@ private void stopConsumer() { } } - /* - * TODO: Remove when deprecated CTORs below are removed. - */ - private static ConsumerProperties dummyProperties(@Nullable Duration commitTimeout2) { - if (commitTimeout2 == null) { - return null; - } - else { - ConsumerProperties consumerProperties = new ConsumerProperties(new String[0]); - consumerProperties.setSyncCommitTimeout(commitTimeout2); - return consumerProperties; - } - } - - /** * AcknowledgmentCallbackFactory for KafkaAckInfo. * @param the key type. @@ -686,29 +596,13 @@ public static class KafkaAckCallbackFactory implements AcknowledgmentCallb private final ConsumerProperties consumerProperties; /** - * Deprecated constructor. - * @deprecated in favor of - * {@code #KafkaAckCallbackFactory(ConsumerProperties)}. + * Construct an instance with the provided properties. + * @param consumerProperties the properties. */ - @Deprecated - public KafkaAckCallbackFactory() { - this(dummyProperties(null)); - } - public KafkaAckCallbackFactory(ConsumerProperties consumerProperties) { this.consumerProperties = consumerProperties; } - /** - * Deprecated setter. - * @param commitTimeout the commit timeout. - * @deprecated in favor of {@code #KafkaAckCallbackFactory(ConsumerProperties)}. - */ - @Deprecated - public void setCommitTimeout(Duration commitTimeout) { - this.consumerProperties.setSyncCommitTimeout(commitTimeout); - } - @Override public AcknowledgmentCallback createCallback(KafkaAckInfo info) { return new KafkaAckCallback<>(info, this.consumerProperties); @@ -740,27 +634,6 @@ public static class KafkaAckCallback implements AcknowledgmentCallback, Ac private boolean autoAckEnabled = true; - /** - * Deprecated constructor. - * @param ackInfo the ack info. - * @deprecated in favor of {@code #KafkaAckCallback(KafkaAckInfo, ConsumerProperties)} - */ - @Deprecated - public KafkaAckCallback(KafkaAckInfo ackInfo) { - this(ackInfo, (ConsumerProperties) null); - } - - /** - * Deprecated constructor. - * @param ackInfo the ack info. - * @param commitTimeout the commit timeout. - * @deprecated in favor of {@code #KafkaAckCallback(KafkaAckInfo, ConsumerProperties)} - */ - @Deprecated - public KafkaAckCallback(KafkaAckInfo ackInfo, @Nullable Duration commitTimeout) { - this(ackInfo, dummyProperties(commitTimeout)); - } - /** * Construct an instance with the provided properties. * @param ackInfo the ack info.