kafka(ticdc): close sarama clients on init failures (#12573)#12592
kafka(ticdc): close sarama clients on init failures (#12573)#12592ti-chi-bot wants to merge 1 commit intopingcap:release-7.5from
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
@wlwilliamx This PR has conflicts, I have hold it. |
|
@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
There was a problem hiding this comment.
Code Review
This pull request ensures Sarama clients are properly closed during initialization failures and standard cleanup, while also adding constructor seams and unit tests for heartbeat and failure scenarios. However, the code contains multiple unresolved git merge conflict markers across several files that will cause compilation errors. Additionally, the use of global variables for constructor seams introduces race conditions in parallel tests, which should be addressed to ensure deterministic test results.
| <<<<<<< HEAD | ||
| // Close shuts down the admin client. | ||
| ======= | ||
| // HeartbeatBroker sends a heartbeat to all brokers to keep the kafka connection alive. | ||
| HeartbeatBrokers() | ||
|
|
||
| // Close shuts down the admin client and releases any owned underlying client connections. | ||
| >>>>>>> 9fbde6ebeb (kafka(ticdc): close sarama clients on init failures (#12573)) |
There was a problem hiding this comment.
The file contains unresolved git merge conflict markers. This will cause compilation errors and must be resolved before merging.
// HeartbeatBrokers sends a heartbeat to all brokers to keep the kafka connection alive.
HeartbeatBrokers()
// Close shuts down the admin client and releases any owned underlying client connections.
Close()| <<<<<<< HEAD | ||
|
|
||
| p, err := sarama.NewSyncProducerFromClient(client) | ||
| ======= | ||
| p, err := newSaramaSyncProducerFromClientImpl(client) | ||
| >>>>>>> 9fbde6ebeb (kafka(ticdc): close sarama clients on init failures (#12573)) |
| <<<<<<< HEAD | ||
| ======= | ||
| stdErrors "errors" | ||
| "sync" | ||
| >>>>>>> 9fbde6ebeb (kafka(ticdc): close sarama clients on init failures (#12573)) |
| <<<<<<< HEAD | ||
| ======= |
| require.Equal(t, 1, client.closeCalls) | ||
| require.True(t, client.closed) | ||
| } | ||
| >>>>>>> 9fbde6ebeb (kafka(ticdc): close sarama clients on init failures (#12573)) |
| newSaramaClientImpl = sarama.NewClient | ||
| newSaramaClusterAdminFromClientImpl = sarama.NewClusterAdminFromClient | ||
| newSaramaSyncProducerFromClientImpl = sarama.NewSyncProducerFromClient | ||
| newSaramaAsyncProducerFromClientImpl = sarama.NewAsyncProducerFromClient |
There was a problem hiding this comment.
Using global variables for constructor seams introduces a race condition in tests. Since multiple tests in this package (e.g., TestSyncProducer, TestAsyncProducer) are marked with t.Parallel(), they may run concurrently with tests that modify these global variables (like TestSaramaFactoryAdminClientClosesClientOnAdminInitFailure), leading to non-deterministic failures. Consider passing these creators as dependencies to the factory or using a thread-safe mocking approach.
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: wlwilliamx The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
/retest |
1 similar comment
|
/retest |
|
@ti-chi-bot: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
This is an automated cherry-pick of #12573
What problem does this PR solve?
Issue Number: close #12572
Kafka admin / producer initialization paths in
pkg/sink/kafkacan return while still leaving the raw Sarama client alive. Repeated retry / rebuild loops may accumulate background metadata updaters, broker connections, and related resources.The normal wrapper close paths also do not always release the owned client:
saramaAdminClient.Closeonly closes the admin handlesaramaSyncProducer.Closeonly closes the producerWhat is changed and how it works?
sarama.Clientwhen admin creation from client failssarama.Clientwhen sync producer creation from client failssarama.Clientwhen async producer creation from client failssaramaAdminClient.ClosesaramaSyncProducer.CloseCheck List
Tests
Unit test:
go test ./pkg/sink/kafka -count=1Questions
Will it cause performance regression or break compatibility?
No. The change only tightens resource cleanup on Kafka init / close paths and does not change normal successful send semantics.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note