From 0a3f8e937df313c4ffc9515292bc358dcfc36647 Mon Sep 17 00:00:00 2001 From: k-apol Date: Sun, 13 Apr 2025 11:46:31 -0500 Subject: [PATCH 01/36] test commit --- .../src/main/java/org/apache/kafka/streams/AutoOffsetReset.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java b/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java index f3f3a941d20f7..6a95bd4171b5f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java +++ b/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java @@ -21,7 +21,7 @@ import org.apache.kafka.streams.kstream.KTable; import java.time.Duration; -import java.util.Optional; +import java.util.Optional; /** * Sets the {@code auto.offset.reset} configuration when From 1768707ee1ad0a7d0081605506b31acba6060496 Mon Sep 17 00:00:00 2001 From: k-apol Date: Sun, 13 Apr 2025 12:28:09 -0500 Subject: [PATCH 02/36] Add new internal topic setup config & related test --- .../org/apache/kafka/streams/StreamsConfig.java | 14 ++++++++++++++ .../apache/kafka/streams/StreamsConfigTest.java | 9 +++++++++ 2 files changed, 23 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index e19cb03b207b5..88a54d3d1c886 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -885,6 +885,14 @@ public class StreamsConfig extends AbstractConfig { ProducerConfig.TRANSACTIONAL_ID_CONFIG }; + public static final String INTERNAL_TOPIC_SETUP_CONFIG = "internal.topics.setup"; + public static final String INTERNAL_TOPIC_SETUP_AUTOMATIC = "automatic"; + public static final String INTERNAL_TOPIC_SETUP_MANUAL = "manual"; + public static final String INTERNAL_TOPIC_SETUP_DOC = + "Configures how internal topics (e.g., repartition or changelog topics) should be created. " + + "Set to 'automatic' to allow internal topics to be created during a rebalance (default). " + + "Set to 'manual' to disable automatic creation. Users must call KafkaStreams#init() instead."; + static { CONFIG = new ConfigDef() @@ -1012,6 +1020,12 @@ public class StreamsConfig extends AbstractConfig { LogAndFailProcessingExceptionHandler.class.getName(), Importance.MEDIUM, PROCESSING_EXCEPTION_HANDLER_CLASS_DOC) + .define(INTERNAL_TOPIC_SETUP_CONFIG, + ConfigDef.Type.STRING, + INTERNAL_TOPIC_SETUP_AUTOMATIC, + ConfigDef.ValidString.in(INTERNAL_TOPIC_SETUP_AUTOMATIC, INTERNAL_TOPIC_SETUP_MANUAL), + Importance.MEDIUM, + INTERNAL_TOPIC_SETUP_DOC) .define(PROCESSING_GUARANTEE_CONFIG, Type.STRING, AT_LEAST_ONCE, diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index a7f657ec54b70..a4ea4489a8c28 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -481,7 +481,16 @@ public void shouldOverrideStreamsDefaultProducerConfigs() { assertEquals("10000", producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG)); assertEquals("30000", producerConfigs.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)); } + @Test + public void shouldParseInternalTopicSetupConfig() { + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test app"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); + props.put(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG, StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL); + + final StreamsConfig config = new StreamsConfig(props); + assertEquals(StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL, config.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG)); + } @Test public void shouldNotThrowIfTransactionTimeoutSmallerThanCommitIntervalForAtLeastOnce() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE); From 12f67af207a8408b06451ae1d2d1370236985e91 Mon Sep 17 00:00:00 2001 From: k-apol Date: Sun, 13 Apr 2025 12:31:09 -0500 Subject: [PATCH 03/36] Update new internal topic doc to be private member --- .../src/main/java/org/apache/kafka/streams/StreamsConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 88a54d3d1c886..b56134d5e14ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -888,7 +888,7 @@ public class StreamsConfig extends AbstractConfig { public static final String INTERNAL_TOPIC_SETUP_CONFIG = "internal.topics.setup"; public static final String INTERNAL_TOPIC_SETUP_AUTOMATIC = "automatic"; public static final String INTERNAL_TOPIC_SETUP_MANUAL = "manual"; - public static final String INTERNAL_TOPIC_SETUP_DOC = + private static final String INTERNAL_TOPIC_SETUP_DOC = "Configures how internal topics (e.g., repartition or changelog topics) should be created. " + "Set to 'automatic' to allow internal topics to be created during a rebalance (default). " + "Set to 'manual' to disable automatic creation. Users must call KafkaStreams#init() instead."; From 160025d652cefca91351eaac856547e2f57a1789 Mon Sep 17 00:00:00 2001 From: k-apol Date: Sun, 13 Apr 2025 12:34:18 -0500 Subject: [PATCH 04/36] remove redundant lines in test case --- .../test/java/org/apache/kafka/streams/StreamsConfigTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index a4ea4489a8c28..ed205e0f579b5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -483,12 +483,8 @@ public void shouldOverrideStreamsDefaultProducerConfigs() { } @Test public void shouldParseInternalTopicSetupConfig() { - props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test app"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); props.put(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG, StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL); - final StreamsConfig config = new StreamsConfig(props); - assertEquals(StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL, config.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG)); } @Test From 8dedaf938576a4708344c190ceb837dd375d5329 Mon Sep 17 00:00:00 2001 From: k-apol Date: Wed, 16 Apr 2025 06:37:58 -0500 Subject: [PATCH 05/36] Add boilerplate stub for init method and Javadoc comments --- .../apache/kafka/streams/KafkaStreams.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 4ee22f045564e..766991dbaf20c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -45,6 +45,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.errors.InvalidStateStorePartitionException; +import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsNotStartedException; @@ -294,6 +295,27 @@ public boolean isValidTransition(final State newState) { private final Object stateLock = new Object(); protected volatile State state = State.CREATED; + public void init() { + init(Duration.ofSeconds(60)); + } + /** + * Initializes broker-side state such as internal topics (repartition and changelog topics). + * + * If {@code internal.topics.setup} is set to {@code manual}, this method must be called before {@link #start()}. + * + * @throws MissingSourceTopicException if a source topic is missing + * @throws MissingInternalTopicsException if one or more internal topics are missing + * @throws MisconfiguredInternalTopicException if an internal topic is misconfigured + * @throws InternalTopicsAlreadySetupException if all internal topics are already set up + * @throws TimeoutException if initialization exceeds the given timeout + */ + + public void init(final Duration timeout) { + // TODO: Validate internal topic setup + // TODO: throw relevant exceptions + throw new UnsupportedOperationException("Manual topic initialization not yet implemented"); + } + private boolean waitOnStates(final long waitMs, final State... targetStates) { final Set targetStateSet = Set.of(targetStates); final long begin = time.milliseconds(); From 71d32cb5b3521bb6dcf9f5b79599929bd2300064 Mon Sep 17 00:00:00 2001 From: k-apol Date: Fri, 18 Apr 2025 05:55:32 -0500 Subject: [PATCH 06/36] Add MissingInternalTopicsException per KIP --- .../MissingInternalTopicsException.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 streams/src/main/java/org/apache/kafka/streams/errors/MissingInternalTopicsException.java diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/MissingInternalTopicsException.java b/streams/src/main/java/org/apache/kafka/streams/errors/MissingInternalTopicsException.java new file mode 100644 index 0000000000000..48dd926ed2a09 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/MissingInternalTopicsException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import java.util.List; + +public class MissingInternalTopicsException extends StreamsException { + + private static final long serialVersionUID = 1L; + + private final List topics; + + /** + * Constructs a new MissingInternalTopicsException. + * + * @param message The detail message + * @param topics the list of missing internal topic names + */ + public MissingInternalTopicsException(final String message, final List topics) { + super(message); + this.topics = topics; + } + + /** + * Returns the list of missing internal topics that caused the exception. + */ + public List topics() { + return topics; + } +} From e081b0587cffcb8d4da77e33e314fe24b5316fbf Mon Sep 17 00:00:00 2001 From: k-apol Date: Fri, 18 Apr 2025 05:57:06 -0500 Subject: [PATCH 07/36] Add InternalTopicAlreadySetup exception --- .../InternalTopicAlreadySetupException.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 streams/src/main/java/org/apache/kafka/streams/errors/InternalTopicAlreadySetupException.java diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/InternalTopicAlreadySetupException.java b/streams/src/main/java/org/apache/kafka/streams/errors/InternalTopicAlreadySetupException.java new file mode 100644 index 0000000000000..7d86154afb83e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/InternalTopicAlreadySetupException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public class InternalTopicAlreadySetupException extends StreamsException { + + private static final long serialVersionUID = 1L; + + public InternalTopicAlreadySetupException(final String message) { + super(message); + } +} From e8aeaeb27d9d85793b3ce9f538fa0b4a9cbd3da7 Mon Sep 17 00:00:00 2001 From: k-apol Date: Fri, 18 Apr 2025 05:57:25 -0500 Subject: [PATCH 08/36] Add MisconfiguredInternalTopicException --- .../MisconfiguredInternalTopicException.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 streams/src/main/java/org/apache/kafka/streams/errors/MisconfiguredInternalTopicException.java diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/MisconfiguredInternalTopicException.java b/streams/src/main/java/org/apache/kafka/streams/errors/MisconfiguredInternalTopicException.java new file mode 100644 index 0000000000000..41a5bdbe4ab48 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/errors/MisconfiguredInternalTopicException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public class MisconfiguredInternalTopicException extends StreamsException { + + private static final long serialVersionUID = 1L; + + public MisconfiguredInternalTopicException(final String message) { + super(message); + } +} From 375767271bf83dbcc5ab07c61de28323bf3afdcd Mon Sep 17 00:00:00 2001 From: k-apol Date: Fri, 18 Apr 2025 05:57:44 -0500 Subject: [PATCH 09/36] Add boilerplate class for Initparameters --- .../java/org/apache/kafka/streams/Initparameters.java | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 streams/src/main/java/org/apache/kafka/streams/Initparameters.java diff --git a/streams/src/main/java/org/apache/kafka/streams/Initparameters.java b/streams/src/main/java/org/apache/kafka/streams/Initparameters.java new file mode 100644 index 0000000000000..f3c721c4cc27d --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/Initparameters.java @@ -0,0 +1,11 @@ +package org.apache.kafka.streams; + +public class Initparameters { + + public static Initparameters initparameters; // specifies to disable the setup of internal topics if some topics are missing. Leverage validation introduced in + + public Initparameters enableSetupInternalTopicsIfIncomplete; // specifies to setup repartition and changelog topics if some are missing + public Initparameters disableSetupInternalTopicsIfIncomplete; // specifies to throw some buytn ot all repartition or changelog topics are missing + public boolean setupInternalTopicsIfIncompleteEnabled; // Getter + +} From 00c847b973b98decfea012a0660f623d7557458c Mon Sep 17 00:00:00 2001 From: k-apol Date: Fri, 18 Apr 2025 05:58:19 -0500 Subject: [PATCH 10/36] Add boilerplate init methods in KafkaStreams to support manual initialization + TODOs --- .../apache/kafka/streams/KafkaStreams.java | 58 ++++++++++++++++--- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 766991dbaf20c..68380c7cb3015 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -295,19 +295,26 @@ public boolean isValidTransition(final State newState) { private final Object stateLock = new Object(); protected volatile State state = State.CREATED; + /** + * Initializes broker-side state. + * + * @throw MissingSourceTopicException if a source topic is missing + * @throw MissingInternalTopicsException if some but not all of the internal topics are missing + * @throw MisconfiguredInternalTopicException if an internal topics is misconfigured + * @throw InternalTopicsAlreadySetupException if all internal topics are already setup + */ public void init() { - init(Duration.ofSeconds(60)); + // TODO: Validate internal topic setup + // TODO: throw relevant exceptions } /** - * Initializes broker-side state such as internal topics (repartition and changelog topics). + * Initializes broker-side state. * - * If {@code internal.topics.setup} is set to {@code manual}, this method must be called before {@link #start()}. - * - * @throws MissingSourceTopicException if a source topic is missing - * @throws MissingInternalTopicsException if one or more internal topics are missing - * @throws MisconfiguredInternalTopicException if an internal topic is misconfigured - * @throws InternalTopicsAlreadySetupException if all internal topics are already set up - * @throws TimeoutException if initialization exceeds the given timeout + * @throw MissingSourceTopicException if a source topic is missing + * @throw MissingInternalTopicsException if some but not all of the internal topics are missing + * @throw MisconfiguredInternalTopicException if an internal topics is misconfigured + * @throw InternalTopicsAlreadySetupException if all internal topics are already setup + * @throw TimeoutException if initialization exceeds the given timeout */ public void init(final Duration timeout) { @@ -316,6 +323,39 @@ public void init(final Duration timeout) { throw new UnsupportedOperationException("Manual topic initialization not yet implemented"); } + /** + * Initializes broker-side state. + * + * This methods takes parameters that specify which internal topics to setup if some + * but not all of them are absent. + * + * @throw MissingSourceTopicException if a source topic is missing + * @throw MissingInternalTopicsException if some but not all of the internal topics are missing + * and the given initialization parameters do not specify to setup them + * @throw MisconfiguredInternalTopicException if an internal topics is misconfigured + * @throw InternalTopicsAlreadySetupException if all internal topics are already setup + */ + public void init(final Initparameters initparameters) { + + } + /** + * Initializes broker-side state. + * + * This methods takes parameters that specify which internal topics to setup if some + * but not all of them are absent. + * + * @throw MissingSourceTopicException if a source topic is missing + * @throw MissingInternalTopicsException if some but not all of the internal topics are missing + * and the given initialization parameters do not specify to setup them + * @throw MisconfiguredInternalTopicException if an internal topics is misconfigured + * @throw InternalTopicsAlreadySetupException if all internal topics are already setup + * @throw TimeoutException if initialization exceeds the given timeout + */ + public void init(final InitParameters initParameters, final Duration timeout) { + + } + + private boolean waitOnStates(final long waitMs, final State... targetStates) { final Set targetStateSet = Set.of(targetStates); final long begin = time.milliseconds(); From 44b3860a4612a1390b8c9c3175f84a7656d36192 Mon Sep 17 00:00:00 2001 From: kapol Date: Thu, 1 May 2025 05:59:08 -0500 Subject: [PATCH 11/36] new setup, test commit to verify connection to IDE --- .../main/java/org/apache/kafka/streams/KafkaStreams.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 68380c7cb3015..bdae694a99c31 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -306,6 +306,8 @@ public boolean isValidTransition(final State newState) { public void init() { // TODO: Validate internal topic setup // TODO: throw relevant exceptions + // Test comment + } /** * Initializes broker-side state. @@ -351,9 +353,9 @@ public void init(final Initparameters initparameters) { * @throw InternalTopicsAlreadySetupException if all internal topics are already setup * @throw TimeoutException if initialization exceeds the given timeout */ - public void init(final InitParameters initParameters, final Duration timeout) { + // public void init(final InitParameters initParameters, final Duration timeout) { - } + // } private boolean waitOnStates(final long waitMs, final State... targetStates) { From a888e30736c80b444292983f1164fc543a6768bb Mon Sep 17 00:00:00 2001 From: kapol Date: Tue, 20 May 2025 07:03:25 -0500 Subject: [PATCH 12/36] Add init method to Kafka Streams --- .../apache/kafka/streams/Initparameters.java | 11 -- .../apache/kafka/streams/KafkaStreams.java | 163 ++++++++++++++---- .../apache/kafka/streams/StreamsConfig.java | 2 +- ... InternalTopicsAlreadySetupException.java} | 4 +- .../processor/internals/ChangelogTopics.java | 8 +- .../internals/InternalTopicManager.java | 30 +++- 6 files changed, 168 insertions(+), 50 deletions(-) delete mode 100644 streams/src/main/java/org/apache/kafka/streams/Initparameters.java rename streams/src/main/java/org/apache/kafka/streams/errors/{InternalTopicAlreadySetupException.java => InternalTopicsAlreadySetupException.java} (86%) diff --git a/streams/src/main/java/org/apache/kafka/streams/Initparameters.java b/streams/src/main/java/org/apache/kafka/streams/Initparameters.java deleted file mode 100644 index f3c721c4cc27d..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/Initparameters.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.apache.kafka.streams; - -public class Initparameters { - - public static Initparameters initparameters; // specifies to disable the setup of internal topics if some topics are missing. Leverage validation introduced in - - public Initparameters enableSetupInternalTopicsIfIncomplete; // specifies to setup repartition and changelog topics if some are missing - public Initparameters disableSetupInternalTopicsIfIncomplete; // specifies to throw some buytn ot all repartition or changelog topics are missing - public boolean setupInternalTopicsIfIncompleteEnabled; // Getter - -} diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index bdae694a99c31..5004ea558efe8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -46,6 +46,9 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.errors.InvalidStateStorePartitionException; import org.apache.kafka.streams.errors.MissingSourceTopicException; +import org.apache.kafka.streams.errors.MisconfiguredInternalTopicException; +import org.apache.kafka.streams.errors.MissingInternalTopicsException; +import org.apache.kafka.streams.errors.InternalTopicsAlreadySetupException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsNotStartedException; @@ -62,6 +65,9 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ClientUtils; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; +import org.apache.kafka.streams.processor.internals.InternalTopicManager; +import org.apache.kafka.streams.processor.internals.InternalTopicConfig; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; @@ -71,6 +77,7 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; +import org.apache.kafka.streams.processor.internals.InternalTopicManager.ValidationResult; import org.apache.kafka.streams.query.FailureReason; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.QueryConfig; @@ -113,6 +120,7 @@ import java.util.stream.Collectors; import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_INIT_TIMEOUT_MS; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; @@ -298,31 +306,30 @@ public boolean isValidTransition(final State newState) { /** * Initializes broker-side state. * - * @throw MissingSourceTopicException if a source topic is missing - * @throw MissingInternalTopicsException if some but not all of the internal topics are missing - * @throw MisconfiguredInternalTopicException if an internal topics is misconfigured - * @throw InternalTopicsAlreadySetupException if all internal topics are already setup + * @throws MissingSourceTopicException if a source topic is missing + * @throws MissingInternalTopicsException if some but not all of the internal topics are missing + * @throws MisconfiguredInternalTopicException if an internal topics is misconfigured + * @throws InternalTopicsAlreadySetupException if all internal topics are already setup */ - public void init() { - // TODO: Validate internal topic setup - // TODO: throw relevant exceptions - // Test comment - + public void init(){ + this.init(DEFAULT_INIT_TIMEOUT_MS); } + /** * Initializes broker-side state. * - * @throw MissingSourceTopicException if a source topic is missing - * @throw MissingInternalTopicsException if some but not all of the internal topics are missing - * @throw MisconfiguredInternalTopicException if an internal topics is misconfigured - * @throw InternalTopicsAlreadySetupException if all internal topics are already setup - * @throw TimeoutException if initialization exceeds the given timeout + * @throws MissingSourceTopicException if a source topic is missing + * @throws MissingInternalTopicsException if some but not all of the internal topics are missing + * @throws MisconfiguredInternalTopicException if an internal topics is misconfigured + * @throws InternalTopicsAlreadySetupException if all internal topics are already setup + * @throws TimeoutException if initialization exceeds the given timeout */ public void init(final Duration timeout) { - // TODO: Validate internal topic setup - // TODO: throw relevant exceptions - throw new UnsupportedOperationException("Manual topic initialization not yet implemented"); + final InitParameters initParameters = InitParameters.initParameters(); + initParameters.setTimeout(timeout); + + this.doInit(InitParameters.initParameters()); } /** @@ -331,32 +338,130 @@ public void init(final Duration timeout) { * This methods takes parameters that specify which internal topics to setup if some * but not all of them are absent. * - * @throw MissingSourceTopicException if a source topic is missing - * @throw MissingInternalTopicsException if some but not all of the internal topics are missing + * @throws MissingSourceTopicException if a source topic is missing + * @throws MissingInternalTopicsException if some but not all of the internal topics are missing * and the given initialization parameters do not specify to setup them - * @throw MisconfiguredInternalTopicException if an internal topics is misconfigured - * @throw InternalTopicsAlreadySetupException if all internal topics are already setup + * @throws MisconfiguredInternalTopicException if an internal topics is misconfigured + * @throws InternalTopicsAlreadySetupException if all internal topics are already setup */ - public void init(final Initparameters initparameters) { + public void init(final InitParameters initParameters) { + this.doInit(initParameters); } + /** * Initializes broker-side state. * * This methods takes parameters that specify which internal topics to setup if some * but not all of them are absent. * - * @throw MissingSourceTopicException if a source topic is missing - * @throw MissingInternalTopicsException if some but not all of the internal topics are missing + * @throws MissingSourceTopicException if a source topic is missing + * @throws MissingInternalTopicsException if some but not all of the internal topics are missing * and the given initialization parameters do not specify to setup them - * @throw MisconfiguredInternalTopicException if an internal topics is misconfigured - * @throw InternalTopicsAlreadySetupException if all internal topics are already setup - * @throw TimeoutException if initialization exceeds the given timeout + * @throws MisconfiguredInternalTopicException if an internal topics is misconfigured + * @throws InternalTopicsAlreadySetupException if all internal topics are already setup + * @throws TimeoutException if initialization exceeds the given timeout */ - // public void init(final InitParameters initParameters, final Duration timeout) { + public void init(final InitParameters initParameters, final Duration timeout) { + initParameters.enableTimeout(); + initParameters.setTimeout(timeout); + + this.doInit(initParameters); + } + private void doInit(final InitParameters initParameters) { + + InternalTopicManager internalTopicManager = new InternalTopicManager(time, adminClient, applicationConfigs); + if (initParameters.hasTimeoutEnabled()) { + internalTopicManager.setInitTimeout(initParameters.getTimeout()); + } + + final Map allInternalTopics = new HashMap<>(); + final Set allSourceTopics = new HashSet<>(); + for (Map subtopologyMap : topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) { + for (InternalTopologyBuilder.TopicsInfo topicsInfo : subtopologyMap.values()) { + allInternalTopics.putAll(topicsInfo.stateChangelogTopics); + allInternalTopics.putAll(topicsInfo.repartitionSourceTopics); + allSourceTopics.addAll(topicsInfo.sourceTopics); + } + } + try { + final ValidationResult validationResult = internalTopicManager.validate(allInternalTopics); // can throw timeout + + final boolean noInternalTopicsExist = allInternalTopics.keySet() == validationResult.missingTopics(); + final boolean internalTopicsMisconfigured = !validationResult.misconfigurationsForTopics().isEmpty(); + final boolean allInternalTopicsExist = validationResult.missingTopics().isEmpty(); + final boolean missingSourceTopics = !Collections.disjoint( + validationResult.missingTopics(), + allSourceTopics + ); + if (internalTopicsMisconfigured) { + throw new MisconfiguredInternalTopicException("Misconfigured Internal Topics: " + validationResult.misconfigurationsForTopics()); + } + if (missingSourceTopics) { + allSourceTopics.retainAll(validationResult.missingTopics()); + throw new MissingSourceTopicException("Missing source topics: " + allSourceTopics); + } + if (noInternalTopicsExist) { + internalTopicManager.setup(allInternalTopics); + } else if (allInternalTopicsExist) { + throw new InternalTopicsAlreadySetupException("All internal topics have already been setup"); + } else { + if (initParameters.setupInternalTopicsIfIncompleteEnabled()) { + final Map topicsToCreate = new HashMap<>(); + for (String missingTopic : validationResult.missingTopics()) { + topicsToCreate.put(missingTopic, allInternalTopics.get(missingTopic)); + } + internalTopicManager.makeReady(topicsToCreate); // can throw timeout + } else { + throw new MissingInternalTopicsException("Missing Internal Topics: ", new ArrayList<>(validationResult.missingTopics())); + } + } + } catch (TimeoutException timeoutException) { + throw new TimeoutException(timeoutException.getMessage(), timeoutException); + } catch (StreamsException streamsException) { + throw new StreamsException(streamsException.getMessage(), streamsException); + } + } - // } + public static class InitParameters { + private boolean timeoutEnabled; + private final boolean setupInternalTopicsIfIncomplete; + private Duration timeout; + private InitParameters(final boolean setupInternalTopicsIfIncomplete) { + this.setupInternalTopicsIfIncomplete = setupInternalTopicsIfIncomplete; + } + + // Default: don't create missing topics if only some are missing + public static InitParameters initParameters() { + return new InitParameters(false); + } + + public InitParameters enableSetupInternalTopicsIfIncomplete() { + return new InitParameters(true); + } + + public InitParameters disableSetupInternalTopicsIfIncomplete() { + return new InitParameters(false); + } + + public boolean setupInternalTopicsIfIncompleteEnabled() { + return setupInternalTopicsIfIncomplete; + } + + public final void enableTimeout() { + this.timeoutEnabled = true; + } + public final boolean hasTimeoutEnabled() { + return timeoutEnabled; + } + + public final void setTimeout(final Duration timeout) { + this.timeout = timeout; + } + public final Duration getTimeout() { return this.timeout; } + + } private boolean waitOnStates(final long waitMs, final State... targetStates) { final Set targetStateSet = Set.of(targetStates); diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index b56134d5e14ef..8ca3558ce1151 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -166,13 +166,13 @@ public class StreamsConfig extends AbstractConfig { private static final long DEFAULT_COMMIT_INTERVAL_MS = 30000L; private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L; private static final int DEFAULT_TRANSACTION_TIMEOUT = 10000; - @Deprecated @SuppressWarnings("unused") public static final int DUMMY_THREAD_INDEX = 1; public static final long MAX_TASK_IDLE_MS_DISABLED = -1; + public static final Duration DEFAULT_INIT_TIMEOUT_MS = Duration.ofMillis(30000); // We impose these limitations because client tags are encoded into the subscription info, // which is part of the group metadata message that is persisted into the internal topic. public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE = 5; diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/InternalTopicAlreadySetupException.java b/streams/src/main/java/org/apache/kafka/streams/errors/InternalTopicsAlreadySetupException.java similarity index 86% rename from streams/src/main/java/org/apache/kafka/streams/errors/InternalTopicAlreadySetupException.java rename to streams/src/main/java/org/apache/kafka/streams/errors/InternalTopicsAlreadySetupException.java index 7d86154afb83e..618e86cfdbd44 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/InternalTopicAlreadySetupException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/InternalTopicsAlreadySetupException.java @@ -16,11 +16,11 @@ */ package org.apache.kafka.streams.errors; -public class InternalTopicAlreadySetupException extends StreamsException { +public class InternalTopicsAlreadySetupException extends StreamsException { private static final long serialVersionUID = 1L; - public InternalTopicAlreadySetupException(final String message) { + public InternalTopicsAlreadySetupException(final String message) { super(message); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java index aaf8ba16a51b2..7aedd56495359 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java @@ -19,14 +19,19 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.MisconfiguredInternalTopicException; +import org.apache.kafka.streams.processor.internals.InternalTopicManager.ValidationResult; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.slf4j.Logger; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -62,10 +67,11 @@ public void setup() { for (final Map.Entry entry : topicGroups.entrySet()) { final Subtopology subtopology = entry.getKey(); final TopicsInfo topicsInfo = entry.getValue(); +// final int topicGroupId = entry.entrySet(); final Set topicGroupTasks = tasksForTopicGroup.get(subtopology); if (topicGroupTasks == null) { - log.debug("No tasks found for subtopology {}", subtopology); + log.debug("No tasks found for topic group {}", subtopology.nodeGroupId); continue; } else if (topicsInfo.stateChangelogTopics.isEmpty()) { continue; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index a8a044edb6b3c..d47a77759bf6f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -47,6 +47,7 @@ import org.slf4j.Logger; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -79,6 +80,8 @@ public class InternalTopicManager { private final long windowChangeLogAdditionalRetention; private final long retryBackOffMs; private final long retryTimeoutMs; + private Duration initTimeoutMs; + private boolean isInitializing = false; private final Map defaultTopicConfigs = new HashMap<>(); @@ -99,6 +102,7 @@ public InternalTopicManager(final Time time, consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); retryTimeoutMs = new QuietConsumerConfig(consumerConfig).getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) / 2L; + initTimeoutMs = StreamsConfig.DEFAULT_INIT_TIMEOUT_MS; log.debug("Configs:" + Utils.NL + "\t{} = {}" + Utils.NL + @@ -113,7 +117,7 @@ public InternalTopicManager(final Time time, } } - static class ValidationResult { + public static class ValidationResult { private final Set missingTopics = new HashSet<>(); private final Map> misconfigurationsForTopics = new HashMap<>(); @@ -463,6 +467,7 @@ public Set makeReady(final Map topics) { long currentWallClockMs = time.milliseconds(); final long deadlineMs = currentWallClockMs + retryTimeoutMs; + final long initDeadlineMs = currentWallClockMs + this.initTimeoutMs.toMillis(); Set topicsNotReady = new HashSet<>(topics.keySet()); final Set newlyCreatedTopics = new HashSet<>(); @@ -556,11 +561,16 @@ public Set makeReady(final Map topics) { if (!topicsNotReady.isEmpty()) { currentWallClockMs = time.milliseconds(); - - if (currentWallClockMs >= deadlineMs) { - final String timeoutError = String.format("Could not create topics within %d milliseconds. " + - "This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs); - log.error(timeoutError); + final boolean isInitializationTimeout = this.isInitializing() && currentWallClockMs >= initDeadlineMs; + if (isInitializationTimeout || currentWallClockMs >= deadlineMs) { + final String timeoutError; + if (!isInitializationTimeout) { + timeoutError = String.format("Could not create topics within %d milliseconds. " + + "This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs); + log.error(timeoutError); + } else { + timeoutError = String.format("Could not finish initialization within %d milliseconds.", initTimeoutMs.toMillis()); + } throw new TimeoutException(timeoutError); } log.info( @@ -627,6 +637,14 @@ protected Map> getTopicPartitionInfo(final Set< return topicPartitionInfo; } + /* Allow manual setting of initialization timeout when using Kafka Streams Init() */ + public void setInitTimeout(final Duration timeoutMs) { + this.isInitializing = true; + this.initTimeoutMs = timeoutMs; + } + private boolean isInitializing() { + return this.isInitializing; + } /** * Try to get the number of partitions for the given topics; return the number of partitions for topics that already exists. From f139ab11d94f26874cecf40b3de906828469f4fd Mon Sep 17 00:00:00 2001 From: kapol Date: Wed, 21 May 2025 06:17:06 -0500 Subject: [PATCH 13/36] Style fixes Add timeout handler method to internalTopicManager to handle both init timeouts and retry timeouts Cleanup ChangelogTopics --- .../apache/kafka/streams/KafkaStreams.java | 42 +++++++------ .../MisconfiguredInternalTopicException.java | 4 +- .../processor/internals/ChangelogTopics.java | 16 ++--- .../internals/InternalTopicManager.java | 62 ++++++++++++------- .../kafka/streams/KafkaStreamsTest.java | 7 +++ 5 files changed, 74 insertions(+), 57 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 5004ea558efe8..ac28ed0a9a559 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -311,7 +311,7 @@ public boolean isValidTransition(final State newState) { * @throws MisconfiguredInternalTopicException if an internal topics is misconfigured * @throws InternalTopicsAlreadySetupException if all internal topics are already setup */ - public void init(){ + public void init() { this.init(DEFAULT_INIT_TIMEOUT_MS); } @@ -362,23 +362,23 @@ public void init(final InitParameters initParameters) { * @throws InternalTopicsAlreadySetupException if all internal topics are already setup * @throws TimeoutException if initialization exceeds the given timeout */ - public void init(final InitParameters initParameters, final Duration timeout) { - initParameters.enableTimeout(); - initParameters.setTimeout(timeout); + public void init(final InitParameters initParameters, final Duration timeout) { + initParameters.enableTimeout(); + initParameters.setTimeout(timeout); - this.doInit(initParameters); - } - private void doInit(final InitParameters initParameters) { + this.doInit(initParameters); + } - InternalTopicManager internalTopicManager = new InternalTopicManager(time, adminClient, applicationConfigs); + private void doInit(final InitParameters initParameters) { + final InternalTopicManager internalTopicManager = new InternalTopicManager(time, adminClient, applicationConfigs); if (initParameters.hasTimeoutEnabled()) { internalTopicManager.setInitTimeout(initParameters.getTimeout()); } final Map allInternalTopics = new HashMap<>(); final Set allSourceTopics = new HashSet<>(); - for (Map subtopologyMap : topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) { - for (InternalTopologyBuilder.TopicsInfo topicsInfo : subtopologyMap.values()) { + for (final Map subtopologyMap : topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) { + for (final InternalTopologyBuilder.TopicsInfo topicsInfo : subtopologyMap.values()) { allInternalTopics.putAll(topicsInfo.stateChangelogTopics); allInternalTopics.putAll(topicsInfo.repartitionSourceTopics); allSourceTopics.addAll(topicsInfo.sourceTopics); @@ -390,10 +390,8 @@ private void doInit(final InitParameters initParameters) { final boolean noInternalTopicsExist = allInternalTopics.keySet() == validationResult.missingTopics(); final boolean internalTopicsMisconfigured = !validationResult.misconfigurationsForTopics().isEmpty(); final boolean allInternalTopicsExist = validationResult.missingTopics().isEmpty(); - final boolean missingSourceTopics = !Collections.disjoint( - validationResult.missingTopics(), - allSourceTopics - ); + final boolean missingSourceTopics = !Collections.disjoint(validationResult.missingTopics(), allSourceTopics); + if (internalTopicsMisconfigured) { throw new MisconfiguredInternalTopicException("Misconfigured Internal Topics: " + validationResult.misconfigurationsForTopics()); } @@ -408,25 +406,25 @@ private void doInit(final InitParameters initParameters) { } else { if (initParameters.setupInternalTopicsIfIncompleteEnabled()) { final Map topicsToCreate = new HashMap<>(); - for (String missingTopic : validationResult.missingTopics()) { + for (final String missingTopic : validationResult.missingTopics()) { topicsToCreate.put(missingTopic, allInternalTopics.get(missingTopic)); } - internalTopicManager.makeReady(topicsToCreate); // can throw timeout + internalTopicManager.makeReady(topicsToCreate); } else { throw new MissingInternalTopicsException("Missing Internal Topics: ", new ArrayList<>(validationResult.missingTopics())); } } - } catch (TimeoutException timeoutException) { + } catch (final TimeoutException timeoutException) { throw new TimeoutException(timeoutException.getMessage(), timeoutException); - } catch (StreamsException streamsException) { + } catch (final StreamsException streamsException) { throw new StreamsException(streamsException.getMessage(), streamsException); } } public static class InitParameters { private boolean timeoutEnabled; - private final boolean setupInternalTopicsIfIncomplete; private Duration timeout; + private final boolean setupInternalTopicsIfIncomplete; private InitParameters(final boolean setupInternalTopicsIfIncomplete) { this.setupInternalTopicsIfIncomplete = setupInternalTopicsIfIncomplete; @@ -452,6 +450,7 @@ public boolean setupInternalTopicsIfIncompleteEnabled() { public final void enableTimeout() { this.timeoutEnabled = true; } + public final boolean hasTimeoutEnabled() { return timeoutEnabled; } @@ -459,7 +458,10 @@ public final boolean hasTimeoutEnabled() { public final void setTimeout(final Duration timeout) { this.timeout = timeout; } - public final Duration getTimeout() { return this.timeout; } + + public final Duration getTimeout() { + return this.timeout; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/MisconfiguredInternalTopicException.java b/streams/src/main/java/org/apache/kafka/streams/errors/MisconfiguredInternalTopicException.java index 41a5bdbe4ab48..a82c9eb394d18 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/MisconfiguredInternalTopicException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/MisconfiguredInternalTopicException.java @@ -21,6 +21,6 @@ public class MisconfiguredInternalTopicException extends StreamsException { private static final long serialVersionUID = 1L; public MisconfiguredInternalTopicException(final String message) { - super(message); - } + super(message); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java index 7aedd56495359..d1fd0b245d5ce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java @@ -19,19 +19,14 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.errors.MisconfiguredInternalTopicException; -import org.apache.kafka.streams.processor.internals.InternalTopicManager.ValidationResult; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.slf4j.Logger; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -67,11 +62,10 @@ public void setup() { for (final Map.Entry entry : topicGroups.entrySet()) { final Subtopology subtopology = entry.getKey(); final TopicsInfo topicsInfo = entry.getValue(); -// final int topicGroupId = entry.entrySet(); final Set topicGroupTasks = tasksForTopicGroup.get(subtopology); if (topicGroupTasks == null) { - log.debug("No tasks found for topic group {}", subtopology.nodeGroupId); + log.debug("No tasks found for subtopology {}", subtopology); continue; } else if (topicsInfo.stateChangelogTopics.isEmpty()) { continue; @@ -79,10 +73,10 @@ public void setup() { for (final TaskId task : topicGroupTasks) { final Set changelogTopicPartitions = topicsInfo.stateChangelogTopics - .keySet() - .stream() - .map(topic -> new TopicPartition(topic, task.partition())) - .collect(Collectors.toSet()); + .keySet() + .stream() + .map(topic -> new TopicPartition(topic, task.partition())) + .collect(Collectors.toSet()); changelogPartitionsForStatefulTask.put(task, changelogTopicPartitions); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index d47a77759bf6f..d02ed19133da8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -80,7 +80,7 @@ public class InternalTopicManager { private final long windowChangeLogAdditionalRetention; private final long retryBackOffMs; private final long retryTimeoutMs; - private Duration initTimeoutMs; + private Duration initTimeout; private boolean isInitializing = false; private final Map defaultTopicConfigs = new HashMap<>(); @@ -102,7 +102,7 @@ public InternalTopicManager(final Time time, consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); retryTimeoutMs = new QuietConsumerConfig(consumerConfig).getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) / 2L; - initTimeoutMs = StreamsConfig.DEFAULT_INIT_TIMEOUT_MS; + this.initTimeout = StreamsConfig.DEFAULT_INIT_TIMEOUT_MS; log.debug("Configs:" + Utils.NL + "\t{} = {}" + Utils.NL + @@ -465,9 +465,9 @@ public Set makeReady(final Map topics) { // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); - long currentWallClockMs = time.milliseconds(); + final long currentWallClockMs = time.milliseconds(); final long deadlineMs = currentWallClockMs + retryTimeoutMs; - final long initDeadlineMs = currentWallClockMs + this.initTimeoutMs.toMillis(); + final long initDeadlineMs = currentWallClockMs + this.initTimeout.toMillis(); Set topicsNotReady = new HashSet<>(topics.keySet()); final Set newlyCreatedTopics = new HashSet<>(); @@ -560,26 +560,12 @@ public Set makeReady(final Map topics) { } if (!topicsNotReady.isEmpty()) { - currentWallClockMs = time.milliseconds(); - final boolean isInitializationTimeout = this.isInitializing() && currentWallClockMs >= initDeadlineMs; - if (isInitializationTimeout || currentWallClockMs >= deadlineMs) { - final String timeoutError; - if (!isInitializationTimeout) { - timeoutError = String.format("Could not create topics within %d milliseconds. " + - "This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs); - log.error(timeoutError); - } else { - timeoutError = String.format("Could not finish initialization within %d milliseconds.", initTimeoutMs.toMillis()); - } - throw new TimeoutException(timeoutError); - } - log.info( - "Topics {} could not be made ready. Will retry in {} milliseconds. Remaining time in milliseconds: {}", + maybeThrowTimeoutExceptionDuringMakeReady( + deadlineMs, + initDeadlineMs, topicsNotReady, retryBackOffMs, - deadlineMs - currentWallClockMs - ); - Utils.sleep(retryBackOffMs); + currentWallClockMs); } } log.debug("Completed validating internal topics and created {}", newlyCreatedTopics); @@ -637,10 +623,10 @@ protected Map> getTopicPartitionInfo(final Set< return topicPartitionInfo; } - /* Allow manual setting of initialization timeout when using Kafka Streams Init() */ + public void setInitTimeout(final Duration timeoutMs) { this.isInitializing = true; - this.initTimeoutMs = timeoutMs; + this.initTimeout = timeoutMs; } private boolean isInitializing() { return this.isInitializing; @@ -897,6 +883,34 @@ private void maybeThrowTimeoutExceptionDuringSetup(final Set topicStillT } } + private void maybeThrowTimeoutExceptionDuringMakeReady(final long deadlineMs, + final long initDeadlineMs, + final Set topicsNotReady, + final long retryBackoffMs, + final long currentWallClockMs) { + final boolean isInitializationTimeout = this.isInitializing() && currentWallClockMs >= initDeadlineMs; + + if (isInitializationTimeout || currentWallClockMs >= deadlineMs) { + final String timeoutError; + + if (!isInitializationTimeout) { + timeoutError = String.format("Could not create topics within %d milliseconds. " + + "This can happen if the Kafka cluster is temporarily not available.", retryTimeoutMs); + log.error(timeoutError); + } else { + timeoutError = String.format("Could not finish initialization within %d milliseconds.", initTimeout.toMillis()); + } + throw new TimeoutException(timeoutError); + } + log.info( + "Topics {} could not be made ready. Will retry in {} milliseconds. Remaining time in milliseconds: {}", + topicsNotReady, + retryBackOffMs, + deadlineMs - currentWallClockMs + ); + Utils.sleep(retryBackOffMs); + } + private void maybeSleep(final List> resultSetsStillToValidate, final long deadline, final String action) { diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 1516dfb5eda5a..0ebcf30b899e8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -1840,7 +1840,14 @@ public Uuid get(final long timeout, final TimeUnit timeUnit) { assertThat(didAssertThreadTwo.get(), equalTo(true)); assertThat(didAssertGlobalThread.get(), equalTo(true)); } + public void initShouldThrowMisconfiguredExceptionWhenInternalTopicsAreMisconfigured() { + } +// public void initShouldThrowMissingSourceTopicExceptionWhenSourceTopicIsMissing() +// public void initShouldThrowInternalTopicsAlreadySetupExceptionIfAllExist() +// public void initShouldSetupAllInternalTopicsIfNoneExist() +// public void initShouldThrowMissingInternalTopicsExceptionWhenDisabled() +// public void initShouldMakeReadyInternalTopicsWhenAutoSetupEnabled() private Topology getStatefulTopology(final String inputTopic, final String outputTopic, final String globalTopicName, From 7496e514caf85a65d0eb34e404d4d64f128ecaa3 Mon Sep 17 00:00:00 2001 From: kapol Date: Thu, 22 May 2025 15:44:01 -0500 Subject: [PATCH 14/36] Add unit tests for init method --- .../kafka/streams/KafkaStreamsTest.java | 483 +++++++++++------- 1 file changed, 310 insertions(+), 173 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 0ebcf30b899e8..9223a6cc1efb1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -36,9 +36,12 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.KafkaStreams.InitParameters; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.MissingSourceTopicException; +import org.apache.kafka.streams.errors.InternalTopicsAlreadySetupException; +import org.apache.kafka.streams.errors.MisconfiguredInternalTopicException; import org.apache.kafka.streams.errors.StreamsNotStartedException; -import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.UnknownStateStoreException; import org.apache.kafka.streams.internals.StreamsConfigUtils; @@ -50,6 +53,7 @@ import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; +import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; @@ -116,6 +120,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.doAnswer; @@ -345,7 +350,7 @@ private void prepareThreadState(final StreamThread thread, final AtomicReference }).when(thread).start(); } - private CountDownLatch terminableThreadBlockingLatch = new CountDownLatch(1); + private final CountDownLatch terminableThreadBlockingLatch = new CountDownLatch(1); private void prepareTerminableThread(final StreamThread thread) throws InterruptedException { doAnswer(invocation -> { @@ -426,7 +431,7 @@ public void shouldCloseStartupTasksAfterFirstRebalance() throws Exception { prepareThreadState(streamThreadOne, state1); prepareThreadState(streamThreadTwo, state2); try (final MockedConstruction constructed = mockConstruction(StateDirectory.class, - (mock, context) -> when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) { + (mock, context) -> when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) { try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { assertEquals(1, constructed.constructed().size()); final StateDirectory stateDirectory = constructed.constructed().get(0); @@ -454,19 +459,19 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws Ex streams.start(); waitForCondition( - () -> streamsStateListener.numChanges == 2, - "Streams never started."); + () -> streamsStateListener.numChanges == 2, + "Streams never started."); assertEquals(KafkaStreams.State.RUNNING, streams.state()); waitForCondition( - () -> streamsStateListener.numChanges == 2, - "Streams never started."); + () -> streamsStateListener.numChanges == 2, + "Streams never started."); assertEquals(KafkaStreams.State.RUNNING, streams.state()); for (final StreamThread thread : streams.threads) { threadStateListenerCapture.getValue().onChange( - thread, - StreamThread.State.PARTITIONS_REVOKED, - StreamThread.State.RUNNING); + thread, + StreamThread.State.PARTITIONS_REVOKED, + StreamThread.State.RUNNING); } assertEquals(3, streamsStateListener.numChanges); @@ -474,23 +479,23 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws Ex for (final StreamThread thread : streams.threads) { threadStateListenerCapture.getValue().onChange( - thread, - StreamThread.State.PARTITIONS_ASSIGNED, - StreamThread.State.PARTITIONS_REVOKED); + thread, + StreamThread.State.PARTITIONS_ASSIGNED, + StreamThread.State.PARTITIONS_REVOKED); } assertEquals(3, streamsStateListener.numChanges); assertEquals(KafkaStreams.State.REBALANCING, streams.state()); threadStateListenerCapture.getValue().onChange( - streams.threads.get(NUM_THREADS - 1), - StreamThread.State.PENDING_SHUTDOWN, - StreamThread.State.PARTITIONS_ASSIGNED); + streams.threads.get(NUM_THREADS - 1), + StreamThread.State.PENDING_SHUTDOWN, + StreamThread.State.PARTITIONS_ASSIGNED); threadStateListenerCapture.getValue().onChange( - streams.threads.get(NUM_THREADS - 1), - StreamThread.State.DEAD, - StreamThread.State.PENDING_SHUTDOWN); + streams.threads.get(NUM_THREADS - 1), + StreamThread.State.DEAD, + StreamThread.State.PENDING_SHUTDOWN); assertEquals(3, streamsStateListener.numChanges); assertEquals(KafkaStreams.State.REBALANCING, streams.state()); @@ -498,9 +503,9 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws Ex for (final StreamThread thread : streams.threads) { if (thread != streams.threads.get(NUM_THREADS - 1)) { threadStateListenerCapture.getValue().onChange( - thread, - StreamThread.State.RUNNING, - StreamThread.State.PARTITIONS_ASSIGNED); + thread, + StreamThread.State.RUNNING, + StreamThread.State.PARTITIONS_ASSIGNED); } } @@ -510,8 +515,8 @@ public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws Ex streams.close(); waitForCondition( - () -> streamsStateListener.numChanges == 6, - "Streams never closed."); + () -> streamsStateListener.numChanges == 6, + "Streams never closed."); assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state()); } } @@ -527,12 +532,12 @@ public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception builder.globalTable("anyTopic"); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KafkaStreams.class); - final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { + final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { streams.close(); waitForCondition( - () -> streams.state() == KafkaStreams.State.NOT_RUNNING, - "Streams never stopped."); + () -> streams.state() == KafkaStreams.State.NOT_RUNNING, + "Streams never stopped."); assertThat(appender.getMessages(), not(hasItem(containsString("ERROR")))); } @@ -561,29 +566,29 @@ public void testStateThreadClose() throws Exception { try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { assertEquals(NUM_THREADS, streams.threads.size()); - assertEquals(streams.state(), KafkaStreams.State.CREATED); + assertEquals(KafkaStreams.State.CREATED, streams.state()); streams.start(); waitForCondition( - () -> streams.state() == KafkaStreams.State.RUNNING, - "Streams never started."); + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); for (int i = 0; i < NUM_THREADS; i++) { final StreamThread tmpThread = streams.threads.get(i); tmpThread.shutdown(); waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD, - "Thread never stopped."); + "Thread never stopped."); streams.threads.get(i).join(); } waitForCondition( - () -> streams.metadataForLocalThreads().stream().allMatch(t -> t.threadState().equals("DEAD")), - "Streams never stopped" + () -> streams.metadataForLocalThreads().stream().allMatch(t -> t.threadState().equals("DEAD")), + "Streams never stopped" ); streams.close(); waitForCondition( - () -> streams.state() == KafkaStreams.State.NOT_RUNNING, - "Streams never stopped."); + () -> streams.state() == KafkaStreams.State.NOT_RUNNING, + "Streams never stopped."); assertNull(streams.globalStreamThread); } @@ -601,26 +606,26 @@ public void testStateGlobalThreadClose() throws Exception { builder.globalTable("anyTopic"); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KafkaStreams.class); - final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { + final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { streams.start(); waitForCondition( - () -> streams.state() == KafkaStreams.State.RUNNING, - "Streams never started."); + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); final GlobalStreamThread globalStreamThread = streams.globalStreamThread; globalStreamThread.shutdown(); waitForCondition( - () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, - "Thread never stopped."); + () -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, + "Thread never stopped."); // shutting down the global thread from "external" will yield an error in KafkaStreams waitForCondition( - () -> streams.state() == KafkaStreams.State.ERROR, - "Thread never stopped." + () -> streams.state() == KafkaStreams.State.ERROR, + "Thread never stopped." ); streams.close(); - assertEquals(streams.state(), KafkaStreams.State.ERROR, "KafkaStreams should remain in ERROR state after close."); + assertEquals(KafkaStreams.State.ERROR, streams.state(), "KafkaStreams should remain in ERROR state after close."); assertThat(appender.getMessages(), hasItem(containsString("State transition from RUNNING to PENDING_ERROR"))); assertThat(appender.getMessages(), hasItem(containsString("State transition from PENDING_ERROR to ERROR"))); assertThat(appender.getMessages(), hasItem(containsString("Streams client is already in the terminal ERROR state"))); @@ -644,7 +649,7 @@ public void testInitializesAndDestroysMetricsReporters() { streams.start(); final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get(); streams.close(); - assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING); + assertEquals(KafkaStreams.State.NOT_RUNNING, streams.state()); assertEquals(oldCloseCount + initDiff, MockMetricsReporter.CLOSE_COUNT.get()); } } @@ -811,7 +816,7 @@ public void shouldRemoveThread() throws Exception { streams.start(); final int oldSize = streams.threads.size(); waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, - "Kafka Streams client did not reach state RUNNING"); + "Kafka Streams client did not reach state RUNNING"); assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1))); assertThat(streams.threads.size(), equalTo(oldSize - 1)); } @@ -875,7 +880,7 @@ public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState( prepareThreadState(streamThreadTwo, state2); try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { streams.start(); - assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null)); + assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler(null)); } } @@ -886,7 +891,7 @@ public void shouldThrowExceptionSettingStreamsUncaughtExceptionHandlerNotInCreat prepareStreamThread(streamThreadTwo, 2); try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { streams.start(); - assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null)); + assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler(null)); } } @@ -896,7 +901,7 @@ public void shouldThrowNullPointerExceptionSettingStreamsUncaughtExceptionHandle prepareStreamThread(streamThreadOne, 1); prepareStreamThread(streamThreadTwo, 2); try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { - assertThrows(NullPointerException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null)); + assertThrows(NullPointerException.class, () -> streams.setUncaughtExceptionHandler(null)); } } @@ -942,8 +947,8 @@ public void shouldThrowOnCleanupWhileRunning() throws Exception { try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { streams.start(); waitForCondition( - () -> streams.state() == KafkaStreams.State.RUNNING, - "Streams never started."); + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); try { streams.cleanUp(); @@ -964,13 +969,13 @@ public void shouldThrowOnCleanupWhilePaused() throws Exception { try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { streams.start(); waitForCondition( - () -> streams.state() == KafkaStreams.State.RUNNING, - "Streams never started."); + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); streams.pause(); waitForCondition( - streams::isPaused, - "Streams did not pause."); + streams::isPaused, + "Streams did not pause."); assertThrows(IllegalStateException.class, streams::cleanUp, "Cannot clean up while running."); } @@ -987,8 +992,8 @@ public void shouldThrowOnCleanupWhileShuttingDown() throws Exception { try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier, time)) { streams.start(); waitForCondition( - () -> streams.state() == KafkaStreams.State.RUNNING, - "Streams never started."); + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); streams.close(Duration.ZERO); assertThat(streams.state() == State.PENDING_SHUTDOWN, equalTo(true)); @@ -1012,8 +1017,8 @@ public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeav try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, mockClientSupplier, time)) { streams.start(); waitForCondition( - () -> streams.state() == KafkaStreams.State.RUNNING, - "Streams never started."); + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); closeOptions.timeout(Duration.ZERO); @@ -1037,8 +1042,8 @@ public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeav try (final KafkaStreams streams = new KafkaStreamsWithTerminableThread(getBuilderWithSource().build(), props, supplier, time)) { streams.start(); waitForCondition( - () -> streams.state() == KafkaStreams.State.RUNNING, - "Streams never started."); + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started."); final KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions(); closeOptions.timeout(Duration.ZERO); @@ -1342,9 +1347,9 @@ public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() thro } executorsMockedStatic.verify(() -> Executors.newSingleThreadScheduledExecutor(any(ThreadFactory.class)), - times(2)); + times(2)); verify(rocksDBMetricsRecordingTriggerThread).scheduleAtFixedRate(any(RocksDBMetricsRecordingTrigger.class), - eq(0L), eq(1L), eq(TimeUnit.MINUTES)); + eq(0L), eq(1L), eq(TimeUnit.MINUTES)); verify(rocksDBMetricsRecordingTriggerThread).shutdownNow(); } } @@ -1369,7 +1374,7 @@ public void shouldGetClientSupplierFromConfigForConstructor() throws Exception { } @Test - public void shouldGetClientSupplierFromConfigForConstructorWithTime() throws Exception { + public void shouldGetClientSupplierFromConfigForConstructorWithTime() { prepareStreams(); final AtomicReference state1 = prepareStreamThread(streamThreadOne, 1); final AtomicReference state2 = prepareStreamThread(streamThreadTwo, 2); @@ -1436,11 +1441,11 @@ public void shouldCleanupOldStateDirs() { try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { final ScheduledExecutorService cleanupSchedule = mock(ScheduledExecutorService.class); executorsMockedStatic.when(() -> Executors.newSingleThreadScheduledExecutor( - any(ThreadFactory.class) + any(ThreadFactory.class) )).thenReturn(cleanupSchedule); try (MockedConstruction ignored = mockConstruction(StateDirectory.class, - (mock, context) -> when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) { + (mock, context) -> when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) { props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1"); final StreamsBuilder builder = new StreamsBuilder(); builder.table("topic", Materialized.as("store")); @@ -1524,8 +1529,8 @@ public void shouldThrowTopologyExceptionOnEmptyTopology() { fail("Should have thrown TopologyException"); } catch (final TopologyException e) { assertThat( - e.getMessage(), - equalTo("Invalid topology: Topology has no stream threads and no global threads, " + + e.getMessage(), + equalTo("Invalid topology: Topology has no stream threads and no global threads, " + "must subscribe to at least one source topic or global table.")); } } @@ -1548,18 +1553,18 @@ public void shouldTransitToRunningWithGlobalOnlyTopology() throws Exception { try (final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time)) { assertThat(streams.threads.size(), equalTo(0)); - assertEquals(streams.state(), KafkaStreams.State.CREATED); + assertEquals(KafkaStreams.State.CREATED, streams.state()); streams.start(); waitForCondition( - () -> streams.state() == KafkaStreams.State.RUNNING, - "Streams never started, state is " + streams.state()); + () -> streams.state() == KafkaStreams.State.RUNNING, + "Streams never started, state is " + streams.state()); streams.close(); waitForCondition( - () -> streams.state() == KafkaStreams.State.NOT_RUNNING, - "Streams never stopped."); + () -> streams.state() == KafkaStreams.State.NOT_RUNNING, + "Streams never stopped."); } } @@ -1571,12 +1576,12 @@ public void shouldThrowOnClientInstanceIdsWithNegativeTimeout() { try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { final IllegalArgumentException error = assertThrows( - IllegalArgumentException.class, - () -> streams.clientInstanceIds(Duration.ofMillis(-1L)) + IllegalArgumentException.class, + () -> streams.clientInstanceIds(Duration.ofMillis(-1L)) ); assertThat( - error.getMessage(), - equalTo("The timeout cannot be negative.") + error.getMessage(), + equalTo("The timeout cannot be negative.") ); } } @@ -1589,12 +1594,12 @@ public void shouldThrowOnClientInstanceIdsWhenNotStarted() { try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { final IllegalStateException error = assertThrows( - IllegalStateException.class, - () -> streams.clientInstanceIds(Duration.ZERO) + IllegalStateException.class, + () -> streams.clientInstanceIds(Duration.ZERO) ); assertThat( - error.getMessage(), - equalTo("KafkaStreams has not been started, you can retry after calling start().") + error.getMessage(), + equalTo("KafkaStreams has not been started, you can retry after calling start().") ); } } @@ -1609,12 +1614,12 @@ public void shouldThrowOnClientInstanceIdsWhenClosed() { streams.close(); final IllegalStateException error = assertThrows( - IllegalStateException.class, - () -> streams.clientInstanceIds(Duration.ZERO) + IllegalStateException.class, + () -> streams.clientInstanceIds(Duration.ZERO) ); assertThat( - error.getMessage(), - equalTo("KafkaStreams has been stopped (NOT_RUNNING).") + error.getMessage(), + equalTo("KafkaStreams has been stopped (NOT_RUNNING).") ); } } @@ -1629,19 +1634,19 @@ public void shouldThrowStreamsExceptionWhenAdminNotInitialized() { streams.start(); final StreamsException error = assertThrows( - StreamsException.class, - () -> streams.clientInstanceIds(Duration.ZERO) + StreamsException.class, + () -> streams.clientInstanceIds(Duration.ZERO) ); assertThat( - error.getMessage(), - equalTo("Could not retrieve admin client instance id.") + error.getMessage(), + equalTo("Could not retrieve admin client instance id.") ); final Throwable cause = error.getCause(); assertThat(cause, instanceOf(UnsupportedOperationException.class)); assertThat( - cause.getMessage(), - equalTo("clientInstanceId not set") + cause.getMessage(), + equalTo("clientInstanceId not set") ); } } @@ -1659,12 +1664,12 @@ public void shouldNotCrashButThrowLaterIfAdminTelemetryDisabled() { final ClientInstanceIds clientInstanceIds = streams.clientInstanceIds(Duration.ZERO); final IllegalStateException error = assertThrows( - IllegalStateException.class, - clientInstanceIds::adminInstanceId + IllegalStateException.class, + clientInstanceIds::adminInstanceId ); assertThat( - error.getMessage(), - equalTo("Telemetry is not enabled on the admin client. Set config `enable.metrics.push` to `true`.") + error.getMessage(), + equalTo("Telemetry is not enabled on the admin client. Set config `enable.metrics.push` to `true`.") ); } } @@ -1682,8 +1687,8 @@ public void shouldThrowTimeExceptionWhenAdminTimesOut() { streams.start(); assertThrows( - TimeoutException.class, - () -> streams.clientInstanceIds(Duration.ZERO) + TimeoutException.class, + () -> streams.clientInstanceIds(Duration.ZERO) ); } } @@ -1700,8 +1705,8 @@ public void shouldReturnAdminInstanceID() { streams.start(); assertThat( - streams.clientInstanceIds(Duration.ZERO).adminInstanceId(), - equalTo(instanceId) + streams.clientInstanceIds(Duration.ZERO).adminInstanceId(), + equalTo(instanceId) ); } } @@ -1719,7 +1724,7 @@ public void shouldReturnProducerAndConsumerInstanceIds() { producerFuture.complete(producerInstanceId); final Uuid adminInstanceId = Uuid.randomUuid(); adminClient.setClientInstanceId(adminInstanceId); - + final Map> expectedClientIds = Map.of("main-consumer", consumerFuture, "some-thread-producer", producerFuture); when(streamThreadOne.clientInstanceIds(any())).thenReturn(expectedClientIds); @@ -1741,14 +1746,14 @@ public void shouldThrowTimeoutExceptionWhenAnyClientFutureDoesNotComplete() { prepareStreamThread(streamThreadTwo, 2); when(streamThreadOne.clientInstanceIds(any())) - .thenReturn(Collections.singletonMap("some-client", new KafkaFutureImpl<>())); + .thenReturn(Collections.singletonMap("some-client", new KafkaFutureImpl<>())); adminClient.setClientInstanceId(Uuid.randomUuid()); try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { streams.start(); final TimeoutException timeoutException = assertThrows( - TimeoutException.class, - () -> streams.clientInstanceIds(Duration.ZERO) + TimeoutException.class, + () -> streams.clientInstanceIds(Duration.ZERO) ); assertThat(timeoutException.getMessage(), equalTo("Could not retrieve consumer/producer instance id for some-client.")); assertThat(timeoutException.getCause(), instanceOf(java.util.concurrent.TimeoutException.class)); @@ -1769,11 +1774,11 @@ public void shouldThrowTimeoutExceptionWhenGlobalConsumerFutureDoesNotComplete() streams.start(); when(globalStreamThreadMockedConstruction.constructed().get(0).globalConsumerInstanceId(any())) - .thenReturn(new KafkaFutureImpl<>()); + .thenReturn(new KafkaFutureImpl<>()); final TimeoutException timeoutException = assertThrows( - TimeoutException.class, - () -> streams.clientInstanceIds(Duration.ZERO) + TimeoutException.class, + () -> streams.clientInstanceIds(Duration.ZERO) ); assertThat(timeoutException.getMessage(), equalTo("Could not retrieve global consumer client instance id.")); assertThat(timeoutException.getCause(), instanceOf(java.util.concurrent.TimeoutException.class)); @@ -1796,25 +1801,25 @@ public void shouldCountDownTimeoutAcrossClient() { final AtomicBoolean didAssertGlobalThread = new AtomicBoolean(false); when(streamThreadOne.clientInstanceIds(any())) - .thenReturn(Collections.singletonMap("any-client-1", new KafkaFutureImpl() { - @Override - public Uuid get(final long timeout, final TimeUnit timeUnit) { - didAssertThreadOne.set(true); - assertThat(timeout, equalTo(expectedTimeout.getAndAdd(-10L))); - mockTime.sleep(10L); - return null; - } - })); + .thenReturn(Collections.singletonMap("any-client-1", new KafkaFutureImpl<>() { + @Override + public Uuid get(final long timeout, final TimeUnit timeUnit) { + didAssertThreadOne.set(true); + assertThat(timeout, equalTo(expectedTimeout.getAndAdd(-10L))); + mockTime.sleep(10L); + return null; + } + })); when(streamThreadTwo.clientInstanceIds(any())) - .thenReturn(Collections.singletonMap("any-client-2", new KafkaFutureImpl() { - @Override - public Uuid get(final long timeout, final TimeUnit timeUnit) { - didAssertThreadTwo.set(true); - assertThat(timeout, equalTo(expectedTimeout.getAndAdd(-5L))); - mockTime.sleep(5L); - return null; - } - })); + .thenReturn(Collections.singletonMap("any-client-2", new KafkaFutureImpl<>() { + @Override + public Uuid get(final long timeout, final TimeUnit timeUnit) { + didAssertThreadTwo.set(true); + assertThat(timeout, equalTo(expectedTimeout.getAndAdd(-5L))); + mockTime.sleep(5L); + return null; + } + })); final StreamsBuilder builder = getBuilderWithSource(); builder.globalTable("anyTopic"); @@ -1823,15 +1828,15 @@ public Uuid get(final long timeout, final TimeUnit timeUnit) { streams.start(); when(globalStreamThreadMockedConstruction.constructed().get(0).globalConsumerInstanceId(any())) - .thenReturn(new KafkaFutureImpl() { - @Override - public Uuid get(final long timeout, final TimeUnit timeUnit) { - didAssertGlobalThread.set(true); - assertThat(timeout, equalTo(expectedTimeout.getAndAdd(-8L))); - mockTime.sleep(8L); - return null; - } - }); + .thenReturn(new KafkaFutureImpl<>() { + @Override + public Uuid get(final long timeout, final TimeUnit timeUnit) { + didAssertGlobalThread.set(true); + assertThat(timeout, equalTo(expectedTimeout.getAndAdd(-8L))); + mockTime.sleep(8L); + return null; + } + }); streams.clientInstanceIds(Duration.ofMillis(60L)); } @@ -1840,14 +1845,146 @@ public Uuid get(final long timeout, final TimeUnit timeUnit) { assertThat(didAssertThreadTwo.get(), equalTo(true)); assertThat(didAssertGlobalThread.get(), equalTo(true)); } + @Test public void initShouldThrowMisconfiguredExceptionWhenInternalTopicsAreMisconfigured() { + prepareStreams(); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0); + + final InitParameters initParams = InitParameters.initParameters(); // no auto-setup + + try (final MockedConstruction mocked = mockConstruction(InternalTopicManager.class, + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + + when(result.misconfigurationsForTopics()).thenReturn(Map.of("topicA", List.of("bad config"))); + when(result.missingTopics()).thenReturn(Set.of("topicA")); // triggers validation error + + when(mock.validate(any())).thenReturn(result); + })) { + + final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); + + final StreamsException exception = assertThrows(StreamsException.class, () -> streams.init(initParams)); + + assertTrue(exception.getCause() instanceof MisconfiguredInternalTopicException); + assertTrue(exception.getCause().getMessage().contains("topicA")); + } + } + + @Test + public void initShouldThrowMissingSourceTopicExceptionWhenSourceTopicMissing() { + prepareStreams(); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0); + + final InitParameters initParams = InitParameters.initParameters(); // auto-setup is disabled + + try (final MockedConstruction mocked = mockConstruction(InternalTopicManager.class, + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + + when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); + when(result.missingTopics()).thenReturn(Set.of("source-topic")); // triggers missing topic error + + when(mock.validate(any())).thenReturn(result); + })) { + + final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); + + final StreamsException exception = assertThrows(StreamsException.class, () -> streams.init(initParams)); + + assertTrue(exception.getCause() instanceof MissingSourceTopicException); + assertTrue(exception.getCause().getMessage().contains("source-topic")); + } + } + + @Test + public void initShouldThrowInternalTopicsAlreadySetupExceptionIfAllExist() { + prepareStreams(); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0); + + final InitParameters initParams = InitParameters.initParameters(); // auto-setup not enabled + + try (final MockedConstruction mocked = mockConstruction(InternalTopicManager.class, + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + when(result.missingTopics()).thenReturn(Collections.emptySet()); + when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); + when(mock.validate(any())).thenReturn(result); + })) { + + final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); + + final StreamsException exception = assertThrows(StreamsException.class, () -> streams.init(initParams)); + + assertTrue(exception.getCause() instanceof InternalTopicsAlreadySetupException); + assertTrue(exception.getCause().getMessage().contains("All internal topics have already been setup")); + } } -// public void initShouldThrowMissingSourceTopicExceptionWhenSourceTopicIsMissing() -// public void initShouldThrowInternalTopicsAlreadySetupExceptionIfAllExist() -// public void initShouldSetupAllInternalTopicsIfNoneExist() -// public void initShouldThrowMissingInternalTopicsExceptionWhenDisabled() -// public void initShouldMakeReadyInternalTopicsWhenAutoSetupEnabled() + + @Test + public void initShouldThrowMissingInternalTopicsExceptionWhenDisabled() { + prepareStreams(); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0); + + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("source-topic").groupByKey().count(); + final Topology topology = builder.build(); + + final InitParameters initParams = InitParameters.initParameters(); // auto-setup not enabled + + try (MockedConstruction mocked = mockConstruction(InternalTopicManager.class, + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + when(result.missingTopics()).thenReturn(Set.of("some-missing-topic")); + when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); + when(mock.validate(any())).thenReturn(result); + })) { + + final KafkaStreams streams = new KafkaStreams(topology, props, supplier, time); + + assertThrows(StreamsException.class, () -> { + streams.init(initParams); + }); + + + final InternalTopicManager internalTopicManager = mocked.constructed().get(0); + verify(internalTopicManager, never()).setup(any()); + verify(internalTopicManager, never()).makeReady(any()); + } + } + + @Test + public void initShouldMakeReadyInternalTopicsWhenAutoSetupEnabled() { + prepareStreams(); + props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0); + + final StreamsBuilder builder = new StreamsBuilder(); + builder.stream("source-topic").groupByKey().count(); + final Topology topology = builder.build(); + + final InitParameters initParams = InitParameters.initParameters().enableSetupInternalTopicsIfIncomplete(); + + try (MockedConstruction mocked = mockConstruction(InternalTopicManager.class, + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + + // Simulate only some topics are missing — triggers makeReady + when(result.missingTopics()).thenReturn(Set.of("topicA")); + when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); + when(mock.validate(any())).thenReturn(result); + })) { + + final KafkaStreams streams = new KafkaStreams(topology, props, supplier, time); + + streams.init(initParams); + + final InternalTopicManager internalTopicManager = mocked.constructed().get(0); + verify(internalTopicManager).makeReady(any()); + verify(internalTopicManager, never()).setup(any()); + } + } + private Topology getStatefulTopology(final String inputTopic, final String outputTopic, final String globalTopicName, @@ -1855,44 +1992,44 @@ private Topology getStatefulTopology(final String inputTopic, final String globalStoreName, final boolean isPersistentStore) { final StoreBuilder> storeBuilder = Stores.keyValueStoreBuilder( - isPersistentStore ? - Stores.persistentKeyValueStore(storeName) - : Stores.inMemoryKeyValueStore(storeName), - Serdes.String(), - Serdes.Long()); + isPersistentStore ? + Stores.persistentKeyValueStore(storeName) + : Stores.inMemoryKeyValueStore(storeName), + Serdes.String(), + Serdes.Long()); final Topology topology = new Topology(); topology.addSource("source", new StringDeserializer(), new StringDeserializer(), inputTopic) - .addProcessor("process", () -> new Processor() { - private ProcessorContext context; + .addProcessor("process", () -> new Processor() { + private ProcessorContext context; - @Override - public void init(final ProcessorContext context) { - this.context = context; - } + @Override + public void init(final ProcessorContext context) { + this.context = context; + } - @Override - public void process(final Record record) { - final KeyValueStore kvStore = context.getStateStore(storeName); - kvStore.put(record.key(), 5L); + @Override + public void process(final Record record) { + final KeyValueStore kvStore = context.getStateStore(storeName); + kvStore.put(record.key(), 5L); - context.forward(record.withValue("5")); - context.commit(); - } - }, "source") - .addStateStore(storeBuilder, "process") - .addSink("sink", outputTopic, new StringSerializer(), new StringSerializer(), "process"); + context.forward(record.withValue("5")); + context.commit(); + } + }, "source") + .addStateStore(storeBuilder, "process") + .addSink("sink", outputTopic, new StringSerializer(), new StringSerializer(), "process"); final StoreBuilder> globalStoreBuilder = Stores.keyValueStoreBuilder( - isPersistentStore ? Stores.persistentKeyValueStore(globalStoreName) : Stores.inMemoryKeyValueStore(globalStoreName), - Serdes.String(), Serdes.String()).withLoggingDisabled(); + isPersistentStore ? Stores.persistentKeyValueStore(globalStoreName) : Stores.inMemoryKeyValueStore(globalStoreName), + Serdes.String(), Serdes.String()).withLoggingDisabled(); topology.addGlobalStore( - globalStoreBuilder, - "global", - new StringDeserializer(), - new StringDeserializer(), - globalTopicName, - globalTopicName + "-processor", - new MockProcessorSupplier<>()); + globalStoreBuilder, + "global", + new StringDeserializer(), + new StringDeserializer(), + globalTopicName, + globalTopicName + "-processor", + new MockProcessorSupplier<>()); return topology; } @@ -1904,11 +2041,11 @@ private StreamsBuilder getBuilderWithSource() { private void startStreamsAndCheckDirExists(final Topology topology, final boolean shouldFilesExist) { try (MockedConstruction stateDirectoryMockedConstruction = mockConstruction(StateDirectory.class, - (mock, context) -> { - when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()); - assertEquals(4, context.arguments().size()); - assertEquals(shouldFilesExist, context.arguments().get(2)); - })) { + (mock, context) -> { + when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()); + assertEquals(4, context.arguments().size()); + assertEquals(shouldFilesExist, context.arguments().get(2)); + })) { try (final KafkaStreams ignored = new KafkaStreams(topology, props, supplier, time)) { // verify that stateDirectory constructor was called @@ -1916,4 +2053,4 @@ private void startStreamsAndCheckDirExists(final Topology topology, final boolea } } } -} +} \ No newline at end of file From 0402ba769b0b3fde7808cc741d692b44e507d067 Mon Sep 17 00:00:00 2001 From: kapol Date: Thu, 22 May 2025 15:57:48 -0500 Subject: [PATCH 15/36] fix typo --- .../src/main/java/org/apache/kafka/streams/StreamsConfig.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 8ca3558ce1151..20733563eb1a1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -166,6 +166,7 @@ public class StreamsConfig extends AbstractConfig { private static final long DEFAULT_COMMIT_INTERVAL_MS = 30000L; private static final long EOS_DEFAULT_COMMIT_INTERVAL_MS = 100L; private static final int DEFAULT_TRANSACTION_TIMEOUT = 10000; + @Deprecated @SuppressWarnings("unused") public static final int DUMMY_THREAD_INDEX = 1; From 8cb10b86a253a4b140acf605af9059f4768b135d Mon Sep 17 00:00:00 2001 From: k-apol Date: Thu, 22 May 2025 16:03:28 -0500 Subject: [PATCH 16/36] typo --- .../src/main/java/org/apache/kafka/streams/AutoOffsetReset.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java b/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java index 6a95bd4171b5f..f3f3a941d20f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java +++ b/streams/src/main/java/org/apache/kafka/streams/AutoOffsetReset.java @@ -21,7 +21,7 @@ import org.apache.kafka.streams.kstream.KTable; import java.time.Duration; -import java.util.Optional; +import java.util.Optional; /** * Sets the {@code auto.offset.reset} configuration when From 29113714e15e8ef21461c15c6cfb92b5ad34590d Mon Sep 17 00:00:00 2001 From: k-apol Date: Thu, 22 May 2025 16:04:27 -0500 Subject: [PATCH 17/36] typo --- .../streams/processor/internals/ChangelogTopics.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java index d1fd0b245d5ce..b716f31b133db 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java @@ -73,10 +73,10 @@ public void setup() { for (final TaskId task : topicGroupTasks) { final Set changelogTopicPartitions = topicsInfo.stateChangelogTopics - .keySet() - .stream() - .map(topic -> new TopicPartition(topic, task.partition())) - .collect(Collectors.toSet()); + .keySet() + .stream() + .map(topic -> new TopicPartition(topic, task.partition())) + .collect(Collectors.toSet()); changelogPartitionsForStatefulTask.put(task, changelogTopicPartitions); } @@ -135,4 +135,4 @@ public Set statefulTaskIds() { public Map> changelogPartionsForTask() { return Collections.unmodifiableMap(changelogPartitionsForStatefulTask); } -} \ No newline at end of file +} From 013fe7d9afb0e54752de24d9ccdc5efa5dc570ba Mon Sep 17 00:00:00 2001 From: kapol Date: Thu, 12 Jun 2025 06:42:38 -0500 Subject: [PATCH 18/36] Allow errors to bubble up in init() Update javadoc for init() Misc cleanup --- .../apache/kafka/streams/KafkaStreams.java | 93 +++++++++++-------- .../kafka/streams/StreamsConfigTest.java | 2 + 2 files changed, 54 insertions(+), 41 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index e6ca43d8d86ed..f6ae8bc3e4db8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -303,19 +303,30 @@ public boolean isValidTransition(final State newState) { protected volatile State state = State.CREATED; /** - * Initializes broker-side state. + *

+ * Kafka Streams creates internal topics on the broker to make the application fault-tolerant and to repartition data + * as needed when a key-based operation (e.g. join) follows a key-changing operation (e.g. map) in the configured topology. + *

+ * This method initializes this required state, validates the internal topics are configured correctly, + * and specifies which internal topics to set up if some, but not all, are present. * * @throws MissingSourceTopicException if a source topic is missing * @throws MissingInternalTopicsException if some but not all of the internal topics are missing * @throws MisconfiguredInternalTopicException if an internal topics is misconfigured * @throws InternalTopicsAlreadySetupException if all internal topics are already setup + * @throws TimeoutException if initialization exceeds the default timeout of 30s */ public void init() { - this.init(DEFAULT_INIT_TIMEOUT_MS); + init(DEFAULT_INIT_TIMEOUT_MS); } /** - * Initializes broker-side state. + *

+ * Kafka Streams creates internal topics on the broker to make the application fault-tolerant and to repartition data + * as needed when a key-based operation (e.g. join) follows a key-changing operation (e.g. map) in the configured topology. + *

+ * This method initializes this required state, validates the internal topics are configured correctly, + * and specifies which internal topics to set up if some, but not all, are present. * * @throws MissingSourceTopicException if a source topic is missing * @throws MissingInternalTopicsException if some but not all of the internal topics are missing @@ -332,16 +343,19 @@ public void init(final Duration timeout) { } /** - * Initializes broker-side state. - * - * This methods takes parameters that specify which internal topics to setup if some - * but not all of them are absent. + *

+ * Kafka Streams creates internal topics on the broker to make the application fault-tolerant and to repartition data + * as needed when a key-based operation (e.g. join) follows a key-changing operation (e.g. map) in the configured topology. + *

+ * This method initializes this required state, validates the internal topics are configured correctly, + * and specifies which internal topics to set up if some, but not all, are present. * * @throws MissingSourceTopicException if a source topic is missing * @throws MissingInternalTopicsException if some but not all of the internal topics are missing * and the given initialization parameters do not specify to setup them * @throws MisconfiguredInternalTopicException if an internal topics is misconfigured * @throws InternalTopicsAlreadySetupException if all internal topics are already setup + * @throws TimeoutException if initialization exceeds the default timeout of 30s */ public void init(final InitParameters initParameters) { @@ -349,10 +363,12 @@ public void init(final InitParameters initParameters) { } /** - * Initializes broker-side state. - * - * This methods takes parameters that specify which internal topics to setup if some - * but not all of them are absent. + *

+ * Kafka Streams creates internal topics on the broker to make the application fault-tolerant and to repartition data + * as needed when a key-based operation (e.g. join) follows a key-changing operation (e.g. map) in the configured topology. + *

+ * This method initializes this required state, validates the internal topics are configured correctly, + * and specifies which internal topics to set up if some, but not all, are present. * * @throws MissingSourceTopicException if a source topic is missing * @throws MissingInternalTopicsException if some but not all of the internal topics are missing @@ -383,40 +399,35 @@ private void doInit(final InitParameters initParameters) { allSourceTopics.addAll(topicsInfo.sourceTopics); } } - try { - final ValidationResult validationResult = internalTopicManager.validate(allInternalTopics); // can throw timeout - final boolean noInternalTopicsExist = allInternalTopics.keySet() == validationResult.missingTopics(); - final boolean internalTopicsMisconfigured = !validationResult.misconfigurationsForTopics().isEmpty(); - final boolean allInternalTopicsExist = validationResult.missingTopics().isEmpty(); - final boolean missingSourceTopics = !Collections.disjoint(validationResult.missingTopics(), allSourceTopics); + final ValidationResult validationResult = internalTopicManager.validate(allInternalTopics); - if (internalTopicsMisconfigured) { - throw new MisconfiguredInternalTopicException("Misconfigured Internal Topics: " + validationResult.misconfigurationsForTopics()); - } - if (missingSourceTopics) { - allSourceTopics.retainAll(validationResult.missingTopics()); - throw new MissingSourceTopicException("Missing source topics: " + allSourceTopics); - } - if (noInternalTopicsExist) { - internalTopicManager.setup(allInternalTopics); - } else if (allInternalTopicsExist) { - throw new InternalTopicsAlreadySetupException("All internal topics have already been setup"); - } else { - if (initParameters.setupInternalTopicsIfIncompleteEnabled()) { - final Map topicsToCreate = new HashMap<>(); - for (final String missingTopic : validationResult.missingTopics()) { - topicsToCreate.put(missingTopic, allInternalTopics.get(missingTopic)); - } - internalTopicManager.makeReady(topicsToCreate); - } else { - throw new MissingInternalTopicsException("Missing Internal Topics: ", new ArrayList<>(validationResult.missingTopics())); + final boolean noInternalTopicsExist = allInternalTopics.keySet().equals(validationResult.missingTopics()); + final boolean internalTopicsMisconfigured = !validationResult.misconfigurationsForTopics().isEmpty(); + final boolean allInternalTopicsExist = validationResult.missingTopics().isEmpty(); + final boolean missingSourceTopics = !Collections.disjoint(validationResult.missingTopics(), allSourceTopics); + + if (internalTopicsMisconfigured) { + throw new MisconfiguredInternalTopicException("Misconfigured Internal Topics: " + validationResult.misconfigurationsForTopics()); + } + if (missingSourceTopics) { + allSourceTopics.retainAll(validationResult.missingTopics()); + throw new MissingSourceTopicException("Missing source topics: " + allSourceTopics); + } + if (noInternalTopicsExist) { + internalTopicManager.setup(allInternalTopics); + } else if (allInternalTopicsExist) { + throw new InternalTopicsAlreadySetupException("All internal topics have already been setup"); + } else { + if (initParameters.setupInternalTopicsIfIncompleteEnabled()) { + final Map topicsToCreate = new HashMap<>(); + for (final String missingTopic : validationResult.missingTopics()) { + topicsToCreate.put(missingTopic, allInternalTopics.get(missingTopic)); } + internalTopicManager.makeReady(topicsToCreate); + } else { + throw new MissingInternalTopicsException("Missing Internal Topics: ", new ArrayList<>(validationResult.missingTopics())); } - } catch (final TimeoutException timeoutException) { - throw new TimeoutException(timeoutException.getMessage(), timeoutException); - } catch (final StreamsException streamsException) { - throw new StreamsException(streamsException.getMessage(), streamsException); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 005be86ceab39..ad934ff857856 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -481,12 +481,14 @@ public void shouldOverrideStreamsDefaultProducerConfigs() { assertEquals("10000", producerConfigs.get(ProducerConfig.LINGER_MS_CONFIG)); assertEquals("30000", producerConfigs.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)); } + @Test public void shouldParseInternalTopicSetupConfig() { props.put(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG, StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL); final StreamsConfig config = new StreamsConfig(props); assertEquals(StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL, config.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG)); } + @Test public void shouldNotThrowIfTransactionTimeoutSmallerThanCommitIntervalForAtLeastOnce() { props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE); From 62f13e54b5290f7200bdcc20554de8772dec9c76 Mon Sep 17 00:00:00 2001 From: kapol Date: Fri, 13 Jun 2025 05:35:19 -0500 Subject: [PATCH 19/36] cleanup formatting --- .../java/org/apache/kafka/streams/KafkaStreams.java | 10 +++++----- .../org/apache/kafka/streams/KafkaStreamsTest.java | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index f6ae8bc3e4db8..883be6eb6ba9d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -43,12 +43,12 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.InternalTopicsAlreadySetupException; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.errors.InvalidStateStorePartitionException; -import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.MisconfiguredInternalTopicException; import org.apache.kafka.streams.errors.MissingInternalTopicsException; -import org.apache.kafka.streams.errors.InternalTopicsAlreadySetupException; +import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsNotStartedException; @@ -65,8 +65,9 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ClientUtils; import org.apache.kafka.streams.processor.internals.GlobalStreamThread; -import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.InternalTopicConfig; +import org.apache.kafka.streams.processor.internals.InternalTopicManager; +import org.apache.kafka.streams.processor.internals.InternalTopicManager.ValidationResult; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StreamThread; @@ -76,7 +77,6 @@ import org.apache.kafka.streams.processor.internals.TopologyMetadata; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; -import org.apache.kafka.streams.processor.internals.InternalTopicManager.ValidationResult; import org.apache.kafka.streams.query.FailureReason; import org.apache.kafka.streams.query.PositionBound; import org.apache.kafka.streams.query.QueryConfig; @@ -118,8 +118,8 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG; import static org.apache.kafka.streams.StreamsConfig.DEFAULT_INIT_TIMEOUT_MS; +import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration; diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 00d9c158d9817..1f8bb5ea261ff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -35,12 +35,12 @@ import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KafkaStreams.InitParameters; -import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.errors.MissingSourceTopicException; +import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.errors.InternalTopicsAlreadySetupException; import org.apache.kafka.streams.errors.MisconfiguredInternalTopicException; +import org.apache.kafka.streams.errors.MissingSourceTopicException; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsNotStartedException; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.errors.UnknownStateStoreException; @@ -120,7 +120,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.doAnswer; @@ -130,6 +129,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; From 1ba84bb840b6037d5b8264a5d96456efe679075b Mon Sep 17 00:00:00 2001 From: kapol Date: Fri, 13 Jun 2025 07:07:19 -0500 Subject: [PATCH 20/36] refactor makeReady to throw if manual setup enabled and not called by init method --- .../internals/InternalTopicManager.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index d02ed19133da8..8ab8883c7df71 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.MissingInternalTopicsException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.internals.ClientUtils.QuietConsumerConfig; @@ -75,6 +76,7 @@ public class InternalTopicManager { private final Time time; private final Admin adminClient; + private final boolean isManualInternalTopicConfig; private final short replicationFactor; private final long windowChangeLogAdditionalRetention; @@ -90,6 +92,8 @@ public InternalTopicManager(final Time time, final StreamsConfig streamsConfig) { this.time = time; this.adminClient = adminClient; + this.isManualInternalTopicConfig = streamsConfig.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG) + .equals(StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL); final LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())); log = logContext.logger(getClass()); @@ -475,6 +479,12 @@ public Set makeReady(final Map topics) { while (!topicsNotReady.isEmpty()) { final Set tempUnknownTopics = new HashSet<>(); topicsNotReady = validateTopics(topicsNotReady, topics, tempUnknownTopics); + + if (this.isManualInternalTopicConfig && !this.isInitializing) { + throw new MissingInternalTopicsException("Internal topic configuration set to MANUAL. \n" + + "You must call init() to setup internal topics.", new ArrayList<>(topicsNotReady)); + } + newlyCreatedTopics.addAll(topicsNotReady); if (!topicsNotReady.isEmpty()) { @@ -569,7 +579,7 @@ public Set makeReady(final Map topics) { } } log.debug("Completed validating internal topics and created {}", newlyCreatedTopics); - + this.isInitializing = false; return newlyCreatedTopics; } @@ -628,9 +638,6 @@ public void setInitTimeout(final Duration timeoutMs) { this.isInitializing = true; this.initTimeout = timeoutMs; } - private boolean isInitializing() { - return this.isInitializing; - } /** * Try to get the number of partitions for the given topics; return the number of partitions for topics that already exists. @@ -888,7 +895,7 @@ private void maybeThrowTimeoutExceptionDuringMakeReady(final long deadlineMs, final Set topicsNotReady, final long retryBackoffMs, final long currentWallClockMs) { - final boolean isInitializationTimeout = this.isInitializing() && currentWallClockMs >= initDeadlineMs; + final boolean isInitializationTimeout = this.isInitializing && currentWallClockMs >= initDeadlineMs; if (isInitializationTimeout || currentWallClockMs >= deadlineMs) { final String timeoutError; From f4070d1b05c381985f93839603b1fc0a4db5c07a Mon Sep 17 00:00:00 2001 From: kapol Date: Fri, 13 Jun 2025 07:07:19 -0500 Subject: [PATCH 21/36] refactor makeReady to throw if manual setup enabled and not called by init method. Add descriptive helper to throw on invalid invoke --- .../internals/InternalTopicManager.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index d02ed19133da8..8ab8883c7df71 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.MissingInternalTopicsException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.internals.ClientUtils.QuietConsumerConfig; @@ -75,6 +76,7 @@ public class InternalTopicManager { private final Time time; private final Admin adminClient; + private final boolean isManualInternalTopicConfig; private final short replicationFactor; private final long windowChangeLogAdditionalRetention; @@ -90,6 +92,8 @@ public InternalTopicManager(final Time time, final StreamsConfig streamsConfig) { this.time = time; this.adminClient = adminClient; + this.isManualInternalTopicConfig = streamsConfig.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG) + .equals(StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL); final LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())); log = logContext.logger(getClass()); @@ -475,6 +479,12 @@ public Set makeReady(final Map topics) { while (!topicsNotReady.isEmpty()) { final Set tempUnknownTopics = new HashSet<>(); topicsNotReady = validateTopics(topicsNotReady, topics, tempUnknownTopics); + + if (this.isManualInternalTopicConfig && !this.isInitializing) { + throw new MissingInternalTopicsException("Internal topic configuration set to MANUAL. \n" + + "You must call init() to setup internal topics.", new ArrayList<>(topicsNotReady)); + } + newlyCreatedTopics.addAll(topicsNotReady); if (!topicsNotReady.isEmpty()) { @@ -569,7 +579,7 @@ public Set makeReady(final Map topics) { } } log.debug("Completed validating internal topics and created {}", newlyCreatedTopics); - + this.isInitializing = false; return newlyCreatedTopics; } @@ -628,9 +638,6 @@ public void setInitTimeout(final Duration timeoutMs) { this.isInitializing = true; this.initTimeout = timeoutMs; } - private boolean isInitializing() { - return this.isInitializing; - } /** * Try to get the number of partitions for the given topics; return the number of partitions for topics that already exists. @@ -888,7 +895,7 @@ private void maybeThrowTimeoutExceptionDuringMakeReady(final long deadlineMs, final Set topicsNotReady, final long retryBackoffMs, final long currentWallClockMs) { - final boolean isInitializationTimeout = this.isInitializing() && currentWallClockMs >= initDeadlineMs; + final boolean isInitializationTimeout = this.isInitializing && currentWallClockMs >= initDeadlineMs; if (isInitializationTimeout || currentWallClockMs >= deadlineMs) { final String timeoutError; From 9d0aedfb74168b113d39be7f589fe4fb25a9d3fc Mon Sep 17 00:00:00 2001 From: kapol Date: Sat, 14 Jun 2025 08:35:10 -0500 Subject: [PATCH 22/36] refactor makeReady to throw if manual setup enabled and not called by init method. Add descriptive helper to throw on invalid invoke --- .../processor/internals/InternalTopicManager.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 8ab8883c7df71..b48cf53ab6fb2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -480,10 +480,7 @@ public Set makeReady(final Map topics) { final Set tempUnknownTopics = new HashSet<>(); topicsNotReady = validateTopics(topicsNotReady, topics, tempUnknownTopics); - if (this.isManualInternalTopicConfig && !this.isInitializing) { - throw new MissingInternalTopicsException("Internal topic configuration set to MANUAL. \n" + - "You must call init() to setup internal topics.", new ArrayList<>(topicsNotReady)); - } + throwIfManualSetupEnabledAndCalledWithoutInit(topicsNotReady); newlyCreatedTopics.addAll(topicsNotReady); @@ -583,6 +580,13 @@ public Set makeReady(final Map topics) { return newlyCreatedTopics; } + private void throwIfManualSetupEnabledAndCalledWithoutInit(final Set topicsNotReady) { + if (this.isManualInternalTopicConfig && !this.isInitializing) { + throw new MissingInternalTopicsException("Internal topic configuration set to MANUAL. \n" + + "You must call init() to setup internal topics.", new ArrayList<>(topicsNotReady)); + } + } + /** * Try to get the partition information for the given topics; return the partition info for topics that already exists. * From ee9a89fd51efa6a2e55fd246f15b26a2e151cc80 Mon Sep 17 00:00:00 2001 From: kapol Date: Sat, 14 Jun 2025 08:37:39 -0500 Subject: [PATCH 23/36] refactor makeReady to throw if manual setup enabled and not called by init method. Add descriptive helper to throw on invalid invoke Update logical check for isManualInternalTopicConfig to avoid NPE --- .../streams/processor/internals/InternalTopicManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index b48cf53ab6fb2..39bcf72a1bf4b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -92,8 +92,8 @@ public InternalTopicManager(final Time time, final StreamsConfig streamsConfig) { this.time = time; this.adminClient = adminClient; - this.isManualInternalTopicConfig = streamsConfig.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG) - .equals(StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL); + this.isManualInternalTopicConfig = streamsConfig.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL) + .equals(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG); final LogContext logContext = new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())); log = logContext.logger(getClass()); From 6a9d6f1fc094e8753481d21f000abda51dd37d24 Mon Sep 17 00:00:00 2001 From: kapol Date: Tue, 24 Jun 2025 05:38:20 -0500 Subject: [PATCH 24/36] Add formatting updates - empty lines between blocks, remove uneccessary line break in ChangelogTOpics.java --- .../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 4 ++++ .../kafka/streams/processor/internals/ChangelogTopics.java | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 6485216a5ddf9..436a216770c3c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -387,12 +387,14 @@ public void init(final InitParameters initParameters, final Duration timeout) { private void doInit(final InitParameters initParameters) { final InternalTopicManager internalTopicManager = new InternalTopicManager(time, adminClient, applicationConfigs); + if (initParameters.hasTimeoutEnabled()) { internalTopicManager.setInitTimeout(initParameters.getTimeout()); } final Map allInternalTopics = new HashMap<>(); final Set allSourceTopics = new HashSet<>(); + for (final Map subtopologyMap : topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) { for (final InternalTopologyBuilder.TopicsInfo topicsInfo : subtopologyMap.values()) { allInternalTopics.putAll(topicsInfo.stateChangelogTopics); @@ -411,10 +413,12 @@ private void doInit(final InitParameters initParameters) { if (internalTopicsMisconfigured) { throw new MisconfiguredInternalTopicException("Misconfigured Internal Topics: " + validationResult.misconfigurationsForTopics()); } + if (missingSourceTopics) { allSourceTopics.retainAll(validationResult.missingTopics()); throw new MissingSourceTopicException("Missing source topics: " + allSourceTopics); } + if (noInternalTopicsExist) { internalTopicManager.setup(allInternalTopics); } else if (allInternalTopicsExist) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java index b716f31b133db..aaf8ba16a51b2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogTopics.java @@ -135,4 +135,4 @@ public Set statefulTaskIds() { public Map> changelogPartionsForTask() { return Collections.unmodifiableMap(changelogPartitionsForStatefulTask); } -} +} \ No newline at end of file From 168e9c9249cdb3812e9a8969fb58e94830010f6c Mon Sep 17 00:00:00 2001 From: kapol Date: Tue, 24 Jun 2025 05:49:43 -0500 Subject: [PATCH 25/36] formatting updates - remove excess indent --- .../kafka/streams/KafkaStreamsTest.java | 66 +++++++++---------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index dcf8f1d365789..c41fa4f8383c4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -413,7 +413,7 @@ public void shouldInitializeTasksForLocalStateOnStart() { prepareStreamThread(streamThreadTwo, 2); try (final MockedConstruction constructed = mockConstruction(StateDirectory.class, - (mock, context) -> when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) { + (mock, context) -> when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) { try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { assertEquals(1, constructed.constructed().size()); final StateDirectory stateDirectory = constructed.constructed().get(0); @@ -1854,14 +1854,14 @@ public void initShouldThrowMisconfiguredExceptionWhenInternalTopicsAreMisconfigu final InitParameters initParams = InitParameters.initParameters(); // no auto-setup try (final MockedConstruction mocked = mockConstruction(InternalTopicManager.class, - (mock, context) -> { - final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); - when(result.misconfigurationsForTopics()).thenReturn(Map.of("topicA", List.of("bad config"))); - when(result.missingTopics()).thenReturn(Set.of("topicA")); // triggers validation error + when(result.misconfigurationsForTopics()).thenReturn(Map.of("topicA", List.of("bad config"))); + when(result.missingTopics()).thenReturn(Set.of("topicA")); // triggers validation error - when(mock.validate(any())).thenReturn(result); - })) { + when(mock.validate(any())).thenReturn(result); + })) { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); @@ -1880,14 +1880,14 @@ public void initShouldThrowMissingSourceTopicExceptionWhenSourceTopicMissing() { final InitParameters initParams = InitParameters.initParameters(); // auto-setup is disabled try (final MockedConstruction mocked = mockConstruction(InternalTopicManager.class, - (mock, context) -> { - final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); - when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); - when(result.missingTopics()).thenReturn(Set.of("source-topic")); // triggers missing topic error + when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); + when(result.missingTopics()).thenReturn(Set.of("source-topic")); // triggers missing topic error - when(mock.validate(any())).thenReturn(result); - })) { + when(mock.validate(any())).thenReturn(result); + })) { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); @@ -1906,13 +1906,13 @@ public void initShouldThrowInternalTopicsAlreadySetupExceptionIfAllExist() { final InitParameters initParams = InitParameters.initParameters(); // auto-setup not enabled try (final MockedConstruction mocked = mockConstruction(InternalTopicManager.class, - (mock, context) -> { - final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); - when(result.missingTopics()).thenReturn(Collections.emptySet()); - when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + when(result.missingTopics()).thenReturn(Collections.emptySet()); + when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); - when(mock.validate(any())).thenReturn(result); - })) { + when(mock.validate(any())).thenReturn(result); + })) { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); @@ -1935,12 +1935,12 @@ public void initShouldThrowMissingInternalTopicsExceptionWhenDisabled() { final InitParameters initParams = InitParameters.initParameters(); // auto-setup not enabled try (MockedConstruction mocked = mockConstruction(InternalTopicManager.class, - (mock, context) -> { - final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); - when(result.missingTopics()).thenReturn(Set.of("some-missing-topic")); - when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); - when(mock.validate(any())).thenReturn(result); - })) { + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + when(result.missingTopics()).thenReturn(Set.of("some-missing-topic")); + when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); + when(mock.validate(any())).thenReturn(result); + })) { final KafkaStreams streams = new KafkaStreams(topology, props, supplier, time); @@ -1967,14 +1967,14 @@ public void initShouldMakeReadyInternalTopicsWhenAutoSetupEnabled() { final InitParameters initParams = InitParameters.initParameters().enableSetupInternalTopicsIfIncomplete(); try (MockedConstruction mocked = mockConstruction(InternalTopicManager.class, - (mock, context) -> { - final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); - - // Simulate only some topics are missing — triggers makeReady - when(result.missingTopics()).thenReturn(Set.of("topicA")); - when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); - when(mock.validate(any())).thenReturn(result); - })) { + (mock, context) -> { + final InternalTopicManager.ValidationResult result = mock(InternalTopicManager.ValidationResult.class); + + // Simulate only some topics are missing — triggers makeReady + when(result.missingTopics()).thenReturn(Set.of("topicA")); + when(result.misconfigurationsForTopics()).thenReturn(Collections.emptyMap()); + when(mock.validate(any())).thenReturn(result); + })) { final KafkaStreams streams = new KafkaStreams(topology, props, supplier, time); From 26e26b1c014c08afd0ed05c0ee95cdcf12643638 Mon Sep 17 00:00:00 2001 From: kapol Date: Tue, 24 Jun 2025 05:50:05 -0500 Subject: [PATCH 26/36] formatting - missing blank line --- .../src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index c41fa4f8383c4..e569f05b98159 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -1846,6 +1846,7 @@ public Uuid get(final long timeout, final TimeUnit timeUnit) { assertThat(didAssertThreadTwo.get(), equalTo(true)); assertThat(didAssertGlobalThread.get(), equalTo(true)); } + @Test public void initShouldThrowMisconfiguredExceptionWhenInternalTopicsAreMisconfigured() { prepareStreams(); From 25a07e9e293f79702be4bd7a5c4df0a17608526f Mon Sep 17 00:00:00 2001 From: kapol Date: Tue, 24 Jun 2025 05:54:54 -0500 Subject: [PATCH 27/36] Code style update: remove unneccessary "this" references in InternalTopicManager.java --- .../processor/internals/InternalTopicManager.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 39bcf72a1bf4b..4b4d93122c130 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -106,7 +106,7 @@ public InternalTopicManager(final Time time, consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); retryTimeoutMs = new QuietConsumerConfig(consumerConfig).getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG) / 2L; - this.initTimeout = StreamsConfig.DEFAULT_INIT_TIMEOUT_MS; + initTimeout = StreamsConfig.DEFAULT_INIT_TIMEOUT_MS; log.debug("Configs:" + Utils.NL + "\t{} = {}" + Utils.NL + @@ -471,7 +471,7 @@ public Set makeReady(final Map topics) { final long currentWallClockMs = time.milliseconds(); final long deadlineMs = currentWallClockMs + retryTimeoutMs; - final long initDeadlineMs = currentWallClockMs + this.initTimeout.toMillis(); + final long initDeadlineMs = currentWallClockMs + initTimeout.toMillis(); Set topicsNotReady = new HashSet<>(topics.keySet()); final Set newlyCreatedTopics = new HashSet<>(); @@ -576,12 +576,12 @@ public Set makeReady(final Map topics) { } } log.debug("Completed validating internal topics and created {}", newlyCreatedTopics); - this.isInitializing = false; + isInitializing = false; return newlyCreatedTopics; } private void throwIfManualSetupEnabledAndCalledWithoutInit(final Set topicsNotReady) { - if (this.isManualInternalTopicConfig && !this.isInitializing) { + if (isManualInternalTopicConfig && !isInitializing) { throw new MissingInternalTopicsException("Internal topic configuration set to MANUAL. \n" + "You must call init() to setup internal topics.", new ArrayList<>(topicsNotReady)); } @@ -639,8 +639,8 @@ protected Map> getTopicPartitionInfo(final Set< } public void setInitTimeout(final Duration timeoutMs) { - this.isInitializing = true; - this.initTimeout = timeoutMs; + isInitializing = true; + initTimeout = timeoutMs; } /** From 01debbc77edc00ea5bbe1917058c2b23a8b75c50 Mon Sep 17 00:00:00 2001 From: kapol Date: Tue, 24 Jun 2025 06:00:28 -0500 Subject: [PATCH 28/36] Update logical check to throw once topic creation starts. See: https://github.com/apache/kafka/pull/19913/#discussion_r2146261373 --- .../streams/processor/internals/InternalTopicManager.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 4b4d93122c130..3ceae58a3a7d3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -480,11 +480,11 @@ public Set makeReady(final Map topics) { final Set tempUnknownTopics = new HashSet<>(); topicsNotReady = validateTopics(topicsNotReady, topics, tempUnknownTopics); - throwIfManualSetupEnabledAndCalledWithoutInit(topicsNotReady); - newlyCreatedTopics.addAll(topicsNotReady); - + if (!topicsNotReady.isEmpty()) { + throwIfManualSetupEnabledAndCalledWithoutInit(topicsNotReady); + final Set newTopics = new HashSet<>(); for (final String topicName : topicsNotReady) { From 72c6d5134d0dbcdf9270e5ac9c83b34265cb58ed Mon Sep 17 00:00:00 2001 From: kapol Date: Wed, 25 Jun 2025 05:59:43 -0500 Subject: [PATCH 29/36] Update tests to match new init structure --- .../kafka/streams/KafkaStreamsTest.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index e569f05b98159..8c3f828b14b66 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.streams.errors.InternalTopicsAlreadySetupException; import org.apache.kafka.streams.errors.MisconfiguredInternalTopicException; import org.apache.kafka.streams.errors.MissingSourceTopicException; +import org.apache.kafka.streams.errors.MissingInternalTopicsException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsNotStartedException; import org.apache.kafka.streams.errors.TopologyException; @@ -1866,10 +1867,12 @@ public void initShouldThrowMisconfiguredExceptionWhenInternalTopicsAreMisconfigu final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - final StreamsException exception = assertThrows(StreamsException.class, () -> streams.init(initParams)); + final MisconfiguredInternalTopicException exception = assertThrows( + MisconfiguredInternalTopicException.class, + () -> streams.init(initParams) + ); - assertTrue(exception.getCause() instanceof MisconfiguredInternalTopicException); - assertTrue(exception.getCause().getMessage().contains("topicA")); + assertTrue(exception.getMessage().contains("topicA")); } } @@ -1892,10 +1895,11 @@ public void initShouldThrowMissingSourceTopicExceptionWhenSourceTopicMissing() { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - final StreamsException exception = assertThrows(StreamsException.class, () -> streams.init(initParams)); + final MissingSourceTopicException exception = assertThrows(MissingSourceTopicException.class, + () -> streams.init(initParams) + ); - assertTrue(exception.getCause() instanceof MissingSourceTopicException); - assertTrue(exception.getCause().getMessage().contains("source-topic")); + assertTrue(exception.getMessage().contains("source-topic")); } } @@ -1917,10 +1921,10 @@ public void initShouldThrowInternalTopicsAlreadySetupExceptionIfAllExist() { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - final StreamsException exception = assertThrows(StreamsException.class, () -> streams.init(initParams)); + final StreamsException exception = assertThrows(InternalTopicsAlreadySetupException.class, + () -> streams.init(initParams)); - assertTrue(exception.getCause() instanceof InternalTopicsAlreadySetupException); - assertTrue(exception.getCause().getMessage().contains("All internal topics have already been setup")); + assertTrue(exception.getMessage().contains("All internal topics have already been setup")); } } @@ -1945,8 +1949,8 @@ public void initShouldThrowMissingInternalTopicsExceptionWhenDisabled() { final KafkaStreams streams = new KafkaStreams(topology, props, supplier, time); - assertThrows(StreamsException.class, () -> { - streams.init(initParams); + assertThrows(MissingInternalTopicsException.class, + () -> {streams.init(initParams); }); From 2a90edcda6173deb70c7ddd6276d0dabbfe4bbbe Mon Sep 17 00:00:00 2001 From: kapol Date: Wed, 25 Jun 2025 06:00:28 -0500 Subject: [PATCH 30/36] decouple logical checks - these should be separate questions, not an else-if --- .../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 436a216770c3c..0da33dba409ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -421,7 +421,9 @@ private void doInit(final InitParameters initParameters) { if (noInternalTopicsExist) { internalTopicManager.setup(allInternalTopics); - } else if (allInternalTopicsExist) { + } + + if (allInternalTopicsExist) { throw new InternalTopicsAlreadySetupException("All internal topics have already been setup"); } else { if (initParameters.setupInternalTopicsIfIncompleteEnabled()) { From b41b361e503de917c8e3e500d9a8db166b04806a Mon Sep 17 00:00:00 2001 From: kapol Date: Wed, 25 Jun 2025 06:01:25 -0500 Subject: [PATCH 31/36] remove unnecessary this ref --- .../kafka/streams/processor/internals/InternalTopicManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 3ceae58a3a7d3..29d288d2c8f7f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -899,7 +899,7 @@ private void maybeThrowTimeoutExceptionDuringMakeReady(final long deadlineMs, final Set topicsNotReady, final long retryBackoffMs, final long currentWallClockMs) { - final boolean isInitializationTimeout = this.isInitializing && currentWallClockMs >= initDeadlineMs; + final boolean isInitializationTimeout = isInitializing && currentWallClockMs >= initDeadlineMs; if (isInitializationTimeout || currentWallClockMs >= deadlineMs) { final String timeoutError; From 44c888a15c3a989f885cb399968b831e34caedb9 Mon Sep 17 00:00:00 2001 From: kapol Date: Wed, 25 Jun 2025 06:03:36 -0500 Subject: [PATCH 32/36] remove unnecessary this ref --- .../test/java/org/apache/kafka/streams/KafkaStreamsTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 8c3f828b14b66..f76ea89921556 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -1950,8 +1950,8 @@ public void initShouldThrowMissingInternalTopicsExceptionWhenDisabled() { final KafkaStreams streams = new KafkaStreams(topology, props, supplier, time); assertThrows(MissingInternalTopicsException.class, - () -> {streams.init(initParams); - }); + () -> { streams.init(initParams); + }); final InternalTopicManager internalTopicManager = mocked.constructed().get(0); From 8cb778a805158166a2d70dad0276dd1db5c2dc7d Mon Sep 17 00:00:00 2001 From: kapol Date: Wed, 25 Jun 2025 06:26:20 -0500 Subject: [PATCH 33/36] formatting update --- .../test/java/org/apache/kafka/streams/KafkaStreamsTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index f76ea89921556..ea1bab84f56ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -1949,9 +1949,7 @@ public void initShouldThrowMissingInternalTopicsExceptionWhenDisabled() { final KafkaStreams streams = new KafkaStreams(topology, props, supplier, time); - assertThrows(MissingInternalTopicsException.class, - () -> { streams.init(initParams); - }); + assertThrows(MissingInternalTopicsException.class, () -> streams.init(initParams)); final InternalTopicManager internalTopicManager = mocked.constructed().get(0); From 5eca29ab6a4e3448a9230b2e1349511be2b3d062 Mon Sep 17 00:00:00 2001 From: kapol Date: Wed, 25 Jun 2025 06:27:27 -0500 Subject: [PATCH 34/36] refactor away from state-based initialization check, rather provide an overload to do this ad-hoc - see https://github.com/apache/kafka/pull/19913#discussion_r2162812021 --- .../internals/InternalTopicManager.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index 29d288d2c8f7f..afeaee217a3d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -83,7 +83,6 @@ public class InternalTopicManager { private final long retryBackOffMs; private final long retryTimeoutMs; private Duration initTimeout; - private boolean isInitializing = false; private final Map defaultTopicConfigs = new HashMap<>(); @@ -465,6 +464,10 @@ public Map> getTopicPartitionInfo(final Set makeReady(final Map topics) { + return makeReady(topics, false); + } + + public Set makeReady(final Map topics, final boolean isInitializing) { // we will do the validation / topic-creation in a loop, until we have confirmed all topics // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); @@ -483,7 +486,7 @@ public Set makeReady(final Map topics) { newlyCreatedTopics.addAll(topicsNotReady); if (!topicsNotReady.isEmpty()) { - throwIfManualSetupEnabledAndCalledWithoutInit(topicsNotReady); + throwIfManualSetupEnabledAndCalledWithoutInit(topicsNotReady, isInitializing); final Set newTopics = new HashSet<>(); @@ -572,15 +575,16 @@ public Set makeReady(final Map topics) { initDeadlineMs, topicsNotReady, retryBackOffMs, - currentWallClockMs); + currentWallClockMs, + isInitializing); } } log.debug("Completed validating internal topics and created {}", newlyCreatedTopics); - isInitializing = false; + return newlyCreatedTopics; } - private void throwIfManualSetupEnabledAndCalledWithoutInit(final Set topicsNotReady) { + private void throwIfManualSetupEnabledAndCalledWithoutInit(final Set topicsNotReady, final boolean isInitializing) { if (isManualInternalTopicConfig && !isInitializing) { throw new MissingInternalTopicsException("Internal topic configuration set to MANUAL. \n" + "You must call init() to setup internal topics.", new ArrayList<>(topicsNotReady)); @@ -639,7 +643,6 @@ protected Map> getTopicPartitionInfo(final Set< } public void setInitTimeout(final Duration timeoutMs) { - isInitializing = true; initTimeout = timeoutMs; } @@ -898,7 +901,8 @@ private void maybeThrowTimeoutExceptionDuringMakeReady(final long deadlineMs, final long initDeadlineMs, final Set topicsNotReady, final long retryBackoffMs, - final long currentWallClockMs) { + final long currentWallClockMs, + final boolean isInitializing) { final boolean isInitializationTimeout = isInitializing && currentWallClockMs >= initDeadlineMs; if (isInitializationTimeout || currentWallClockMs >= deadlineMs) { From b11bb29933de7416f7b61f5037da435d1748c5a0 Mon Sep 17 00:00:00 2001 From: kapol Date: Wed, 25 Jun 2025 06:30:12 -0500 Subject: [PATCH 35/36] update makeReady call from init method --- .../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 0da33dba409ff..19f07f1552e4b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -431,7 +431,7 @@ private void doInit(final InitParameters initParameters) { for (final String missingTopic : validationResult.missingTopics()) { topicsToCreate.put(missingTopic, allInternalTopics.get(missingTopic)); } - internalTopicManager.makeReady(topicsToCreate); + internalTopicManager.makeReady(topicsToCreate, true); } else { throw new MissingInternalTopicsException("Missing Internal Topics: ", new ArrayList<>(validationResult.missingTopics())); } From 782d36c4b78a10f7ae2710f1e4de4db990fdb734 Mon Sep 17 00:00:00 2001 From: kapol Date: Sat, 28 Jun 2025 06:45:04 -0500 Subject: [PATCH 36/36] formatting update --- .../test/java/org/apache/kafka/streams/KafkaStreamsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index ea1bab84f56ea..7924a9f0e546d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -39,8 +39,8 @@ import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.errors.InternalTopicsAlreadySetupException; import org.apache.kafka.streams.errors.MisconfiguredInternalTopicException; -import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.MissingInternalTopicsException; +import org.apache.kafka.streams.errors.MissingSourceTopicException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.StreamsNotStartedException; import org.apache.kafka.streams.errors.TopologyException;