Skip to content
This repository has been archived by the owner on Mar 30, 2023. It is now read-only.

Commit

Permalink
Remove deprecated methods
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell authored and artembilan committed Mar 17, 2020
1 parent cdfcc25 commit c3179d5
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 291 deletions.
64 changes: 0 additions & 64 deletions src/main/java/org/springframework/integration/kafka/dsl/Kafka.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,70 +73,6 @@ public static <K, V> KafkaProducerMessageHandlerSpec.KafkaProducerMessageHandler
return new KafkaProducerMessageHandlerSpec.KafkaProducerMessageHandlerTemplateSpec<>(producerFactory);
}

/**
* Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and
* topics.
* @param consumerFactory the consumer factory.
* @param topics the topic(s).
* @param <K> the Kafka message key type.
* @param <V> the Kafka message value type.
* @return the spec.
* @since 3.0.1
* @deprecated in favor of {@link #inboundChannelAdapter(ConsumerFactory, ConsumerProperties)}
*/
@Deprecated
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
ConsumerFactory<K, V> consumerFactory, String... topics) {

return inboundChannelAdapter(consumerFactory, new ConsumerProperties(topics), false);
}

/**
* Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and
* topics with a custom ack callback factory.
* @param consumerFactory the consumer factory.
* @param ackCallbackFactory the callback factory.
* @param topics the topic(s).
* @param <K> the Kafka message key type.
* @param <V> the Kafka message value type.
* @return the spec.
* @since 3.0.1
* @deprecated in favor of
* {@code #inboundChannelAdapter(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory)}
*/
@Deprecated
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
ConsumerFactory<K, V> consumerFactory,
KafkaAckCallbackFactory<K, V> ackCallbackFactory, String... topics) {

return inboundChannelAdapter(consumerFactory, ackCallbackFactory, false, topics);
}

/**
* Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and
* topics with a custom ack callback factory.
* @param consumerFactory the consumer factory.
* @param ackCallbackFactory the callback factory.
* @param allowMultiFetch true to fetch multiple records on each poll.
* @param topics the topic(s).
* @param <K> the Kafka message key type.
* @param <V> the Kafka message value type.
* @return the spec.
* @since 3.0.1
* @deprecated in favor of
* {@code #inboundChannelAdapter(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)}
*/
@Deprecated
public static <K, V> KafkaInboundChannelAdapterSpec<K, V> inboundChannelAdapter(
ConsumerFactory<K, V> consumerFactory,
KafkaAckCallbackFactory<K, V> ackCallbackFactory,
boolean allowMultiFetch,
String... topics) {

return new KafkaInboundChannelAdapterSpec<>(consumerFactory, new ConsumerProperties(topics),
ackCallbackFactory, allowMultiFetch);
}

/**
* Create an initial {@link KafkaInboundChannelAdapterSpec} with the consumer factory and
* topics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.springframework.integration.kafka.dsl;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.integration.IntegrationMessageHeaderAccessor;
Expand Down Expand Up @@ -44,38 +43,6 @@
public class KafkaInboundChannelAdapterSpec<K, V>
extends MessageSourceSpec<KafkaInboundChannelAdapterSpec<K, V>, KafkaMessageSource<K, V>> {

/**
* Create an initial {@link KafkaMessageSource} with the consumer factory and
* topics.
* @param consumerFactory the consumer factory.
* @param allowMultiFetch true to allow {@code max.poll.records > 1}.
* @param topics the topics.
* @deprecated in favor of
* {@link #KafkaInboundChannelAdapterSpec(ConsumerFactory, ConsumerProperties, boolean)}
*/
@Deprecated
KafkaInboundChannelAdapterSpec(ConsumerFactory<K, V> consumerFactory, boolean allowMultiFetch, String... topics) {
this.target = new KafkaMessageSource<>(consumerFactory, new ConsumerProperties(topics), allowMultiFetch);
}

/**
* Create an initial {@link KafkaMessageSource} with the consumer factory and
* topics with a custom ack callback factory.
* @param consumerFactory the consumer factory.
* @param ackCallbackFactory the callback factory.
* @param allowMultiFetch true to allow {@code max.poll.records > 1}.
* @param topics the topics.
* @deprecated in favor of
* {@link #KafkaInboundChannelAdapterSpec(ConsumerFactory, ConsumerProperties,
* KafkaAckCallbackFactory, boolean)}
*/
@Deprecated
KafkaInboundChannelAdapterSpec(ConsumerFactory<K, V> consumerFactory,
KafkaAckCallbackFactory<K, V> ackCallbackFactory, boolean allowMultiFetch, String... topics) {

this.target = new KafkaMessageSource<>(consumerFactory, new ConsumerProperties(topics), ackCallbackFactory, allowMultiFetch);
}

