-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-10357: Add explicit internal topic initialization for Kafka Streams #19913
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-10357: Add explicit internal topic initialization for Kafka Streams #19913
Conversation
Add timeout handler method to internalTopicManager to handle both init timeouts and retry timeouts Cleanup ChangelogTopics
Seems there is some style errors: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made an high-level initial pass.
It would be good to either add a new integration test, or re-use an existing one to test this feature end-to-end.
@@ -173,6 +173,7 @@ public class StreamsConfig extends AbstractConfig { | |||
|
|||
public static final long MAX_TASK_IDLE_MS_DISABLED = -1; | |||
|
|||
public static final Duration DEFAULT_INIT_TIMEOUT_MS = Duration.ofMillis(30000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should add this here, as it would "leak" into public API. Not even sure if we need a variable for it at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am marking this down along with the other concerns surrounding the timeout
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Outdated
Show resolved
Hide resolved
return setupInternalTopicsIfIncomplete; | ||
} | ||
|
||
public final void enableTimeout() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method was not defined on the KIP -- so we should not add it -- or we need to update the KIP.
Looking into KIP-1092 and KIP-1153 which introduce CloseOptions
which also takes a timeout, it might make sense to update the KIP to include the timeout in InitParameters
and remove the Duration timeout
parameter and corresponding init()
overloads? @cadonna WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I will update as this and the other timeout-related code as conversation evolves. There may be a better way to approach handling a timeout here.
} | ||
log.info( | ||
"Topics {} could not be made ready. Will retry in {} milliseconds. Remaining time in milliseconds: {}", | ||
maybeThrowTimeoutExceptionDuringMakeReady( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you extract the code into this new helper? Seems it's only called once, so unclear to me what the benefit is compared to just inlined code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I tried adding it as inline, the code style rule for cyclomatic complexity broke. In order to make use of a timeout passed to init(final Duration timeout)
, I had to add some more logical branches to check for which kind of timeout had happened during makeReady()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. Our old friend. I see. Makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about the structor of the code, I would propose to pull the if (!topicsNotReady.isEmpty()))
check into the new help method, and rename the method to verifyAllTopicsCreated()
or something like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this idea, let me take a crack at this. Based on your other comments about the complexity of this method, I want to make sure I do not overlook similar opportunities for abstracting a bit here.
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Show resolved
Hide resolved
Update javadoc for init() Misc cleanup
@@ -569,7 +579,7 @@ public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) { | |||
} | |||
} | |||
log.debug("Completed validating internal topics and created {}", newlyCreatedTopics); | |||
|
|||
this.isInitializing = false; | |||
return newlyCreatedTopics; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made this change to avoid triggering a setup with subsequent calls
@@ -90,6 +92,8 @@ public InternalTopicManager(final Time time, | |||
final StreamsConfig streamsConfig) { | |||
this.time = time; | |||
this.adminClient = adminClient; | |||
this.isManualInternalTopicConfig = streamsConfig.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG) | |||
.equals(StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may be an opportunity to use the InitParameters class here, but this approach seemed a bit cleaner to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to "flip" the check:
this.isManualInternalTopicConfig = StreamsConfig.INTERNAL_TOPIC_SETUP_MANUAL.equals(
streamsConfig.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG)
);
The advantage is, that we know that INTERNAL_TOPIC_SETUP_MANUAL
is never null
, so there no risk of a NPE. On the other hand, if streamsConfig.getString(StreamsConfig.INTERNAL_TOPIC_SETUP_CONFIG)
would return null
(what we still guard against, but this guard is weaker) we cannot call equals()
on the null
reference and crash.
As a rule of thumb: I alway prefer to use <constant>.equals(<variable>)
instead of <variable>.equals(<constant>)
to guard against NPE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the way you wrote this does functionally the same thing, but I'm introducing some risk that can be avoided by just changing the order?
I think I understand, the constant is always going to be a reference to a string value of some kind, even if we change the string later. But it's possible that if someone isn't using this feature, or it's not defaulted etc. that a crash is caused without good reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the way you wrote this does functionally the same thing, but I'm introducing some risk that can be avoided by just changing the order?
Correct.
But it's possible that if someone isn't using this feature, or it's not defaulted etc. that a crash is caused without good reason?
Yes. For this particular case, it does not matter too much, but in general, there is always a change that <variable> == null
so you it's just best practice to not use <variable>.equals(...)
w/o a null
check for <variable>
(by flipping to <constant>.equals(...)
we don't need the null
check and are safe.)
One more side comments: if we want to reduce the size of this PR, we could also do a separate PR that only adds the new exception classes. It would just add some "dead code" until we use them, so it's zero risk. |
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Outdated
Show resolved
Hide resolved
|
||
private final short replicationFactor; | ||
private final long windowChangeLogAdditionalRetention; | ||
private final long retryBackOffMs; | ||
private final long retryTimeoutMs; | ||
private Duration initTimeout; | ||
private boolean isInitializing = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I understand the semantics of this variable and why we need it. Can you explain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem I am trying to solve with the initTimeout variable (and most of the related code) is that the InternalTopicManager
, where most of the work happens, has it's own timeout exception that it can throw. I wanted a way to differentiate between the existing case and the case where the user passes a timeout into one of the overloads.
Since the bulk of the work done by the init() method actually happens on the InternalTopicManager
. It made sense to me, if the user wants to call one of the timeout overloads for the init constructor, to pass that timeout into the InternalTopicManager.
makeReady()
has it's own paths where a timeout exception can be thrown (the processing takes longer than the configured MAX_POLL_INTERVAL_MS_CONFIG)
, I took this approach to be able to let the method throw a timeout exception in both cases. There may be a better way to do this, I also thought of doing something similar to Javascript's promise.race command.
The current approach adds a lot of other surrounding code, and does not accurately represent the timeout a user would pass anyway. It's not accounting for any time that the validation takes before makeReady
might be called. How would you approach it @mjsax ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem I am trying to solve with the isInitializing flag is that, as I understand from the KIP, we want to prevent makeReady from being invoked by anything besides init when configured to do so. My idea was to set this flag is set to true when the init method is used to invoke makeReady
, and makeReady
turns it off at the end of it's work. This way, we can stop topics from being created if makeReady
is called explicitly, instead of via init
when applicable. Currently, it's tied to the timeout setting that also looks like it's going to be refactored somehow. There are other ways to stop topics from being created conditionally, and this was the simplest approach I could think of.
This is where I currently try to decide whether or not topic creation is valid, just before we actually start creating them on line 488
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Might be simpler to just add a boolean
parameter to makeReady
and pass in true
from init()
and false
from the other calls to it?
In the end, we don't need to track this as "state" (ie, member variable on InternalTopicManager
).
throw new MissingInternalTopicsException("Internal topic configuration set to MANUAL. \n" + | ||
"You must call init() to setup internal topics.", new ArrayList<>(topicsNotReady)); | ||
} | ||
|
||
newlyCreatedTopics.addAll(topicsNotReady); | ||
|
||
if (!topicsNotReady.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To reduce complexity of this class, we should also extract this block into a sub-method (maybe in a separate refactoring PR to not convolute PRs.) called createTopics()
or similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a good idea, we are making this method work really hard. He could use some pals to help him out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax I have a small PR open to add just the exceptions, happy to open another small one to refactor this method and make it easier to work with as part of this PR. Let me know what you think, can always do this after as well. Trying to avoid as much refactoring as possible in this PR, despite that there is a lot needed and still a bit more to come.
I don't mind doing this - if you think it's a good idea, it would be less painful to do it now than later. I feel that the current state of this PR is fairly volatile. You would have better judgment than I would on what constitutes a large PR for Kafka Update 6/18: @mjsax I've just opened a PR that does this. Maybe I can follow it with the refactoring changes we've outlined in the InternalTopicManager? Many of the changes there are tied to the new init method, but I could still 'break out' parts of the code to make it easier to integrate with init. |
… init method. Add descriptive helper to throw on invalid invoke
… init method. Add descriptive helper to throw on invalid invoke
… init method. Add descriptive helper to throw on invalid invoke Update logical check for isManualInternalTopicConfig to avoid NPE
…on-Handling' into KAFKA-10357-Accidental-Repartition-Handling # Conflicts: # streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Seems there is still a few smaller comments to address. Just let me know when you need another round of review. |
…ry line break in ChangelogTOpics.java
…n overload to do this ad-hoc - see apache#19913 (comment)
This PR implements the init() method described in KIP-698 to allow users
to validate or prepare internal Kafka Streams topics before starting the
application. The changes include:
Introduced a public init() method on KafkaStreams, which performs
internal topic validation and optionally sets up missing topics.
Added support for a new configuration option, internal.topics.setup,
with 'automatic' and 'manual' as options to control default internal
topic management.
Updated InternalTopicManager to expose validate, and allow for a
separate initialization timeout.