Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2024 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,16 +44,6 @@ public ConsumerPartitionPausedEvent(Object source, Object container, TopicPartit
this.partition = partition;
}

/**
* Return the paused partition.
* @return the partition.
* @deprecated replaced by {@link #getPartition()}
*/
@Deprecated(since = "3.3", forRemoval = true)
public TopicPartition getPartitions() {
return this.partition;
}

/**
* Return the paused partition.
* @return the partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.kafka.common.TopicPartition;
import org.jspecify.annotations.Nullable;

import org.springframework.util.CollectionUtils;

/**
* Manages the {@link ConsumerSeekAware.ConsumerSeekCallback} s for the listener. If the
* listener subclasses this class, it can easily seek arbitrary topics/partitions without
Expand Down Expand Up @@ -89,22 +86,6 @@ public void unregisterSeekCallback() {
this.callbackForThread.remove(Thread.currentThread());
}

/**
* Return the callback for the specified topic/partition.
* @param topicPartition the topic/partition.
* @return the callback (or null if there is no assignment).
* @deprecated Replaced by {@link #getSeekCallbacksFor(TopicPartition)}
*/
@Deprecated(since = "3.3", forRemoval = true)
@Nullable
protected ConsumerSeekCallback getSeekCallbackFor(TopicPartition topicPartition) {
List<ConsumerSeekCallback> callbacks = getSeekCallbacksFor(topicPartition);
if (CollectionUtils.isEmpty(callbacks)) {
return null;
}
return callbacks.get(0);
}

/**
* Return the callbacks for the specified topic/partition.
* @param topicPartition the topic/partition.
Expand All @@ -116,22 +97,6 @@ protected List<ConsumerSeekCallback> getSeekCallbacksFor(TopicPartition topicPar
return this.topicToCallbacks.get(topicPartition);
}

/**
* The map of callbacks for all currently assigned partitions.
* @return the map.
* @deprecated Replaced by {@link #getTopicsAndCallbacks()}
*/
@Deprecated(since = "3.3", forRemoval = true)
protected Map<TopicPartition, ConsumerSeekCallback> getSeekCallbacks() {
Map<TopicPartition, List<ConsumerSeekCallback>> topicsAndCallbacks = getTopicsAndCallbacks();
return topicsAndCallbacks.entrySet().stream()
.filter(entry -> !entry.getValue().isEmpty())
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().get(0)
));
}

/**
* The map of callbacks for all currently assigned partitions.
* @return the map.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement

private final EnumSet<HeaderNames.HeadersToAdd> whichHeaders = EnumSet.allOf(HeaderNames.HeadersToAdd.class);

@SuppressWarnings("this-escape")
private @Nullable HeaderNames headerNames = getHeaderNames();
private @Nullable HeaderNames headerNames;

private boolean retainExceptionHeader;

Expand Down Expand Up @@ -877,20 +876,6 @@ private String getStackTraceAsString(Throwable cause) {
return stringWriter.getBuffer().toString();
}

/**
* Override this if you want different header names to be used
* in the sent record.
* @return the header names.
* @since 2.7
* @deprecated since 3.0.9 - provide a supplier instead.
* @see #setHeaderNamesSupplier(Supplier)
*/
@Nullable
@Deprecated(since = "3.0.9", forRemoval = true) // 3.2
protected HeaderNames getHeaderNames() {
return null;
}

/**
* Set a {@link Supplier} for {@link HeaderNames}.
* @param supplier the supplier.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,42 +77,6 @@ else if (listener instanceof GenericMessageListener) {
return listenerType;
}

/**
* Sleep according to the {@link BackOff}; when the {@link BackOffExecution} returns
* {@link BackOffExecution#STOP} sleep for the previous backOff.
* @param backOff the {@link BackOff} to create a new {@link BackOffExecution}.
* @param executions a thread local containing the {@link BackOffExecution} for this
* thread.
* @param lastIntervals a thread local containing the previous {@link BackOff}
* interval for this thread.
* @param container the container or parent container.
* @throws InterruptedException if the thread is interrupted.
* @since 2.7
* @deprecated in favor of
* {@link #unrecoverableBackOff(BackOff, Map, Map, MessageListenerContainer)}.
*/
@Deprecated(since = "3.1", forRemoval = true) // 3.2
public static void unrecoverableBackOff(BackOff backOff, ThreadLocal<BackOffExecution> executions,
ThreadLocal<Long> lastIntervals, MessageListenerContainer container) throws InterruptedException {

BackOffExecution backOffExecution = executions.get();
if (backOffExecution == null) {
backOffExecution = backOff.start();
executions.set(backOffExecution);
}
Long interval = backOffExecution.nextBackOff();
if (interval == BackOffExecution.STOP) {
interval = lastIntervals.get();
if (interval == null) {
interval = Long.valueOf(0);
}
}
lastIntervals.set(interval);
if (interval > 0) {
stoppableSleep(container, interval);
}
}

/**
* Sleep according to the {@link BackOff}; when the {@link BackOffExecution} returns
* {@link BackOffExecution#STOP} sleep for the previous backOff.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,6 @@ public interface DestinationTopicContainer {
*/
DestinationTopic getNextDestinationTopicFor(String mainListenerId, String topicName);

/**
* Returns the {@link DestinationTopic} instance registered as
* DLT for the given topic, or null if none is found.
* @param mainListenerId the listener id.
* @param topicName the topic name for which to look the DLT for
* @return The {@link DestinationTopic} instance corresponding to the DLT.
* @deprecated Replaced by {@link #getDltFor(String, String, Exception)}
*/
@Nullable
@Deprecated(since = "3.2", forRemoval = true)
default DestinationTopic getDltFor(String mainListenerId, String topicName) {
return getDltFor(mainListenerId, topicName, null);
}

/**
* Returns the {@link DestinationTopic} instance registered as
* DLT for the given topic taking into consideration the exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,6 @@ else if (dt instanceof String str) {
min));
}

/**
* Set to true to only log record metadata.
* @param onlyMeta true to only log record metadata.
* @since 2.7.12
* @deprecated - no longer used.
*/
@Deprecated(since = "3.1", forRemoval = true) // 3.2
public static void setLogOnlyMetadata(boolean onlyMeta) {
}

/**
* Set a formatter for logging {@link ConsumerRecord}s.
* @param formatter a function to format the record as a String
Expand Down