/**
* Create an initial {@link KafkaMessageSource} with the consumer factory and
* topics with a custom ack callback factory.
Expand Down Expand Up @@ -103,45 +70,6 @@ public class KafkaInboundChannelAdapterSpec<K, V>
this.target = new KafkaMessageSource<>(consumerFactory, consumerProperties, ackCallbackFactory, allowMultiFetch);
}

/**
* Set the group.id property for the consumer.
* @param groupId the group id.
* @return the spec.
* @see ConsumerProperties
* @deprecated in favor of using {@link ConsumerProperties}
*/
@Deprecated
public KafkaInboundChannelAdapterSpec<K, V> groupId(String groupId) {
this.target.setGroupId(groupId);
return this;
}

/**
* Set the client.id property for the consumer.
* @param clientId the client id.
* @return the spec.
* @see ConsumerProperties
* @deprecated in favor of using {@link ConsumerProperties}
*/
@Deprecated
public KafkaInboundChannelAdapterSpec<K, V> clientId(String clientId) {
this.target.setClientId(clientId);
return this;
}

/**
* Set the pollTimeout for the poll() operations.
* @param pollTimeout the poll timeout.
* @return the spec.
* @see ConsumerProperties
* @deprecated in favor of using {@link ConsumerProperties}
*/
@Deprecated
public KafkaInboundChannelAdapterSpec<K, V> pollTimeout(long pollTimeout) {
this.target.setPollTimeout(pollTimeout);
return this;
}

/**
* Set the message converter to replace the default.
* {@link MessagingMessageConverter}.
Expand All @@ -164,19 +92,6 @@ public KafkaInboundChannelAdapterSpec<K, V> payloadType(Class<?> type) {
return this;
}

/**
* Set a rebalance listener.
* @param rebalanceListener the rebalance listener.
* @return the spec.
* @see ConsumerProperties
* @deprecated in favor of using {@link ConsumerProperties}
*/
@Deprecated
public KafkaInboundChannelAdapterSpec<K, V> rebalanceListener(ConsumerRebalanceListener rebalanceListener) {
this.target.setRebalanceListener(rebalanceListener);
return this;
}

/**
* Set to true to include the raw {@link ConsumerRecord} as headers with keys
* {@link KafkaHeaders#RAW_DATA} and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,6 @@ public ReplyingKafkaTemplateSpec<K, V, R> taskScheduler(TaskScheduler scheduler)
return this;
}

/**
* Default reply timeout.
* @param replyTimeout the timeout.
* @return the spec.
* @deprecated in favor of {@link #defaultReplyTimeout(Duration)}.
*/
@Deprecated
@SuppressWarnings("unchecked")
public ReplyingKafkaTemplateSpec<K, V, R> replyTimeout(long replyTimeout) {
((ReplyingKafkaTemplate<K, V, R>) this.target).setDefaultReplyTimeout(Duration.ofMillis(replyTimeout));
return this;
}

/**
* Default reply timeout.
* @param replyTimeout the timeout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,36 +144,6 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object> impl

private volatile Iterator<ConsumerRecord<K, V>> recordsIterator;

/**
* Construct an instance with the supplied parameters. Fetching multiple
* records per poll will be disabled.
* @param consumerFactory the consumer factory.
* @param topics the topics.
* @see #KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)
* @deprecated in favor of {@link #KafkaMessageSource(ConsumerFactory, ConsumerProperties)}
*/
@Deprecated
public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory, String... topics) {
this(consumerFactory, new ConsumerProperties(topics), new KafkaAckCallbackFactory<>(), false);
}

/**
* Construct an instance with the supplied parameters. Fetching multiple
* records per poll will be disabled.
* @param consumerFactory the consumer factory.
* @param ackCallbackFactory the ack callback factory.
* @param topics the topics.
* @see #KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)
* @deprecated in favor of
* {@link #KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory)}
*/
@Deprecated
public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory,
KafkaAckCallbackFactory<K, V> ackCallbackFactory, String... topics) {

this(consumerFactory, new ConsumerProperties(topics), ackCallbackFactory, false);
}

/**
* Construct an instance with the supplied parameters. Fetching multiple
* records per poll will be disabled.
Expand Down Expand Up @@ -284,48 +254,14 @@ protected String getGroupId() {
return this.consumerProperties.getGroupId();
}

/**
* Set the group.id property for the consumer.
* @param groupId the group id.
* @see ConsumerProperties
* @deprecated in favor of using {@link ConsumerProperties}
*/
@Deprecated
public void setGroupId(String groupId) {
this.consumerProperties.setGroupId(groupId);
}

