Skip to content

Commit

Permalink
sink/mysql: adjust producer/consumer exit sequence to avoid goroutine…
Browse files Browse the repository at this point in the history
… leak (#1929) (#1946)
  • Loading branch information
ti-chi-bot committed Jun 22, 2021
1 parent 13f80b1 commit 9236cf2
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 6 deletions.
6 changes: 0 additions & 6 deletions cdc/sink/manager_test.go
Expand Up @@ -24,10 +24,8 @@ import (
"github.com/pingcap/errors"

"github.com/pingcap/check"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util/testleak"
"go.uber.org/zap"
)

type managerSuite struct{}
Expand All @@ -48,9 +46,6 @@ func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTab
func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
for _, row := range rows {
log.Info("rows in check sink", zap.Reflect("row", row))
}
c.rows = append(c.rows, rows...)
return nil
}
Expand All @@ -62,7 +57,6 @@ func (c *checkSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
func (c *checkSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
log.Info("flush in check sink", zap.Uint64("resolved", resolvedTs))
var newRows []*model.RowChangedEvent
for _, row := range c.rows {
c.Assert(row.CommitTs, check.Greater, c.lastResolvedTs)
Expand Down
27 changes: 27 additions & 0 deletions cdc/sink/mysql.go
Expand Up @@ -134,6 +134,11 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64
}

func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify.Receiver) {
defer func() {
for _, worker := range s.workers {
worker.closedCh <- struct{}{}
}
}()
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -640,6 +645,7 @@ func (s *mysqlSink) createSinkWorkers(ctx context.Context) error {
log.Info("mysql sink receives redundant error", zap.Error(err))
}
}
worker.cleanup()
}()
}
return nil
Expand Down Expand Up @@ -712,6 +718,7 @@ type mysqlSinkWorker struct {
metricBucketSize prometheus.Counter
receiver *notify.Receiver
checkpointTs uint64
closedCh chan struct{}
}

func newMySQLSinkWorker(
Expand All @@ -728,6 +735,7 @@ func newMySQLSinkWorker(
metricBucketSize: metricBucketSize,
execDMLs: execDMLs,
receiver: receiver,
closedCh: make(chan struct{}, 1),
}
}

Expand Down Expand Up @@ -833,6 +841,25 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) {
}
}

// cleanup waits for notification from closedCh and consumes all txns from txnCh.
// The exit sequence is
// 1. producer(sink.flushRowChangedEvents goroutine) of txnCh exits
// 2. goroutine in 1 sends notification to closedCh of each sink worker
// 3. each sink worker receives the notification from closedCh and mark FinishWg as Done
func (w *mysqlSinkWorker) cleanup() {
<-w.closedCh
for {
select {
case txn := <-w.txnCh:
if txn.FinishWg != nil {
txn.FinishWg.Done()
}
default:
return
}
}
}

func (s *mysqlSink) Close() error {
s.execWaitNotifier.Close()
s.resolvedNotifier.Close()
Expand Down
73 changes: 73 additions & 0 deletions cdc/sink/mysql_test.go
Expand Up @@ -272,6 +272,8 @@ func (s MySQLSinkSuite) TestMySQLSinkWorkerExitWithError(c *check.C) {
for _, txn := range txns1 {
w.appendTxn(cctx, txn)
}

// simulate notify sink worker to flush existing txns
var wg sync.WaitGroup
w.appendFinishTxn(&wg)
time.Sleep(time.Millisecond * 100)
Expand All @@ -280,7 +282,78 @@ func (s MySQLSinkSuite) TestMySQLSinkWorkerExitWithError(c *check.C) {
w.appendTxn(cctx, txn)
}
notifier.Notify()

// simulate sink shutdown and send closed singal to sink worker
w.closedCh <- struct{}{}
w.cleanup()

// the flush notification wait group should be done
wg.Wait()

cancel()
c.Assert(errg.Wait(), check.Equals, errExecFailed)
}

func (s MySQLSinkSuite) TestMySQLSinkWorkerExitCleanup(c *check.C) {
defer testleak.AfterTest(c)()
txns1 := []*model.SingleTableTxn{
{
CommitTs: 1,
Rows: []*model.RowChangedEvent{{CommitTs: 1}},
},
{
CommitTs: 2,
Rows: []*model.RowChangedEvent{{CommitTs: 2}},
},
}
txns2 := []*model.SingleTableTxn{
{
CommitTs: 5,
Rows: []*model.RowChangedEvent{{CommitTs: 5}},
},
}

maxTxnRow := 1
ctx := context.Background()

errExecFailed := errors.New("sink worker exec failed")
notifier := new(notify.Notifier)
cctx, cancel := context.WithCancel(ctx)
receiver, err := notifier.NewReceiver(-1)
c.Assert(err, check.IsNil)
w := newMySQLSinkWorker(maxTxnRow, 1, /*bucket*/
bucketSizeCounter.WithLabelValues("capture", "changefeed", "1"),
receiver,
func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error {
return errExecFailed
})
errg, cctx := errgroup.WithContext(cctx)
errg.Go(func() error {
err := w.run(cctx)
return err
})
for _, txn := range txns1 {
w.appendTxn(cctx, txn)
}

// sleep to let txns flushed by tick
time.Sleep(time.Millisecond * 100)

// simulate more txns are sent to txnCh after the sink worker run has exited
for _, txn := range txns2 {
w.appendTxn(cctx, txn)
}
var wg sync.WaitGroup
w.appendFinishTxn(&wg)
notifier.Notify()

// simulate sink shutdown and send closed singal to sink worker
w.closedCh <- struct{}{}
w.cleanup()

// the flush notification wait group should be done
wg.Wait()

cancel()
c.Assert(errg.Wait(), check.Equals, errExecFailed)
}
Expand Down

0 comments on commit 9236cf2

Please sign in to comment.