The processing mode. Can be either "at_least_once" (default)
- or "exactly_once_v2" (for EOS version 2, requires broker version 2.5+). Deprecated config options are
- "exactly_once" (for EOS version 1) and "exactly_once_beta" (for EOS version 2, requires broker version 2.5+)
.
+ or "exactly_once_v2" (for EOS version 2, requires broker version 2.5+). Config options
+ "exactly_once" and "exactly_once_beta" are not supported anymore since 4.0..
+ We removed the StreamsConfig processing.guarantee configuration values "exactly_once" and "exactly_once_beta",
+ which were deprecated in 3.0. Now, there is only one implementation of exactly-once semantics available, which is enabled by setting
+ processing.guarantee to "exactly_once_v2" (which is equivalent to the mode formally enabled by exactly_once_beta).
+ Note that eos-v2 requires broker version 2.5 or higher, so users need to upgrade their kafka cluster if necessary.
+ Users that still use "exactly_once" in a version 3.0+ or "exactly_once_beta" can use a rolling bounce to upgrade to the
+ "exactly_once_v2" setting.
+ For users using still using "exactly_once" in a version <3.0, please see Notable compatibility changes in past releases
+ for detailed upgrade instructions.
+
We improved the semantics of
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 325cbec90d9d..c5b151465f92 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -129,8 +129,7 @@
*
*
* If {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} is set to {@link #EXACTLY_ONCE_V2 "exactly_once_v2"},
- * {@link #EXACTLY_ONCE "exactly_once"} (deprecated), or {@link #EXACTLY_ONCE_BETA "exactly_once_beta"} (deprecated), Kafka Streams does not
- * allow users to overwrite the following properties (Streams setting shown in parentheses):
+ * Kafka Streams does not allow users to overwrite the following properties (Streams setting shown in parentheses):
*
*
{@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"} (read_committed) - Consumers will always read committed data only
*
{@link ProducerConfig#ENABLE_IDEMPOTENCE_CONFIG "enable.idempotence"} (true) - Producer will always have idempotency enabled
@@ -357,32 +356,6 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String AT_LEAST_ONCE = "at_least_once";
- /**
- * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for exactly-once processing guarantees.
- *
- * Enabling exactly-once processing semantics requires broker version 0.11.0 or higher.
- * If you enable this feature Kafka Streams will use more resources (like broker connections)
- * compared to {@link #AT_LEAST_ONCE "at_least_once"} and {@link #EXACTLY_ONCE_V2 "exactly_once_v2"}.
- *
- * @deprecated Since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead.
- */
- @SuppressWarnings("WeakerAccess")
- @Deprecated
- public static final String EXACTLY_ONCE = "exactly_once";
-
- /**
- * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for exactly-once processing guarantees.
- *
- * Enabling exactly-once (beta) requires broker version 2.5 or higher.
- * If you enable this feature Kafka Streams will use fewer resources (like broker connections)
- * compared to the {@link #EXACTLY_ONCE} (deprecated) case.
- *
- * @deprecated Since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead.
- */
- @SuppressWarnings("WeakerAccess")
- @Deprecated
- public static final String EXACTLY_ONCE_BETA = "exactly_once_beta";
-
/**
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for exactly-once processing guarantees.
*
@@ -443,7 +416,7 @@ public class StreamsConfig extends AbstractConfig {
private static final String COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds with which to commit processing progress." +
" For at-least-once processing, committing means to save the position (ie, offsets) of the processor." +
" For exactly-once processing, it means to commit the transaction which includes to save the position and to make the committed data in the output topic visible to consumers with isolation level read_committed." +
- " (Note, if processing.guarantee is set to " + EXACTLY_ONCE_V2 + ", " + EXACTLY_ONCE + ",the default value is " + EOS_DEFAULT_COMMIT_INTERVAL_MS + "," +
+ " (Note, if processing.guarantee is set to " + EXACTLY_ONCE_V2 + ", the default value is " + EOS_DEFAULT_COMMIT_INTERVAL_MS + "," +
" otherwise the default value is " + DEFAULT_COMMIT_INTERVAL_MS + ".";
/** {@code repartition.purge.interval.ms} */
@@ -584,8 +557,6 @@ public class StreamsConfig extends AbstractConfig {
private static final String PROCESSING_GUARANTEE_DOC = "The processing guarantee that should be used. " +
"Possible values are " + AT_LEAST_ONCE + " (default) " +
"and " + EXACTLY_ONCE_V2 + " (requires brokers version 2.5 or higher). " +
- "Deprecated options are " + EXACTLY_ONCE + " (requires brokers version 0.11.0 or higher) " +
- "and " + EXACTLY_ONCE_BETA + " (requires brokers version 2.5 or higher). " +
"Note that exactly-once processing requires a cluster of at least three brokers by default what is the " +
"recommended setting for production; for development you can change this, by adjusting broker setting " +
"transaction.state.log.replication.factor and transaction.state.log.min.isr.";
@@ -816,7 +787,7 @@ public class StreamsConfig extends AbstractConfig {
.define(PROCESSING_GUARANTEE_CONFIG,
Type.STRING,
AT_LEAST_ONCE,
- in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2),
+ (name, value) -> validateProcessingConfiguration((String) value),
Importance.MEDIUM,
PROCESSING_GUARANTEE_DOC)
.define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
@@ -1247,17 +1218,6 @@ protected StreamsConfig(final Map, ?> props,
super(CONFIG, props, doLog);
eosEnabled = StreamsConfigUtils.eosEnabled(this);
- final String processingModeConfig = getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
- if (processingModeConfig.equals(EXACTLY_ONCE)) {
- log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release. " +
- "Please use `{}` instead. Note that this requires broker version 2.5+ so you should prepare "
- + "to upgrade your brokers if necessary.", EXACTLY_ONCE, EXACTLY_ONCE_V2);
- }
- if (processingModeConfig.equals(EXACTLY_ONCE_BETA)) {
- log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release. " +
- "Please use `{}` instead.", EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2);
- }
-
if (props.containsKey(RETRIES_CONFIG)) {
log.warn("Configuration parameter `{}` is deprecated and will be removed in the 4.0.0 release.", RETRIES_CONFIG);
}
@@ -1331,6 +1291,21 @@ private void validateRackAwarenessConfiguration() {
});
}
+ private static void validateProcessingConfiguration(final String processingModeConfig) {
+ if (processingModeConfig.equals("exactly_once")) {
+ throw new ConfigException(String.format("Configuration parameter `exactly_once` was removed in the 4.0.0 release. " +
+ "Please use `%s` instead. Refer to the Kafka Streams upgrade guide on how to upgrade your application " +
+ "to use the new parameter. Note that this requires broker version 2.5+ so you should prepare "
+ + "to upgrade your brokers if necessary.", EXACTLY_ONCE_V2));
+ }
+ if (processingModeConfig.equals("exactly_once_beta")) {
+ throw new ConfigException(String.format("Configuration parameter `exactly_once_beta` was removed in the 4.0.0 release. " +
+ "Please use `%s` instead, which is the new name for the same processing semantics.",
+ EXACTLY_ONCE_V2));
+ }
+ in(AT_LEAST_ONCE, EXACTLY_ONCE_V2).ensureValid(PROCESSING_GUARANTEE_CONFIG, processingModeConfig);
+ }
+
private Map getCommonConsumerConfigs() {
final Map clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
@@ -1580,11 +1555,6 @@ public Map getProducerConfigs(final String clientId) {
props.putAll(getClientCustomProps());
props.putAll(clientProvidedProps);
- // When using EOS alpha, stream should auto-downgrade the transactional commit protocol to be compatible with older brokers.
- if (StreamsConfigUtils.processingMode(this) == StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA) {
- props.put("internal.auto.downgrade.txn.commit", true);
- }
-
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
// add client id with stream client id prefix
props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId);
diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
index e271a42ab891..7733942089e7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java
@@ -23,8 +23,6 @@ public class StreamsConfigUtils {
public enum ProcessingMode {
AT_LEAST_ONCE("AT_LEAST_ONCE"),
- EXACTLY_ONCE_ALPHA("EXACTLY_ONCE_ALPHA"),
-
EXACTLY_ONCE_V2("EXACTLY_ONCE_V2");
public final String name;
@@ -34,25 +32,17 @@ public enum ProcessingMode {
}
}
- @SuppressWarnings("deprecation")
public static ProcessingMode processingMode(final StreamsConfig config) {
- if (StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) {
- return ProcessingMode.EXACTLY_ONCE_ALPHA;
- } else if (StreamsConfig.EXACTLY_ONCE_BETA.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) {
- return ProcessingMode.EXACTLY_ONCE_V2;
- } else if (StreamsConfig.EXACTLY_ONCE_V2.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) {
+ if (StreamsConfig.EXACTLY_ONCE_V2.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))) {
return ProcessingMode.EXACTLY_ONCE_V2;
} else {
return ProcessingMode.AT_LEAST_ONCE;
}
}
- @SuppressWarnings("deprecation")
public static String processingModeString(final ProcessingMode processingMode) {
if (processingMode == ProcessingMode.EXACTLY_ONCE_V2) {
return StreamsConfig.EXACTLY_ONCE_V2;
- } else if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA) {
- return StreamsConfig.EXACTLY_ONCE;
} else {
return StreamsConfig.AT_LEAST_ONCE;
}
@@ -63,7 +53,6 @@ public static boolean eosEnabled(final StreamsConfig config) {
}
public static boolean eosEnabled(final ProcessingMode processingMode) {
- return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA ||
- processingMode == ProcessingMode.EXACTLY_ONCE_V2;
+ return processingMode == ProcessingMode.EXACTLY_ONCE_V2;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index d28d0d4444c7..a7bfae095003 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -36,18 +36,15 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
-import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.processingMode;
-import static org.apache.kafka.streams.processor.internals.ClientUtils.getTaskProducerClientId;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getThreadProducerClientId;
class ActiveTaskCreator {
@@ -58,12 +55,10 @@ class ActiveTaskCreator {
private final ChangelogReader storeChangelogReader;
private final ThreadCache cache;
private final Time time;
- private final KafkaClientSupplier clientSupplier;
private final String threadId;
private final Logger log;
private final Sensor createTaskSensor;
private final StreamsProducer threadProducer;
- private final Map taskProducers;
private final ProcessingMode processingMode;
ActiveTaskCreator(final TopologyMetadata topologyMetadata,
@@ -84,50 +79,30 @@ class ActiveTaskCreator {
this.storeChangelogReader = storeChangelogReader;
this.cache = cache;
this.time = time;
- this.clientSupplier = clientSupplier;
this.threadId = threadId;
this.log = log;
createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
processingMode = processingMode(applicationConfig);
- if (processingMode == EXACTLY_ONCE_ALPHA) {
- threadProducer = null;
- taskProducers = new HashMap<>();
- } else { // non-eos and eos-v2
- log.info("Creating thread producer client");
+ log.info("Creating thread producer client");
- final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
- final LogContext logContext = new LogContext(threadIdPrefix);
+ final String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
+ final LogContext logContext = new LogContext(threadIdPrefix);
- threadProducer = new StreamsProducer(
- applicationConfig,
- threadId,
- clientSupplier,
- null,
- processId,
- logContext,
- time);
- taskProducers = Collections.emptyMap();
- }
+ threadProducer = new StreamsProducer(
+ applicationConfig,
+ threadId,
+ clientSupplier,
+ processId,
+ logContext,
+ time);
}
public void reInitializeThreadProducer() {
threadProducer.resetProducer();
}
- StreamsProducer streamsProducerForTask(final TaskId taskId) {
- if (processingMode != EXACTLY_ONCE_ALPHA) {
- throw new IllegalStateException("Expected EXACTLY_ONCE to be enabled, but the processing mode was " + processingMode);
- }
-
- final StreamsProducer taskProducer = taskProducers.get(taskId);
- if (taskProducer == null) {
- throw new IllegalStateException("Unknown TaskId: " + taskId);
- }
- return taskProducer;
- }
-
StreamsProducer threadProducer() {
if (processingMode != EXACTLY_ONCE_V2) {
throw new IllegalStateException("Expected EXACTLY_ONCE_V2 to be enabled, but the processing mode was " + processingMode);
@@ -183,27 +158,10 @@ public Collection createTasks(final Consumer consumer,
private RecordCollector createRecordCollector(final TaskId taskId,
final LogContext logContext,
final ProcessorTopology topology) {
- final StreamsProducer streamsProducer;
- if (processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA) {
- log.info("Creating producer client for task {}", taskId);
- streamsProducer = new StreamsProducer(
- applicationConfig,
- threadId,
- clientSupplier,
- taskId,
- null,
- logContext,
- time
- );
- taskProducers.put(taskId, streamsProducer);
- } else {
- streamsProducer = threadProducer;
- }
-
return new RecordCollectorImpl(
logContext,
taskId,
- streamsProducer,
+ threadProducer,
applicationConfig.defaultProductionExceptionHandler(),
streamsMetrics,
topology
@@ -278,45 +236,22 @@ private StreamTask createActiveTask(final TaskId taskId,
}
void closeThreadProducerIfNeeded() {
- if (threadProducer != null) {
- try {
- threadProducer.close();
- } catch (final RuntimeException e) {
- throw new StreamsException("Thread producer encounter error trying to close.", e);
- }
- }
- }
-
- void closeAndRemoveTaskProducerIfNeeded(final TaskId id) {
- final StreamsProducer taskProducer = taskProducers.remove(id);
- if (taskProducer != null) {
- try {
- taskProducer.close();
- } catch (final RuntimeException e) {
- throw new StreamsException("[" + id + "] task producer encounter error trying to close.", e, id);
- }
+ try {
+ threadProducer.close();
+ } catch (final RuntimeException e) {
+ throw new StreamsException("Thread producer encounter error trying to close.", e);
}
}
Map producerMetrics() {
- // When EOS is turned on, each task will have its own producer client
- // and the producer object passed in here will be null. We would then iterate through
- // all the active tasks and add their metrics to the output metrics map.
- final Collection producers = threadProducer != null ?
- Collections.singleton(threadProducer) :
- taskProducers.values();
- return ClientUtils.producerMetrics(producers);
+ return threadProducer.metrics()
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> (Metric) e.getValue()));
}
Set producerClientIds() {
- if (threadProducer != null) {
- return Collections.singleton(getThreadProducerClientId(threadId));
- } else {
- return taskProducers.keySet()
- .stream()
- .map(taskId -> getTaskProducerClientId(threadId, taskId))
- .collect(Collectors.toSet());
- }
+ return Collections.singleton(getThreadProducerClientId(threadId));
}
private LogContext getLogContext(final TaskId taskId) {
@@ -326,11 +261,6 @@ private LogContext getLogContext(final TaskId taskId) {
}
public double totalProducerBlockedTime() {
- if (threadProducer != null) {
- return threadProducer.totalBlockedTime();
- }
- return taskProducers.values().stream()
- .mapToDouble(StreamsProducer::totalBlockedTime)
- .sum();
+ return threadProducer.totalBlockedTime();
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
index 1177e29d8259..8f54673d940e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
@@ -34,7 +34,6 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,10 +79,6 @@ public static String getThreadProducerClientId(final String threadClientId) {
return threadClientId + "-producer";
}
- public static String getTaskProducerClientId(final String threadClientId, final TaskId taskId) {
- return threadClientId + "-" + taskId + "-producer";
- }
-
public static Map consumerMetrics(final Consumer mainConsumer,
final Consumer restoreConsumer) {
final Map consumerMetrics = mainConsumer.metrics();
@@ -99,17 +94,6 @@ public static Map adminClientMetrics(final Admin adminClient
return new LinkedHashMap<>(adminClientMetrics);
}
- public static Map producerMetrics(final Collection producers) {
- final Map result = new LinkedHashMap<>();
- for (final StreamsProducer producer : producers) {
- final Map producerMetrics = producer.metrics();
- if (producerMetrics != null) {
- result.putAll(producerMetrics);
- }
- }
- return result;
- }
-
/**
* @throws StreamsException if the consumer throws an exception
* @throws org.apache.kafka.common.errors.TimeoutException if the request times out
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 0bc19cefc30b..61743bba0e2e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -583,7 +583,6 @@ public void run() {
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
- @SuppressWarnings("deprecation") // Needed to include StreamsConfig.EXACTLY_ONCE_BETA in error log for UnsupportedVersionException
boolean runLoop() {
subscribeConsumer();
@@ -632,9 +631,9 @@ boolean runLoop() {
errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) {
log.error("Shutting down because the Kafka cluster seems to be on a too old version. " +
- "Setting {}=\"{}\"/\"{}\" requires broker version 2.5 or higher.",
- StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
- StreamsConfig.EXACTLY_ONCE_V2, StreamsConfig.EXACTLY_ONCE_BETA);
+ "Setting {}=\"{}\" requires broker version 2.5 or higher.",
+ StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
+ StreamsConfig.EXACTLY_ONCE_V2);
}
failedStreamThreadSensor.record();
this.streamsUncaughtExceptionHandler.accept(new StreamsException(e), false);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index a1b68ff79085..8365d94f6c20 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -43,7 +43,6 @@
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
-import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import java.util.List;
@@ -52,8 +51,6 @@
import java.util.UUID;
import java.util.concurrent.Future;
-import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
-import static org.apache.kafka.streams.processor.internals.ClientUtils.getTaskProducerClientId;
import static org.apache.kafka.streams.processor.internals.ClientUtils.getThreadProducerClientId;
/**
@@ -81,7 +78,6 @@ public class StreamsProducer {
public StreamsProducer(final StreamsConfig config,
final String threadId,
final KafkaClientSupplier clientSupplier,
- final TaskId taskId,
final UUID processId,
final LogContext logContext,
final Time time) {
@@ -102,21 +98,6 @@ public StreamsProducer(final StreamsConfig config,
break;
}
- case EXACTLY_ONCE_ALPHA: {
- producerConfigs = config.getProducerConfigs(
- getTaskProducerClientId(
- threadId,
- Objects.requireNonNull(taskId, "taskId cannot be null for exactly-once alpha")
- )
- );
-
- final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
- producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId);
-
- eosV2ProducerConfigs = null;
-
- break;
- }
case EXACTLY_ONCE_V2: {
producerConfigs = config.getProducerConfigs(getThreadProducerClientId(threadId));
@@ -182,7 +163,7 @@ void initTransaction() {
}
public void resetProducer() {
- if (processingMode != EXACTLY_ONCE_V2) {
+ if (!eosEnabled()) {
throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + processingMode);
}
@@ -290,10 +271,7 @@ protected void commitTransaction(final Map of
}
maybeBeginTransaction();
try {
- // EOS-v2 assumes brokers are on version 2.5+ and thus can understand the full set of consumer group metadata
- // Thus if we are using EOS-v1 and can't make this assumption, we must downgrade the request to include only the group id metadata
- final ConsumerGroupMetadata maybeDowngradedGroupMetadata = processingMode == EXACTLY_ONCE_V2 ? consumerGroupMetadata : new ConsumerGroupMetadata(consumerGroupMetadata.groupId());
- producer.sendOffsetsToTransaction(offsets, maybeDowngradedGroupMetadata);
+ producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
producer.commitTransaction();
transactionInFlight = false;
} catch (final ProducerFencedException | InvalidProducerEpochException | CommitFailedException error) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index b839c22f5aa0..0a0e04145ab4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -37,7 +37,6 @@
import java.util.stream.Collectors;
import org.slf4j.Logger;
-import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
/**
@@ -176,64 +175,47 @@ void commitOffsetsOrTransaction(final Map corruptedTasks = new HashSet<>();
if (!offsetsPerTask.isEmpty()) {
- if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
- for (final Map.Entry> taskToCommit : offsetsPerTask.entrySet()) {
- final Task task = taskToCommit.getKey();
- try {
- taskManager.streamsProducerForTask(task.id())
- .commitTransaction(taskToCommit.getValue(), taskManager.mainConsumer().groupMetadata());
- updateTaskCommitMetadata(taskToCommit.getValue());
- } catch (final TimeoutException timeoutException) {
- log.error(
- String.format("Committing task %s failed.", task.id()),
- timeoutException
- );
- corruptedTasks.add(task.id());
- }
+ final Map allOffsets = offsetsPerTask.values().stream()
+ .flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
+ try {
+ taskManager.threadProducer().commitTransaction(allOffsets, taskManager.mainConsumer().groupMetadata());
+ updateTaskCommitMetadata(allOffsets);
+ } catch (final TimeoutException timeoutException) {
+ log.error(
+ String.format("Committing task(s) %s failed.",
+ offsetsPerTask
+ .keySet()
+ .stream()
+ .map(t -> t.id().toString())
+ .collect(Collectors.joining(", "))),
+ timeoutException
+ );
+ offsetsPerTask
+ .keySet()
+ .forEach(task -> corruptedTasks.add(task.id()));
}
} else {
- final Map allOffsets = offsetsPerTask.values().stream()
- .flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-
- if (executionMetadata.processingMode() == EXACTLY_ONCE_V2) {
- try {
- taskManager.threadProducer().commitTransaction(allOffsets, taskManager.mainConsumer().groupMetadata());
- updateTaskCommitMetadata(allOffsets);
- } catch (final TimeoutException timeoutException) {
- log.error(
- String.format("Committing task(s) %s failed.",
- offsetsPerTask
- .keySet()
- .stream()
- .map(t -> t.id().toString())
- .collect(Collectors.joining(", "))),
- timeoutException
- );
- offsetsPerTask
- .keySet()
- .forEach(task -> corruptedTasks.add(task.id()));
- }
- } else {
- try {
- taskManager.mainConsumer().commitSync(allOffsets);
- updateTaskCommitMetadata(allOffsets);
- } catch (final CommitFailedException error) {
- throw new TaskMigratedException("Consumer committing offsets failed, " +
- "indicating the corresponding thread is no longer part of the group", error);
- } catch (final TimeoutException timeoutException) {
- log.error(
- String.format("Committing task(s) %s failed.",
- offsetsPerTask
- .keySet()
- .stream()
- .map(t -> t.id().toString())
- .collect(Collectors.joining(", "))),
- timeoutException
- );
- throw timeoutException;
- } catch (final KafkaException error) {
- throw new StreamsException("Error encountered committing offsets via consumer", error);
- }
+ try {
+ taskManager.mainConsumer().commitSync(allOffsets);
+ updateTaskCommitMetadata(allOffsets);
+ } catch (final CommitFailedException error) {
+ throw new TaskMigratedException("Consumer committing offsets failed, " +
+ "indicating the corresponding thread is no longer part of the group", error);
+ } catch (final TimeoutException timeoutException) {
+ log.error(
+ String.format("Committing task(s) %s failed.",
+ offsetsPerTask
+ .keySet()
+ .stream()
+ .map(t -> t.id().toString())
+ .collect(Collectors.joining(", "))),
+ timeoutException
+ );
+ throw timeoutException;
+ } catch (final KafkaException error) {
+ throw new StreamsException("Error encountered committing offsets via consumer", error);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 31eba40ae617..10e66ffc0526 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -152,10 +152,6 @@ Consumer mainConsumer() {
return mainConsumer;
}
- StreamsProducer streamsProducerForTask(final TaskId taskId) {
- return activeTaskCreator.streamsProducerForTask(taskId);
- }
-
StreamsProducer threadProducer() {
return activeTaskCreator.threadProducer();
}
@@ -617,9 +613,7 @@ private Map closeAndRecycleTasks(final Map partitions) {
- final StandbyTask standbyTask = standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTask.id());
- return standbyTask;
+ return standbyTaskCreator.createStandbyTaskFromActive(activeTask, partitions);
}
private StreamTask convertStandbyToActive(final StandbyTask standbyTask, final Set partitions) {
@@ -743,9 +737,6 @@ private void closeTaskClean(final Task task,
try {
task.suspend();
task.closeClean();
- if (task.isActive()) {
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
- }
} catch (final RuntimeException e) {
final String uncleanMessage = String.format("Failed to close task %s cleanly. " +
"Attempting to close remaining tasks before re-throwing:", task.id());
@@ -1195,10 +1186,6 @@ private void closeTaskDirty(final Task task, final boolean removeFromTasksRegist
if (removeFromTasksRegistry) {
tasks.removeTask(task);
}
-
- if (task.isActive()) {
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
- }
} catch (final RuntimeException swallow) {
log.error("Error removing dirty task {}: {}", task.id(), swallow.getMessage());
}
@@ -1207,9 +1194,6 @@ private void closeTaskDirty(final Task task, final boolean removeFromTasksRegist
private void closeTaskClean(final Task task) {
task.closeClean();
tasks.removeTask(task);
- if (task.isActive()) {
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
- }
}
void shutdown(final boolean clean) {
@@ -1281,14 +1265,6 @@ private void closeFailedTasksFromStateUpdater() {
}
task.closeDirty();
-
- try {
- if (task.isActive()) {
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(task.id());
- }
- } catch (final RuntimeException swallow) {
- log.error("Error closing dirty task {}: {}", task.id(), swallow.getMessage());
- }
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 435dd249f2f6..47f636b903ac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -55,8 +55,6 @@
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG;
-import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
-import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH;
@@ -409,18 +407,10 @@ public void shouldOverrideStreamsDefaultProducerConfigs() {
assertEquals("30000", producerConfigs.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG));
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldThrowIfTransactionTimeoutSmallerThanCommitIntervalForEOSAlpha() {
- assertThrows(IllegalArgumentException.class,
- () -> testTransactionTimeoutSmallerThanCommitInterval(EXACTLY_ONCE));
- }
-
- @SuppressWarnings("deprecation")
@Test
- public void shouldThrowIfTransactionTimeoutSmallerThanCommitIntervalForEOSBeta() {
+ public void shouldThrowIfTransactionTimeoutSmallerThanCommitIntervalForEosV2() {
assertThrows(IllegalArgumentException.class,
- () -> testTransactionTimeoutSmallerThanCommitInterval(EXACTLY_ONCE_BETA));
+ () -> testTransactionTimeoutSmallerThanCommitInterval(EXACTLY_ONCE_V2));
}
@Test
@@ -529,24 +519,6 @@ public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalse
assertThat(consumerConfigs.get("internal.throw.on.fetch.stable.offset.unsupported"), is(nullValue()));
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalseInConsumerForEosAlpha() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
- assertThat(consumerConfigs.get("internal.throw.on.fetch.stable.offset.unsupported"), is(nullValue()));
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalseInConsumerForEosBeta() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
- final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
- assertThat(consumerConfigs.get("internal.throw.on.fetch.stable.offset.unsupported"), is(true));
- }
-
@Test
public void shouldNotSetInternalThrowOnFetchStableOffsetUnsupportedConfigToFalseInConsumerForEosV2() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);
@@ -561,24 +533,6 @@ public void shouldNotSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosDisa
assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"), is(nullValue()));
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosAlpha() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map producerConfigs = streamsConfig.getProducerConfigs(clientId);
- assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"), is(true));
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldNotSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosBeta() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
- final StreamsConfig streamsConfig = new StreamsConfig(props);
- final Map producerConfigs = streamsConfig.getProducerConfigs(clientId);
- assertThat(producerConfigs.get("internal.auto.downgrade.txn.commit"), is(nullValue()));
- }
-
@Test
public void shouldNotSetInternalAutoDowngradeTxnCommitToTrueInProducerForEosV2() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);
@@ -594,20 +548,6 @@ public void shouldAcceptAtLeastOnce() {
new StreamsConfig(props);
}
- @Test
- public void shouldAcceptExactlyOnce() {
- // don't use `StreamsConfig.EXACLTY_ONCE` to actually do a useful test
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
- new StreamsConfig(props);
- }
-
- @Test
- public void shouldAcceptExactlyOnceBeta() {
- // don't use `StreamsConfig.EXACLTY_ONCE_BETA` to actually do a useful test
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once_beta");
- new StreamsConfig(props);
- }
-
@Test
public void shouldThrowExceptionIfNotAtLeastOnceOrExactlyOnce() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "bad_value");
@@ -638,27 +578,9 @@ public void shouldThrowIfBuiltInMetricsVersionInvalid() {
);
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosAlphaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled();
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosBetaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
- shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled();
- }
-
@Test
public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosV2Enabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);
- shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled();
- }
-
- private void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled() {
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
@@ -679,27 +601,9 @@ public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
);
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosAlphaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled();
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosBetaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
- shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled();
- }
-
@Test
public void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosV2Enabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);
- shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled();
- }
-
- private void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled() {
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "anyValue");
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map producerConfigs = streamsConfig.getProducerConfigs(clientId);
@@ -714,27 +618,9 @@ public void shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() {
assertThat(producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), equalTo(false));
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldSetDifferentDefaultsIfEosAlphaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- shouldSetDifferentDefaultsIfEosEnabled();
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldSetDifferentDefaultsIfEosBetaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
- shouldSetDifferentDefaultsIfEosEnabled();
- }
-
@Test
public void shouldSetDifferentDefaultsIfEosV2Enabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);
- shouldSetDifferentDefaultsIfEosEnabled();
- }
-
- private void shouldSetDifferentDefaultsIfEosEnabled() {
final StreamsConfig streamsConfig = new StreamsConfig(props);
final Map consumerConfigs = streamsConfig.getMainConsumerConfigs(groupId, clientId, threadIdx);
@@ -750,27 +636,9 @@ private void shouldSetDifferentDefaultsIfEosEnabled() {
assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(100L));
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldOverrideUserConfigTransactionalIdIfEosAlphaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- shouldOverrideUserConfigTransactionalIdIfEosEnable();
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldOverrideUserConfigTransactionalIdIfEosBetaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
- shouldOverrideUserConfigTransactionalIdIfEosEnable();
- }
-
@Test
public void shouldOverrideUserConfigTransactionalIdIfEosV2Enabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);
- shouldOverrideUserConfigTransactionalIdIfEosEnable();
- }
-
- private void shouldOverrideUserConfigTransactionalIdIfEosEnable() {
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "user-TxId");
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -779,27 +647,9 @@ private void shouldOverrideUserConfigTransactionalIdIfEosEnable() {
assertThat(producerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), is(nullValue()));
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldNotOverrideUserConfigRetriesIfExactlyAlphaOnceEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldNotOverrideUserConfigRetriesIfExactlyBetaOnceEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
- shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
- }
-
@Test
public void shouldNotOverrideUserConfigRetriesIfExactlyV2OnceEnabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);
- shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled();
- }
-
- private void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() {
final int numberOfRetries = 42;
props.put(ProducerConfig.RETRIES_CONFIG, numberOfRetries);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -809,27 +659,9 @@ private void shouldNotOverrideUserConfigRetriesIfExactlyOnceEnabled() {
assertThat(producerConfigs.get(ProducerConfig.RETRIES_CONFIG), equalTo(numberOfRetries));
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceAlphaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled();
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceBetaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
- shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled();
- }
-
@Test
public void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceV2Enabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);
- shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled();
- }
-
- private void shouldNotOverrideUserConfigCommitIntervalMsIfExactlyOnceEnabled() {
final long commitIntervalMs = 73L;
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitIntervalMs);
final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -906,27 +738,9 @@ public void shouldSpecifyCorrectValueSerdeClassOnError() {
}
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosAlphaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled();
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosBetaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
- shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled();
- }
-
@Test
public void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosV2Enabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);
- shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled();
- }
-
- private void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnabled() {
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 7);
final StreamsConfig streamsConfig = new StreamsConfig(props);
try {
@@ -941,53 +755,17 @@ private void shouldThrowExceptionIfMaxInFlightRequestsGreaterThanFiveIfEosEnable
}
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosAlphaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled();
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosBetaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
- shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled();
- }
-
@Test
public void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosV2Enabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);
- shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled();
- }
-
- private void shouldAllowToSpecifyMaxInFlightRequestsPerConnectionAsStringIfEosEnabled() {
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
new StreamsConfig(props).getProducerConfigs(clientId);
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosAlphaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
- shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled();
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosBetaEnabled() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_BETA);
- shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled();
- }
-
@Test
public void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosV2Enabled() {
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE_V2);
- shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled();
- }
-
- private void shouldThrowConfigExceptionIfMaxInFlightRequestsPerConnectionIsInvalidStringIfEosEnabled() {
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "not-a-number");
try {
@@ -1053,42 +831,28 @@ public void shouldThrowConfigExceptionWhenStoreTypeConfigNotValueInRange() {
assertThrows(ConfigException.class, () -> new StreamsConfig(props));
}
- @SuppressWarnings("deprecation")
@Test
- public void shouldLogWarningWhenEosAlphaIsUsed() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
-
- LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
- try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
- new StreamsConfig(props);
-
- assertThat(
- appender.getMessages(),
- hasItem("Configuration parameter `" + StreamsConfig.EXACTLY_ONCE +
- "` is deprecated and will be removed in the 4.0.0 release. " +
- "Please use `" + StreamsConfig.EXACTLY_ONCE_V2 + "` instead. " +
- "Note that this requires broker version 2.5+ so you should prepare " +
- "to upgrade your brokers if necessary.")
- );
- }
+ public void shouldThrowWhenEosAlphaIsUsed() {
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once");
+ final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+ assertEquals(
+ "Configuration parameter `exactly_once` was removed in the 4.0.0 release. " +
+ "Please use `exactly_once_v2` instead. Refer to the Kafka Streams upgrade guide on how to upgrade your application "
+ + "to use the new parameter. Note that this requires broker version 2.5+ so you should prepare "
+ + "to upgrade your brokers if necessary.",
+ exception.getMessage()
+ );
}
- @SuppressWarnings("deprecation")
@Test
- public void shouldLogWarningWhenEosBetaIsUsed() {
- props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_BETA);
-
- LogCaptureAppender.setClassLoggerToDebug(StreamsConfig.class);
- try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
- new StreamsConfig(props);
-
- assertThat(
- appender.getMessages(),
- hasItem("Configuration parameter `" + StreamsConfig.EXACTLY_ONCE_BETA +
- "` is deprecated and will be removed in the 4.0.0 release. " +
- "Please use `" + StreamsConfig.EXACTLY_ONCE_V2 + "` instead.")
- );
- }
+ public void shouldThrowWhenEosBetaIsUsed() {
+ props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, "exactly_once_beta");
+ final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+ assertEquals(
+ "Configuration parameter `exactly_once_beta` was removed in the 4.0.0 release. " +
+ "Please use `exactly_once_v2` instead, which is the new name for the same processing semantics.",
+ exception.getMessage()
+ );
}
@SuppressWarnings("deprecation")
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
index 718f162a18c0..395ffa2ba3d5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java
@@ -41,13 +41,9 @@
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
@@ -65,24 +61,11 @@
/**
* Test the unclean shutdown behavior around state store cleanup.
*/
-@RunWith(Parameterized.class)
@Category(IntegrationTest.class)
public class EOSUncleanShutdownIntegrationTest {
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
- @SuppressWarnings("deprecation")
- @Parameterized.Parameters(name = "{0}")
- public static Collection data() {
- return Arrays.asList(new String[][] {
- {StreamsConfig.EXACTLY_ONCE},
- {StreamsConfig.EXACTLY_ONCE_V2}
- });
- }
-
- @Parameterized.Parameter
- public String eosConfig;
-
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
@BeforeClass
@@ -114,7 +97,7 @@ public static void closeCluster() {
public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedException {
final String appId = "shouldWorkWithUncleanShutdownWipeOutStateStore";
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
- STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
+ STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
final String input = "input-topic";
cleanStateBeforeTest(CLUSTER, input);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 12cb0bf9563e..b7776d0be51b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -148,12 +148,10 @@ public static void closeCluster() {
private String stateTmpDir;
- @SuppressWarnings("deprecation")
@Parameters(name = "{0}")
public static Collection data() {
return Arrays.asList(new String[][]{
{StreamsConfig.AT_LEAST_ONCE},
- {StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_V2}
});
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
deleted file mode 100644
index 7f652f0c7f57..000000000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
+++ /dev/null
@@ -1,1213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Partitioner;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.IsolationLevel;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KafkaStreams.State;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StoreQueryParameters;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsConfig.InternalConfig;
-import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils.StableAssignmentListener;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.StreamsTestUtils;
-import org.apache.kafka.test.TestUtils;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.test.TestUtils.waitForCondition;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-
-@RunWith(Parameterized.class)
-@Category({IntegrationTest.class})
-public class EosV2UpgradeIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
-
- @Parameterized.Parameters(name = "{0}")
- public static Collection data() {
- return Arrays.asList(new Boolean[][] {
- {false},
- {true}
- });
- }
-
- @Parameterized.Parameter
- public boolean injectError;
-
- private static final int NUM_BROKERS = 3;
- private static final int MAX_POLL_INTERVAL_MS = (int) Duration.ofSeconds(100L).toMillis();
- private static final long MAX_WAIT_TIME_MS = Duration.ofMinutes(1L).toMillis();
-
- private static final List> CLOSE =
- Collections.unmodifiableList(
- Arrays.asList(
- KeyValue.pair(KafkaStreams.State.RUNNING, KafkaStreams.State.PENDING_SHUTDOWN),
- KeyValue.pair(KafkaStreams.State.PENDING_SHUTDOWN, KafkaStreams.State.NOT_RUNNING)
- )
- );
- private static final List> CRASH =
- Collections.unmodifiableList(
- Collections.singletonList(
- KeyValue.pair(State.PENDING_ERROR, State.ERROR)
- )
- );
-
- public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(
- NUM_BROKERS,
- Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false"))
- );
-
-
- @BeforeClass
- public static void startCluster() throws IOException {
- CLUSTER.start();
- }
-
- @AfterClass
- public static void closeCluster() {
- CLUSTER.stop();
- }
-
- private static String applicationId;
- private final static int NUM_TOPIC_PARTITIONS = 4;
- private final static String CONSUMER_GROUP_ID = "readCommitted";
- private final static String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic";
- private final static String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic";
- private final static String APP_DIR_1 = "appDir1";
- private final static String APP_DIR_2 = "appDir2";
- private final static String UNEXPECTED_EXCEPTION_MSG = "Fail the test since we got an unexpected exception, or " +
- "there are too many exceptions thrown, please check standard error log for more info.";
- private final String storeName = "store";
-
- private final StableAssignmentListener assignmentListener = new StableAssignmentListener();
-
- private final AtomicBoolean errorInjectedClient1 = new AtomicBoolean(false);
- private final AtomicBoolean errorInjectedClient2 = new AtomicBoolean(false);
- private final AtomicBoolean commitErrorInjectedClient1 = new AtomicBoolean(false);
- private final AtomicBoolean commitErrorInjectedClient2 = new AtomicBoolean(false);
- private final AtomicInteger commitCounterClient1 = new AtomicInteger(-1);
- private final AtomicInteger commitCounterClient2 = new AtomicInteger(-1);
- private final AtomicInteger commitRequested = new AtomicInteger(0);
-
- private int testNumber = 0;
- private Map exceptionCounts = new HashMap() {
- {
- put(APP_DIR_1, 0);
- put(APP_DIR_2, 0);
- }
- };
-
- private volatile boolean hasUnexpectedError = false;
-
- @Before
- public void createTopics() throws Exception {
- applicationId = "appId-" + ++testNumber;
- CLUSTER.deleteTopicsAndWait(
- MULTI_PARTITION_INPUT_TOPIC,
- MULTI_PARTITION_OUTPUT_TOPIC,
- applicationId + "-" + storeName + "-changelog"
- );
-
- CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
- CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC, NUM_TOPIC_PARTITIONS, 1);
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void shouldUpgradeFromEosAlphaToEosV2() throws Exception {
- // We use two KafkaStreams clients that we upgrade from eos-alpha to eos-V2. During the upgrade,
- // we ensure that there are pending transaction and verify that data is processed correctly.
- //
- // We either close clients cleanly (`injectError = false`) or let them crash (`injectError = true`) during
- // the upgrade. For both cases, EOS should not be violated.
- //
- // Additionally, we inject errors while one client is on eos-alpha while the other client is on eos-V2:
- // For this case, we inject the error during task commit phase, i.e., after offsets are appended to a TX,
- // and before the TX is committed. The goal is to verify that the written but uncommitted offsets are not
- // picked up, i.e., GroupCoordinator fencing works correctly.
- //
- // The commit interval is set to MAX_VALUE and the used `Processor` request commits manually so we have full
- // control when a commit actually happens. We use an input topic with 4 partitions and each task will request
- // a commit after processing 10 records.
- //
- // 1. start both clients and wait until rebalance stabilizes
- // 2. write 10 records per input topic partition and verify that the result was committed
- // 3. write 5 records per input topic partition to get pending transactions (verified via "read_uncommitted" mode)
- // - all 4 pending transactions are based on task producers
- // - we will get only 4 pending writes for one partition for the crash case as we crash processing the 5th record
- // 4. stop/crash the first client, wait until rebalance stabilizes:
- // - stop case:
- // * verify that the stopped client did commit its pending transaction during shutdown
- // * the second client will still have two pending transaction
- // - crash case:
- // * the pending transactions of the crashed client got aborted
- // * the second client will have four pending transactions
- // 5. restart the first client with eos-V2 enabled and wait until rebalance stabilizes
- // - the rebalance should result in a commit of all tasks
- // 6. write 5 record per input topic partition
- // - stop case:
- // * verify that the result was committed
- // - crash case:
- // * fail the second (i.e., eos-alpha) client during commit
- // * the eos-V2 client should not pickup the pending offsets
- // * verify uncommitted and committed result
- // 7. only for crash case:
- // 7a. restart the second client in eos-alpha mode and wait until rebalance stabilizes
- // 7b. write 10 records per input topic partition
- // * fail the first (i.e., eos-V2) client during commit
- // * the eos-alpha client should not pickup the pending offsets
- // * verify uncommitted and committed result
- // 7c. restart the first client in eos-V2 mode and wait until rebalance stabilizes
- // 8. write 5 records per input topic partition to get pending transactions (verified via "read_uncommitted" mode)
- // - 2 transaction are base on a task producer; one transaction is based on a thread producer
- // - we will get 4 pending writes for the crash case as we crash processing the 5th record
- // 9. stop/crash the second client and wait until rebalance stabilizes:
- // - stop only:
- // * verify that the stopped client did commit its pending transaction during shutdown
- // * the first client will still have one pending transaction
- // - crash case:
- // * the pending transactions of the crashed client got aborted
- // * the first client will have one pending transactions
- // 10. restart the second client with eos-V2 enabled and wait until rebalance stabilizes
- // - the rebalance should result in a commit of all tasks
- // 11. write 5 record per input topic partition and verify that the result was committed
-
- final List> stateTransitions1 = new LinkedList<>();
- KafkaStreams streams1Alpha = null;
- KafkaStreams streams1V2 = null;
- KafkaStreams streams1V2Two = null;
-
- final List> stateTransitions2 = new LinkedList<>();
- KafkaStreams streams2Alpha = null;
- KafkaStreams streams2AlphaTwo = null;
- KafkaStreams streams2V2 = null;
-
- try {
- // phase 1: start both clients
- streams1Alpha = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE);
- streams1Alpha.setStateListener(
- (newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState))
- );
-
- assignmentListener.prepareForRebalance();
- streams1Alpha.cleanUp();
- streams1Alpha.start();
- assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
- waitForRunning(stateTransitions1);
-
- streams2Alpha = getKafkaStreams(APP_DIR_2, StreamsConfig.EXACTLY_ONCE);
- streams2Alpha.setStateListener(
- (newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState))
- );
- stateTransitions1.clear();
-
- assignmentListener.prepareForRebalance();
- streams2Alpha.cleanUp();
- streams2Alpha.start();
- assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
- waitForRunning(stateTransitions1);
- waitForRunning(stateTransitions2);
-
- // in all phases, we write comments that assume that p-0/p-1 are assigned to the first client
- // and p-2/p-3 are assigned to the second client (in reality the assignment might be different though)
-
- // phase 2: (write first batch of data)
- // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
- //
- // p-0: ---> 10 rec + C
- // p-1: ---> 10 rec + C
- // p-2: ---> 10 rec + C
- // p-3: ---> 10 rec + C
- final List> committedInputDataBeforeUpgrade =
- prepareData(0L, 10L, 0L, 1L, 2L, 3L);
- writeInputData(committedInputDataBeforeUpgrade);
-
- waitForCondition(
- () -> commitRequested.get() == 4,
- MAX_WAIT_TIME_MS,
- "SteamsTasks did not request commit."
- );
-
- final Map committedState = new HashMap<>();
- final List> expectedUncommittedResult =
- computeExpectedResult(committedInputDataBeforeUpgrade, committedState);
- verifyCommitted(expectedUncommittedResult);
-
- // phase 3: (write partial second batch of data)
- // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
- //
- // stop case:
- // p-0: 10 rec + C ---> 5 rec (pending)
- // p-1: 10 rec + C ---> 5 rec (pending)
- // p-2: 10 rec + C ---> 5 rec (pending)
- // p-3: 10 rec + C ---> 5 rec (pending)
- // crash case: (we just assumes that we inject the error for p-0; in reality it might be a different partition)
- // (we don't crash right away and write one record less)
- // p-0: 10 rec + C ---> 4 rec (pending)
- // p-1: 10 rec + C ---> 5 rec (pending)
- // p-2: 10 rec + C ---> 5 rec (pending)
- // p-3: 10 rec + C ---> 5 rec (pending)
- final Set cleanKeys = mkSet(0L, 1L, 2L, 3L);
- final Set keysFirstClientAlpha = keysFromInstance(streams1Alpha);
- final long firstFailingKeyForCrashCase = keysFirstClientAlpha.iterator().next();
- cleanKeys.remove(firstFailingKeyForCrashCase);
-
- final List> uncommittedInputDataBeforeFirstUpgrade = new LinkedList<>();
- final HashMap uncommittedState = new HashMap<>(committedState);
- if (!injectError) {
- uncommittedInputDataBeforeFirstUpgrade.addAll(
- prepareData(10L, 15L, 0L, 1L, 2L, 3L)
- );
- writeInputData(uncommittedInputDataBeforeFirstUpgrade);
-
- expectedUncommittedResult.addAll(
- computeExpectedResult(uncommittedInputDataBeforeFirstUpgrade, uncommittedState)
- );
- verifyUncommitted(expectedUncommittedResult);
- } else {
- final List> uncommittedInputDataWithoutFailingKey = new LinkedList<>();
- for (final long key : cleanKeys) {
- uncommittedInputDataWithoutFailingKey.addAll(prepareData(10L, 15L, key));
- }
- uncommittedInputDataWithoutFailingKey.addAll(
- prepareData(10L, 14L, firstFailingKeyForCrashCase)
- );
- uncommittedInputDataBeforeFirstUpgrade.addAll(uncommittedInputDataWithoutFailingKey);
- writeInputData(uncommittedInputDataWithoutFailingKey);
-
- expectedUncommittedResult.addAll(
- computeExpectedResult(uncommittedInputDataWithoutFailingKey, new HashMap<>(committedState))
- );
- verifyUncommitted(expectedUncommittedResult);
- }
-
- // phase 4: (stop first client)
- // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
- //
- // stop case: (client 1 will commit its two tasks on close())
- // p-0: 10 rec + C + 5 rec ---> C
- // p-1: 10 rec + C + 5 rec ---> C
- // p-2: 10 rec + C + 5 rec (pending)
- // p-3: 10 rec + C + 5 rec (pending)
- // crash case: (we write the last record that will trigger the crash; both TX from client 1 will be aborted
- // during fail over by client 2 and retried)
- // p-0: 10 rec + C + 4 rec + A + 5 rec (pending)
- // p-1: 10 rec + C + 5 rec + A + 5 rec (pending)
- // p-2: 10 rec + C + 5 rec (pending)
- // p-3: 10 rec + C + 5 rec (pending)
- stateTransitions2.clear();
- assignmentListener.prepareForRebalance();
-
- if (!injectError) {
- stateTransitions1.clear();
- streams1Alpha.close();
- waitForStateTransition(stateTransitions1, CLOSE);
- } else {
- errorInjectedClient1.set(true);
-
- final List> dataPotentiallyFirstFailingKey =
- prepareData(14L, 15L, firstFailingKeyForCrashCase);
- uncommittedInputDataBeforeFirstUpgrade.addAll(dataPotentiallyFirstFailingKey);
- writeInputData(dataPotentiallyFirstFailingKey);
- }
- assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
- waitForRunning(stateTransitions2);
-
- if (!injectError) {
- final List> committedInputDataDuringFirstUpgrade =
- uncommittedInputDataBeforeFirstUpgrade
- .stream()
- .filter(pair -> keysFirstClientAlpha.contains(pair.key))
- .collect(Collectors.toList());
-
- final List> expectedCommittedResult =
- computeExpectedResult(committedInputDataDuringFirstUpgrade, committedState);
- verifyCommitted(expectedCommittedResult);
- } else {
- // retrying TX
- expectedUncommittedResult.addAll(computeExpectedResult(
- uncommittedInputDataBeforeFirstUpgrade
- .stream()
- .filter(pair -> keysFirstClientAlpha.contains(pair.key))
- .collect(Collectors.toList()),
- new HashMap<>(committedState)
- ));
- verifyUncommitted(expectedUncommittedResult);
- waitForStateTransitionContains(stateTransitions1, CRASH);
-
- errorInjectedClient1.set(false);
- stateTransitions1.clear();
- streams1Alpha.close();
- assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError);
- }
-
- // phase 5: (restart first client)
- // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
- //
- // stop case: (client 2 (alpha) will commit the two revoked task that migrate back to client 1)
- // (note: we may or may not get newly committed data, depending if the already committed tasks
- // migrate back to client 1, or different tasks)
- // (below we show the case for which we don't get newly committed data)
- // p-0: 10 rec + C + 5 rec ---> C
- // p-1: 10 rec + C + 5 rec ---> C
- // p-2: 10 rec + C + 5 rec (pending)
- // p-3: 10 rec + C + 5 rec (pending)
- // crash case: (client 2 (alpha) will commit all tasks even only two tasks are revoked and migrate back to client 1)
- // (note: because nothing was committed originally, we always get newly committed data)
- // p-0: 10 rec + C + 4 rec + A + 5 rec ---> C
- // p-1: 10 rec + C + 5 rec + A + 5 rec ---> C
- // p-2: 10 rec + C + 5 rec ---> C
- // p-3: 10 rec + C + 5 rec ---> C
- commitRequested.set(0);
- stateTransitions1.clear();
- stateTransitions2.clear();
- streams1V2 = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_V2);
- streams1V2.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState)));
- assignmentListener.prepareForRebalance();
- streams1V2.start();
- assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
- waitForRunning(stateTransitions1);
- waitForRunning(stateTransitions2);
-
- final Set newlyCommittedKeys;
- if (!injectError) {
- newlyCommittedKeys = keysFromInstance(streams1V2);
- newlyCommittedKeys.removeAll(keysFirstClientAlpha);
- } else {
- newlyCommittedKeys = mkSet(0L, 1L, 2L, 3L);
- }
-
- final List> expectedCommittedResultAfterRestartFirstClient = computeExpectedResult(
- uncommittedInputDataBeforeFirstUpgrade
- .stream()
- .filter(pair -> newlyCommittedKeys.contains(pair.key))
- .collect(Collectors.toList()),
- committedState
- );
- verifyCommitted(expectedCommittedResultAfterRestartFirstClient);
-
- // phase 6: (complete second batch of data; crash: let second client fail on commit)
- // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
- //
- // stop case: (both client commit regularly)
- // (depending on the task movement in phase 5, we may or may not get newly committed data;
- // we show the case for which p-2 and p-3 are newly committed below)
- // p-0: 10 rec + C + 5 rec + C ---> 5 rec + C
- // p-1: 10 rec + C + 5 rec + C ---> 5 rec + C
- // p-2: 10 rec + C + 5 rec ---> 5 rec + C
- // p-3: 10 rec + C + 5 rec ---> 5 rec + C
- // crash case: (second/alpha client fails and both TX are aborted)
- // (first/V2 client reprocessed the 10 records and commits TX)
- // p-0: 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
- // p-1: 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
- // p-2: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
- // p-3: 10 rec + C + 5 rec + C ---> 5 rec + A + 5 rec + C
- commitCounterClient1.set(0);
-
- if (!injectError) {
- final List> finishSecondBatch = prepareData(15L, 20L, 0L, 1L, 2L, 3L);
- writeInputData(finishSecondBatch);
-
- final List> committedInputDataDuringUpgrade = uncommittedInputDataBeforeFirstUpgrade
- .stream()
- .filter(pair -> !keysFirstClientAlpha.contains(pair.key))
- .filter(pair -> !newlyCommittedKeys.contains(pair.key))
- .collect(Collectors.toList());
- committedInputDataDuringUpgrade.addAll(
- finishSecondBatch
- );
-
- expectedUncommittedResult.addAll(
- computeExpectedResult(finishSecondBatch, uncommittedState)
- );
- final List> expectedCommittedResult =
- computeExpectedResult(committedInputDataDuringUpgrade, committedState);
- verifyCommitted(expectedCommittedResult);
- } else {
- final Set keysFirstClientV2 = keysFromInstance(streams1V2);
- final Set keysSecondClientAlpha = keysFromInstance(streams2Alpha);
-
- final List> committedInputDataAfterFirstUpgrade =
- prepareData(15L, 20L, keysFirstClientV2.toArray(new Long[0]));
- writeInputData(committedInputDataAfterFirstUpgrade);
-
- final List> expectedCommittedResultBeforeFailure =
- computeExpectedResult(committedInputDataAfterFirstUpgrade, committedState);
- verifyCommitted(expectedCommittedResultBeforeFailure);
- expectedUncommittedResult.addAll(expectedCommittedResultBeforeFailure);
-
- commitCounterClient2.set(0);
-
- final Iterator it = keysSecondClientAlpha.iterator();
- final Long otherKey = it.next();
- final Long failingKey = it.next();
-
- final List> uncommittedInputDataAfterFirstUpgrade =
- prepareData(15L, 19L, keysSecondClientAlpha.toArray(new Long[0]));
- uncommittedInputDataAfterFirstUpgrade.addAll(prepareData(19L, 20L, otherKey));
- writeInputData(uncommittedInputDataAfterFirstUpgrade);
-
- uncommittedState.putAll(committedState);
- expectedUncommittedResult.addAll(
- computeExpectedResult(uncommittedInputDataAfterFirstUpgrade, uncommittedState)
- );
- verifyUncommitted(expectedUncommittedResult);
-
- stateTransitions1.clear();
- stateTransitions2.clear();
- assignmentListener.prepareForRebalance();
-
- commitCounterClient1.set(0);
- commitErrorInjectedClient2.set(true);
-
- final List> dataFailingKey = prepareData(19L, 20L, failingKey);
- uncommittedInputDataAfterFirstUpgrade.addAll(dataFailingKey);
- writeInputData(dataFailingKey);
-
- expectedUncommittedResult.addAll(
- computeExpectedResult(dataFailingKey, uncommittedState)
- );
- verifyUncommitted(expectedUncommittedResult);
-
- assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
-
- waitForStateTransitionContains(stateTransitions2, CRASH);
-
- commitErrorInjectedClient2.set(false);
- stateTransitions2.clear();
- streams2Alpha.close();
- assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError);
-
- final List> expectedCommittedResultAfterFailure =
- computeExpectedResult(uncommittedInputDataAfterFirstUpgrade, committedState);
- verifyCommitted(expectedCommittedResultAfterFailure);
- expectedUncommittedResult.addAll(expectedCommittedResultAfterFailure);
- }
-
- // 7. only for crash case:
- // 7a. restart the failed second client in eos-alpha mode and wait until rebalance stabilizes
- // 7b. write third batch of input data
- // * fail the first (i.e., eos-V2) client during commit
- // * the eos-alpha client should not pickup the pending offsets
- // * verify uncommitted and committed result
- // 7c. restart the first client in eos-V2 mode and wait until rebalance stabilizes
- //
- // crash case:
- // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C ---> 10 rec + A + 10 rec + C
- // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C ---> 10 rec + A + 10 rec + C
- // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 rec + C
- // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C ---> 10 rec + C
- if (!injectError) {
- streams2AlphaTwo = streams2Alpha;
- } else {
- // 7a restart the second client in eos-alpha mode and wait until rebalance stabilizes
- commitCounterClient1.set(0);
- commitCounterClient2.set(-1);
- stateTransitions1.clear();
- stateTransitions2.clear();
- streams2AlphaTwo = getKafkaStreams(APP_DIR_2, StreamsConfig.EXACTLY_ONCE);
- streams2AlphaTwo.setStateListener(
- (newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState))
- );
- assignmentListener.prepareForRebalance();
- streams2AlphaTwo.start();
- assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
- waitForRunning(stateTransitions1);
- waitForRunning(stateTransitions2);
-
- // 7b. write third batch of input data
- final Set keysFirstClientV2 = keysFromInstance(streams1V2);
- final Set keysSecondClientAlphaTwo = keysFromInstance(streams2AlphaTwo);
-
- final List> committedInputDataBetweenUpgrades =
- prepareData(20L, 30L, keysSecondClientAlphaTwo.toArray(new Long[0]));
- writeInputData(committedInputDataBetweenUpgrades);
-
- final List> expectedCommittedResultBeforeFailure =
- computeExpectedResult(committedInputDataBetweenUpgrades, committedState);
- verifyCommitted(expectedCommittedResultBeforeFailure);
- expectedUncommittedResult.addAll(expectedCommittedResultBeforeFailure);
-
- commitCounterClient2.set(0);
-
- final Iterator it = keysFirstClientV2.iterator();
- final Long otherKey = it.next();
- final Long failingKey = it.next();
-
- final List> uncommittedInputDataBetweenUpgrade =
- prepareData(20L, 29L, keysFirstClientV2.toArray(new Long[0]));
- uncommittedInputDataBetweenUpgrade.addAll(prepareData(29L, 30L, otherKey));
- writeInputData(uncommittedInputDataBetweenUpgrade);
-
- uncommittedState.putAll(committedState);
- expectedUncommittedResult.addAll(
- computeExpectedResult(uncommittedInputDataBetweenUpgrade, uncommittedState)
- );
- verifyUncommitted(expectedUncommittedResult);
-
- stateTransitions1.clear();
- stateTransitions2.clear();
- assignmentListener.prepareForRebalance();
- commitCounterClient2.set(0);
- commitErrorInjectedClient1.set(true);
-
- final List> dataFailingKey = prepareData(29L, 30L, failingKey);
- uncommittedInputDataBetweenUpgrade.addAll(dataFailingKey);
- writeInputData(dataFailingKey);
-
- expectedUncommittedResult.addAll(
- computeExpectedResult(dataFailingKey, uncommittedState)
- );
- verifyUncommitted(expectedUncommittedResult);
-
- assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
-
- waitForStateTransitionContains(stateTransitions1, CRASH);
-
- commitErrorInjectedClient1.set(false);
- stateTransitions1.clear();
- streams1V2.close();
- assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError);
-
- final List> expectedCommittedResultAfterFailure =
- computeExpectedResult(uncommittedInputDataBetweenUpgrade, committedState);
- verifyCommitted(expectedCommittedResultAfterFailure);
- expectedUncommittedResult.addAll(expectedCommittedResultAfterFailure);
-
- // 7c. restart the first client in eos-V2 mode and wait until rebalance stabilizes
- stateTransitions1.clear();
- stateTransitions2.clear();
- streams1V2Two = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_V2);
- streams1V2Two.setStateListener((newState, oldState) -> stateTransitions1.add(KeyValue.pair(oldState, newState)));
- assignmentListener.prepareForRebalance();
- streams1V2Two.start();
- assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
- waitForRunning(stateTransitions1);
- waitForRunning(stateTransitions2);
- }
-
- // phase 8: (write partial last batch of data)
- // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
- //
- // stop case:
- // p-0: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec (pending)
- // p-1: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec (pending)
- // p-2: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec (pending)
- // p-3: 10 rec + C + 5 rec + C + 5 rec + C ---> 5 rec (pending)
- // crash case: (we just assumes that we inject the error for p-2; in reality it might be a different partition)
- // (we don't crash right away and write one record less)
- // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C ---> 5 rec (pending)
- // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C ---> 5 rec (pending)
- // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C ---> 4 rec (pending)
- // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C ---> 5 rec (pending)
- cleanKeys.addAll(mkSet(0L, 1L, 2L, 3L));
- final Set keysSecondClientAlphaTwo = keysFromInstance(streams2AlphaTwo);
- final long secondFailingKeyForCrashCase = keysSecondClientAlphaTwo.iterator().next();
- cleanKeys.remove(secondFailingKeyForCrashCase);
-
- final List> uncommittedInputDataBeforeSecondUpgrade = new LinkedList<>();
- if (!injectError) {
- uncommittedInputDataBeforeSecondUpgrade.addAll(
- prepareData(30L, 35L, 0L, 1L, 2L, 3L)
- );
- writeInputData(uncommittedInputDataBeforeSecondUpgrade);
-
- expectedUncommittedResult.addAll(
- computeExpectedResult(uncommittedInputDataBeforeSecondUpgrade, new HashMap<>(committedState))
- );
- verifyUncommitted(expectedUncommittedResult);
- } else {
- final List> uncommittedInputDataWithoutFailingKey = new LinkedList<>();
- for (final long key : cleanKeys) {
- uncommittedInputDataWithoutFailingKey.addAll(prepareData(30L, 35L, key));
- }
- uncommittedInputDataWithoutFailingKey.addAll(
- prepareData(30L, 34L, secondFailingKeyForCrashCase)
- );
- uncommittedInputDataBeforeSecondUpgrade.addAll(uncommittedInputDataWithoutFailingKey);
- writeInputData(uncommittedInputDataWithoutFailingKey);
-
- expectedUncommittedResult.addAll(
- computeExpectedResult(uncommittedInputDataWithoutFailingKey, new HashMap<>(committedState))
- );
- verifyUncommitted(expectedUncommittedResult);
- }
-
- // phase 9: (stop/crash second client)
- // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
- //
- // stop case: (client 2 (alpha) will commit its two tasks on close())
- // p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec (pending)
- // p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec (pending)
- // p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
- // p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
- // crash case: (we write the last record that will trigger the crash; both TX from client 2 will be aborted
- // during fail over by client 1 and retried)
- // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec (pending)
- // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec (pending)
- // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec ---> A + 5 rec (pending)
- // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec ---> A + 5 rec (pending)
- stateTransitions1.clear();
- assignmentListener.prepareForRebalance();
- if (!injectError) {
- stateTransitions2.clear();
- streams2AlphaTwo.close();
- waitForStateTransition(stateTransitions2, CLOSE);
- } else {
- errorInjectedClient2.set(true);
-
- final List> dataPotentiallySecondFailingKey =
- prepareData(34L, 35L, secondFailingKeyForCrashCase);
- uncommittedInputDataBeforeSecondUpgrade.addAll(dataPotentiallySecondFailingKey);
- writeInputData(dataPotentiallySecondFailingKey);
- }
- assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
- waitForRunning(stateTransitions1);
-
- if (!injectError) {
- final List> committedInputDataDuringSecondUpgrade =
- uncommittedInputDataBeforeSecondUpgrade
- .stream()
- .filter(pair -> keysSecondClientAlphaTwo.contains(pair.key))
- .collect(Collectors.toList());
-
- final List> expectedCommittedResult =
- computeExpectedResult(committedInputDataDuringSecondUpgrade, committedState);
- verifyCommitted(expectedCommittedResult);
- } else {
- // retrying TX
- expectedUncommittedResult.addAll(computeExpectedResult(
- uncommittedInputDataBeforeSecondUpgrade
- .stream()
- .filter(pair -> keysSecondClientAlphaTwo.contains(pair.key))
- .collect(Collectors.toList()),
- new HashMap<>(committedState)
- ));
- verifyUncommitted(expectedUncommittedResult);
- waitForStateTransitionContains(stateTransitions2, CRASH);
-
- errorInjectedClient2.set(false);
- stateTransitions2.clear();
- streams2AlphaTwo.close();
- assertFalse(UNEXPECTED_EXCEPTION_MSG, hasUnexpectedError);
- }
-
- // phase 10: (restart second client)
- // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
- //
- // the state below indicate the case for which the "original" tasks of client2 are migrated back to client2
- // if a task "switch" happens, we might get additional commits (omitted in the comment for brevity)
- //
- // stop case: (client 1 (V2) will commit all four tasks if at least one revoked and migrate task needs committing back to client 2)
- // p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
- // p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec ---> C
- // p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C
- // p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C
- // crash case: (client 1 (V2) will commit all four tasks even only two are migrate back to client 2)
- // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec ---> C
- // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec ---> C
- // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec + A + 5 rec ---> C
- // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec + A + 5 rec ---> C
- commitRequested.set(0);
- stateTransitions1.clear();
- stateTransitions2.clear();
- streams2V2 = getKafkaStreams(APP_DIR_1, StreamsConfig.EXACTLY_ONCE_V2);
- streams2V2.setStateListener(
- (newState, oldState) -> stateTransitions2.add(KeyValue.pair(oldState, newState))
- );
- assignmentListener.prepareForRebalance();
- streams2V2.start();
- assignmentListener.waitForNextStableAssignment(MAX_WAIT_TIME_MS);
- waitForRunning(stateTransitions1);
- waitForRunning(stateTransitions2);
-
- newlyCommittedKeys.clear();
- if (!injectError) {
- newlyCommittedKeys.addAll(keysFromInstance(streams2V2));
- newlyCommittedKeys.removeAll(keysSecondClientAlphaTwo);
- } else {
- newlyCommittedKeys.addAll(mkSet(0L, 1L, 2L, 3L));
- }
-
- final List> expectedCommittedResultAfterRestartSecondClient = computeExpectedResult(
- uncommittedInputDataBeforeSecondUpgrade
- .stream()
- .filter(pair -> newlyCommittedKeys.contains(pair.key))
- .collect(Collectors.toList()),
- committedState
- );
- verifyCommitted(expectedCommittedResultAfterRestartSecondClient);
-
- // phase 11: (complete fourth batch of data)
- // expected end state per output partition (C == COMMIT; A == ABORT; ---> indicate the changes):
- //
- // stop case:
- // p-0: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
- // p-1: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
- // p-2: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
- // p-3: 10 rec + C + 5 rec + C + 5 rec + C + 5 rec + C ---> 5 rec + C
- // crash case: (we just assumes that we inject the error for p-2; in reality it might be a different partition)
- // p-0: 10 rec + C + 4 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
- // p-1: 10 rec + C + 5 rec + A + 5 rec + C + 5 rec + C + 10 rec + A + 10 rec + C + 5 rec + C ---> 5 rec + C
- // p-2: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 4 rec + A + 5 rec + C ---> 5 rec + C
- // p-3: 10 rec + C + 5 rec + C + 5 rec + A + 5 rec + C + 10 rec + C + 5 rec + A + 5 rec + C ---> 5 rec + C
- commitCounterClient1.set(-1);
- commitCounterClient2.set(-1);
-
- final List> finishLastBatch =
- prepareData(35L, 40L, 0L, 1L, 2L, 3L);
- writeInputData(finishLastBatch);
-
- final Set uncommittedKeys = mkSet(0L, 1L, 2L, 3L);
- uncommittedKeys.removeAll(keysSecondClientAlphaTwo);
- uncommittedKeys.removeAll(newlyCommittedKeys);
- final List> committedInputDataDuringUpgrade = uncommittedInputDataBeforeSecondUpgrade
- .stream()
- .filter(pair -> uncommittedKeys.contains(pair.key))
- .collect(Collectors.toList());
- committedInputDataDuringUpgrade.addAll(
- finishLastBatch
- );
-
- final List> expectedCommittedResult =
- computeExpectedResult(committedInputDataDuringUpgrade, committedState);
- verifyCommitted(expectedCommittedResult);
- } finally {
- if (streams1Alpha != null) {
- streams1Alpha.close();
- }
- if (streams1V2 != null) {
- streams1V2.close();
- }
- if (streams1V2Two != null) {
- streams1V2Two.close();
- }
- if (streams2Alpha != null) {
- streams2Alpha.close();
- }
- if (streams2AlphaTwo != null) {
- streams2AlphaTwo.close();
- }
- if (streams2V2 != null) {
- streams2V2.close();
- }
- }
- }
-
- @SuppressWarnings("deprecation")
- private KafkaStreams getKafkaStreams(final String appDir,
- final String processingGuarantee) {
- final StreamsBuilder builder = new StreamsBuilder();
-
- final String[] storeNames = new String[] {storeName};
- final StoreBuilder> storeBuilder = Stores
- .keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.Long(), Serdes.Long())
- .withCachingEnabled();
-
- builder.addStateStore(storeBuilder);
-
- final KStream input = builder.stream(MULTI_PARTITION_INPUT_TOPIC);
- input.transform(new TransformerSupplier>() {
- @Override
- public Transformer> get() {
- return new Transformer>() {
- ProcessorContext context;
- KeyValueStore state = null;
- AtomicBoolean crash;
- AtomicInteger sharedCommit;
-
- @Override
- public void init(final ProcessorContext context) {
- this.context = context;
- state = context.getStateStore(storeName);
- final String clientId = context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString();
- if (APP_DIR_1.equals(clientId)) {
- crash = errorInjectedClient1;
- sharedCommit = commitCounterClient1;
- } else {
- crash = errorInjectedClient2;
- sharedCommit = commitCounterClient2;
- }
- }
-
- @Override
- public KeyValue transform(final Long key, final Long value) {
- if ((value + 1) % 10 == 0) {
- if (sharedCommit.get() < 0 ||
- sharedCommit.incrementAndGet() == 2) {
-
- context.commit();
- }
- commitRequested.incrementAndGet();
- }
-
- Long sum = state.get(key);
- if (sum == null) {
- sum = value;
- } else {
- sum += value;
- }
- state.put(key, sum);
- state.flush();
-
- if (value % 10 == 4 && // potentially crash when processing 5th, 15th, or 25th record (etc.)
- crash != null && crash.compareAndSet(true, false)) {
- // only crash a single task
- throw new RuntimeException("Injected test exception.");
- }
-
- return new KeyValue<>(key, state.get(key));
- }
-
- @Override
- public void close() {}
- };
- } }, storeNames)
- .to(MULTI_PARTITION_OUTPUT_TOPIC);
-
- final Properties properties = new Properties();
- properties.put(StreamsConfig.CLIENT_ID_CONFIG, appDir);
- properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
- final long commitInterval = Duration.ofMinutes(1L).toMillis();
- properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval);
- properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.METADATA_MAX_AGE_CONFIG), Duration.ofSeconds(1L).toMillis());
- properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
- properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).toMillis());
- properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), (int) Duration.ofSeconds(5L).minusMillis(1L).toMillis());
- properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MAX_POLL_INTERVAL_MS);
- properties.put(StreamsConfig.producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), (int) commitInterval);
- properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG), KeyPartitioner.class);
- properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
- properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
- properties.put(InternalConfig.ASSIGNMENT_LISTENER, assignmentListener);
-
- final Properties config = StreamsTestUtils.getStreamsConfig(
- applicationId,
- CLUSTER.bootstrapServers(),
- Serdes.LongSerde.class.getName(),
- Serdes.LongSerde.class.getName(),
- properties
- );
-
- final KafkaStreams streams = new KafkaStreams(builder.build(), config, new TestKafkaClientSupplier());
- streams.setUncaughtExceptionHandler(e -> {
- if (!injectError) {
- // we don't expect any exception thrown in stop case
- e.printStackTrace(System.err);
- hasUnexpectedError = true;
- } else {
- int exceptionCount = (int) exceptionCounts.get(appDir);
- // should only have our injected exception or commit exception, and 2 exceptions for each stream
- if (++exceptionCount > 2 || !(e instanceof RuntimeException) ||
- !(e.getMessage().contains("test exception"))) {
- // The exception won't cause the test fail since we actually "expected" exception thrown and failed the stream.
- // So, log to stderr for debugging when the exception is not what we expected, and fail in the main thread
- e.printStackTrace(System.err);
- hasUnexpectedError = true;
- }
- exceptionCounts.put(appDir, exceptionCount);
- }
- return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
- });
-
- return streams;
- }
-
- private void waitForRunning(final List> observed) throws Exception {
- waitForCondition(
- () -> !observed.isEmpty() && observed.get(observed.size() - 1).value.equals(State.RUNNING),
- MAX_WAIT_TIME_MS,
- () -> "Client did not startup on time. Observers transitions: " + observed
- );
- }
-
- private void waitForStateTransition(final List> observed,
- final List> expected)
- throws Exception {
-
- waitForCondition(
- () -> observed.equals(expected),
- MAX_WAIT_TIME_MS,
- () -> "Client did not have the expected state transition on time. Observers transitions: " + observed
- + "Expected transitions: " + expected
- );
- }
-
- private void waitForStateTransitionContains(final List> observed,
- final List> expected)
- throws Exception {
-
- waitForCondition(
- () -> observed.containsAll(expected),
- MAX_WAIT_TIME_MS,
- () -> "Client did not have the expected state transition on time. Observers transitions: " + observed
- + "Expected transitions: " + expected
- );
- }
-
- private List> prepareData(final long fromInclusive,
- final long toExclusive,
- final Long... keys) {
- final List> data = new ArrayList<>();
-
- for (final Long k : keys) {
- for (long v = fromInclusive; v < toExclusive; ++v) {
- data.add(new KeyValue<>(k, v));
- }
- }
-
- return data;
- }
-
- private void writeInputData(final List> records) {
- final Properties config = TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- LongSerializer.class,
- LongSerializer.class
- );
- config.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyPartitioner.class.getName());
- IntegrationTestUtils.produceKeyValuesSynchronously(
- MULTI_PARTITION_INPUT_TOPIC,
- records,
- config,
- CLUSTER.time
- );
- }
-
- private void verifyCommitted(final List> expectedResult) throws Exception {
- final List> committedOutput = readResult(expectedResult.size(), true);
- checkResultPerKey(committedOutput, expectedResult);
- }
-
- private void verifyUncommitted(final List> expectedResult) throws Exception {
- final List> uncommittedOutput = readResult(expectedResult.size(), false);
- checkResultPerKey(uncommittedOutput, expectedResult);
- }
-
- private List> readResult(final int numberOfRecords,
- final boolean readCommitted) throws Exception {
- if (readCommitted) {
- return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- TestUtils.consumerConfig(
- CLUSTER.bootstrapServers(),
- CONSUMER_GROUP_ID,
- LongDeserializer.class,
- LongDeserializer.class,
- Utils.mkProperties(Collections.singletonMap(
- ConsumerConfig.ISOLATION_LEVEL_CONFIG,
- IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT))
- )
- ),
- MULTI_PARTITION_OUTPUT_TOPIC,
- numberOfRecords,
- MAX_WAIT_TIME_MS
- );
- }
-
- // read uncommitted
- return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
- TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class),
- MULTI_PARTITION_OUTPUT_TOPIC,
- numberOfRecords
- );
- }
-
- private void checkResultPerKey(final List> result,
- final List> expectedResult) {
- final Set allKeys = new HashSet<>();
- addAllKeys(allKeys, result);
- addAllKeys(allKeys, expectedResult);
-
- for (final Long key : allKeys) {
- try {
- assertThat(getAllRecordPerKey(key, result), equalTo(getAllRecordPerKey(key, expectedResult)));
- } catch (final AssertionError error) {
- throw new AssertionError(
- "expected result: " + expectedResult.stream().map(KeyValue::toString).collect(Collectors.joining(", ")) +
- "\nreceived records: " + result.stream().map(KeyValue::toString).collect(Collectors.joining(", ")),
- error
- );
- }
- }
- }
-
- private void addAllKeys(final Set allKeys, final List> records) {
- for (final KeyValue record : records) {
- allKeys.add(record.key);
- }
- }
-
- private List> getAllRecordPerKey(final Long key, final List> records) {
- final List> recordsPerKey = new ArrayList<>(records.size());
-
- for (final KeyValue record : records) {
- if (record.key.equals(key)) {
- recordsPerKey.add(record);
- }
- }
-
- return recordsPerKey;
- }
-
- private List> computeExpectedResult(final List> input,
- final Map currentState) {
- final List> expectedResult = new ArrayList<>(input.size());
-
- for (final KeyValue record : input) {
- final long sum = currentState.getOrDefault(record.key, 0L);
- currentState.put(record.key, sum + record.value);
- expectedResult.add(new KeyValue<>(record.key, sum + record.value));
- }
-
- return expectedResult;
- }
-
- private Set keysFromInstance(final KafkaStreams streams) throws Exception {
- final Set keys = new HashSet<>();
- waitForCondition(
- () -> {
- final ReadOnlyKeyValueStore store = streams.store(
- StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())
- );
-
- keys.clear();
- try (final KeyValueIterator it = store.all()) {
- while (it.hasNext()) {
- final KeyValue row = it.next();
- keys.add(row.key);
- }
- }
-
- return true;
- },
- MAX_WAIT_TIME_MS,
- "Could not get keys from store: " + storeName
- );
-
- return keys;
- }
-
- // must be public to allow KafkaProducer to instantiate it
- public static class KeyPartitioner implements Partitioner {
- private final static LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
-
- @Override
- public int partition(final String topic,
- final Object key,
- final byte[] keyBytes,
- final Object value,
- final byte[] valueBytes,
- final Cluster cluster) {
- return LONG_DESERIALIZER.deserialize(topic, keyBytes).intValue() % NUM_TOPIC_PARTITIONS;
- }
-
- @Override
- public void close() {}
-
- @Override
- public void configure(final Map configs) {}
- }
-
- private class TestKafkaClientSupplier extends DefaultKafkaClientSupplier {
- @Override
- public Producer getProducer(final Map config) {
- return new ErrorInjector(config);
- }
- }
-
- private class ErrorInjector extends KafkaProducer {
- private final AtomicBoolean crash;
-
- public ErrorInjector(final Map configs) {
- super(configs, new ByteArraySerializer(), new ByteArraySerializer());
- final String clientId = configs.get(ProducerConfig.CLIENT_ID_CONFIG).toString();
- if (clientId.contains(APP_DIR_1)) {
- crash = commitErrorInjectedClient1;
- } else {
- crash = commitErrorInjectedClient2;
- }
- }
-
- @Override
- public void commitTransaction() {
- super.flush(); // we flush to ensure that the offsets are written
- if (!crash.compareAndSet(true, false)) {
- super.commitTransaction();
- } else {
- throw new RuntimeException("Injected producer commit test exception.");
- }
- }
- }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index 3ac94ad96834..00e70cbfac7a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -56,12 +56,9 @@
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -71,7 +68,6 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class GlobalKTableEOSIntegrationTest {
@Rule
@@ -98,18 +94,6 @@ public static void closeCluster() {
CLUSTER.stop();
}
- @SuppressWarnings("deprecation")
- @Parameterized.Parameters(name = "{0}")
- public static Collection data() {
- return Arrays.asList(new String[][] {
- {StreamsConfig.EXACTLY_ONCE},
- {StreamsConfig.EXACTLY_ONCE_V2}
- });
- }
-
- @Parameterized.Parameter
- public String eosConfig;
-
private final MockTime mockTime = CLUSTER.time;
private final KeyValueMapper keyMapper = (key, value) -> value;
private final ValueJoiner joiner = (value1, value2) -> value1 + "+" + value2;
@@ -138,7 +122,7 @@ public void before() throws Exception {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
- streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
+ streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
streamsConfiguration.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index be583f17a7e2..f6ab148ff8db 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -142,20 +142,9 @@ public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDi
}
}
- @SuppressWarnings("deprecation")
- @Test
- public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled() throws Exception {
- STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
- shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled();
- }
-
@Test
public void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosV2Enabled() throws Exception {
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
- shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled();
- }
-
- private void shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled() throws Exception {
try {
streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
streams.start();
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
index 7fe905ae7d4f..0eeb0b9d07cd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
@@ -94,12 +94,10 @@ public static void closeCluster() {
private static final int DEFAULT_TIMEOUT = 100;
private static long lastRecordedTimestamp = -2L;
- @SuppressWarnings("deprecation")
@Parameterized.Parameters(name = "{0}")
public static Collection data() {
return Arrays.asList(new String[][] {
{StreamsConfig.AT_LEAST_ONCE},
- {StreamsConfig.EXACTLY_ONCE},
{StreamsConfig.EXACTLY_ONCE_V2}
});
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 17610c8450dd..61cfa2774958 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -137,12 +137,10 @@ public static void closeCluster() {
private static final String ESTIMATED_MEMORY_OF_TABLE_READERS = "estimate-table-readers-mem";
private static final String NUMBER_OF_BACKGROUND_ERRORS = "background-errors";
- @SuppressWarnings("deprecation")
@Parameters(name = "{0}")
public static Collection