Skip to content

KAFKA-10357: Add explicit internal topic initialization for Kafka Streams #19913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 38 commits into
base: trunk
Choose a base branch
from

Conversation

k-apol
Copy link

@k-apol k-apol commented Jun 6, 2025

This PR implements the init() method described in KIP-698 to allow users
to validate or prepare internal Kafka Streams topics before starting the
application. The changes include:

Introduced a public init() method on KafkaStreams, which performs
internal topic validation and optionally sets up missing topics.

Added support for a new configuration option, internal.topics.setup,
with 'automatic' and 'manual' as options to control default internal
topic management.

Updated InternalTopicManager to expose validate, and allow for a
separate initialization timeout.

@github-actions github-actions bot added triage PRs from the community streams labels Jun 6, 2025
@mjsax mjsax added ci-approved and removed triage PRs from the community labels Jun 11, 2025
@mjsax
Copy link
Member

mjsax commented Jun 11, 2025

Seems there is some style errors: Execution failed for task ':streams:spotlessJavaCheck'.

@mjsax mjsax added the kip Requires or implements a KIP label Jun 11, 2025
@mjsax mjsax changed the title KAFKA-10357 accidental repartition handling KAFKA-10357: Add explicit internal topic initialization for Kafka Streams Jun 11, 2025
Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made an high-level initial pass.

It would be good to either add a new integration test, or re-use an existing one to test this feature end-to-end.

@@ -173,6 +173,7 @@ public class StreamsConfig extends AbstractConfig {

public static final long MAX_TASK_IDLE_MS_DISABLED = -1;

public static final Duration DEFAULT_INIT_TIMEOUT_MS = Duration.ofMillis(30000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should add this here, as it would "leak" into public API. Not even sure if we need a variable for it at all?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am marking this down along with the other concerns surrounding the timeout

return setupInternalTopicsIfIncomplete;
}

public final void enableTimeout() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was not defined on the KIP -- so we should not add it -- or we need to update the KIP.

Looking into KIP-1092 and KIP-1153 which introduce CloseOptions which also takes a timeout, it might make sense to update the KIP to include the timeout in InitParameters and remove the Duration timeout parameter and corresponding init() overloads? @cadonna WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I will update as this and the other timeout-related code as conversation evolves. There may be a better way to approach handling a timeout here.

}
log.info(
"Topics {} could not be made ready. Will retry in {} milliseconds. Remaining time in milliseconds: {}",
maybeThrowTimeoutExceptionDuringMakeReady(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you extract the code into this new helper? Seems it's only called once, so unclear to me what the benefit is compared to just inlined code?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I tried adding it as inline, the code style rule for cyclomatic complexity broke. In order to make use of a timeout passed to init(final Duration timeout), I had to add some more logical branches to check for which kind of timeout had happened during makeReady()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. Our old friend. I see. Makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about the structor of the code, I would propose to pull the if (!topicsNotReady.isEmpty())) check into the new help method, and rename the method to verifyAllTopicsCreated() or something like this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea, let me take a crack at this. Based on your other comments about the complexity of this method, I want to make sure I do not overlook similar opportunities for abstracting a bit here.

@@ -569,7 +579,7 @@ public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
}
}
log.debug("Completed validating internal topics and created {}", newlyCreatedTopics);

this.isInitializing = false;
return newlyCreatedTopics;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made this change to avoid triggering a setup with subsequent calls

@@ -90,6 +92,8 @@ public InternalTopicManager(final Time time,
final StreamsConfig streamsConfig) {
this.time = time;
this.adminClient = adminClient;
this.isManualInternalTopicConfig = streamsConfig.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG)
.equals(StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be an opportunity to use the InitParameters class here, but this approach seemed a bit cleaner to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to "flip" the check:

this.isManualInternalTopicConfig = StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL.equals(
                                       streamsConfig.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG)
                                   );

The advantage is, that we know that INTERNAL_TOPIC_SETUP_MANUAL is never null, so there no risk of a NPE. On the other hand, if streamsConfig.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG) would return null (what we still guard against, but this guard is weaker) we cannot call equals() on the null reference and crash.

As a rule of thumb: I alway prefer to use <constant>.equals(<variable>) instead of <variable>.equals(<constant>) to guard against NPE.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the way you wrote this does functionally the same thing, but I'm introducing some risk that can be avoided by just changing the order?

I think I understand, the constant is always going to be a reference to a string value of some kind, even if we change the string later. But it's possible that if someone isn't using this feature, or it's not defaulted etc. that a crash is caused without good reason?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the way you wrote this does functionally the same thing, but I'm introducing some risk that can be avoided by just changing the order?

Correct.