protected String getClientId() {
return this.consumerProperties.getClientId();
}

/**
* Set the client.id property for the consumer.
* @param clientId the client id.
* @see ConsumerProperties
* @deprecated in favor of using {@link ConsumerProperties}
*/
@Deprecated
public void setClientId(String clientId) {
this.consumerProperties.setClientId(clientId);
}

protected long getPollTimeout() {
return this.pollTimeout.toMillis();
}

/**
* Set the pollTimeout for the poll() operations.
* @param pollTimeout the poll timeout.
* @see ConsumerProperties
* @deprecated in favor of using {@link ConsumerProperties}
*/
@Deprecated
public void setPollTimeout(long pollTimeout) {
this.pollTimeout = Duration.ofMillis(pollTimeout);
this.assignTimeout = this.minTimeoutProvider.get();
}

protected RecordMessageConverter getMessageConverter() {
return this.messageConverter;
}
Expand Down Expand Up @@ -356,17 +292,6 @@ protected ConsumerRebalanceListener getRebalanceListener() {
return this.consumerProperties.getConsumerRebalanceListener();
}

/**
* Set a rebalance listener.
* @param rebalanceListener the rebalance listener.
* @see ConsumerProperties
* @deprecated in favor of using {@link ConsumerProperties}
*/
@Deprecated
public void setRebalanceListener(ConsumerRebalanceListener rebalanceListener) {
this.consumerProperties.setConsumerRebalanceListener(rebalanceListener);
}

@Override
public String getComponentType() {
return "kafka:message-source";
Expand Down Expand Up @@ -660,21 +585,6 @@ private void stopConsumer() {
}
}

/*
* TODO: Remove when deprecated CTORs below are removed.
*/
private static ConsumerProperties dummyProperties(@Nullable Duration commitTimeout2) {
if (commitTimeout2 == null) {
return null;
}
else {
ConsumerProperties consumerProperties = new ConsumerProperties(new String[0]);
consumerProperties.setSyncCommitTimeout(commitTimeout2);
return consumerProperties;
}
}


/**
* AcknowledgmentCallbackFactory for KafkaAckInfo.
* @param <K> the key type.
Expand All @@ -686,29 +596,13 @@ public static class KafkaAckCallbackFactory<K, V> implements AcknowledgmentCallb
private final ConsumerProperties consumerProperties;

/**
* Deprecated constructor.
* @deprecated in favor of
* {@code #KafkaAckCallbackFactory(ConsumerProperties)}.
* Construct an instance with the provided properties.
* @param consumerProperties the properties.
*/
@Deprecated
public KafkaAckCallbackFactory() {
this(dummyProperties(null));
}

public KafkaAckCallbackFactory(ConsumerProperties consumerProperties) {
this.consumerProperties = consumerProperties;
}

/**
* Deprecated setter.
* @param commitTimeout the commit timeout.
* @deprecated in favor of {@code #KafkaAckCallbackFactory(ConsumerProperties)}.
*/
@Deprecated
public void setCommitTimeout(Duration commitTimeout) {
this.consumerProperties.setSyncCommitTimeout(commitTimeout);
}

@Override
public AcknowledgmentCallback createCallback(KafkaAckInfo<K, V> info) {
return new KafkaAckCallback<>(info, this.consumerProperties);
Expand Down Expand Up @@ -740,27 +634,6 @@ public static class KafkaAckCallback<K, V> implements AcknowledgmentCallback, Ac

private boolean autoAckEnabled = true;

/**
* Deprecated constructor.
* @param ackInfo the ack info.
* @deprecated in favor of {@code #KafkaAckCallback(KafkaAckInfo, ConsumerProperties)}
*/
@Deprecated
public KafkaAckCallback(KafkaAckInfo<K, V> ackInfo) {
this(ackInfo, (ConsumerProperties) null);
}

/**
* Deprecated constructor.
* @param ackInfo the ack info.
* @param commitTimeout the commit timeout.
* @deprecated in favor of {@code #KafkaAckCallback(KafkaAckInfo, ConsumerProperties)}
*/
@Deprecated
public KafkaAckCallback(KafkaAckInfo<K, V> ackInfo, @Nullable Duration commitTimeout) {
this(ackInfo, dummyProperties(commitTimeout));
}

/**
* Construct an instance with the provided properties.
* @param ackInfo the ack info.
Expand Down

0 comments on commit c3179d5

Please sign in to comment.