kafka: release sarama clients on cleanup and init failures#4437
Conversation
(cherry picked from commit 6afd7d8)
📝 WalkthroughWalkthroughRefactors Kafka sink cleanup to use small sarama interfaces, ensures clients are closed on failure paths, makes Close() nil-safe and ordering-explicit for admin/client/producer, adds unit tests for Close behavior, and expands goleak ignore list for long‑lived Sarama and systemd/dbus routines. (46 words) Changes
Sequence Diagram(s)sequenceDiagram
participant CF as Changefeed (owner)
participant Wrapper as saramaAdminClient / saramaSyncProducer
participant Admin as saramaClusterAdmin
participant Client as saramaClient / saramaSyncClient
participant Producer as saramaSyncProducerClient
rect rgba(200,230,255,0.5)
CF->>Wrapper: Close()
end
alt admin exists
Wrapper->>Admin: Close()
Admin-->>Wrapper: success
Wrapper-->>CF: return (done)
else admin close fails or admin nil
Wrapper->>Admin: Close() (if non-nil)
Admin-->>Wrapper: error / nil
Wrapper->>Client: Close()
Client-->>Wrapper: success/error
alt producer present (sync producer case)
Wrapper->>Producer: Close()
Producer-->>Wrapper: success/error
end
Wrapper-->>CF: return
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment Tip CodeRabbit can enforce grammar and style rules using `languagetool`.Configure the |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
pkg/sink/kafka/admin_test.go (1)
24-64: Assert the shutdown order, not just that both flags flipped.The production fix depends on closing
adminbeforeclient. This test still passes if a future refactor swaps those calls, so it won't catch the most important regression. Consider recording call order in the fakes and assertingadmincloses beforeclient. As per coding guidelines,**/*_test.go: "favor deterministic tests and usetestify/require"Also applies to: 66-85
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sink/kafka/admin_test.go` around lines 24 - 64, The test currently only checks that testSaramaClient.closed and testSaramaClusterAdmin.closed became true; update the fakes (testSaramaClient.Close and testSaramaClusterAdmin.Close) to append a timestamped or sequential marker to a shared slice (e.g., callOrder) when Close is called, then in the test assert using testify/require that the callOrder shows admin's Close occurred before client's Close (i.e., "admin" appears earlier than "client"); ensure you protect the shared slice with a mutex if needed and use require.Equal/require.True to make the order assertion deterministic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/leakutil/leak_helper.go`:
- Around line 36-37: Remove the two Sarama goroutines
("github.com/IBM/sarama.(*client).backgroundMetadataUpdater" and
"github.com/IBM/sarama.(*Broker).responseReceiver") from the shared defaultOpts
allowlist in leak_helper.go and instead add those goleak.IgnoreTopFunction
entries only in the specific tests that intentionally tolerate async
producer/client shutdown; update the tests that currently expect those leaks to
append these IgnoreTopFunction options when calling goleak.VerifyNone (or the
test-specific leak verification helper) so the global defaultOpts no longer
hides the regression caused by not closing the Sarama client.
In `@pkg/sink/kafka/sarama_factory.go`:
- Around line 90-94: The cleanup path after sarama.NewClusterAdminFromClient
failure currently calls client.Close() and discards any error; capture the
result of client.Close() (e.g., closeErr) and log that closeErr (using the
package logger) while still returning the original admin-construction error
(err); update the block in sarama_factory.go that handles the
NewClusterAdminFromClient failure so it logs the client close failure for
visibility but preserves returning errors.Trace(err).
---
Nitpick comments:
In `@pkg/sink/kafka/admin_test.go`:
- Around line 24-64: The test currently only checks that testSaramaClient.closed
and testSaramaClusterAdmin.closed became true; update the fakes
(testSaramaClient.Close and testSaramaClusterAdmin.Close) to append a
timestamped or sequential marker to a shared slice (e.g., callOrder) when Close
is called, then in the test assert using testify/require that the callOrder
shows admin's Close occurred before client's Close (i.e., "admin" appears
earlier than "client"); ensure you protect the shared slice with a mutex if
needed and use require.Equal/require.True to make the order assertion
deterministic.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: cd98f55d-1708-4dd6-bb25-978b89f59e75
📒 Files selected for processing (4)
pkg/leakutil/leak_helper.gopkg/sink/kafka/admin.gopkg/sink/kafka/admin_test.gopkg/sink/kafka/sarama_factory.go
| goleak.IgnoreTopFunction("github.com/IBM/sarama.(*client).backgroundMetadataUpdater"), | ||
| goleak.IgnoreTopFunction("github.com/IBM/sarama.(*Broker).responseReceiver"), |
There was a problem hiding this comment.
Don't add these Sarama goroutines to the global goleak allowlist.
These are the exact goroutines leaked when the admin wrapper forgets to close its underlying Sarama client. Putting them in defaultOpts will make the suite ignore the regression this PR is trying to prevent. Please scope these ignores to the few tests that intentionally tolerate async producer shutdown instead of the shared default list.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/leakutil/leak_helper.go` around lines 36 - 37, Remove the two Sarama
goroutines ("github.com/IBM/sarama.(*client).backgroundMetadataUpdater" and
"github.com/IBM/sarama.(*Broker).responseReceiver") from the shared defaultOpts
allowlist in leak_helper.go and instead add those goleak.IgnoreTopFunction
entries only in the specific tests that intentionally tolerate async
producer/client shutdown; update the tests that currently expect those leaks to
append these IgnoreTopFunction options when calling goleak.VerifyNone (or the
test-specific leak verification helper) so the global defaultOpts no longer
hides the regression caused by not closing the Sarama client.
| if err != nil { | ||
| // `sarama.NewClusterAdminFromClient` does not take ownership of the client, | ||
| // so we need to close it on failures to avoid leaking background goroutines. | ||
| _ = client.Close() | ||
| return nil, errors.Trace(err) |
There was a problem hiding this comment.
Don't swallow the client-close failure on this cleanup path.
If admin creation fails and client.Close() also fails, we lose the only signal that the leak-prevention cleanup did not complete. Please log closeErr here while still returning the original admin-construction error.
Suggested change
if err != nil {
// `sarama.NewClusterAdminFromClient` does not take ownership of the client,
// so we need to close it on failures to avoid leaking background goroutines.
- _ = client.Close()
+ if closeErr := client.Close(); closeErr != nil {
+ log.Warn("close kafka client after admin creation failed",
+ zap.Stringer("changefeedID", changefeedID),
+ zap.Error(closeErr))
+ }
return nil, errors.Trace(err)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if err != nil { | |
| // `sarama.NewClusterAdminFromClient` does not take ownership of the client, | |
| // so we need to close it on failures to avoid leaking background goroutines. | |
| _ = client.Close() | |
| return nil, errors.Trace(err) | |
| if err != nil { | |
| // `sarama.NewClusterAdminFromClient` does not take ownership of the client, | |
| // so we need to close it on failures to avoid leaking background goroutines. | |
| if closeErr := client.Close(); closeErr != nil { | |
| log.Warn("close kafka client after admin creation failed", | |
| zap.Stringer("changefeedID", changefeedID), | |
| zap.Error(closeErr)) | |
| } | |
| return nil, errors.Trace(err) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/sink/kafka/sarama_factory.go` around lines 90 - 94, The cleanup path
after sarama.NewClusterAdminFromClient failure currently calls client.Close()
and discards any error; capture the result of client.Close() (e.g., closeErr)
and log that closeErr (using the package logger) while still returning the
original admin-construction error (err); update the block in sarama_factory.go
that handles the NewClusterAdminFromClient failure so it logs the client close
failure for visibility but preserves returning errors.Trace(err).
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical resource leakage issue within the Kafka admin client, particularly when the sink retries against missing topics. By ensuring that both the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request effectively addresses a critical resource leak in the Kafka admin client by ensuring that underlying Sarama client resources are properly released, even when the sarama.NewClusterAdminFromClient call fails. The introduction of interfaces for saramaClient and saramaClusterAdmin significantly improves the testability and maintainability of the saramaAdminClient. Additionally, the Close() method has been made more robust by handling nil fields and ensuring a correct shutdown sequence. The comprehensive unit tests added in admin_test.go validate these improvements, covering both successful closure and nil-tolerance scenarios. The updates to the goleak ignore list also help in reducing false positives during leak detection, contributing to more stable tests.
| if err != nil { | ||
| // `sarama.NewClusterAdminFromClient` does not take ownership of the client, | ||
| // so we need to close it on failures to avoid leaking background goroutines. | ||
| _ = client.Close() |
There was a problem hiding this comment.
| admin saramaClusterAdmin | ||
| } | ||
|
|
||
| type saramaClient interface { |
There was a problem hiding this comment.
these 2 interface looks similar to the admin interface.
There was a problem hiding this comment.
It's not suggested to test on the sarama.
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| type testSaramaClient struct { |
There was a problem hiding this comment.
You should use gomock to generate a mocked admin client for the test purpose.
| // Close admin first (may send RPCs), then close the underlying client to stop | ||
| // sarama background goroutines and release related caches/metrics. | ||
| if a.admin != nil { | ||
| if err := a.admin.Close(); err != nil { |
There was a problem hiding this comment.
this call should already close the admin underline client.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
pkg/sink/kafka/sarama_factory.go (1)
90-94:⚠️ Potential issue | 🟡 MinorLog the client close error for observability.
The cleanup path correctly closes the client to prevent goroutine leaks, but discarding the close error (
_ = client.Close()) hides potential cleanup failures. As flagged in a previous review, logging this error improves observability without changing the returned error.Suggested change
if err != nil { // `sarama.NewClusterAdminFromClient` does not take ownership of the client, // so we need to close it on failures to avoid leaking background goroutines. - _ = client.Close() + if closeErr := client.Close(); closeErr != nil { + log.Warn("close kafka client after admin creation failed", + zap.Stringer("changefeedID", changefeedID), + zap.Error(closeErr)) + } return nil, errors.Trace(err) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sink/kafka/sarama_factory.go` around lines 90 - 94, The cleanup path swallows client.Close() errors, reducing observability; modify the error handling after sarama.NewClusterAdminFromClient fails to capture the close error (e.g., errClose := client.Close()) and log it (using the package's logger or standard log) without altering the returned error from the function, so we still return errors.Trace(err) while recording any failure from client.Close().
🧹 Nitpick comments (1)
pkg/sink/kafka/sarama_sync_producer_test.go (1)
57-87: Consider adding tests for nil fields and double-close idempotency.The current tests cover the happy path and client-error scenario well. For completeness, consider adding:
- Test with
nilclient to verify nil-safety- Test calling
Close()twice to verify idempotent behavior via theclosedatomic flag🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/sink/kafka/sarama_sync_producer_test.go` around lines 57 - 87, Add two unit tests for saramaSyncProducer.Close: one named like TestSaramaSyncProducerCloseWithNilClient that constructs a saramaSyncProducer with producer set (and client == nil) and asserts Close() does not panic and only closes the producer (producer.closeCalls == 1, client nil safe); and another named like TestSaramaSyncProducerDoubleCloseIdempotent that calls Close() twice on a saramaSyncProducer with both client and producer set and asserts closeCalls for both client and producer increment only once (idempotent via closed atomic flag) and no panic on second call; reference saramaSyncProducer.Close, testSaramaSyncProducer, and testSyncProducerClient to locate code.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/sink/kafka/sarama_factory.go`:
- Around line 154-158: When NewAsyncProducerFromClient fails in the
sarama.NewAsyncProducerFromClient block, the code currently discards the error
returned by client.Close(); instead capture and log that close error for
consistency: call client.Close(), check its returned error and emit a log entry
(or attach it to the returned error) before returning
errors.WrapError(errors.ErrKafkaNewProducer, err); update the failing block
around sarama.NewAsyncProducerFromClient and client.Close to include this logged
close error so the close failure isn’t silently ignored.
- Around line 126-130: When NewSyncProducerFromClient fails we currently discard
client.Close() errors; change the error path in the
sarama.NewSyncProducerFromClient handling to capture the result of
client.Close(), and log that close error (with context) before returning the
wrapped ErrKafkaNewProducer using errors.WrapError. Locate the block around
sarama.NewSyncProducerFromClient and replace the silent call to client.Close()
with capturing its error (e.g., errClose := client.Close()) and emit a log entry
including errClose and context (consistent with how the admin creation path logs
close failures) prior to returning the wrapped error.
---
Duplicate comments:
In `@pkg/sink/kafka/sarama_factory.go`:
- Around line 90-94: The cleanup path swallows client.Close() errors, reducing
observability; modify the error handling after sarama.NewClusterAdminFromClient
fails to capture the close error (e.g., errClose := client.Close()) and log it
(using the package's logger or standard log) without altering the returned error
from the function, so we still return errors.Trace(err) while recording any
failure from client.Close().
---
Nitpick comments:
In `@pkg/sink/kafka/sarama_sync_producer_test.go`:
- Around line 57-87: Add two unit tests for saramaSyncProducer.Close: one named
like TestSaramaSyncProducerCloseWithNilClient that constructs a
saramaSyncProducer with producer set (and client == nil) and asserts Close()
does not panic and only closes the producer (producer.closeCalls == 1, client
nil safe); and another named like TestSaramaSyncProducerDoubleCloseIdempotent
that calls Close() twice on a saramaSyncProducer with both client and producer
set and asserts closeCalls for both client and producer increment only once
(idempotent via closed atomic flag) and no panic on second call; reference
saramaSyncProducer.Close, testSaramaSyncProducer, and testSyncProducerClient to
locate code.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 250cb26c-6326-4989-9fa1-b2156d4d4c83
📒 Files selected for processing (5)
pkg/sink/kafka/admin.gopkg/sink/kafka/admin_test.gopkg/sink/kafka/sarama_factory.gopkg/sink/kafka/sarama_sync_producer.gopkg/sink/kafka/sarama_sync_producer_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/sink/kafka/admin_test.go
| p, err := sarama.NewSyncProducerFromClient(client) | ||
| if err != nil { | ||
| _ = client.Close() | ||
| return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) | ||
| } |
There was a problem hiding this comment.
Log the client close error for consistency.
Same concern as the admin creation path: silently discarding the close error loses visibility into potential cleanup failures.
Suggested change
p, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
- _ = client.Close()
+ if closeErr := client.Close(); closeErr != nil {
+ log.Warn("close kafka client after sync producer creation failed",
+ zap.Stringer("changefeedID", f.changefeedID),
+ zap.Error(closeErr))
+ }
return nil, errors.WrapError(errors.ErrKafkaNewProducer, err)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| p, err := sarama.NewSyncProducerFromClient(client) | |
| if err != nil { | |
| _ = client.Close() | |
| return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) | |
| } | |
| p, err := sarama.NewSyncProducerFromClient(client) | |
| if err != nil { | |
| if closeErr := client.Close(); closeErr != nil { | |
| log.Warn("close kafka client after sync producer creation failed", | |
| zap.Stringer("changefeedID", f.changefeedID), | |
| zap.Error(closeErr)) | |
| } | |
| return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/sink/kafka/sarama_factory.go` around lines 126 - 130, When
NewSyncProducerFromClient fails we currently discard client.Close() errors;
change the error path in the sarama.NewSyncProducerFromClient handling to
capture the result of client.Close(), and log that close error (with context)
before returning the wrapped ErrKafkaNewProducer using errors.WrapError. Locate
the block around sarama.NewSyncProducerFromClient and replace the silent call to
client.Close() with capturing its error (e.g., errClose := client.Close()) and
emit a log entry including errClose and context (consistent with how the admin
creation path logs close failures) prior to returning the wrapped error.
| p, err := sarama.NewAsyncProducerFromClient(client) | ||
| if err != nil { | ||
| _ = client.Close() | ||
| return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) | ||
| } |
There was a problem hiding this comment.
Log the client close error for consistency.
Same pattern as other creation paths: log the close error rather than discarding it.
Suggested change
p, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
- _ = client.Close()
+ if closeErr := client.Close(); closeErr != nil {
+ log.Warn("close kafka client after async producer creation failed",
+ zap.Stringer("changefeedID", f.changefeedID),
+ zap.Error(closeErr))
+ }
return nil, errors.WrapError(errors.ErrKafkaNewProducer, err)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| p, err := sarama.NewAsyncProducerFromClient(client) | |
| if err != nil { | |
| _ = client.Close() | |
| return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) | |
| } | |
| p, err := sarama.NewAsyncProducerFromClient(client) | |
| if err != nil { | |
| if closeErr := client.Close(); closeErr != nil { | |
| log.Warn("close kafka client after async producer creation failed", | |
| zap.Stringer("changefeedID", f.changefeedID), | |
| zap.Error(closeErr)) | |
| } | |
| return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/sink/kafka/sarama_factory.go` around lines 154 - 158, When
NewAsyncProducerFromClient fails in the sarama.NewAsyncProducerFromClient block,
the code currently discards the error returned by client.Close(); instead
capture and log that close error for consistency: call client.Close(), check its
returned error and emit a log entry (or attach it to the returned error) before
returning errors.WrapError(errors.ErrKafkaNewProducer, err); update the failing
block around sarama.NewAsyncProducerFromClient and client.Close to include this
logged close error so the close failure isn’t silently ignored.
| "go.uber.org/zap" | ||
| ) | ||
|
|
||
| type saramaSyncClient interface { |
There was a problem hiding this comment.
redundant interface。
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: 3AceShowHand, lidezhu The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/test all |
|
/retest |
|
In response to a cherrypick label: new pull request created to branch |
What problem does this PR solve?
Issue Number: close #4064
When a Kafka sink repeatedly enters warning/retry (for example, when the target topic is missing), TiCDC can leave Sarama clients alive across retries.
For producers created from an existing client, Sarama wraps the provided client with a
nopCloserClient. That means closing the producer alone does not release the original client owned by TiCDC. If the original client is not closed, background goroutines, broker connections, and related caches/metrics can accumulate.What is changed and how it works?
This PR tightens Sarama client cleanup in Kafka sink paths by:
sarama.NewClusterAdminFromClientfailssaramaSyncProducer.Close()sarama.NewSyncProducerFromClientfailssarama.NewAsyncProducerFromClientfailsCheck List
Tests
Unit test:
go test ./pkg/sink/kafka -run 'TestSarama(AdminClientCloseDelegatesClientCleanupToAdmin|AdminClientCloseFallsBackToClientWhenAdminIsNil|SyncProducerCloseClosesClientAndProducer|SyncProducerCloseStillClosesProducerWhenClientCloseFails)' -count=1Questions
Will it cause performance regression or break compatibility?
No. The change only makes Kafka Sarama client cleanup more complete on close and on partially initialized failure paths.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note