But it's possible that if someone isn't using this feature, or it's not defaulted etc. that a crash is caused without good reason?

Yes. For this particular case, it does not matter too much, but in general, there is always a change that <variable> == null so you it's just best practice to not use <variable>.equals(...) w/o a null check for <variable> (by flipping to <constant>.equals(...) we don't need the null check and are safe.)

@mjsax
Copy link
Member

mjsax commented Jun 13, 2025

One more side comments: if we want to reduce the size of this PR, we could also do a separate PR that only adds the new exception classes. It would just add some "dead code" until we use them, so it's zero risk.


private final short replicationFactor;
private final long windowChangeLogAdditionalRetention;
private final long retryBackOffMs;
private final long retryTimeoutMs;
private Duration initTimeout;
private boolean isInitializing = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand the semantics of this variable and why we need it. Can you explain?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I am trying to solve with the initTimeout variable (and most of the related code) is that the InternalTopicManager, where most of the work happens, has it's own timeout exception that it can throw. I wanted a way to differentiate between the existing case and the case where the user passes a timeout into one of the overloads.

Since the bulk of the work done by the init() method actually happens on the InternalTopicManager. It made sense to me, if the user wants to call one of the timeout overloads for the init constructor, to pass that timeout into the InternalTopicManager. makeReady() has it's own paths where a timeout exception can be thrown (the processing takes longer than the configured MAX_POLL_INTERVAL_MS_CONFIG), I took this approach to be able to let the method throw a timeout exception in both cases. There may be a better way to do this, I also thought of doing something similar to Javascript's promise.race command.

The current approach adds a lot of other surrounding code, and does not accurately represent the timeout a user would pass anyway. It's not accounting for any time that the validation takes before makeReady might be called. How would you approach it @mjsax ?

Copy link
Author

@k-apol k-apol Jun 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I am trying to solve with the isInitializing flag is that, as I understand from the KIP, we want to prevent makeReady from being invoked by anything besides init when configured to do so. My idea was to set this flag is set to true when the init method is used to invoke makeReady, and makeReady turns it off at the end of it's work. This way, we can stop topics from being created if makeReady is called explicitly, instead of via init when applicable. Currently, it's tied to the timeout setting that also looks like it's going to be refactored somehow. There are other ways to stop topics from being created conditionally, and this was the simplest approach I could think of.

This is where I currently try to decide whether or not topic creation is valid, just before we actually start creating them on line 488
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Might be simpler to just add a boolean parameter to makeReady and pass in true from init() and false from the other calls to it?

In the end, we don't need to track this as "state" (ie, member variable on InternalTopicManager).

throw new MissingInternalTopicsException("Internal topic configuration set to MANUAL. \n" +
"You must call init() to setup internal topics.", new ArrayList<>(topicsNotReady));
}

newlyCreatedTopics.addAll(topicsNotReady);

if (!topicsNotReady.isEmpty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reduce complexity of this class, we should also extract this block into a sub-method (maybe in a separate refactoring PR to not convolute PRs.) called createTopics() or similar?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a good idea, we are making this method work really hard. He could use some pals to help him out

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax I have a small PR open to add just the exceptions, happy to open another small one to refactor this method and make it easier to work with as part of this PR. Let me know what you think, can always do this after as well. Trying to avoid as much refactoring as possible in this PR, despite that there is a lot needed and still a bit more to come.

@k-apol
Copy link
Author

k-apol commented Jun 14, 2025

One more side comments: if we want to reduce the size of this PR, we could also do a separate PR that only adds the new exception classes. It would just add some "dead code" until we use them, so it's zero risk.

I don't mind doing this - if you think it's a good idea, it would be less painful to do it now than later. I feel that the current state of this PR is fairly volatile. You would have better judgment than I would on what constitutes a large PR for Kafka

Update 6/18: @mjsax I've just opened a PR that does this. Maybe I can follow it with the refactoring changes we've outlined in the InternalTopicManager? Many of the changes there are tied to the new init method, but I could still 'break out' parts of the code to make it easier to integrate with init.

k-apol added 4 commits June 14, 2025 08:33
… init method.

Add descriptive helper to throw on invalid invoke
… init method.

Add descriptive helper to throw on invalid invoke
… init method.

Add descriptive helper to throw on invalid invoke

Update logical check for isManualInternalTopicConfig to avoid NPE
…on-Handling' into KAFKA-10357-Accidental-Repartition-Handling

# Conflicts:
#	streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@mjsax
Copy link
Member

mjsax commented Jun 24, 2025

Seems there is still a few smaller comments to address.

Just let me know when you need another round of review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved kip Requires or implements a KIP streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants