Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/leakutil/leak_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@ var defaultOpts = []goleak.Option{
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
// Some systemd helpers keep a long-lived dbus connection with a background worker goroutine.
// The stack top is usually runtime_pollWait, so match by any-frame.
goleak.IgnoreAnyFunction("github.com/godbus/dbus.(*Conn).inWorker"),
goleak.IgnoreAnyFunction("github.com/godbus/dbus/v5.(*Conn).inWorker"),
// library used by sarama, ref: https://github.com/rcrowley/go-metrics/pull/266
goleak.IgnoreTopFunction("github.com/rcrowley/go-metrics.(*meterArbiter).tick"),
// Because we close the sarama producer asynchronously, so we have to ignore these funcs.
goleak.IgnoreTopFunction("github.com/Shopify/sarama.(*client).backgroundMetadataUpdater"),
goleak.IgnoreTopFunction("github.com/Shopify/sarama.(*Broker).responseReceiver"),
goleak.IgnoreTopFunction("github.com/IBM/sarama.(*client).backgroundMetadataUpdater"),
goleak.IgnoreTopFunction("github.com/IBM/sarama.(*Broker).responseReceiver"),
Comment on lines +36 to +37
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't add these Sarama goroutines to the global goleak allowlist.

These are the exact goroutines leaked when the admin wrapper forgets to close its underlying Sarama client. Putting them in defaultOpts will make the suite ignore the regression this PR is trying to prevent. Please scope these ignores to the few tests that intentionally tolerate async producer shutdown instead of the shared default list.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/leakutil/leak_helper.go` around lines 36 - 37, Remove the two Sarama
goroutines ("github.com/IBM/sarama.(*client).backgroundMetadataUpdater" and
"github.com/IBM/sarama.(*Broker).responseReceiver") from the shared defaultOpts
allowlist in leak_helper.go and instead add those goleak.IgnoreTopFunction
entries only in the specific tests that intentionally tolerate async
producer/client shutdown; update the tests that currently expect those leaks to
append these IgnoreTopFunction options when calling goleak.VerifyNone (or the
test-specific leak verification helper) so the global defaultOpts no longer
hides the regression caused by not closing the Sarama client.

goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
}

Expand Down
44 changes: 37 additions & 7 deletions pkg/sink/kafka/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,24 @@ import (
type saramaAdminClient struct {
changefeed common.ChangeFeedID

client sarama.Client
admin sarama.ClusterAdmin
// client is the underlying sarama client created for this admin wrapper.
// It must be closed to stop background goroutines (e.g. metadata updater) and release memory.
client saramaClient
admin saramaClusterAdmin
}

type saramaClient interface {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these 2 interface looks similar to the admin interface.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not suggested to test on the sarama.

Brokers() []*sarama.Broker
Partitions(topic string) ([]int32, error)
Close() error
}

type saramaClusterAdmin interface {
DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error)
DescribeConfig(resource sarama.ConfigResource) ([]sarama.ConfigEntry, error)
DescribeTopics(topics []string) (metadata []*sarama.TopicMetadata, err error)
CreateTopic(topic string, detail *sarama.TopicDetail, validateOnly bool) error
Close() error
}

func (a *saramaAdminClient) GetAllBrokers() []Broker {
Expand Down Expand Up @@ -172,10 +188,24 @@ func (a *saramaAdminClient) Heartbeat() {
}

func (a *saramaAdminClient) Close() {
if err := a.admin.Close(); err != nil {
log.Warn("close admin client meet error",
zap.String("keyspace", a.changefeed.Keyspace()),
zap.String("changefeed", a.changefeed.Name()),
zap.Error(err))
// For admins created via sarama.NewClusterAdminFromClient, admin.Close() takes care
// of closing the underlying client as well. Fall back to closing the client directly
// only when admin is unexpectedly nil.
if a.admin != nil {
if err := a.admin.Close(); err != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this call should already close the admin underline client.

log.Warn("close admin client meet error",
zap.String("keyspace", a.changefeed.Keyspace()),
zap.String("changefeed", a.changefeed.Name()),
zap.Error(err))
}
return
}
if a.client != nil {
if err := a.client.Close(); err != nil {
log.Warn("close kafka client meet error",
zap.String("keyspace", a.changefeed.Keyspace()),
zap.String("changefeed", a.changefeed.Name()),
zap.Error(err))
}
}
}
91 changes: 91 additions & 0 deletions pkg/sink/kafka/admin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"testing"

"github.com/IBM/sarama"
"github.com/pingcap/ticdc/pkg/common"
"github.com/stretchr/testify/require"
)

type testSaramaClient struct {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use gomock to generate a mocked admin client for the test purpose.

closed bool
}

func (c *testSaramaClient) Brokers() []*sarama.Broker {
return nil
}

func (c *testSaramaClient) Partitions(string) ([]int32, error) {
return nil, nil
}

func (c *testSaramaClient) Close() error {
c.closed = true
return nil
}

type testSaramaClusterAdmin struct {
closed bool
closeClient *testSaramaClient
}

func (a *testSaramaClusterAdmin) DescribeCluster() ([]*sarama.Broker, int32, error) {
return nil, 0, nil
}

func (a *testSaramaClusterAdmin) DescribeConfig(sarama.ConfigResource) ([]sarama.ConfigEntry, error) {
return nil, nil
}

func (a *testSaramaClusterAdmin) DescribeTopics([]string) ([]*sarama.TopicMetadata, error) {
return nil, nil
}

func (a *testSaramaClusterAdmin) CreateTopic(string, *sarama.TopicDetail, bool) error {
return nil
}

func (a *testSaramaClusterAdmin) Close() error {
a.closed = true
if a.closeClient != nil {
return a.closeClient.Close()
}
return nil
}

func TestSaramaAdminClientCloseDelegatesClientCleanupToAdmin(t *testing.T) {
client := &testSaramaClient{}
admin := &testSaramaClusterAdmin{closeClient: client}
a := &saramaAdminClient{
changefeed: common.NewChangeFeedIDWithName("test", "default"),
client: client,
admin: admin,
}
a.Close()
require.True(t, admin.closed)
require.True(t, client.closed)
}

func TestSaramaAdminClientCloseFallsBackToClientWhenAdminIsNil(t *testing.T) {
client := &testSaramaClient{}
a := &saramaAdminClient{
changefeed: common.NewChangeFeedIDWithName("test", "default"),
client: client,
}
require.NotPanics(t, func() { a.Close() })
require.True(t, client.closed)
}
5 changes: 5 additions & 0 deletions pkg/sink/kafka/sarama_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func newAdminClient(changefeedID common.ChangeFeedID, endpoints []string, config
zap.Any("duration", duration), zap.Stringer("changefeedID", changefeedID))
}
if err != nil {
// `sarama.NewClusterAdminFromClient` does not take ownership of the client,
// so we need to close it on failures to avoid leaking background goroutines.
_ = client.Close()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This is a critical fix. Explicitly closing the client here prevents resource leaks (background goroutines and cached metadata/metrics) if sarama.NewClusterAdminFromClient fails, as this function does not take ownership of the client. This directly addresses the problem described in the PR.

return nil, errors.Trace(err)
Comment on lines 90 to 94
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Don't swallow the client-close failure on this cleanup path.

If admin creation fails and client.Close() also fails, we lose the only signal that the leak-prevention cleanup did not complete. Please log closeErr here while still returning the original admin-construction error.

Suggested change
 	if err != nil {
 		// `sarama.NewClusterAdminFromClient` does not take ownership of the client,
 		// so we need to close it on failures to avoid leaking background goroutines.
-		_ = client.Close()
+		if closeErr := client.Close(); closeErr != nil {
+			log.Warn("close kafka client after admin creation failed",
+				zap.Stringer("changefeedID", changefeedID),
+				zap.Error(closeErr))
+		}
 		return nil, errors.Trace(err)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if err != nil {
// `sarama.NewClusterAdminFromClient` does not take ownership of the client,
// so we need to close it on failures to avoid leaking background goroutines.
_ = client.Close()
return nil, errors.Trace(err)
if err != nil {
// `sarama.NewClusterAdminFromClient` does not take ownership of the client,
// so we need to close it on failures to avoid leaking background goroutines.
if closeErr := client.Close(); closeErr != nil {
log.Warn("close kafka client after admin creation failed",
zap.Stringer("changefeedID", changefeedID),
zap.Error(closeErr))
}
return nil, errors.Trace(err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sink/kafka/sarama_factory.go` around lines 90 - 94, The cleanup path
after sarama.NewClusterAdminFromClient failure currently calls client.Close()
and discards any error; capture the result of client.Close() (e.g., closeErr)
and log that closeErr (using the package logger) while still returning the
original admin-construction error (err); update the block in sarama_factory.go
that handles the NewClusterAdminFromClient failure so it logs the client close
failure for visibility but preserves returning errors.Trace(err).

}
return &saramaAdminClient{
Expand Down Expand Up @@ -122,6 +125,7 @@ func (f *saramaFactory) SyncProducer(ctx context.Context) (SyncProducer, error)

p, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
_ = client.Close()
return nil, errors.WrapError(errors.ErrKafkaNewProducer, err)
}
Comment on lines 126 to 130
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Log the client close error for consistency.

