Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl,domain: deal with discrete handles and close DDL when creating failed #5155

Merged
merged 2 commits into from Nov 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion ddl/ddl_db_test.go
Expand Up @@ -458,7 +458,7 @@ func (s *testDBSuite) testAlterLock(c *C) {
func (s *testDBSuite) TestAddIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
s.tk.MustExec("create table test_add_index (c1 int, c2 int, c3 int, primary key(c1))")
s.tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))")

done := make(chan error, 1)
start := -10
Expand All @@ -483,6 +483,11 @@ func (s *testDBSuite) TestAddIndex(c *C) {
otherKeys = append(otherKeys, n)
}
}
// Encounter the value of math.MaxInt64 in middle of
v := math.MaxInt64 - defaultBatchSize/2
sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", v, v, v)
s.mustExec(c, sql)
otherKeys = append(otherKeys, v)

sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)

Expand Down
53 changes: 44 additions & 9 deletions ddl/index.go
Expand Up @@ -473,7 +473,9 @@ func (w *worker) getIndexRecord(t table.Table, colMap map[int64]*types.FieldType
}

const (
minTaskHandledCnt = 32 // minTaskHandledCnt is the minimum number of handles per batch.
defaultTaskHandleCnt = 128
maxTaskHandleCnt = 1 << 20 // maxTaskHandleCnt is the maximum number of handles per batch.
defaultWorkers = 16
)

Expand All @@ -500,13 +502,15 @@ type worker struct {
idxRecords []*indexRecord // It's used to reduce the number of new slice.
taskRange handleInfo // Every task's handle range.
taskRet *taskResult
batchSize int
rowMap map[int64]types.Datum // It's the index column values map. It is used to reduce the number of making map.
}

func newWorker(ctx context.Context, id, batch, colsLen, indexColsLen int) *worker {
return &worker{
id: id,
ctx: ctx,
batchSize: batch,
idxRecords: make([]*indexRecord, 0, batch),
defaultVals: make([]types.Datum, colsLen),
rowMap: make(map[int64]types.Datum, indexColsLen),
Expand All @@ -524,6 +528,13 @@ type handleInfo struct {
endHandle int64
}

func getEndHandle(baseHandle, batch int64) int64 {
if baseHandle >= math.MaxInt64-batch {
return math.MaxInt64
}
return baseHandle + batch
}

// addTableIndex adds index into table.
// TODO: Move this to doc or wiki.
// How to add index in reorganization state?
Expand All @@ -550,25 +561,27 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
colMap[col.ID] = &col.FieldType
}
workerCnt := defaultWorkers
taskBatch := int64(defaultTaskHandleCnt)
addedCount := job.GetRowCount()
baseHandle := reorgInfo.Handle
baseHandle, logStartHandle := reorgInfo.Handle, reorgInfo.Handle

workers := make([]*worker, workerCnt)
for i := 0; i < workerCnt; i++ {
ctx := d.newContext()
workers[i] = newWorker(ctx, i, int(taskBatch), len(cols), len(colMap))
workers[i] = newWorker(ctx, i, defaultTaskHandleCnt, len(cols), len(colMap))
// Make sure every worker has its own index buffer.
workers[i].index = tables.NewIndexWithBuffer(t.Meta(), indexInfo)
}
for {
startTime := time.Now()
wg := sync.WaitGroup{}
currentBatchSize := int64(workers[0].batchSize)
for i := 0; i < workerCnt; i++ {
wg.Add(1)
workers[i].setTaskNewRange(baseHandle+int64(i)*taskBatch, baseHandle+int64(i+1)*taskBatch)
endHandle := getEndHandle(baseHandle, currentBatchSize)
workers[i].setTaskNewRange(baseHandle, endHandle)
// TODO: Consider one worker to one goroutine.
go workers[i].doBackfillIndexTask(t, colMap, &wg)
baseHandle = endHandle
}
wg.Wait()

Expand All @@ -583,36 +596,58 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
err1 := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
return errors.Trace(reorgInfo.UpdateHandle(txn, nextHandle))
})
log.Warnf("[ddl] total added index for %d rows, this task [%d,%d) add index for %d failed %v, take time %v, update handle err %v",
addedCount, baseHandle, nextHandle, taskAddedCount, err, sub, err1)
log.Warnf("[ddl] total added index for %d rows, this task [%d,%d) add index for %d failed %v, batch %d, take time %v, update handle err %v",
addedCount, logStartHandle, nextHandle, taskAddedCount, err, currentBatchSize, sub, err1)
return errors.Trace(err)
}
d.reorgCtx.setRowCountAndHandle(addedCount, nextHandle)
batchHandleDataHistogram.WithLabelValues(batchAddIdx).Observe(sub)
log.Infof("[ddl] total added index for %d rows, this task [%d,%d) added index for %d rows, take time %v",
addedCount, baseHandle, nextHandle, taskAddedCount, sub)
log.Infof("[ddl] total added index for %d rows, this task [%d,%d) added index for %d rows, batch %d, take time %v",
addedCount, logStartHandle, nextHandle, taskAddedCount, currentBatchSize, sub)

