-
Notifications
You must be signed in to change notification settings - Fork 36
dispatchermanager: close on sink exit to avoid OOM #4087
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
dispatchermanager: close on sink exit to avoid OOM #4087
Conversation
If sink.Run exits with error, close DispatcherManager to stop dispatchers and release sink resources (e.g. sarama) instead of allowing unbounded buffering.
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @wlwilliamx, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the stability of TiCDC's Kafka sink by introducing robust error handling. Previously, an unexpectedly failing Kafka sink could lead to severe resource exhaustion, including out-of-memory errors and CPU spikes, as the system continued to buffer events. The changes ensure that upon detecting a non-recoverable sink error, the DispatcherManager gracefully shuts down, preventing further event buffering and immediately reporting the issue, thus safeguarding the system's overall health and performance. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
📝 WalkthroughWalkthroughAdded centralized exit handling for sinks in the dispatcher manager, introducing new methods to gracefully handle sink errors, report them to the maintainer via heartbeat, and trigger proactive shutdown when the sink exits unexpectedly or encounters non-cancellation errors. Changes
Sequence DiagramsequenceDiagram
participant DM as DispatcherManager
participant Sink as Sink
participant HQ as Heartbeat<br/>Queue
participant Maint as Maintainer
DM->>Sink: runSinkWithExitHandling()
activate Sink
Sink->>Sink: Run()
Sink-->>DM: returns error
deactivate Sink
alt Context Cancelled
DM->>DM: return (normal shutdown)
else Non-Nil Error
DM->>HQ: reportErrorToMaintainer()
HQ->>HQ: enqueue error message<br/>(time, node, code, msg)
HQ->>Maint: notify
DM->>DM: close manager
else No Error (Unexpected)
DM->>DM: log unexpected exit
DM->>DM: close manager
end
Estimated code review effort🎯 2 (Simple) | ⏱️ ~12 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@wlwilliamx: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request effectively addresses a potential OOM issue by ensuring the DispatcherManager closes proactively when a sink exits unexpectedly. The introduction of runSinkWithExitHandling and reportErrorToMaintainer is a solid approach to make the error handling more robust and prevent the loss of critical error information during shutdown. The new unit tests also provide good coverage for the added functionality. I have a few suggestions to further refine the implementation by removing a redundant error handling call, using a standardized timestamp format for better machine readability, and ensuring all unexpected sink exits are consistently reported as errors.
| } else { | ||
| log.Error("sink exited without error", zap.Stringer("changefeedID", e.changefeedID)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the sink exits unexpectedly without an error, the code logs an error but doesn't report it to the maintainer. This is inconsistent with the case where the sink exits with an error. An unexpected exit, even without an error, is a failure condition for the changefeed that should be reported for better observability. I suggest creating and reporting an error in this case as well.
} else {
err := errors.ErrUnexpected.GenWithStack("sink exited without error")
log.Error("sink exited unexpectedly without returning an error",
zap.Stringer("changefeedID", e.changefeedID),
zap.Error(err))
e.reportErrorToMaintainer(err)
}| // manager proactively. | ||
| func (e *DispatcherManager) runSinkWithExitHandling(ctx context.Context) { | ||
| err := e.sink.Run(ctx) | ||
| e.handleError(ctx, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to e.handleError(ctx, err) is redundant given that e.reportErrorToMaintainer(err) is called below to handle the same error. The comment for reportErrorToMaintainer correctly notes that it's a pre-emptive report to avoid losing the error due to a race with context cancellation, which is a risk with the handleError -> collectErrors path. Removing this line will simplify the logic and prevent potential double-reporting of the error.
| var message heartbeatpb.HeartBeatRequest | ||
| message.ChangefeedID = e.changefeedID.ToPB() | ||
| message.Err = &heartbeatpb.RunningError{ | ||
| Time: time.Now().String(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using time.Now().String() for timestamps is not ideal as the format is not standardized and can be difficult for other systems to parse. It's better to use a standard format like time.RFC3339Nano. Using UTC is also a best practice to avoid timezone-related issues.
Time: time.Now().UTC().Format(time.RFC3339Nano),
What problem does this PR solve?
Issue Number: close #4064
What is changed and how it works?
When the Kafka sink exits unexpectedly (for example, downstream topic missing),
DispatcherManagerpreviously kept running dispatchers and could continue buffering events into the sink path. This can lead to unbounded memory growth, GC CPU spikes, and eventually OOM.This PR closes
DispatcherManagerproactively whensink.Run(ctx)returns with a non-cancel error, which stops all dispatchers and releases sink resources (including sarama) instead of allowing further buffering. The sink error is reported to maintainer before closing to avoid being lost due to context cancellation.Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No. It improves stability by preventing runaway buffering and GC thrash after sink failures, and only changes error handling behavior when the sink goroutine exits unexpectedly.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note
Summary by CodeRabbit
Bug Fixes
Tests
✏️ Tip: You can customize this high-level summary in your review settings.