Same concern as the admin creation path: silently discarding the close error loses visibility into potential cleanup failures.

Suggested change
 	p, err := sarama.NewSyncProducerFromClient(client)
 	if err != nil {
-		_ = client.Close()
+		if closeErr := client.Close(); closeErr != nil {
+			log.Warn("close kafka client after sync producer creation failed",
+				zap.Stringer("changefeedID", f.changefeedID),
+				zap.Error(closeErr))
+		}
 		return nil, errors.WrapError(errors.ErrKafkaNewProducer, err)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
p, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
_ = client.Close()
return nil, errors.WrapError(errors.ErrKafkaNewProducer, err)
}
p, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
if closeErr := client.Close(); closeErr != nil {
log.Warn("close kafka client after sync producer creation failed",
zap.Stringer("changefeedID", f.changefeedID),
zap.Error(closeErr))
}
return nil, errors.WrapError(errors.ErrKafkaNewProducer, err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sink/kafka/sarama_factory.go` around lines 126 - 130, When
NewSyncProducerFromClient fails we currently discard client.Close() errors;
change the error path in the sarama.NewSyncProducerFromClient handling to
capture the result of client.Close(), and log that close error (with context)
before returning the wrapped ErrKafkaNewProducer using errors.WrapError. Locate
the block around sarama.NewSyncProducerFromClient and replace the silent call to
client.Close() with capturing its error (e.g., errClose := client.Close()) and
emit a log entry including errClose and context (consistent with how the admin
creation path logs close failures) prior to returning the wrapped error.


Expand Down Expand Up @@ -149,6 +153,7 @@ func (f *saramaFactory) AsyncProducer(ctx context.Context) (AsyncProducer, error

p, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
_ = client.Close()
return nil, errors.WrapError(errors.ErrKafkaNewProducer, err)
}
Comment on lines 154 to 158
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Log the client close error for consistency.

Same pattern as other creation paths: log the close error rather than discarding it.

Suggested change
 	p, err := sarama.NewAsyncProducerFromClient(client)
 	if err != nil {
-		_ = client.Close()
+		if closeErr := client.Close(); closeErr != nil {
+			log.Warn("close kafka client after async producer creation failed",
+				zap.Stringer("changefeedID", f.changefeedID),
+				zap.Error(closeErr))
+		}
 		return nil, errors.WrapError(errors.ErrKafkaNewProducer, err)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
p, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
_ = client.Close()
return nil, errors.WrapError(errors.ErrKafkaNewProducer, err)
}
p, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
if closeErr := client.Close(); closeErr != nil {
log.Warn("close kafka client after async producer creation failed",
zap.Stringer("changefeedID", f.changefeedID),
zap.Error(closeErr))
}
return nil, errors.WrapError(errors.ErrKafkaNewProducer, err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sink/kafka/sarama_factory.go` around lines 154 - 158, When
NewAsyncProducerFromClient fails in the sarama.NewAsyncProducerFromClient block,
the code currently discards the error returned by client.Close(); instead
capture and log that close error for consistency: call client.Close(), check its
returned error and emit a log entry (or attach it to the returned error) before
returning errors.WrapError(errors.ErrKafkaNewProducer, err); update the failing
block around sarama.NewAsyncProducerFromClient and client.Close to include this
logged close error so the close failure isn’t silently ignored.

return &saramaAsyncProducer{
Expand Down
44 changes: 33 additions & 11 deletions pkg/sink/kafka/sarama_sync_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,21 @@ import (
"go.uber.org/zap"
)

type saramaSyncClient interface {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant interface。

Brokers() []*sarama.Broker
Close() error
}

type saramaSyncProducerClient interface {
SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
SendMessages(msgs []*sarama.ProducerMessage) error
Close() error
}

type saramaSyncProducer struct {
id commonType.ChangeFeedID
client sarama.Client
producer sarama.SyncProducer
client saramaSyncClient
producer saramaSyncProducerClient
closed *atomic.Bool
}

Expand Down Expand Up @@ -110,15 +121,26 @@ func (p *saramaSyncProducer) Close() {

p.closed.Store(true)
start := time.Now()
// this also close the client.
err := p.producer.Close()
if err != nil {
log.Error("Close Kafka DDL producer with error",
zap.String("keyspace", p.id.Keyspace()),
zap.String("changefeed", p.id.Name()),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
return
// sarama.NewSyncProducerFromClient wraps the provided client with a nopCloserClient,
// so producer.Close() alone won't release the underlying client resources.
if p.client != nil {
if err := p.client.Close(); err != nil {
log.Warn("Close Kafka DDL producer client with error",
zap.String("keyspace", p.id.Keyspace()),
zap.String("changefeed", p.id.Name()),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
}
}
if p.producer != nil {
if err := p.producer.Close(); err != nil {
log.Error("Close Kafka DDL producer with error",
zap.String("keyspace", p.id.Keyspace()),
zap.String("changefeed", p.id.Name()),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
return
}
}
log.Info("Kafka DDL producer closed",
zap.String("keyspace", p.id.Keyspace()),
Expand Down
87 changes: 87 additions & 0 deletions pkg/sink/kafka/sarama_sync_producer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"errors"
"testing"

"github.com/IBM/sarama"
"github.com/pingcap/ticdc/pkg/common"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

type testSyncProducerClient struct {
closeCalls int
closeErr error
}

func (c *testSyncProducerClient) Brokers() []*sarama.Broker {
return nil
}

func (c *testSyncProducerClient) Close() error {
c.closeCalls++
return c.closeErr
}

type testSaramaSyncProducer struct {
closeCalls int
}

func (p *testSaramaSyncProducer) SendMessage(*sarama.ProducerMessage) (int32, int64, error) {
return 0, 0, nil
}

func (p *testSaramaSyncProducer) SendMessages([]*sarama.ProducerMessage) error {
return nil
}

func (p *testSaramaSyncProducer) Close() error {
p.closeCalls++
return nil
}

func TestSaramaSyncProducerCloseClosesClientAndProducer(t *testing.T) {
client := &testSyncProducerClient{}
producer := &testSaramaSyncProducer{}
p := &saramaSyncProducer{
id: common.NewChangeFeedIDWithName("test", "default"),
client: client,
producer: producer,
closed: atomic.NewBool(false),
}

p.Close()

require.Equal(t, 1, client.closeCalls)
require.Equal(t, 1, producer.closeCalls)
}

func TestSaramaSyncProducerCloseStillClosesProducerWhenClientCloseFails(t *testing.T) {
client := &testSyncProducerClient{closeErr: errors.New("boom")}
producer := &testSaramaSyncProducer{}
p := &saramaSyncProducer{
id: common.NewChangeFeedIDWithName("test", "default"),
client: client,
producer: producer,
closed: atomic.NewBool(false),
}

p.Close()

require.Equal(t, 1, client.closeCalls)
require.Equal(t, 1, producer.closeCalls)
}
Loading