Skip to content

KAFKA-10357: Add explicit internal topic initialization for Kafka Streams #19913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 39 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
0a3f8e9
test commit
k-apol Apr 13, 2025
1768707
Add new internal topic setup config & related test
k-apol Apr 13, 2025
12f67af
Update new internal topic doc to be private member
k-apol Apr 13, 2025
160025d
remove redundant lines in test case
k-apol Apr 13, 2025
8dedaf9
Add boilerplate stub for init method and Javadoc comments
k-apol Apr 16, 2025
71d32cb
Add MissingInternalTopicsException per KIP
k-apol Apr 18, 2025
e081b05
Add InternalTopicAlreadySetup exception
k-apol Apr 18, 2025
e8aeaeb
Add MisconfiguredInternalTopicException
k-apol Apr 18, 2025
3757672
Add boilerplate class for Initparameters
k-apol Apr 18, 2025
00c847b
Add boilerplate init methods in KafkaStreams to support manual initia…
k-apol Apr 18, 2025
44b3860
new setup, test commit to verify connection to IDE
k-apol May 1, 2025
a888e30
Add init method to Kafka Streams
k-apol May 20, 2025
f139ab1
Style fixes
k-apol May 21, 2025
7496e51
Add unit tests for init method
k-apol May 22, 2025
0402ba7
fix typo
k-apol May 22, 2025
8cb10b8
typo
k-apol May 22, 2025
2911371
typo
k-apol May 22, 2025
ec63b89
Merge branch 'trunk' into KAFKA-10357-Accidental-Repartition-Handling
k-apol Jun 6, 2025
013fe7d
Allow errors to bubble up in init()
k-apol Jun 12, 2025
62f13e5
cleanup formatting
k-apol Jun 13, 2025
1ba84bb
refactor makeReady to throw if manual setup enabled and not called by…
k-apol Jun 13, 2025
f4070d1
refactor makeReady to throw if manual setup enabled and not called by…
k-apol Jun 13, 2025
9d0aedf
refactor makeReady to throw if manual setup enabled and not called by…
k-apol Jun 14, 2025
ee9a89f
refactor makeReady to throw if manual setup enabled and not called by…
k-apol Jun 14, 2025
ffbcfa7
Merge remote-tracking branch 'origin/KAFKA-10357-Accidental-Repartiti…
k-apol Jun 14, 2025
fed9c39
Merge branch 'apache:trunk' into KAFKA-10357-Accidental-Repartition-H…
k-apol Jun 24, 2025
6a9d6f1
Add formatting updates - empty lines between blocks, remove uneccessa…
k-apol Jun 24, 2025
168e9c9
formatting updates - remove excess indent
k-apol Jun 24, 2025
26e26b1
formatting - missing blank line
k-apol Jun 24, 2025
25a07e9
Code style update: remove unneccessary "this" references in InternalT…
k-apol Jun 24, 2025
01debbc
Update logical check to throw once topic creation starts. See: https:…
k-apol Jun 24, 2025
72c6d51
Update tests to match new init structure
k-apol Jun 25, 2025
2a90edc
decouple logical checks - these should be separate questions, not an …
k-apol Jun 25, 2025
b41b361
remove unnecessary this ref
k-apol Jun 25, 2025
44c888a
remove unnecessary this ref
k-apol Jun 25, 2025
8cb778a
formatting update
k-apol Jun 25, 2025
5eca29a
refactor away from state-based initialization check, rather provide a…
k-apol Jun 25, 2025
b11bb29
update makeReady call from init method
k-apol Jun 25, 2025
782d36c
formatting update
k-apol Jun 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.MisconfiguredInternalTopicException;
import org.apache.kafka.streams.errors.MissingInternalTopicsException;
import org.apache.kafka.streams.errors.InternalTopicsAlreadySetupException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
Expand All @@ -61,6 +65,9 @@
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
Expand All @@ -69,6 +76,7 @@
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.processor.internals.InternalTopicManager.ValidationResult;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.QueryConfig;
Expand Down Expand Up @@ -111,6 +119,7 @@
import java.util.stream.Collectors;

