From 42891739f28a99a5e38390ef477e8b420556dac9 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Wed, 7 Jul 2021 16:31:29 +0800 Subject: [PATCH] sink/mysql: adjust producer/consumer exit sequence to avoid goroutine leak (#1929) (#1945) --- cdc/sink/manager_test.go | 6 ---- cdc/sink/mysql.go | 27 +++++++++++++++ cdc/sink/mysql_test.go | 73 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+), 6 deletions(-) diff --git a/cdc/sink/manager_test.go b/cdc/sink/manager_test.go index 1bd12c34d98..9e2de47ce17 100644 --- a/cdc/sink/manager_test.go +++ b/cdc/sink/manager_test.go @@ -24,10 +24,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/check" - "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/util/testleak" - "go.uber.org/zap" ) type managerSuite struct{} @@ -48,9 +46,6 @@ func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTab func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { c.rowsMu.Lock() defer c.rowsMu.Unlock() - for _, row := range rows { - log.Info("rows in check sink", zap.Reflect("row", row)) - } c.rows = append(c.rows, rows...) return nil } @@ -62,7 +57,6 @@ func (c *checkSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error func (c *checkSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) { c.rowsMu.Lock() defer c.rowsMu.Unlock() - log.Info("flush in check sink", zap.Uint64("resolved", resolvedTs)) var newRows []*model.RowChangedEvent for _, row := range c.rows { c.Assert(row.CommitTs, check.Greater, c.lastResolvedTs) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index c6ed3c9ba91..2e1d131dde3 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -134,6 +134,11 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64 } func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify.Receiver) { + defer func() { + for _, worker := range s.workers { + worker.closedCh <- struct{}{} + } + }() for { select { case <-ctx.Done(): @@ -640,6 +645,7 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) error { log.Info("mysql sink receives redundant error", zap.Error(err)) } } + worker.cleanup() }() } return nil @@ -712,6 +718,7 @@ type mysqlSinkWorker struct { metricBucketSize prometheus.Counter receiver *notify.Receiver checkpointTs uint64 + closedCh chan struct{} } func newMySQLSinkWorker( @@ -728,6 +735,7 @@ func newMySQLSinkWorker( metricBucketSize: metricBucketSize, execDMLs: execDMLs, receiver: receiver, + closedCh: make(chan struct{}, 1), } } @@ -833,6 +841,25 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) { } } +// cleanup waits for notification from closedCh and consumes all txns from txnCh. +// The exit sequence is +// 1. producer(sink.flushRowChangedEvents goroutine) of txnCh exits +// 2. goroutine in 1 sends notification to closedCh of each sink worker +// 3. each sink worker receives the notification from closedCh and mark FinishWg as Done +func (w *mysqlSinkWorker) cleanup() { + <-w.closedCh + for { + select { + case txn := <-w.txnCh: + if txn.FinishWg != nil { + txn.FinishWg.Done() + } + default: + return + } + } +} + func (s *mysqlSink) Close() error { s.execWaitNotifier.Close() s.resolvedNotifier.Close() diff --git a/cdc/sink/mysql_test.go b/cdc/sink/mysql_test.go index 37f58b0e69c..6f4dfc0f261 100644 --- a/cdc/sink/mysql_test.go +++ b/cdc/sink/mysql_test.go @@ -272,6 +272,8 @@ func (s MySQLSinkSuite) TestMySQLSinkWorkerExitWithError(c *check.C) { for _, txn := range txns1 { w.appendTxn(cctx, txn) } + + // simulate notify sink worker to flush existing txns var wg sync.WaitGroup w.appendFinishTxn(&wg) time.Sleep(time.Millisecond * 100) @@ -280,7 +282,78 @@ func (s MySQLSinkSuite) TestMySQLSinkWorkerExitWithError(c *check.C) { w.appendTxn(cctx, txn) } notifier.Notify() + + // simulate sink shutdown and send closed singal to sink worker + w.closedCh <- struct{}{} + w.cleanup() + + // the flush notification wait group should be done wg.Wait() + + cancel() + c.Assert(errg.Wait(), check.Equals, errExecFailed) +} + +func (s MySQLSinkSuite) TestMySQLSinkWorkerExitCleanup(c *check.C) { + defer testleak.AfterTest(c)() + txns1 := []*model.SingleTableTxn{ + { + CommitTs: 1, + Rows: []*model.RowChangedEvent{{CommitTs: 1}}, + }, + { + CommitTs: 2, + Rows: []*model.RowChangedEvent{{CommitTs: 2}}, + }, + } + txns2 := []*model.SingleTableTxn{ + { + CommitTs: 5, + Rows: []*model.RowChangedEvent{{CommitTs: 5}}, + }, + } + + maxTxnRow := 1 + ctx := context.Background() + + errExecFailed := errors.New("sink worker exec failed") + notifier := new(notify.Notifier) + cctx, cancel := context.WithCancel(ctx) + receiver, err := notifier.NewReceiver(-1) + c.Assert(err, check.IsNil) + w := newMySQLSinkWorker(maxTxnRow, 1, /*bucket*/ + bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"), + receiver, + func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error { + return errExecFailed + }) + errg, cctx := errgroup.WithContext(cctx) + errg.Go(func() error { + err := w.run(cctx) + return err + }) + for _, txn := range txns1 { + w.appendTxn(cctx, txn) + } + + // sleep to let txns flushed by tick + time.Sleep(time.Millisecond * 100) + + // simulate more txns are sent to txnCh after the sink worker run has exited + for _, txn := range txns2 { + w.appendTxn(cctx, txn) + } + var wg sync.WaitGroup + w.appendFinishTxn(&wg) + notifier.Notify() + + // simulate sink shutdown and send closed singal to sink worker + w.closedCh <- struct{}{} + w.cleanup() + + // the flush notification wait group should be done + wg.Wait() + cancel() c.Assert(errg.Wait(), check.Equals, errExecFailed) }