if isEnd {
return nil
}
baseHandle = nextHandle
baseHandle, logStartHandle = nextHandle, nextHandle
}
}

func getCountAndHandle(workers []*worker) (int64, int64, bool, error) {
taskAddedCount, nextHandle := int64(0), workers[0].taskRange.startHandle
var err error
var isEnd bool
starvingWorkers := 0
largerDefaultWorkers := 0
for _, worker := range workers {
ret := worker.taskRet
if ret.err != nil {
err = ret.err
break
}
taskAddedCount += int64(ret.count)
if ret.count < minTaskHandledCnt {
starvingWorkers++
} else if ret.count > defaultTaskHandleCnt {
largerDefaultWorkers++
}
nextHandle = ret.outOfRangeHandle
isEnd = ret.isAllDone
}

// Adjust the worker's batch size.
halfWorkers := len(workers) / 2
if starvingWorkers >= halfWorkers && workers[0].batchSize < maxTaskHandleCnt {
// If the index data is discrete, we need to increase the batch size to speed up.
for _, worker := range workers {
worker.batchSize *= 2
}
} else if largerDefaultWorkers >= halfWorkers && workers[0].batchSize > defaultTaskHandleCnt {
// If the batch size exceeds the limit after we increase it,
// we need to decrease the batch size to reduce write conflict.
for _, worker := range workers {
worker.batchSize /= 2
}
}
return taskAddedCount, nextHandle, isEnd, errors.Trace(err)
}

Expand Down
22 changes: 16 additions & 6 deletions domain/domain.go
Expand Up @@ -409,10 +409,10 @@ type EtcdBackend interface {
}

// NewDomain creates a new domain. Should not create multiple domains for the same store.
func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, factory pools.Factory, sysFactory func(*Domain) (pools.Resource, error)) (d *Domain, err error) {
func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, factory pools.Factory, sysFactory func(*Domain) (pools.Resource, error)) (*Domain, error) {
capacity := 200 // capacity of the sysSessionPool size
idleTimeout := 3 * time.Minute // sessions in the sysSessionPool will be recycled after idleTimeout
d = &Domain{
d := &Domain{
store: store,
SchemaValidator: NewSchemaValidator(ddlLease),
exit: make(chan struct{}),
Expand All @@ -422,8 +422,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio

if ebd, ok := store.(EtcdBackend); ok {
if addrs := ebd.EtcdAddrs(); addrs != nil {
var cli *clientv3.Client
cli, err = clientv3.New(clientv3.Config{
cli, err := clientv3.New(clientv3.Config{
Endpoints: addrs,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
Expand Down Expand Up @@ -453,11 +452,22 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
}
sysCtxPool := pools.NewResourcePool(sysFac, 2, 2, idleTimeout)
d.ddl = ddl.NewDDL(ctx, d.etcdClient, d.store, d.infoHandle, callback, ddlLease, sysCtxPool)
var err error
defer func() {
// Clean up domain when initializing syncer failed or reloading failed.
// If we don't clean it, there are some dirty data when retrying this function.
if err != nil {
d.Close()
log.Errorf("[ddl] new domain failed %v", errors.ErrorStack(errors.Trace(err)))
}
}()

if err = d.ddl.SchemaSyncer().Init(ctx); err != nil {
err = d.ddl.SchemaSyncer().Init(ctx)
if err != nil {
return nil, errors.Trace(err)
}
if err = d.Reload(); err != nil {
err = d.Reload()
if err != nil {
return nil, errors.Trace(err)
}

Expand Down
4 changes: 2 additions & 2 deletions table/tables/index_test.go
Expand Up @@ -33,6 +33,7 @@ type testIndexSuite struct {
}

func (s *testIndexSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
store, err := tikv.NewMockTikvStore()
c.Assert(err, IsNil)
s.s = store
Expand All @@ -41,10 +42,10 @@ func (s *testIndexSuite) SetUpSuite(c *C) {
func (s *testIndexSuite) TearDownSuite(c *C) {
err := s.s.Close()
c.Assert(err, IsNil)
testleak.AfterTest(c)()
}

func (s *testIndexSuite) TestIndex(c *C) {
defer testleak.AfterTest(c)()
tblInfo := &model.TableInfo{
ID: 1,
Indices: []*model.IndexInfo{
Expand Down Expand Up @@ -183,7 +184,6 @@ func (s *testIndexSuite) TestIndex(c *C) {
}

func (s *testIndexSuite) TestCombineIndexSeek(c *C) {
defer testleak.AfterTest(c)()
tblInfo := &model.TableInfo{
ID: 1,
Indices: []*model.IndexInfo{
Expand Down