From fb7f65a1023024ced3ff0fc8500f80c6b8f6fba4 Mon Sep 17 00:00:00 2001 From: wlwilliamx Date: Thu, 12 Feb 2026 13:58:19 +0800 Subject: [PATCH 1/2] kafka: release sarama admin resources on close (cherry picked from commit 6afd7d8b9cc58d7b00a27988601200052daffefb) --- pkg/leakutil/leak_helper.go | 6 ++ pkg/sink/kafka/admin.go | 42 ++++++++++-- pkg/sink/kafka/admin_test.go | 110 +++++++++++++++++++++++++++++++ pkg/sink/kafka/sarama_factory.go | 3 + 4 files changed, 154 insertions(+), 7 deletions(-) create mode 100644 pkg/sink/kafka/admin_test.go diff --git a/pkg/leakutil/leak_helper.go b/pkg/leakutil/leak_helper.go index 68ef89a301..5e70e16786 100644 --- a/pkg/leakutil/leak_helper.go +++ b/pkg/leakutil/leak_helper.go @@ -24,11 +24,17 @@ var defaultOpts = []goleak.Option{ goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"), goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"), + // Some systemd helpers keep a long-lived dbus connection with a background worker goroutine. + // The stack top is usually runtime_pollWait, so match by any-frame. + goleak.IgnoreAnyFunction("github.com/godbus/dbus.(*Conn).inWorker"), + goleak.IgnoreAnyFunction("github.com/godbus/dbus/v5.(*Conn).inWorker"), // library used by sarama, ref: https://github.com/rcrowley/go-metrics/pull/266 goleak.IgnoreTopFunction("github.com/rcrowley/go-metrics.(*meterArbiter).tick"), // Because we close the sarama producer asynchronously, so we have to ignore these funcs. goleak.IgnoreTopFunction("github.com/Shopify/sarama.(*client).backgroundMetadataUpdater"), goleak.IgnoreTopFunction("github.com/Shopify/sarama.(*Broker).responseReceiver"), + goleak.IgnoreTopFunction("github.com/IBM/sarama.(*client).backgroundMetadataUpdater"), + goleak.IgnoreTopFunction("github.com/IBM/sarama.(*Broker).responseReceiver"), goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"), } diff --git a/pkg/sink/kafka/admin.go b/pkg/sink/kafka/admin.go index 2101951072..b96a0975e8 100644 --- a/pkg/sink/kafka/admin.go +++ b/pkg/sink/kafka/admin.go @@ -27,8 +27,24 @@ import ( type saramaAdminClient struct { changefeed common.ChangeFeedID - client sarama.Client - admin sarama.ClusterAdmin + // client is the underlying sarama client created for this admin wrapper. + // It must be closed to stop background goroutines (e.g. metadata updater) and release memory. + client saramaClient + admin saramaClusterAdmin +} + +type saramaClient interface { + Brokers() []*sarama.Broker + Partitions(topic string) ([]int32, error) + Close() error +} + +type saramaClusterAdmin interface { + DescribeCluster() (brokers []*sarama.Broker, controllerID int32, err error) + DescribeConfig(resource sarama.ConfigResource) ([]sarama.ConfigEntry, error) + DescribeTopics(topics []string) (metadata []*sarama.TopicMetadata, err error) + CreateTopic(topic string, detail *sarama.TopicDetail, validateOnly bool) error + Close() error } func (a *saramaAdminClient) GetAllBrokers() []Broker { @@ -172,10 +188,22 @@ func (a *saramaAdminClient) Heartbeat() { } func (a *saramaAdminClient) Close() { - if err := a.admin.Close(); err != nil { - log.Warn("close admin client meet error", - zap.String("keyspace", a.changefeed.Keyspace()), - zap.String("changefeed", a.changefeed.Name()), - zap.Error(err)) + // Close admin first (may send RPCs), then close the underlying client to stop + // sarama background goroutines and release related caches/metrics. + if a.admin != nil { + if err := a.admin.Close(); err != nil { + log.Warn("close admin client meet error", + zap.String("keyspace", a.changefeed.Keyspace()), + zap.String("changefeed", a.changefeed.Name()), + zap.Error(err)) + } + } + if a.client != nil { + if err := a.client.Close(); err != nil { + log.Warn("close kafka client meet error", + zap.String("keyspace", a.changefeed.Keyspace()), + zap.String("changefeed", a.changefeed.Name()), + zap.Error(err)) + } } } diff --git a/pkg/sink/kafka/admin_test.go b/pkg/sink/kafka/admin_test.go new file mode 100644 index 0000000000..aef8184137 --- /dev/null +++ b/pkg/sink/kafka/admin_test.go @@ -0,0 +1,110 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "testing" + + "github.com/IBM/sarama" + "github.com/pingcap/ticdc/pkg/common" + "github.com/stretchr/testify/require" +) + +type testSaramaClient struct { + closed bool +} + +func (c *testSaramaClient) Brokers() []*sarama.Broker { + return nil +} + +func (c *testSaramaClient) Partitions(string) ([]int32, error) { + return nil, nil +} + +func (c *testSaramaClient) Close() error { + c.closed = true + return nil +} + +type testSaramaClusterAdmin struct { + closed bool +} + +func (a *testSaramaClusterAdmin) DescribeCluster() ([]*sarama.Broker, int32, error) { + return nil, 0, nil +} + +func (a *testSaramaClusterAdmin) DescribeConfig(sarama.ConfigResource) ([]sarama.ConfigEntry, error) { + return nil, nil +} + +func (a *testSaramaClusterAdmin) DescribeTopics([]string) ([]*sarama.TopicMetadata, error) { + return nil, nil +} + +func (a *testSaramaClusterAdmin) CreateTopic(string, *sarama.TopicDetail, bool) error { + return nil +} + +func (a *testSaramaClusterAdmin) Close() error { + a.closed = true + return nil +} + +func TestSaramaAdminClientCloseClosesAdminAndClient(t *testing.T) { + // Scenario: Closing the admin wrapper must close both the sarama admin and the + // underlying sarama client, otherwise sarama background goroutines (metadata updater) + // and their in-memory caches/metrics can be leaked across changefeed restarts. + // + // Steps: + // 1. Create a wrapper with a fake admin and fake client. + // 2. Call Close(). + // 3. Verify both Close calls are executed. + client := &testSaramaClient{} + admin := &testSaramaClusterAdmin{} + a := &saramaAdminClient{ + changefeed: common.NewChangeFeedIDWithName("test", "default"), + client: client, + admin: admin, + } + a.Close() + require.True(t, admin.closed) + require.True(t, client.closed) +} + +func TestSaramaAdminClientCloseToleratesNilFields(t *testing.T) { + // Scenario: Close should be safe even if admin/client has already been cleared. + // + // Steps: + // 1. Call Close() on wrappers with nil admin or nil client. + // 2. Ensure Close does not panic and still closes the non-nil field. + client := &testSaramaClient{} + a := &saramaAdminClient{ + changefeed: common.NewChangeFeedIDWithName("test", "default"), + client: client, + admin: nil, + } + require.NotPanics(t, func() { a.Close() }) + require.True(t, client.closed) + + admin := &testSaramaClusterAdmin{} + b := &saramaAdminClient{ + changefeed: common.NewChangeFeedIDWithName("test", "default"), + client: nil, + admin: admin, + } + require.NotPanics(t, func() { b.Close() }) + require.True(t, admin.closed) +} diff --git a/pkg/sink/kafka/sarama_factory.go b/pkg/sink/kafka/sarama_factory.go index 7e21a1777e..6d6c03aadc 100644 --- a/pkg/sink/kafka/sarama_factory.go +++ b/pkg/sink/kafka/sarama_factory.go @@ -88,6 +88,9 @@ func newAdminClient(changefeedID common.ChangeFeedID, endpoints []string, config zap.Any("duration", duration), zap.Stringer("changefeedID", changefeedID)) } if err != nil { + // `sarama.NewClusterAdminFromClient` does not take ownership of the client, + // so we need to close it on failures to avoid leaking background goroutines. + _ = client.Close() return nil, errors.Trace(err) } return &saramaAdminClient{ From dd8e9f1b1fe5654268e8b7f9b2246202544d965f Mon Sep 17 00:00:00 2001 From: wlwilliamx Date: Tue, 17 Mar 2026 19:28:23 +0800 Subject: [PATCH 2/2] kafka: release sarama producer clients on cleanup --- pkg/sink/kafka/admin.go | 6 +- pkg/sink/kafka/admin_test.go | 35 ++------- pkg/sink/kafka/sarama_factory.go | 2 + pkg/sink/kafka/sarama_sync_producer.go | 44 ++++++++--- pkg/sink/kafka/sarama_sync_producer_test.go | 87 +++++++++++++++++++++ 5 files changed, 134 insertions(+), 40 deletions(-) create mode 100644 pkg/sink/kafka/sarama_sync_producer_test.go diff --git a/pkg/sink/kafka/admin.go b/pkg/sink/kafka/admin.go index b96a0975e8..41298460d8 100644 --- a/pkg/sink/kafka/admin.go +++ b/pkg/sink/kafka/admin.go @@ -188,8 +188,9 @@ func (a *saramaAdminClient) Heartbeat() { } func (a *saramaAdminClient) Close() { - // Close admin first (may send RPCs), then close the underlying client to stop - // sarama background goroutines and release related caches/metrics. + // For admins created via sarama.NewClusterAdminFromClient, admin.Close() takes care + // of closing the underlying client as well. Fall back to closing the client directly + // only when admin is unexpectedly nil. if a.admin != nil { if err := a.admin.Close(); err != nil { log.Warn("close admin client meet error", @@ -197,6 +198,7 @@ func (a *saramaAdminClient) Close() { zap.String("changefeed", a.changefeed.Name()), zap.Error(err)) } + return } if a.client != nil { if err := a.client.Close(); err != nil { diff --git a/pkg/sink/kafka/admin_test.go b/pkg/sink/kafka/admin_test.go index aef8184137..ecab2e122d 100644 --- a/pkg/sink/kafka/admin_test.go +++ b/pkg/sink/kafka/admin_test.go @@ -39,7 +39,8 @@ func (c *testSaramaClient) Close() error { } type testSaramaClusterAdmin struct { - closed bool + closed bool + closeClient *testSaramaClient } func (a *testSaramaClusterAdmin) DescribeCluster() ([]*sarama.Broker, int32, error) { @@ -60,20 +61,15 @@ func (a *testSaramaClusterAdmin) CreateTopic(string, *sarama.TopicDetail, bool) func (a *testSaramaClusterAdmin) Close() error { a.closed = true + if a.closeClient != nil { + return a.closeClient.Close() + } return nil } -func TestSaramaAdminClientCloseClosesAdminAndClient(t *testing.T) { - // Scenario: Closing the admin wrapper must close both the sarama admin and the - // underlying sarama client, otherwise sarama background goroutines (metadata updater) - // and their in-memory caches/metrics can be leaked across changefeed restarts. - // - // Steps: - // 1. Create a wrapper with a fake admin and fake client. - // 2. Call Close(). - // 3. Verify both Close calls are executed. +func TestSaramaAdminClientCloseDelegatesClientCleanupToAdmin(t *testing.T) { client := &testSaramaClient{} - admin := &testSaramaClusterAdmin{} + admin := &testSaramaClusterAdmin{closeClient: client} a := &saramaAdminClient{ changefeed: common.NewChangeFeedIDWithName("test", "default"), client: client, @@ -84,27 +80,12 @@ func TestSaramaAdminClientCloseClosesAdminAndClient(t *testing.T) { require.True(t, client.closed) } -func TestSaramaAdminClientCloseToleratesNilFields(t *testing.T) { - // Scenario: Close should be safe even if admin/client has already been cleared. - // - // Steps: - // 1. Call Close() on wrappers with nil admin or nil client. - // 2. Ensure Close does not panic and still closes the non-nil field. +func TestSaramaAdminClientCloseFallsBackToClientWhenAdminIsNil(t *testing.T) { client := &testSaramaClient{} a := &saramaAdminClient{ changefeed: common.NewChangeFeedIDWithName("test", "default"), client: client, - admin: nil, } require.NotPanics(t, func() { a.Close() }) require.True(t, client.closed) - - admin := &testSaramaClusterAdmin{} - b := &saramaAdminClient{ - changefeed: common.NewChangeFeedIDWithName("test", "default"), - client: nil, - admin: admin, - } - require.NotPanics(t, func() { b.Close() }) - require.True(t, admin.closed) } diff --git a/pkg/sink/kafka/sarama_factory.go b/pkg/sink/kafka/sarama_factory.go index 6d6c03aadc..0ff45ebf38 100644 --- a/pkg/sink/kafka/sarama_factory.go +++ b/pkg/sink/kafka/sarama_factory.go @@ -125,6 +125,7 @@ func (f *saramaFactory) SyncProducer(ctx context.Context) (SyncProducer, error) p, err := sarama.NewSyncProducerFromClient(client) if err != nil { + _ = client.Close() return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) } @@ -152,6 +153,7 @@ func (f *saramaFactory) AsyncProducer(ctx context.Context) (AsyncProducer, error p, err := sarama.NewAsyncProducerFromClient(client) if err != nil { + _ = client.Close() return nil, errors.WrapError(errors.ErrKafkaNewProducer, err) } return &saramaAsyncProducer{ diff --git a/pkg/sink/kafka/sarama_sync_producer.go b/pkg/sink/kafka/sarama_sync_producer.go index 1161602883..4410e0aff9 100644 --- a/pkg/sink/kafka/sarama_sync_producer.go +++ b/pkg/sink/kafka/sarama_sync_producer.go @@ -26,10 +26,21 @@ import ( "go.uber.org/zap" ) +type saramaSyncClient interface { + Brokers() []*sarama.Broker + Close() error +} + +type saramaSyncProducerClient interface { + SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) + SendMessages(msgs []*sarama.ProducerMessage) error + Close() error +} + type saramaSyncProducer struct { id commonType.ChangeFeedID - client sarama.Client - producer sarama.SyncProducer + client saramaSyncClient + producer saramaSyncProducerClient closed *atomic.Bool } @@ -110,15 +121,26 @@ func (p *saramaSyncProducer) Close() { p.closed.Store(true) start := time.Now() - // this also close the client. - err := p.producer.Close() - if err != nil { - log.Error("Close Kafka DDL producer with error", - zap.String("keyspace", p.id.Keyspace()), - zap.String("changefeed", p.id.Name()), - zap.Duration("duration", time.Since(start)), - zap.Error(err)) - return + // sarama.NewSyncProducerFromClient wraps the provided client with a nopCloserClient, + // so producer.Close() alone won't release the underlying client resources. + if p.client != nil { + if err := p.client.Close(); err != nil { + log.Warn("Close Kafka DDL producer client with error", + zap.String("keyspace", p.id.Keyspace()), + zap.String("changefeed", p.id.Name()), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) + } + } + if p.producer != nil { + if err := p.producer.Close(); err != nil { + log.Error("Close Kafka DDL producer with error", + zap.String("keyspace", p.id.Keyspace()), + zap.String("changefeed", p.id.Name()), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) + return + } } log.Info("Kafka DDL producer closed", zap.String("keyspace", p.id.Keyspace()), diff --git a/pkg/sink/kafka/sarama_sync_producer_test.go b/pkg/sink/kafka/sarama_sync_producer_test.go new file mode 100644 index 0000000000..281af9c634 --- /dev/null +++ b/pkg/sink/kafka/sarama_sync_producer_test.go @@ -0,0 +1,87 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "errors" + "testing" + + "github.com/IBM/sarama" + "github.com/pingcap/ticdc/pkg/common" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +type testSyncProducerClient struct { + closeCalls int + closeErr error +} + +func (c *testSyncProducerClient) Brokers() []*sarama.Broker { + return nil +} + +func (c *testSyncProducerClient) Close() error { + c.closeCalls++ + return c.closeErr +} + +type testSaramaSyncProducer struct { + closeCalls int +} + +func (p *testSaramaSyncProducer) SendMessage(*sarama.ProducerMessage) (int32, int64, error) { + return 0, 0, nil +} + +func (p *testSaramaSyncProducer) SendMessages([]*sarama.ProducerMessage) error { + return nil +} + +func (p *testSaramaSyncProducer) Close() error { + p.closeCalls++ + return nil +} + +func TestSaramaSyncProducerCloseClosesClientAndProducer(t *testing.T) { + client := &testSyncProducerClient{} + producer := &testSaramaSyncProducer{} + p := &saramaSyncProducer{ + id: common.NewChangeFeedIDWithName("test", "default"), + client: client, + producer: producer, + closed: atomic.NewBool(false), + } + + p.Close() + + require.Equal(t, 1, client.closeCalls) + require.Equal(t, 1, producer.closeCalls) +} + +func TestSaramaSyncProducerCloseStillClosesProducerWhenClientCloseFails(t *testing.T) { + client := &testSyncProducerClient{closeErr: errors.New("boom")} + producer := &testSaramaSyncProducer{} + p := &saramaSyncProducer{ + id: common.NewChangeFeedIDWithName("test", "default"), + client: client, + producer: producer, + closed: atomic.NewBool(false), + } + + p.Close() + + require.Equal(t, 1, client.closeCalls) + require.Equal(t, 1, producer.closeCalls) +}