From a5c7024e90622808243427bef2ac1d5d88872e7c Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 13 Nov 2018 16:43:06 +0800 Subject: [PATCH 01/16] ddl: dynamic adjust add index worker number init --- ddl/index.go | 45 ++++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index a8fa89e39e36e..9d036369c096c 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1053,18 +1053,41 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*addIndexWorker // buildIndexForReorgInfo build backfilling tasks from [reorgInfo.StartHandle, reorgInfo.EndHandle), // and send these tasks to add index workers, till we finish adding the indices. -func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addIndexWorker, job *model.Job, reorgInfo *reorgInfo) error { +func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, indexInfo *model.IndexInfo, job *model.Job, reorgInfo *reorgInfo) error { totalAddedCount := job.GetRowCount() startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle + sessCtx := newContext(reorgInfo.d.store) + decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo) + if err != nil { + return errors.Trace(err) + } + + // variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt". + workerCnt := variable.GetDDLReorgWorkerCounter() + idxWorkers := make([]*addIndexWorker, 0, workerCnt) + defer func() { + closeAddIndexWorkers(idxWorkers) + }() + for { + // For dynamic adjust add index worker number. + workerCnt = variable.GetDDLReorgWorkerCounter() + for i := len(idxWorkers); i < int(workerCnt); i++ { + sessCtx := newContext(reorgInfo.d.store) + idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap) + idxWorker.priority = job.Priority + idxWorkers = append(idxWorkers, idxWorker) + go idxWorkers[i].run(reorgInfo.d) + } + kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle) if err != nil { return errors.Trace(err) } log.Infof("[ddl-reorg] start to reorg index of %v region ranges, handle range:[%v, %v).", len(kvRanges), startHandle, endHandle) - remains, err := w.sendRangeTaskToWorkers(t, workers, reorgInfo, &totalAddedCount, kvRanges) + remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges) if err != nil { return errors.Trace(err) } @@ -1097,23 +1120,7 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, workers []*addInd func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error { job := reorgInfo.Job log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo) - sessCtx := newContext(reorgInfo.d.store) - decodeColMap, err := makeupDecodeColMap(sessCtx, t, indexInfo) - if err != nil { - return errors.Trace(err) - } - - // variable.ddlReorgWorkerCounter can be modified by system variable "tidb_ddl_reorg_worker_cnt". - workerCnt := variable.GetDDLReorgWorkerCounter() - idxWorkers := make([]*addIndexWorker, workerCnt) - for i := 0; i < int(workerCnt); i++ { - sessCtx := newContext(reorgInfo.d.store) - idxWorkers[i] = newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap) - idxWorkers[i].priority = job.Priority - go idxWorkers[i].run(reorgInfo.d) - } - defer closeAddIndexWorkers(idxWorkers) - err = w.buildIndexForReorgInfo(t, idxWorkers, job, reorgInfo) + err := w.buildIndexForReorgInfo(t, indexInfo, job, reorgInfo) return errors.Trace(err) } From 00ad984f32020369330c115af5a9a4778a83359a Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 14 Nov 2018 15:06:23 +0800 Subject: [PATCH 02/16] shrink worker num --- ddl/index.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ddl/index.go b/ddl/index.go index 9d036369c096c..af36523ad5d25 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1080,6 +1080,11 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, indexInfo *model. idxWorkers = append(idxWorkers, idxWorker) go idxWorkers[i].run(reorgInfo.d) } + if len(idxWorkers) > int(workerCnt) { + workers := idxWorkers[workerCnt:] + idxWorkers = idxWorkers[:workerCnt] + closeAddIndexWorkers(workers) + } kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle) if err != nil { From f936c59083d41579dce7dbf4a16927fa5d1d40f1 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 14 Nov 2018 17:10:35 +0800 Subject: [PATCH 03/16] add index --- ddl/db_test.go | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index cd2d7f0fae410..e572dd3d64b78 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -588,16 +588,28 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri s.mustExec(c, sql) otherKeys = append(otherKeys, v) + is := s.dom.InfoSchema() + schemaName := model.NewCIStr(s.schemaName) + tableName := model.NewCIStr("test_add_index") + tbl, err := is.TableByName(schemaName, tableName) + c.Assert(err, IsNil) + + // Split table to multi region. + s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, 100) sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) deletedKeys := make(map[int]struct{}) ticker := time.NewTicker(s.lease / 2) defer ticker.Stop() + ticker2 := time.NewTicker(10 * time.Millisecond) + defer ticker2.Stop() + startReorganization := false + LOOP: for { select { - case err := <-done: + case err = <-done: if err == nil { break LOOP } @@ -620,6 +632,21 @@ LOOP: s.mustExec(c, sql) } num += step + case <-ticker2.C: + if !startReorganization { + is = s.dom.InfoSchema() + tbl, err = is.TableByName(schemaName, tableName) + c.Assert(err, IsNil) + for _, idx := range tbl.Meta().Indices { + if idx.Name.L == "c3_index" && idx.State == model.StateWriteReorganization { + startReorganization = true + } + } + } + if startReorganization { + workerCnt := rand.Intn(8) + 8 + s.mustExec(c, fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", workerCnt)) + } } } @@ -665,7 +692,7 @@ LOOP: t := s.testGetTable(c, "test_add_index") handles := make(map[int64]struct{}) startKey := t.RecordKey(math.MinInt64) - err := t.IterRecords(ctx, startKey, t.Cols(), + err = t.IterRecords(ctx, startKey, t.Cols(), func(h int64, data []types.Datum, cols []*table.Column) (bool, error) { handles[h] = struct{}{} return true, nil From c7fd5ee573f3ab4168ad75aac6be68c3446e5630 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 14 Nov 2018 21:31:14 +0800 Subject: [PATCH 04/16] refine test --- ddl/db_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index e572dd3d64b78..1604c702ecb60 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -558,6 +558,7 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri s.tk.MustExec("use " + s.schemaName) s.tk.MustExec("drop table if exists test_add_index") s.tk.MustExec(createTableSQL) + defer s.tk.MustExec("drop table test_add_index") done := make(chan error, 1) start := -10 @@ -731,8 +732,6 @@ LOOP: delete(handles, h) } c.Assert(handles, HasLen, 0) - - s.tk.MustExec("drop table test_add_index") } func (s *testDBSuite) TestDropIndex(c *C) { From b1cbd7c92bbea07cdc95c4da3f70ac4d39e73aef Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 14 Nov 2018 23:19:15 +0800 Subject: [PATCH 05/16] refine test --- ddl/db_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 1604c702ecb60..b349d42a21d03 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -558,7 +558,6 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri s.tk.MustExec("use " + s.schemaName) s.tk.MustExec("drop table if exists test_add_index") s.tk.MustExec(createTableSQL) - defer s.tk.MustExec("drop table test_add_index") done := make(chan error, 1) start := -10 @@ -595,8 +594,11 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri tbl, err := is.TableByName(schemaName, tableName) c.Assert(err, IsNil) - // Split table to multi region. - s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, 100) + if !testPartition { + // Split table to multi region. + s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, 100) + } + sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) deletedKeys := make(map[int]struct{}) @@ -732,6 +734,7 @@ LOOP: delete(handles, h) } c.Assert(handles, HasLen, 0) + s.tk.MustExec("drop table test_add_index") } func (s *testDBSuite) TestDropIndex(c *C) { From 4545ebc67fbbd46a259fec00350294390f5986bd Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 19 Nov 2018 17:00:34 +0800 Subject: [PATCH 06/16] shrink worker num if regions is less then worker num --- ddl/index.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index af36523ad5d25..a049ec5283d30 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1071,8 +1071,17 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, indexInfo *model. }() for { + kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle) + if err != nil { + return errors.Trace(err) + } + // For dynamic adjust add index worker number. workerCnt = variable.GetDDLReorgWorkerCounter() + // If only have 1 range, we can only start 1 worker. + if len(kvRanges) < int(workerCnt) { + workerCnt = int32(len(kvRanges)) + } for i := len(idxWorkers); i < int(workerCnt); i++ { sessCtx := newContext(reorgInfo.d.store) idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap) @@ -1086,11 +1095,6 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, indexInfo *model. closeAddIndexWorkers(workers) } - kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startHandle, endHandle) - if err != nil { - return errors.Trace(err) - } - log.Infof("[ddl-reorg] start to reorg index of %v region ranges, handle range:[%v, %v).", len(kvRanges), startHandle, endHandle) remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges) if err != nil { From 3033df19834372699639fd94a2f9ba41a316656c Mon Sep 17 00:00:00 2001 From: crazycs Date: Tue, 20 Nov 2018 19:27:53 +0800 Subject: [PATCH 07/16] add comment --- ddl/index.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ddl/index.go b/ddl/index.go index a049ec5283d30..0f5e45b892201 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1082,6 +1082,7 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, indexInfo *model. if len(kvRanges) < int(workerCnt) { workerCnt = int32(len(kvRanges)) } + // Enlarge the worker size. for i := len(idxWorkers); i < int(workerCnt); i++ { sessCtx := newContext(reorgInfo.d.store) idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap) @@ -1089,6 +1090,7 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, indexInfo *model. idxWorkers = append(idxWorkers, idxWorker) go idxWorkers[i].run(reorgInfo.d) } + // Shrink the worker size. if len(idxWorkers) > int(workerCnt) { workers := idxWorkers[workerCnt:] idxWorkers = idxWorkers[:workerCnt] From fb85c9b08687f2b7f772acd9a178a183f574adf8 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 5 Dec 2018 19:48:03 +0800 Subject: [PATCH 08/16] add test to check change add index worker num take effect. --- ddl/callback.go | 7 +++++++ ddl/callback_test.go | 20 ++++++++++++++----- ddl/db_test.go | 46 ++++++++++++++++++++++++++++++++++++++++---- ddl/index.go | 4 ++++ 4 files changed, 68 insertions(+), 9 deletions(-) diff --git a/ddl/callback.go b/ddl/callback.go index af2b4856e8c1d..e1461ebab085d 100644 --- a/ddl/callback.go +++ b/ddl/callback.go @@ -45,6 +45,8 @@ type Callback interface { OnJobUpdated(job *model.Job) // OnWatched is called after watching owner is completed. OnWatched(ctx context.Context) + // OnIndexWorkerReorgBefore is called before add index worker running reorganization. + OnIndexWorkerReorgBefore(workerNum, rangesNum int) } // BaseCallback implements Callback.OnChanged interface. @@ -70,3 +72,8 @@ func (c *BaseCallback) OnJobUpdated(job *model.Job) { func (c *BaseCallback) OnWatched(ctx context.Context) { // Nothing to do. } + +// OnIndexWorkerReorgBefore implements Callback.OnIndexWorkerReorgBefore interface. +func (c *BaseCallback) OnIndexWorkerReorgBefore(workerNum, rangesNum int) { + // Nothing to do. +} diff --git a/ddl/callback_test.go b/ddl/callback_test.go index 6066dfb535a86..ee7d5d21381db 100644 --- a/ddl/callback_test.go +++ b/ddl/callback_test.go @@ -40,11 +40,12 @@ func (ti *TestInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema type TestDDLCallback struct { *BaseCallback - onJobRunBefore func(*model.Job) - OnJobRunBeforeExported func(*model.Job) - onJobUpdated func(*model.Job) - OnJobUpdatedExported func(*model.Job) - onWatched func(ctx context.Context) + onJobRunBefore func(*model.Job) + OnJobRunBeforeExported func(*model.Job) + onJobUpdated func(*model.Job) + OnJobUpdatedExported func(*model.Job) + onWatched func(ctx context.Context) + OnIndexWorkerReorgBeforeExported func(workerNum, rangesNum int) } func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) { @@ -84,6 +85,15 @@ func (tc *TestDDLCallback) OnWatched(ctx context.Context) { tc.BaseCallback.OnWatched(ctx) } +func (tc *TestDDLCallback) OnIndexWorkerReorgBefore(workerNum, rangesNum int) { + if tc.OnIndexWorkerReorgBeforeExported != nil { + tc.OnIndexWorkerReorgBeforeExported(workerNum, rangesNum) + return + } + + tc.BaseCallback.OnIndexWorkerReorgBefore(workerNum, rangesNum) +} + func (s *testDDLSuite) TestCallback(c *C) { cb := &BaseCallback{} c.Assert(cb.OnChanged(nil), IsNil) diff --git a/ddl/db_test.go b/ddl/db_test.go index 9af6d1e43f299..67262188d7c09 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/mocktikv" "github.com/pingcap/tidb/table" @@ -657,11 +658,45 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri tbl, err := is.TableByName(schemaName, tableName) c.Assert(err, IsNil) + splitCount := 100 if !testPartition { // Split table to multi region. - s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, 100) + s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount) } + // set hook for check add index worker num + hook := &ddl.TestDDLCallback{} + oringDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter() + lastSetWorkerCnt := int(oringDDLAddIndexWorkerCnt) + defer variable.SetDDLReorgWorkerCounter(oringDDLAddIndexWorkerCnt) + // firstCheck is use to check split table range is take effect. + firstCheck := !testPartition + var checkErr error + changeWorkerNumEnable := false + hook.OnIndexWorkerReorgBeforeExported = func(workerNum, rangesNum int) { + if checkErr != nil { + return + } + // Check split table range is successful. + if firstCheck { + firstCheck = false + if rangesNum != splitCount { + checkErr = errors.Errorf("first check rangeNum, expect: %v, but got: %v", 100, rangesNum) + } + } + setNum := int(variable.GetDDLReorgWorkerCounter()) + if rangesNum < setNum { + if workerNum != rangesNum { + checkErr = errors.Errorf("rangeNum is %v, expect workerNum is: %v, but got: %v", rangesNum, rangesNum, workerNum) + } + } else if workerNum != setNum { + checkErr = errors.Errorf("rangeNum is %v, expect workerNum is: %v, but got: %v", rangesNum, setNum, workerNum) + } + changeWorkerNumEnable = true + } + s.dom.DDL().(ddl.DDLForTest).SetHook(hook) + defer s.dom.DDL().(ddl.DDLForTest).SetHook(&ddl.TestDDLCallback{}) + sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) deletedKeys := make(map[int]struct{}) @@ -709,12 +744,15 @@ LOOP: } } } - if startReorganization { - workerCnt := rand.Intn(8) + 8 - s.mustExec(c, fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", workerCnt)) + if startReorganization && changeWorkerNumEnable { + lastSetWorkerCnt = rand.Intn(8) + 8 + s.mustExec(c, fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt)) + c.Assert(checkErr, IsNil) + changeWorkerNumEnable = false } } } + c.Assert(checkErr, IsNil) // get exists keys keys := make([]int, 0, num) diff --git a/ddl/index.go b/ddl/index.go index 8d3cb6fe99f4e..fd1d08425690e 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1081,6 +1081,10 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, indexInfo *model. closeAddIndexWorkers(workers) } + reorgInfo.d.mu.Lock() + reorgInfo.d.mu.hook.OnIndexWorkerReorgBefore(len(idxWorkers), len(kvRanges)) + reorgInfo.d.mu.Unlock() + log.Infof("[ddl-reorg] start to reorg index of %v region ranges, handle range:[%v, %v).", len(kvRanges), startHandle, endHandle) remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges) if err != nil { From a2bdc3edf7d3631136440a2bfa47f10bf52135f3 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Wed, 5 Dec 2018 19:50:58 +0800 Subject: [PATCH 09/16] refine test --- ddl/db_test.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 67262188d7c09..1a4186eabaec2 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -705,7 +705,6 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri defer ticker.Stop() ticker2 := time.NewTicker(10 * time.Millisecond) defer ticker2.Stop() - startReorganization := false LOOP: for { @@ -734,17 +733,7 @@ LOOP: } num += step case <-ticker2.C: - if !startReorganization { - is = s.dom.InfoSchema() - tbl, err = is.TableByName(schemaName, tableName) - c.Assert(err, IsNil) - for _, idx := range tbl.Meta().Indices { - if idx.Name.L == "c3_index" && idx.State == model.StateWriteReorganization { - startReorganization = true - } - } - } - if startReorganization && changeWorkerNumEnable { + if changeWorkerNumEnable { lastSetWorkerCnt = rand.Intn(8) + 8 s.mustExec(c, fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt)) c.Assert(checkErr, IsNil) From 73cb88bbff28124787e8c83caea91c60e571fc53 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 7 Dec 2018 21:03:17 +0800 Subject: [PATCH 10/16] add log --- ddl/index.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/index.go b/ddl/index.go index fd1d08425690e..996ec323494d1 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1085,7 +1085,7 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, indexInfo *model. reorgInfo.d.mu.hook.OnIndexWorkerReorgBefore(len(idxWorkers), len(kvRanges)) reorgInfo.d.mu.Unlock() - log.Infof("[ddl-reorg] start to reorg index of %v region ranges, handle range:[%v, %v).", len(kvRanges), startHandle, endHandle) + log.Infof("[ddl-reorg] start %d workers to reorg index of %v region ranges, handle range:[%v, %v).", len(idxWorkers), len(kvRanges), startHandle, endHandle) remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges) if err != nil { return errors.Trace(err) From c594dda487162110ef8b50f22e0a6db0b26482c4 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 17 Dec 2018 20:31:03 +0800 Subject: [PATCH 11/16] address comment --- ddl/db_test.go | 3 ++- ddl/index.go | 41 +++++++++++++++++------------------------ 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 1fecbd517186b..eeb5cdb5ba1c1 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -791,8 +791,9 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri } changeWorkerNumEnable = true } + originHook := s.dom.DDL().(ddl.DDLForTest).GetHook() + defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) s.dom.DDL().(ddl.DDLForTest).SetHook(hook) - defer s.dom.DDL().(ddl.DDLForTest).SetHook(&ddl.TestDDLCallback{}) sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) diff --git a/ddl/index.go b/ddl/index.go index a0fe68f855dd5..04a11cc7f4170 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1046,9 +1046,23 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*addIndexWorker return nil, nil } -// buildIndexForReorgInfo build backfilling tasks from [reorgInfo.StartHandle, reorgInfo.EndHandle), -// and send these tasks to add index workers, till we finish adding the indices. -func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, indexInfo *model.IndexInfo, job *model.Job, reorgInfo *reorgInfo) error { +// addPhysicalTableIndex handles the add index reorganization state for a non-partitioned table or a partition. +// For a partitioned table, it should be handled partition by partition. +// +// How to add index in reorganization state? +// Concurrently process the defaultTaskHandleCnt tasks. Each task deals with a handle range of the index record. +// The handle range is split from PD regions now. Each worker deal with a region table key range one time. +// Each handle range by estimation, concurrent processing needs to perform after the handle range has been acquired. +// The operation flow is as follows: +// 1. Open numbers of defaultWorkers goroutines. +// 2. Split table key range from PD regions. +// 3. Send tasks to running workers by workers's task channel. Each task deals with a region key ranges. +// 4. Wait all these running tasks finished, then continue to step 3, until all tasks is done. +// The above operations are completed in a transaction. +// Finally, update the concurrent processing of the total number of rows, and store the completed handle value. +func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error { + job := reorgInfo.Job + log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo) totalAddedCount := job.GetRowCount() startHandle, endHandle := reorgInfo.StartHandle, reorgInfo.EndHandle @@ -1113,27 +1127,6 @@ func (w *worker) buildIndexForReorgInfo(t table.PhysicalTable, indexInfo *model. return nil } -// addPhysicalTableIndex handles the add index reorganization state for a non-partitioned table or a partition. -// For a partitioned table, it should be handled partition by partition. -// -// How to add index in reorganization state? -// Concurrently process the defaultTaskHandleCnt tasks. Each task deals with a handle range of the index record. -// The handle range is split from PD regions now. Each worker deal with a region table key range one time. -// Each handle range by estimation, concurrent processing needs to perform after the handle range has been acquired. -// The operation flow is as follows: -// 1. Open numbers of defaultWorkers goroutines. -// 2. Split table key range from PD regions. -// 3. Send tasks to running workers by workers's task channel. Each task deals with a region key ranges. -// 4. Wait all these running tasks finished, then continue to step 3, until all tasks is done. -// The above operations are completed in a transaction. -// Finally, update the concurrent processing of the total number of rows, and store the completed handle value. -func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error { - job := reorgInfo.Job - log.Infof("[ddl-reorg] addTableIndex, job:%s, reorgInfo:%#v", job, reorgInfo) - err := w.buildIndexForReorgInfo(t, indexInfo, job, reorgInfo) - return errors.Trace(err) -} - // addTableIndex handles the add index reorganization state for a table. func (w *worker) addTableIndex(t table.Table, idx *model.IndexInfo, reorgInfo *reorgInfo) error { var err error From 216c58f3699eb766371232006202eab208fc40a8 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 17 Dec 2018 20:45:27 +0800 Subject: [PATCH 12/16] fix test --- ddl/db_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 8fbdef4936b81..82de272ce6c69 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -640,7 +640,7 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri } changeWorkerNumEnable = true } - originHook := s.dom.DDL().(ddl.DDLForTest).GetHook() + originHook := s.dom.DDL().GetHook() defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) s.dom.DDL().(ddl.DDLForTest).SetHook(hook) From 77f49bd38b666980c80c79e1e1418fe36ec149e1 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 21 Dec 2018 11:05:55 +0800 Subject: [PATCH 13/16] use gofail test --- ddl/db_test.go | 190 +++++++++++++++++++++++++++++++++++++++++++------ ddl/index.go | 9 ++- 2 files changed, 175 insertions(+), 24 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 1ce4131c20e12..7e70b386a5c99 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -27,6 +27,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" + gofail "github.com/pingcap/gofail/runtime" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" tmysql "github.com/pingcap/parser/mysql" @@ -609,6 +610,162 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri s.mustExec(c, sql) otherKeys = append(otherKeys, v) + sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) + + deletedKeys := make(map[int]struct{}) + + ticker := time.NewTicker(s.lease / 2) + defer ticker.Stop() +LOOP: + for { + select { + case err := <-done: + if err == nil { + break LOOP + } + c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err))) + case <-ticker.C: + // When the server performance is particularly poor, + // the adding index operation can not be completed. + // So here is a limit to the number of rows inserted. + if num > defaultBatchSize*10 { + break + } + step := 10 + // delete some rows, and add some data + for i := num; i < num+step; i++ { + n := rand.Intn(num) + deletedKeys[n] = struct{}{} + sql := fmt.Sprintf("delete from test_add_index where c1 = %d", n) + s.mustExec(c, sql) + sql = fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) + s.mustExec(c, sql) + } + num += step + } + } + + // get exists keys + keys := make([]int, 0, num) + for i := start; i < num; i++ { + if _, ok := deletedKeys[i]; ok { + continue + } + keys = append(keys, i) + } + keys = append(keys, otherKeys...) + + // test index key + expectedRows := make([][]interface{}, 0, len(keys)) + for _, key := range keys { + expectedRows = append(expectedRows, []interface{}{key}) + } + rows := s.mustQuery(c, fmt.Sprintf("select c1 from test_add_index where c3 >= %d order by c1", start)) + matchRows(c, rows, expectedRows) + + if testPartition { + s.tk.MustExec("admin check table test_add_index") + return + } + + // test index range + for i := 0; i < 100; i++ { + index := rand.Intn(len(keys) - 3) + rows := s.mustQuery(c, "select c1 from test_add_index where c3 >= ? limit 3", keys[index]) + matchRows(c, rows, [][]interface{}{{keys[index]}, {keys[index+1]}, {keys[index+2]}}) + } + + // TODO: Support explain in future. + // rows := s.mustQuery(c, "explain select c1 from test_add_index where c3 >= 100") + + // ay := dumpRows(c, rows) + // c.Assert(strings.Contains(fmt.Sprintf("%v", ay), "c3_index"), IsTrue) + + // get all row handles + ctx := s.s.(sessionctx.Context) + c.Assert(ctx.NewTxn(context.Background()), IsNil) + t := s.testGetTable(c, "test_add_index") + handles := make(map[int64]struct{}) + startKey := t.RecordKey(math.MinInt64) + err := t.IterRecords(ctx, startKey, t.Cols(), + func(h int64, data []types.Datum, cols []*table.Column) (bool, error) { + handles[h] = struct{}{} + return true, nil + }) + c.Assert(err, IsNil) + + // check in index + var nidx table.Index + for _, tidx := range t.Indices() { + if tidx.Meta().Name.L == "c3_index" { + nidx = tidx + break + } + } + // Make sure there is index with name c3_index. + c.Assert(nidx, NotNil) + c.Assert(nidx.Meta().ID, Greater, int64(0)) + txn, err := ctx.Txn(true) + c.Assert(err, IsNil) + txn.Rollback() + + c.Assert(ctx.NewTxn(context.Background()), IsNil) + defer txn.Rollback() + + it, err := nidx.SeekFirst(txn) + c.Assert(err, IsNil) + defer it.Close() + + for { + _, h, err := it.Next() + if terror.ErrorEqual(err, io.EOF) { + break + } + + c.Assert(err, IsNil) + _, ok := handles[h] + c.Assert(ok, IsTrue) + delete(handles, h) + } + c.Assert(handles, HasLen, 0) + s.tk.MustExec("drop table test_add_index") +} + +func (s *testDBSuite) TestAddIndexWorkerNum(c *C) { + s.tk = testkit.NewTestKit(c, s.store) + s.tk.MustExec("use " + s.schemaName) + s.tk.MustExec("drop table if exists test_add_index") + s.tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))") + + done := make(chan error, 1) + start := -10 + num := defaultBatchSize + // first add some rows + for i := start; i < num; i++ { + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) + s.mustExec(c, sql) + } + // Add some discrete rows. + maxBatch := 20 + batchCnt := 100 + otherKeys := make([]int, 0, batchCnt*maxBatch) + // Make sure there are no duplicate keys. + base := defaultBatchSize * 20 + for i := 1; i < batchCnt; i++ { + n := base + i*defaultBatchSize + i + for j := 0; j < rand.Intn(maxBatch); j++ { + n += j + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", n, n, n) + s.mustExec(c, sql) + otherKeys = append(otherKeys, n) + } + } + // Encounter the value of math.MaxInt64 in middle of + v := math.MaxInt64 - defaultBatchSize/2 + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", v, v, v) + s.mustExec(c, sql) + otherKeys = append(otherKeys, v) + is := s.dom.InfoSchema() schemaName := model.NewCIStr(s.schemaName) tableName := model.NewCIStr("test_add_index") @@ -616,20 +773,22 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri c.Assert(err, IsNil) splitCount := 100 - if !testPartition { - // Split table to multi region. - s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount) - } + // Split table to multi region. + s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount) // set hook for check add index worker num + gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorker", `return(true)`) + defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorker") + hook := &ddl.TestDDLCallback{} oringDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter() lastSetWorkerCnt := int(oringDDLAddIndexWorkerCnt) defer variable.SetDDLReorgWorkerCounter(oringDDLAddIndexWorkerCnt) // firstCheck is use to check split table range is take effect. - firstCheck := !testPartition + firstCheck := true var checkErr error - changeWorkerNumEnable := false + var changeWorkerNumEnable = int32(0) + hook.OnIndexWorkerReorgBeforeExported = func(workerNum, rangesNum int) { if checkErr != nil { return @@ -649,7 +808,7 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri } else if workerNum != setNum { checkErr = errors.Errorf("rangeNum is %v, expect workerNum is: %v, but got: %v", rangesNum, setNum, workerNum) } - changeWorkerNumEnable = true + atomic.StoreInt32(&changeWorkerNumEnable, 1) } originHook := s.dom.DDL().GetHook() defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) @@ -691,15 +850,16 @@ LOOP: } num += step case <-ticker2.C: - if changeWorkerNumEnable { + if atomic.LoadInt32(&changeWorkerNumEnable) == 1 { lastSetWorkerCnt = rand.Intn(8) + 8 s.mustExec(c, fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt)) c.Assert(checkErr, IsNil) - changeWorkerNumEnable = false + atomic.StoreInt32(&changeWorkerNumEnable, 0) } } } c.Assert(checkErr, IsNil) + c.Assert(firstCheck, IsFalse) // get exists keys keys := make([]int, 0, num) @@ -719,11 +879,6 @@ LOOP: rows := s.mustQuery(c, fmt.Sprintf("select c1 from test_add_index where c3 >= %d order by c1", start)) matchRows(c, rows, expectedRows) - if testPartition { - s.tk.MustExec("admin check table test_add_index") - return - } - // test index range for i := 0; i < 100; i++ { index := rand.Intn(len(keys) - 3) @@ -731,13 +886,6 @@ LOOP: matchRows(c, rows, [][]interface{}{{keys[index]}, {keys[index+1]}, {keys[index+2]}}) } - // TODO: Support explain in future. - // rows := s.mustQuery(c, "explain select c1 from test_add_index where c3 >= 100") - - // ay := dumpRows(c, rows) - // c.Assert(strings.Contains(fmt.Sprintf("%v", ay), "c3_index"), IsTrue) - - // get all row handles ctx := s.s.(sessionctx.Context) c.Assert(ctx.NewTxn(context.Background()), IsNil) t := s.testGetTable(c, "test_add_index") diff --git a/ddl/index.go b/ddl/index.go index 544c47d5d7d12..271f3d3aab54e 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1106,9 +1106,12 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.I closeAddIndexWorkers(workers) } - reorgInfo.d.mu.Lock() - reorgInfo.d.mu.hook.OnIndexWorkerReorgBefore(len(idxWorkers), len(kvRanges)) - reorgInfo.d.mu.Unlock() + // gofail: var checkIndexWorker bool + //if checkIndexWorker { + // reorgInfo.d.mu.Lock() + // reorgInfo.d.mu.hook.OnIndexWorkerReorgBefore(len(idxWorkers), len(kvRanges)) + // reorgInfo.d.mu.Unlock() + //} log.Infof("[ddl-reorg] start %d workers to reorg index of %v region ranges, handle range:[%v, %v).", len(idxWorkers), len(kvRanges), startHandle, endHandle) remains, err := w.sendRangeTaskToWorkers(t, idxWorkers, reorgInfo, &totalAddedCount, kvRanges) From d60f0650c63dec2dca96a659b3e10132e8c0d85f Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 21 Dec 2018 19:25:02 +0800 Subject: [PATCH 14/16] refine gofail test and remove hook --- ddl/callback.go | 7 -- ddl/callback_test.go | 20 +--- ddl/db_test.go | 206 ----------------------------------- ddl/failtest/fail_db_test.go | 116 ++++++++++++++++++++ ddl/index.go | 27 ++++- 5 files changed, 142 insertions(+), 234 deletions(-) diff --git a/ddl/callback.go b/ddl/callback.go index e1461ebab085d..af2b4856e8c1d 100644 --- a/ddl/callback.go +++ b/ddl/callback.go @@ -45,8 +45,6 @@ type Callback interface { OnJobUpdated(job *model.Job) // OnWatched is called after watching owner is completed. OnWatched(ctx context.Context) - // OnIndexWorkerReorgBefore is called before add index worker running reorganization. - OnIndexWorkerReorgBefore(workerNum, rangesNum int) } // BaseCallback implements Callback.OnChanged interface. @@ -72,8 +70,3 @@ func (c *BaseCallback) OnJobUpdated(job *model.Job) { func (c *BaseCallback) OnWatched(ctx context.Context) { // Nothing to do. } - -// OnIndexWorkerReorgBefore implements Callback.OnIndexWorkerReorgBefore interface. -func (c *BaseCallback) OnIndexWorkerReorgBefore(workerNum, rangesNum int) { - // Nothing to do. -} diff --git a/ddl/callback_test.go b/ddl/callback_test.go index ee7d5d21381db..6066dfb535a86 100644 --- a/ddl/callback_test.go +++ b/ddl/callback_test.go @@ -40,12 +40,11 @@ func (ti *TestInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema type TestDDLCallback struct { *BaseCallback - onJobRunBefore func(*model.Job) - OnJobRunBeforeExported func(*model.Job) - onJobUpdated func(*model.Job) - OnJobUpdatedExported func(*model.Job) - onWatched func(ctx context.Context) - OnIndexWorkerReorgBeforeExported func(workerNum, rangesNum int) + onJobRunBefore func(*model.Job) + OnJobRunBeforeExported func(*model.Job) + onJobUpdated func(*model.Job) + OnJobUpdatedExported func(*model.Job) + onWatched func(ctx context.Context) } func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) { @@ -85,15 +84,6 @@ func (tc *TestDDLCallback) OnWatched(ctx context.Context) { tc.BaseCallback.OnWatched(ctx) } -func (tc *TestDDLCallback) OnIndexWorkerReorgBefore(workerNum, rangesNum int) { - if tc.OnIndexWorkerReorgBeforeExported != nil { - tc.OnIndexWorkerReorgBeforeExported(workerNum, rangesNum) - return - } - - tc.BaseCallback.OnIndexWorkerReorgBefore(workerNum, rangesNum) -} - func (s *testDDLSuite) TestCallback(c *C) { cb := &BaseCallback{} c.Assert(cb.OnChanged(nil), IsNil) diff --git a/ddl/db_test.go b/ddl/db_test.go index 7e70b386a5c99..261651b7dff49 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -27,7 +27,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" - gofail "github.com/pingcap/gofail/runtime" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" tmysql "github.com/pingcap/parser/mysql" @@ -39,7 +38,6 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/mocktikv" "github.com/pingcap/tidb/table" @@ -731,210 +729,6 @@ LOOP: s.tk.MustExec("drop table test_add_index") } -func (s *testDBSuite) TestAddIndexWorkerNum(c *C) { - s.tk = testkit.NewTestKit(c, s.store) - s.tk.MustExec("use " + s.schemaName) - s.tk.MustExec("drop table if exists test_add_index") - s.tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))") - - done := make(chan error, 1) - start := -10 - num := defaultBatchSize - // first add some rows - for i := start; i < num; i++ { - sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) - s.mustExec(c, sql) - } - // Add some discrete rows. - maxBatch := 20 - batchCnt := 100 - otherKeys := make([]int, 0, batchCnt*maxBatch) - // Make sure there are no duplicate keys. - base := defaultBatchSize * 20 - for i := 1; i < batchCnt; i++ { - n := base + i*defaultBatchSize + i - for j := 0; j < rand.Intn(maxBatch); j++ { - n += j - sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", n, n, n) - s.mustExec(c, sql) - otherKeys = append(otherKeys, n) - } - } - // Encounter the value of math.MaxInt64 in middle of - v := math.MaxInt64 - defaultBatchSize/2 - sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", v, v, v) - s.mustExec(c, sql) - otherKeys = append(otherKeys, v) - - is := s.dom.InfoSchema() - schemaName := model.NewCIStr(s.schemaName) - tableName := model.NewCIStr("test_add_index") - tbl, err := is.TableByName(schemaName, tableName) - c.Assert(err, IsNil) - - splitCount := 100 - // Split table to multi region. - s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount) - - // set hook for check add index worker num - gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorker", `return(true)`) - defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorker") - - hook := &ddl.TestDDLCallback{} - oringDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter() - lastSetWorkerCnt := int(oringDDLAddIndexWorkerCnt) - defer variable.SetDDLReorgWorkerCounter(oringDDLAddIndexWorkerCnt) - // firstCheck is use to check split table range is take effect. - firstCheck := true - var checkErr error - var changeWorkerNumEnable = int32(0) - - hook.OnIndexWorkerReorgBeforeExported = func(workerNum, rangesNum int) { - if checkErr != nil { - return - } - // Check split table range is successful. - if firstCheck { - firstCheck = false - if rangesNum != splitCount { - checkErr = errors.Errorf("first check rangeNum, expect: %v, but got: %v", 100, rangesNum) - } - } - setNum := int(variable.GetDDLReorgWorkerCounter()) - if rangesNum < setNum { - if workerNum != rangesNum { - checkErr = errors.Errorf("rangeNum is %v, expect workerNum is: %v, but got: %v", rangesNum, rangesNum, workerNum) - } - } else if workerNum != setNum { - checkErr = errors.Errorf("rangeNum is %v, expect workerNum is: %v, but got: %v", rangesNum, setNum, workerNum) - } - atomic.StoreInt32(&changeWorkerNumEnable, 1) - } - originHook := s.dom.DDL().GetHook() - defer s.dom.DDL().(ddl.DDLForTest).SetHook(originHook) - s.dom.DDL().(ddl.DDLForTest).SetHook(hook) - - sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) - - deletedKeys := make(map[int]struct{}) - - ticker := time.NewTicker(s.lease / 2) - defer ticker.Stop() - ticker2 := time.NewTicker(10 * time.Millisecond) - defer ticker2.Stop() - -LOOP: - for { - select { - case err = <-done: - if err == nil { - break LOOP - } - c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err))) - case <-ticker.C: - // When the server performance is particularly poor, - // the adding index operation can not be completed. - // So here is a limit to the number of rows inserted. - if num > defaultBatchSize*10 { - break - } - step := 10 - // delete some rows, and add some data - for i := num; i < num+step; i++ { - n := rand.Intn(num) - deletedKeys[n] = struct{}{} - sql := fmt.Sprintf("delete from test_add_index where c1 = %d", n) - s.mustExec(c, sql) - sql = fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) - s.mustExec(c, sql) - } - num += step - case <-ticker2.C: - if atomic.LoadInt32(&changeWorkerNumEnable) == 1 { - lastSetWorkerCnt = rand.Intn(8) + 8 - s.mustExec(c, fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt)) - c.Assert(checkErr, IsNil) - atomic.StoreInt32(&changeWorkerNumEnable, 0) - } - } - } - c.Assert(checkErr, IsNil) - c.Assert(firstCheck, IsFalse) - - // get exists keys - keys := make([]int, 0, num) - for i := start; i < num; i++ { - if _, ok := deletedKeys[i]; ok { - continue - } - keys = append(keys, i) - } - keys = append(keys, otherKeys...) - - // test index key - expectedRows := make([][]interface{}, 0, len(keys)) - for _, key := range keys { - expectedRows = append(expectedRows, []interface{}{key}) - } - rows := s.mustQuery(c, fmt.Sprintf("select c1 from test_add_index where c3 >= %d order by c1", start)) - matchRows(c, rows, expectedRows) - - // test index range - for i := 0; i < 100; i++ { - index := rand.Intn(len(keys) - 3) - rows := s.mustQuery(c, "select c1 from test_add_index where c3 >= ? limit 3", keys[index]) - matchRows(c, rows, [][]interface{}{{keys[index]}, {keys[index+1]}, {keys[index+2]}}) - } - - ctx := s.s.(sessionctx.Context) - c.Assert(ctx.NewTxn(context.Background()), IsNil) - t := s.testGetTable(c, "test_add_index") - handles := make(map[int64]struct{}) - startKey := t.RecordKey(math.MinInt64) - err = t.IterRecords(ctx, startKey, t.Cols(), - func(h int64, data []types.Datum, cols []*table.Column) (bool, error) { - handles[h] = struct{}{} - return true, nil - }) - c.Assert(err, IsNil) - - // check in index - var nidx table.Index - for _, tidx := range t.Indices() { - if tidx.Meta().Name.L == "c3_index" { - nidx = tidx - break - } - } - // Make sure there is index with name c3_index. - c.Assert(nidx, NotNil) - c.Assert(nidx.Meta().ID, Greater, int64(0)) - txn, err := ctx.Txn(true) - c.Assert(err, IsNil) - txn.Rollback() - - c.Assert(ctx.NewTxn(context.Background()), IsNil) - defer txn.Rollback() - - it, err := nidx.SeekFirst(txn) - c.Assert(err, IsNil) - defer it.Close() - - for { - _, h, err := it.Next() - if terror.ErrorEqual(err, io.EOF) { - break - } - - c.Assert(err, IsNil) - _, ok := handles[h] - c.Assert(ok, IsTrue) - delete(handles, h) - } - c.Assert(handles, HasLen, 0) - s.tk.MustExec("drop table test_add_index") -} - func (s *testDBSuite) TestDropIndex(c *C) { s.tk = testkit.NewTestKit(c, s.store) s.tk.MustExec("use " + s.schemaName) diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index bcb2d21d914e6..ff28212948261 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -16,8 +16,12 @@ package ddl_test import ( "context" "fmt" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/sessionctx/variable" + "math" "math/rand" "os" + "sync/atomic" "testing" "time" @@ -311,3 +315,115 @@ func (s *testFailDBSuite) TestGenGlobalIDFail(c *C) { tk.MustExec("admin check table t1") tk.MustExec("admin check table t2") } + +func (s *testFailDBSuite) TestAddIndexWorkerNum(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("create database if not exists test_db") + tk.MustExec("use test_db") + tk.MustExec("drop table if exists test_add_index") + tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))") + + done := make(chan error, 1) + start := -10 + defaultBatchSize := 2048 + num := defaultBatchSize + // first add some rows + for i := start; i < num; i++ { + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) + tk.MustExec(sql) + } + // Add some discrete rows. + maxBatch := 20 + batchCnt := 100 + otherKeys := make([]int, 0, batchCnt*maxBatch) + // Make sure there are no duplicate keys. + base := defaultBatchSize * 20 + for i := 1; i < batchCnt; i++ { + n := base + i*defaultBatchSize + i + for j := 0; j < rand.Intn(maxBatch); j++ { + n += j + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", n, n, n) + tk.MustExec(sql) + otherKeys = append(otherKeys, n) + } + } + // Encounter the value of math.MaxInt64 in middle of + v := math.MaxInt64 - defaultBatchSize/2 + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", v, v, v) + tk.MustExec(sql) + otherKeys = append(otherKeys, v) + + is := s.dom.InfoSchema() + schemaName := model.NewCIStr("test_db") + tableName := model.NewCIStr("test_add_index") + tbl, err := is.TableByName(schemaName, tableName) + c.Assert(err, IsNil) + + splitCount := 100 + // Split table to multi region. + s.cluster.SplitTable(s.mvccStore, tbl.Meta().ID, splitCount) + + originDDLAddIndexWorkerCnt := variable.GetDDLReorgWorkerCounter() + lastSetWorkerCnt := originDDLAddIndexWorkerCnt + atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt) + ddl.TestCheckWorkerNumber = lastSetWorkerCnt + defer variable.SetDDLReorgWorkerCounter(originDDLAddIndexWorkerCnt) + + gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum", `return(true)`) + defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum") + + sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) + + checkNum := 0 + +LOOP: + for { + select { + case err = <-done: + if err == nil { + break LOOP + } + c.Assert(err, IsNil, Commentf("err:%v", errors.ErrorStack(err))) + case <-ddl.TestCheckWorkerNumCh: + lastSetWorkerCnt = int32(rand.Intn(8) + 8) + tk.MustExec(fmt.Sprintf("set @@tidb_ddl_reorg_worker_cnt=%d", lastSetWorkerCnt)) + atomic.StoreInt32(&ddl.TestCheckWorkerNumber, lastSetWorkerCnt) + checkNum++ + } + } + c.Assert(checkNum, Greater, 5) + tk.MustExec("admin check table test_add_index") + tk.MustExec("drop table test_add_index") +} + +func sessionExecInGoroutine(c *C, s kv.Storage, sql string, done chan error) { + execMultiSQLInGoroutine(c, s, "test_db", []string{sql}, done) +} + +func execMultiSQLInGoroutine(c *C, s kv.Storage, dbName string, multiSQL []string, done chan error) { + go func() { + se, err := session.CreateSession4Test(s) + if err != nil { + done <- errors.Trace(err) + return + } + defer se.Close() + _, err = se.Execute(context.Background(), "use "+dbName) + if err != nil { + done <- errors.Trace(err) + return + } + for _, sql := range multiSQL { + rs, err := se.Execute(context.Background(), sql) + if err != nil { + done <- errors.Trace(err) + return + } + if rs != nil { + done <- errors.Errorf("RecordSet should be empty.") + return + } + done <- nil + } + }() +} diff --git a/ddl/index.go b/ddl/index.go index 271f3d3aab54e..d87c7107c09ca 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -823,7 +823,7 @@ func (w *addIndexWorker) handleBackfillTask(d *ddlCtx, task *reorgIndexTask) *ad if task.endIncluded { rightParenthesis = "]" } - log.Infof("[ddl-reorg] worker(%v), finish region %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v", + log.Infof("[ddl-reorg] worker(%v), finish table %v ranges [%v,%v%s, addedCount:%v, scanCount:%v, nextHandle:%v, elapsed time(s):%v", w.id, task.physicalTableID, task.startHandle, task.endHandle, rightParenthesis, result.addedCount, result.scanCount, result.nextHandle, time.Since(startTime).Seconds()) return result @@ -1046,6 +1046,13 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*addIndexWorker return nil, nil } +var ( + // TestCheckWorkerNumCh use for test adjust add index worker. + TestCheckWorkerNumCh = make(chan struct{}, 0) + // TestCheckWorkerNumber use for test adjust add index worker. + TestCheckWorkerNumber = int32(16) +) + // addPhysicalTableIndex handles the add index reorganization state for a non-partitioned table or a partition. // For a partitioned table, it should be handled partition by partition. // @@ -1106,11 +1113,19 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.I closeAddIndexWorkers(workers) } - // gofail: var checkIndexWorker bool - //if checkIndexWorker { - // reorgInfo.d.mu.Lock() - // reorgInfo.d.mu.hook.OnIndexWorkerReorgBefore(len(idxWorkers), len(kvRanges)) - // reorgInfo.d.mu.Unlock() + // gofail: var checkIndexWorkerNum bool + //if checkIndexWorkerNum { + // num := int(atomic.LoadInt32(&TestCheckWorkerNumber)) + // if num != 0 { + // if num > len(kvRanges) { + // if len(idxWorkers) != len(kvRanges) { + // return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers)) + // } + // } else if num != len(idxWorkers) { + // return errors.Errorf("check index worker num error, len kv ranges is: %v, check index worker num is: %v, actual index num is: %v", len(kvRanges), num, len(idxWorkers)) + // } + // TestCheckWorkerNumCh <- struct{}{} + // } //} log.Infof("[ddl-reorg] start %d workers to reorg index of %v region ranges, handle range:[%v, %v).", len(idxWorkers), len(kvRanges), startHandle, endHandle) From 50470ad935b4ac49fe1f791aeb600d9759b0424f Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Fri, 21 Dec 2018 19:47:53 +0800 Subject: [PATCH 15/16] refine test --- ddl/db_partition_test.go | 3 +- ddl/db_test.go | 42 +++-------------------- ddl/failtest/fail_db_test.go | 64 +++--------------------------------- util/testutil/testutil.go | 37 +++++++++++++++++++++ 4 files changed, 49 insertions(+), 97 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 78629e4dbc073..606aa6fa67063 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -38,6 +38,7 @@ import ( "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testkit" + "github.com/pingcap/tidb/util/testutil" ) func (s *testIntegrationSuite) TestCreateTableWithPartition(c *C) { @@ -952,7 +953,7 @@ func (s *testIntegrationSuite) TestPartitionDropIndex(c *C) { } c.Assert(idx1, NotNil) - sessionExecInGoroutine(c, s.store, "drop index idx1 on partition_drop_idx;", done) + testutil.SessionExecInGoroutine(c, s.store, "drop index idx1 on partition_drop_idx;", done) ticker := time.NewTicker(s.lease / 2) defer ticker.Stop() LOOP: diff --git a/ddl/db_test.go b/ddl/db_test.go index 261651b7dff49..6a63a181ff67e 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -608,7 +608,7 @@ func (s *testDBSuite) testAddIndex(c *C, testPartition bool, createTableSQL stri s.mustExec(c, sql) otherKeys = append(otherKeys, v) - sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) + testutil.SessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) deletedKeys := make(map[int]struct{}) @@ -753,7 +753,7 @@ func (s *testDBSuite) TestDropIndex(c *C) { } c.Assert(c3idx, NotNil) - sessionExecInGoroutine(c, s.store, "drop index c3_index on test_drop_index", done) + testutil.SessionExecInGoroutine(c, s.store, "drop index c3_index on test_drop_index", done) ticker := time.NewTicker(s.lease / 2) defer ticker.Stop() @@ -915,38 +915,6 @@ func sessionExec(c *C, s kv.Storage, sql string) { se.Close() } -func sessionExecInGoroutine(c *C, s kv.Storage, sql string, done chan error) { - execMultiSQLInGoroutine(c, s, "test_db", []string{sql}, done) -} - -func execMultiSQLInGoroutine(c *C, s kv.Storage, dbName string, multiSQL []string, done chan error) { - go func() { - se, err := session.CreateSession4Test(s) - if err != nil { - done <- errors.Trace(err) - return - } - defer se.Close() - _, err = se.Execute(context.Background(), "use "+dbName) - if err != nil { - done <- errors.Trace(err) - return - } - for _, sql := range multiSQL { - rs, err := se.Execute(context.Background(), sql) - if err != nil { - done <- errors.Trace(err) - return - } - if rs != nil { - done <- errors.Errorf("RecordSet should be empty.") - return - } - done <- nil - } - }() -} - func (s *testDBSuite) testAddColumn(c *C) { done := make(chan error, 1) @@ -956,7 +924,7 @@ func (s *testDBSuite) testAddColumn(c *C) { s.mustExec(c, "insert into t2 values (?, ?, ?)", i, i, i) } - sessionExecInGoroutine(c, s.store, "alter table t2 add column c4 int default -1", done) + testutil.SessionExecInGoroutine(c, s.store, "alter table t2 add column c4 int default -1", done) ticker := time.NewTicker(s.lease / 2) defer ticker.Stop() @@ -1091,7 +1059,7 @@ func (s *testDBSuite) testDropColumn(c *C) { } // get c4 column id - sessionExecInGoroutine(c, s.store, "alter table t2 drop column c4", done) + testutil.SessionExecInGoroutine(c, s.store, "alter table t2 drop column c4", done) ticker := time.NewTicker(s.lease / 2) defer ticker.Stop() @@ -1516,7 +1484,7 @@ func (s *testDBSuite) TestAddNotNullColumn(c *C) { s.tk.MustExec("create table tnn (c1 int primary key auto_increment, c2 int)") s.tk.MustExec("insert tnn (c2) values (0)" + strings.Repeat(",(0)", 99)) done := make(chan error, 1) - sessionExecInGoroutine(c, s.store, "alter table tnn add column c3 int not null default 3", done) + testutil.SessionExecInGoroutine(c, s.store, "alter table tnn add column c3 int not null default 3", done) updateCnt := 0 out: for { diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index ff28212948261..552f00d22fa0d 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -16,9 +16,6 @@ package ddl_test import ( "context" "fmt" - "github.com/pingcap/errors" - "github.com/pingcap/tidb/sessionctx/variable" - "math" "math/rand" "os" "sync/atomic" @@ -26,6 +23,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" gofail "github.com/pingcap/gofail/runtime" "github.com/pingcap/parser" "github.com/pingcap/parser/model" @@ -33,11 +31,13 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/store/mockstore/mocktikv" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" + "github.com/pingcap/tidb/util/testutil" ) func TestT(t *testing.T) { @@ -325,33 +325,12 @@ func (s *testFailDBSuite) TestAddIndexWorkerNum(c *C) { done := make(chan error, 1) start := -10 - defaultBatchSize := 2048 - num := defaultBatchSize + num := 4096 // first add some rows for i := start; i < num; i++ { sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", i, i, i) tk.MustExec(sql) } - // Add some discrete rows. - maxBatch := 20 - batchCnt := 100 - otherKeys := make([]int, 0, batchCnt*maxBatch) - // Make sure there are no duplicate keys. - base := defaultBatchSize * 20 - for i := 1; i < batchCnt; i++ { - n := base + i*defaultBatchSize + i - for j := 0; j < rand.Intn(maxBatch); j++ { - n += j - sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", n, n, n) - tk.MustExec(sql) - otherKeys = append(otherKeys, n) - } - } - // Encounter the value of math.MaxInt64 in middle of - v := math.MaxInt64 - defaultBatchSize/2 - sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", v, v, v) - tk.MustExec(sql) - otherKeys = append(otherKeys, v) is := s.dom.InfoSchema() schemaName := model.NewCIStr("test_db") @@ -372,8 +351,7 @@ func (s *testFailDBSuite) TestAddIndexWorkerNum(c *C) { gofail.Enable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum", `return(true)`) defer gofail.Disable("github.com/pingcap/tidb/ddl/checkIndexWorkerNum") - sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) - + testutil.SessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) checkNum := 0 LOOP: @@ -395,35 +373,3 @@ LOOP: tk.MustExec("admin check table test_add_index") tk.MustExec("drop table test_add_index") } - -func sessionExecInGoroutine(c *C, s kv.Storage, sql string, done chan error) { - execMultiSQLInGoroutine(c, s, "test_db", []string{sql}, done) -} - -func execMultiSQLInGoroutine(c *C, s kv.Storage, dbName string, multiSQL []string, done chan error) { - go func() { - se, err := session.CreateSession4Test(s) - if err != nil { - done <- errors.Trace(err) - return - } - defer se.Close() - _, err = se.Execute(context.Background(), "use "+dbName) - if err != nil { - done <- errors.Trace(err) - return - } - for _, sql := range multiSQL { - rs, err := se.Execute(context.Background(), sql) - if err != nil { - done <- errors.Trace(err) - return - } - if rs != nil { - done <- errors.Errorf("RecordSet should be empty.") - return - } - done <- nil - } - }() -} diff --git a/util/testutil/testutil.go b/util/testutil/testutil.go index 003756b0fd405..657a6ab3028f5 100644 --- a/util/testutil/testutil.go +++ b/util/testutil/testutil.go @@ -14,10 +14,14 @@ package testutil import ( + "context" "fmt" "strings" "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/types" ) @@ -108,3 +112,36 @@ func RowsWithSep(sep string, args ...string) [][]interface{} { } return rows } + +// SessionExecInGoroutine export for testing +func SessionExecInGoroutine(c *check.C, s kv.Storage, sql string, done chan error) { + execMultiSQLInGoroutine(c, s, "test_db", []string{sql}, done) +} + +func execMultiSQLInGoroutine(c *check.C, s kv.Storage, dbName string, multiSQL []string, done chan error) { + go func() { + se, err := session.CreateSession4Test(s) + if err != nil { + done <- errors.Trace(err) + return + } + defer se.Close() + _, err = se.Execute(context.Background(), "use "+dbName) + if err != nil { + done <- errors.Trace(err) + return + } + for _, sql := range multiSQL { + rs, err := se.Execute(context.Background(), sql) + if err != nil { + done <- errors.Trace(err) + return + } + if rs != nil { + done <- errors.Errorf("RecordSet should be empty.") + return + } + done <- nil + } + }() +} From 8b2ab5261801d871bb23c3bc327b9fd5cc6ccf6d Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Sat, 22 Dec 2018 16:25:48 +0800 Subject: [PATCH 16/16] add comment --- ddl/index.go | 2 +- ddl/testutil/testutil.go | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/ddl/index.go b/ddl/index.go index d87c7107c09ca..2dafb9b66ef2a 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1114,7 +1114,7 @@ func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.I } // gofail: var checkIndexWorkerNum bool - //if checkIndexWorkerNum { + // if checkIndexWorkerNum { // num := int(atomic.LoadInt32(&TestCheckWorkerNumber)) // if num != 0 { // if num > len(kvRanges) { diff --git a/ddl/testutil/testutil.go b/ddl/testutil/testutil.go index 1d9e224bc6854..c690b05af3108 100644 --- a/ddl/testutil/testutil.go +++ b/ddl/testutil/testutil.go @@ -1,3 +1,16 @@ +// Copyright 2018 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package testutil import (