Skip to content

Commit

Permalink
loader: stop retry execute sqls when closed (#691)
Browse files Browse the repository at this point in the history
* update RetryNnError

* add isClosed function

* fix check

* use context

* add context for execute

* replace some context.Background() with loader's ctx

* fmt

* fix test

* add todo

* fix test

* minor update

* remove ctx when execute sql
  • Loading branch information
WangXiangUSTC authored and suzaku committed Aug 1, 2019
1 parent eaf6146 commit d5f9ccf
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 19 deletions.
22 changes: 11 additions & 11 deletions pkg/loader/executor.go
Expand Up @@ -55,9 +55,9 @@ func (e *executor) withQueryHistogramVec(queryHistogramVec *prometheus.Histogram
return e
}

func (e *executor) execTableBatchRetry(dmls []*DML, retryNum int, backoff time.Duration) error {
err := util.RetryOnError(retryNum, backoff, "execTableBatchRetry", func() error {
return e.execTableBatch(dmls)
func (e *executor) execTableBatchRetry(ctx context.Context, dmls []*DML, retryNum int, backoff time.Duration) error {
err := util.RetryContext(ctx, retryNum, backoff, 1, func(context.Context) error {
return e.execTableBatch(ctx, dmls)
})
return errors.Trace(err)
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func (e *executor) bulkReplace(inserts []*DML) error {
// use replace to handle the update unique index case(see https://github.com/pingcap/tidb-binlog/pull/437/files)
// or we can simply check if it update unique index column or not, and for update change to (delete + insert)
// the final result should has no duplicate entry or the origin dmls is wrong.
func (e *executor) execTableBatch(dmls []*DML) error {
func (e *executor) execTableBatch(ctx context.Context, dmls []*DML) error {
if len(dmls) == 0 {
return nil
}
Expand All @@ -199,19 +199,19 @@ func (e *executor) execTableBatch(dmls []*DML) error {
log.Debug("merge dmls", zap.Reflect("dmls", dmls), zap.Reflect("merged", types))

if allDeletes, ok := types[DeleteDMLType]; ok {
if err := e.splitExecDML(allDeletes, e.bulkDelete); err != nil {
if err := e.splitExecDML(ctx, allDeletes, e.bulkDelete); err != nil {
return errors.Trace(err)
}
}

if allInserts, ok := types[InsertDMLType]; ok {
if err := e.splitExecDML(allInserts, e.bulkReplace); err != nil {
if err := e.splitExecDML(ctx, allInserts, e.bulkReplace); err != nil {
return errors.Trace(err)
}
}

if allUpdates, ok := types[UpdateDMLType]; ok {
if err := e.splitExecDML(allUpdates, e.bulkReplace); err != nil {
if err := e.splitExecDML(ctx, allUpdates, e.bulkReplace); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -220,8 +220,8 @@ func (e *executor) execTableBatch(dmls []*DML) error {
}

// splitExecDML split dmls to size of e.batchSize and call exec concurrently
func (e *executor) splitExecDML(dmls []*DML, exec func(dmls []*DML) error) error {
errg, _ := errgroup.WithContext(context.Background())
func (e *executor) splitExecDML(ctx context.Context, dmls []*DML, exec func(dmls []*DML) error) error {
errg, _ := errgroup.WithContext(ctx)

for _, split := range splitDMLs(dmls, e.batchSize) {
split := split
Expand All @@ -237,9 +237,9 @@ func (e *executor) splitExecDML(dmls []*DML, exec func(dmls []*DML) error) error
return errors.Trace(errg.Wait())
}

func (e *executor) singleExecRetry(allDMLs []*DML, safeMode bool, retryNum int, backoff time.Duration) error {
func (e *executor) singleExecRetry(ctx context.Context, allDMLs []*DML, safeMode bool, retryNum int, backoff time.Duration) error {
for _, dmls := range splitDMLs(allDMLs, e.batchSize) {
err := util.RetryOnError(retryNum, backoff, "singleExec", func() error {
err := util.RetryContext(ctx, retryNum, backoff, 1, func(context.Context) error {
return e.singleExec(dmls, safeMode)
})
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/loader/executor_test.go
Expand Up @@ -14,6 +14,7 @@
package loader

import (
"context"
"database/sql"
"fmt"
"regexp"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (s *executorSuite) TestSplitExecDML(c *C) {

var counter int32

err = e.splitExecDML(dmls, func(group []*DML) error {
err = e.splitExecDML(context.Background(), dmls, func(group []*DML) error {
atomic.AddInt32(&counter, 1)
if len(group) < 2 {
return errors.New("fake")
Expand Down
21 changes: 16 additions & 5 deletions pkg/loader/load.go
Expand Up @@ -86,6 +86,11 @@ type loaderImpl struct {
// value can be tidb or mysql
saveAppliedTS bool
lastUpdateAppliedTSTime time.Time

// TODO: remove this ctx, context shouldn't stored in struct
// https://github.com/pingcap/tidb-binlog/pull/691#issuecomment-515387824
ctx context.Context
cancel context.CancelFunc
}

// MetricsGroup contains metrics of Loader
Expand Down Expand Up @@ -147,6 +152,8 @@ func NewLoader(db *gosql.DB, opt ...Option) (Loader, error) {
o(&opts)
}

ctx, cancel := context.WithCancel(context.Background())

s := &loaderImpl{
db: db,
workerCount: opts.workerCount,
Expand All @@ -156,6 +163,9 @@ func NewLoader(db *gosql.DB, opt ...Option) (Loader, error) {
successTxn: make(chan *Txn, 1024),
merge: true,
saveAppliedTS: opts.saveAppliedTS,

ctx: ctx,
cancel: cancel,
}

db.SetMaxOpenConns(opts.workerCount)
Expand Down Expand Up @@ -222,6 +232,7 @@ func (s *loaderImpl) Successes() <-chan *Txn {
// Run will quit when all data is drained
func (s *loaderImpl) Close() {
close(s.input)
s.cancel()
}

var utilGetTableInfo = getTableInfo
Expand Down Expand Up @@ -296,7 +307,7 @@ func isCreateDatabaseDDL(sql string) bool {
func (s *loaderImpl) execDDL(ddl *DDL) error {
log.Debug("exec ddl", zap.Reflect("ddl", ddl))

err := util.RetryOnError(maxDDLRetryCount, execDDLRetryWait, "execDDL", func() error {
err := util.RetryContext(s.ctx, maxDDLRetryCount, execDDLRetryWait, 1, func(context.Context) error {
tx, err := s.db.Begin()
if err != nil {
return err
Expand Down Expand Up @@ -327,7 +338,7 @@ func (s *loaderImpl) execDDL(ddl *DDL) error {
}

func (s *loaderImpl) execByHash(executor *executor, byHash [][]*DML) error {
errg, _ := errgroup.WithContext(context.Background())
errg, _ := errgroup.WithContext(s.ctx)

for _, dmls := range byHash {
if len(dmls) == 0 {
Expand All @@ -337,7 +348,7 @@ func (s *loaderImpl) execByHash(executor *executor, byHash [][]*DML) error {
dmls := dmls

errg.Go(func() error {
err := executor.singleExecRetry(dmls, s.GetSafeMode(), maxDMLRetryCount, time.Second)
err := executor.singleExecRetry(s.ctx, dmls, s.GetSafeMode(), maxDMLRetryCount, time.Second)
return err
})
}
Expand Down Expand Up @@ -396,13 +407,13 @@ func (s *loaderImpl) execDMLs(dmls []*DML) error {
batchTables, singleDMLs := s.groupDMLs(dmls)

executor := s.getExecutor()
errg, _ := errgroup.WithContext(context.Background())
errg, _ := errgroup.WithContext(s.ctx)

for _, dmls := range batchTables {
// https://golang.org/doc/faq#closures_and_goroutines
dmls := dmls
errg.Go(func() error {
err := executor.execTableBatchRetry(dmls, maxDMLRetryCount, time.Second)
err := executor.execTableBatchRetry(s.ctx, dmls, maxDMLRetryCount, time.Second)
return err
})
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/loader/load_test.go
Expand Up @@ -14,6 +14,7 @@
package loader

import (
"context"
"database/sql"
"reflect"
"time"
Expand Down Expand Up @@ -58,6 +59,7 @@ func (cs *LoadSuite) TestGetExecutor(c *check.C) {
metrics: &MetricsGroup{
QueryHistogramVec: &prometheus.HistogramVec{},
},
ctx: context.Background(),
}
var e *executor = loader.getExecutor()
c.Assert(e.db, check.DeepEquals, loader.db)
Expand Down Expand Up @@ -268,7 +270,7 @@ func (s *execDDLSuite) TestShouldExecInTransaction(c *check.C) {
mock.ExpectExec("CREATE TABLE").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectCommit()

loader := &loaderImpl{db: db}
loader := &loaderImpl{db: db, ctx: context.Background()}

ddl := DDL{SQL: "CREATE TABLE"}
err = loader.execDDL(&ddl)
Expand All @@ -284,7 +286,7 @@ func (s *execDDLSuite) TestShouldUseDatabase(c *check.C) {
mock.ExpectExec("CREATE TABLE").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectCommit()

loader := &loaderImpl{db: db}
loader := &loaderImpl{db: db, ctx: context.Background()}

ddl := DDL{SQL: "CREATE TABLE", Database: "test_db"}
err = loader.execDDL(&ddl)
Expand Down

0 comments on commit d5f9ccf

Please sign in to comment.