import static org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_INIT_TIMEOUT_MS;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
Expand Down Expand Up @@ -293,6 +302,168 @@ public boolean isValidTransition(final State newState) {
private final Object stateLock = new Object();
protected volatile State state = State.CREATED;

/**
* Initializes broker-side state.
*
* @throws MissingSourceTopicException if a source topic is missing
* @throws MissingInternalTopicsException if some but not all of the internal topics are missing
* @throws MisconfiguredInternalTopicException if an internal topics is misconfigured
* @throws InternalTopicsAlreadySetupException if all internal topics are already setup
*/
public void init() {
this.init(DEFAULT_INIT_TIMEOUT_MS);
}

/**
* Initializes broker-side state.
*
* @throws MissingSourceTopicException if a source topic is missing
* @throws MissingInternalTopicsException if some but not all of the internal topics are missing
* @throws MisconfiguredInternalTopicException if an internal topics is misconfigured
* @throws InternalTopicsAlreadySetupException if all internal topics are already setup
* @throws TimeoutException if initialization exceeds the given timeout
*/

public void init(final Duration timeout) {
final InitParameters initParameters = InitParameters.initParameters();
initParameters.setTimeout(timeout);

this.doInit(InitParameters.initParameters());
}

/**
* Initializes broker-side state.
*
* This methods takes parameters that specify which internal topics to setup if some
* but not all of them are absent.
*
* @throws MissingSourceTopicException if a source topic is missing
* @throws MissingInternalTopicsException if some but not all of the internal topics are missing
* and the given initialization parameters do not specify to setup them
* @throws MisconfiguredInternalTopicException if an internal topics is misconfigured
* @throws InternalTopicsAlreadySetupException if all internal topics are already setup
*/

public void init(final InitParameters initParameters) {
this.doInit(initParameters);
}

/**
* Initializes broker-side state.
*
* This methods takes parameters that specify which internal topics to setup if some
* but not all of them are absent.
*
* @throws MissingSourceTopicException if a source topic is missing
* @throws MissingInternalTopicsException if some but not all of the internal topics are missing
* and the given initialization parameters do not specify to setup them
* @throws MisconfiguredInternalTopicException if an internal topics is misconfigured
* @throws InternalTopicsAlreadySetupException if all internal topics are already setup
* @throws TimeoutException if initialization exceeds the given timeout
*/
public void init(final InitParameters initParameters, final Duration timeout) {
initParameters.enableTimeout();
initParameters.setTimeout(timeout);

this.doInit(initParameters);
}

private void doInit(final InitParameters initParameters) {
final InternalTopicManager internalTopicManager = new InternalTopicManager(time, adminClient, applicationConfigs);
if (initParameters.hasTimeoutEnabled()) {
internalTopicManager.setInitTimeout(initParameters.getTimeout());
}

final Map<String, InternalTopicConfig> allInternalTopics = new HashMap<>();
final Set<String> allSourceTopics = new HashSet<>();
for (final Map<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> subtopologyMap : topologyMetadata.topologyToSubtopologyTopicsInfoMap().values()) {
for (final InternalTopologyBuilder.TopicsInfo topicsInfo : subtopologyMap.values()) {
allInternalTopics.putAll(topicsInfo.stateChangelogTopics);
allInternalTopics.putAll(topicsInfo.repartitionSourceTopics);
allSourceTopics.addAll(topicsInfo.sourceTopics);
}
}
try {
final ValidationResult validationResult = internalTopicManager.validate(allInternalTopics); // can throw timeout

final boolean noInternalTopicsExist = allInternalTopics.keySet() == validationResult.missingTopics();
final boolean internalTopicsMisconfigured = !validationResult.misconfigurationsForTopics().isEmpty();
final boolean allInternalTopicsExist = validationResult.missingTopics().isEmpty();
final boolean missingSourceTopics = !Collections.disjoint(validationResult.missingTopics(), allSourceTopics);

if (internalTopicsMisconfigured) {
throw new MisconfiguredInternalTopicException("Misconfigured Internal Topics: " + validationResult.misconfigurationsForTopics());
}
if (missingSourceTopics) {
allSourceTopics.retainAll(validationResult.missingTopics());
throw new MissingSourceTopicException("Missing source topics: " + allSourceTopics);
}
if (noInternalTopicsExist) {
internalTopicManager.setup(allInternalTopics);
} else if (allInternalTopicsExist) {
throw new InternalTopicsAlreadySetupException("All internal topics have already been setup");
} else {
if (initParameters.setupInternalTopicsIfIncompleteEnabled()) {
final Map<String, InternalTopicConfig> topicsToCreate = new HashMap<>();
for (final String missingTopic : validationResult.missingTopics()) {
topicsToCreate.put(missingTopic, allInternalTopics.get(missingTopic));
}
internalTopicManager.makeReady(topicsToCreate);
} else {
throw new MissingInternalTopicsException("Missing Internal Topics: ", new ArrayList<>(validationResult.missingTopics()));
}
}
} catch (final TimeoutException timeoutException) {
throw new TimeoutException(timeoutException.getMessage(), timeoutException);
} catch (final StreamsException streamsException) {
throw new StreamsException(streamsException.getMessage(), streamsException);
}
}

public static class InitParameters {
private boolean timeoutEnabled;
private Duration timeout;
private final boolean setupInternalTopicsIfIncomplete;

private InitParameters(final boolean setupInternalTopicsIfIncomplete) {
this.setupInternalTopicsIfIncomplete = setupInternalTopicsIfIncomplete;
}

// Default: don't create missing topics if only some are missing
public static InitParameters initParameters() {
return new InitParameters(false);
}

public InitParameters enableSetupInternalTopicsIfIncomplete() {
return new InitParameters(true);
}

public InitParameters disableSetupInternalTopicsIfIncomplete() {
return new InitParameters(false);
}

public boolean setupInternalTopicsIfIncompleteEnabled() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the KIP does specify this getter, I am wondering why? We usually don't use getters on the main config-classes, but use sub-classes. Might be good to re-read the KIP discussion (and if need be, update the KIP to "fix" this)

return setupInternalTopicsIfIncomplete;
}

public final void enableTimeout() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was not defined on the KIP -- so we should not add it -- or we need to update the KIP.

Looking into KIP-1092 and KIP-1153 which introduce CloseOptions which also takes a timeout, it might make sense to update the KIP to include the timeout in InitParameters and remove the Duration timeout parameter and corresponding init() overloads? @cadonna WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I will update as this and the other timeout-related code as conversation evolves. There may be a better way to approach handling a timeout here.

this.timeoutEnabled = true;
}

public final boolean hasTimeoutEnabled() {
return timeoutEnabled;
}

public final void setTimeout(final Duration timeout) {
this.timeout = timeout;
}

public final Duration getTimeout() {
return this.timeout;
}

}

private boolean waitOnStates(final long waitMs, final State... targetStates) {
final Set<State> targetStateSet = Set.of(targetStates);
final long begin = time.milliseconds();
Expand Down
15 changes: 15 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public class StreamsConfig extends AbstractConfig {

public static final long MAX_TASK_IDLE_MS_DISABLED = -1;

public static final Duration DEFAULT_INIT_TIMEOUT_MS = Duration.ofMillis(30000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should add this here, as it would "leak" into public API. Not even sure if we need a variable for it at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am marking this down along with the other concerns surrounding the timeout

// We impose these limitations because client tags are encoded into the subscription info,
// which is part of the group metadata message that is persisted into the internal topic.
public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE = 5;
Expand Down Expand Up @@ -885,6 +886,14 @@ public class StreamsConfig extends AbstractConfig {
ProducerConfig.TRANSACTIONAL_ID_CONFIG
};

public static final String INTERNAL_TOPIC_SETUP_CONFIG = "internal.topics.setup";
public static final String INTERNAL_TOPIC_SETUP_AUTOMATIC = "automatic";
public static final String INTERNAL_TOPIC_SETUP_MANUAL = "manual";
private static final String INTERNAL_TOPIC_SETUP_DOC =
"Configures how internal topics (e.g., repartition or changelog topics) should be created. " +
"Set to 'automatic' to allow internal topics to be created during a rebalance (default). " +
"Set to 'manual' to disable automatic creation. Users must call KafkaStreams#init() instead.";

static {
CONFIG = new ConfigDef()

Expand Down Expand Up @@ -1012,6 +1021,12 @@ public class StreamsConfig extends AbstractConfig {
LogAndFailProcessingExceptionHandler.class.getName(),
Importance.MEDIUM,
PROCESSING_EXCEPTION_HANDLER_CLASS_DOC)
.define(INTERNAL_TOPIC_SETUP_CONFIG,
ConfigDef.Type.STRING,
INTERNAL_TOPIC_SETUP_AUTOMATIC,
ConfigDef.ValidString.in(INTERNAL_TOPIC_SETUP_AUTOMATIC, INTERNAL_TOPIC_SETUP_MANUAL),
Importance.MEDIUM,
INTERNAL_TOPIC_SETUP_DOC)
.define(PROCESSING_GUARANTEE_CONFIG,
Type.STRING,
AT_LEAST_ONCE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;

public class InternalTopicsAlreadySetupException extends StreamsException {

private static final long serialVersionUID = 1L;

public InternalTopicsAlreadySetupException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;

public class MisconfiguredInternalTopicException extends StreamsException {

private static final long serialVersionUID = 1L;

public MisconfiguredInternalTopicException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.errors;

import java.util.List;

public class MissingInternalTopicsException extends StreamsException {

private static final long serialVersionUID = 1L;

private final List<String> topics;

/**
* Constructs a new MissingInternalTopicsException.
*
* @param message The detail message
* @param topics the list of missing internal topic names
*/
public MissingInternalTopicsException(final String message, final List<String> topics) {
super(message);
this.topics = topics;
}

/**
* Returns the list of missing internal topics that caused the exception.
*/
public List<String> topics() {
return topics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,4 @@ public Set<TaskId> statefulTaskIds() {
public Map<TaskId, Set<TopicPartition>> changelogPartionsForTask() {
return Collections.unmodifiableMap(changelogPartitionsForStatefulTask);
}
}
}
Loading
Loading