Skip to content

Commit

Permalink
Merge branch 'master' into zimuxia/parallel-ddl
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Jul 2, 2018
2 parents ee4ad7c + e868f0e commit 7d35817
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 55 deletions.
45 changes: 11 additions & 34 deletions ddl/ddl.go
Expand Up @@ -196,21 +196,16 @@ type DDL interface {
RegisterEventCh(chan<- *util.Event)
// SchemaSyncer gets the schema syncer.
SchemaSyncer() SchemaSyncer
// OwnerManager gets the owner manager, and it's used for testing.
// OwnerManager gets the owner manager.
OwnerManager() owner.Manager

// WorkerVars gets the session variables for DDL worker. It's exported for testing.
WorkerVars() *variable.SessionVars
// SetHook sets the hook. It's exported for testing.
SetHook(h Callback)
// GetHook gets the hook. It's exported for testing.
GetHook() Callback

// GetTableMaxRowID gets table max row ID. It's exported for testing.
// GetTableMaxRowID gets table max row ID.
GetTableMaxRowID(startTS uint64, tblInfo *model.TableInfo) (int64, bool, error)

// SetBinlogClient sets the binlog client for DDL worker. It's exported for testing.
SetBinlogClient(interface{})
}

// ddl represents the statements which are used to define the database structure or schema.
// ddl is used to handle the statements that define the structure or schema of the database.
type ddl struct {
m sync.RWMutex
infoHandle *infoschema.Handle
Expand All @@ -229,12 +224,11 @@ type ddlCtx struct {
ddlJobDoneCh chan struct{}
ddlEventCh chan<- *util.Event
lease time.Duration // lease is schema lease.
binlogCli interface{} // binlogCli is used for Binlog.

// hook may be modified.
hook Callback
hookMu sync.RWMutex

workerVars *variable.SessionVars // workerVars is used for Binlog.
}

func (dc *ddlCtx) isOwner() bool {
Expand Down Expand Up @@ -307,14 +301,13 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
ddlJobDoneCh: make(chan struct{}, 1),
ownerManager: manager,
schemaSyncer: syncer,
workerVars: variable.NewSessionVars(),
binlogCli: binloginfo.GetPumpClient(),
hook: hook,
}
d := &ddl{
infoHandle: infoHandle,
ddlCtx: ddlCtx,
}
d.workerVars.BinlogClient = binloginfo.GetPumpClient()

d.start(ctx, ctxPool)
variable.RegisterStatistics(d)
Expand Down Expand Up @@ -508,25 +501,9 @@ func (d *ddl) callHookOnChanged(err error) error {
return errors.Trace(err)
}

// SetHook implements DDL.SetHook interface.
func (d *ddl) SetHook(h Callback) {
d.hookMu.Lock()
defer d.hookMu.Unlock()

d.hook = h
}

// GetHook implements DDL.GetHook interface.
func (d *ddl) GetHook() Callback {
d.hookMu.RLock()
defer d.hookMu.RUnlock()

return d.hook
}

// WorkerVars implements DDL.WorkerVars interface.
func (d *ddl) WorkerVars() *variable.SessionVars {
return d.workerVars
// SetBinlogClient implements DDL.SetBinlogClient interface.
func (d *ddl) SetBinlogClient(binlogCli interface{}) {
d.binlogCli = binlogCli
}

// DDL error codes.
Expand Down
20 changes: 10 additions & 10 deletions ddl/ddl_db_change_test.go
Expand Up @@ -174,7 +174,7 @@ func (s *testStateChangeSuite) test(c *C, tableName, alterTableSQL string, testI
}
}
d := s.dom.DDL()
d.SetHook(callback)
d.(ddl.DDLForTest).SetHook(callback)
_, err = s.se.Execute(context.Background(), alterTableSQL)
c.Assert(err, IsNil)
err = testInfo.compileSQL(4)
Expand All @@ -186,7 +186,7 @@ func (s *testStateChangeSuite) test(c *C, tableName, alterTableSQL string, testI
c.Assert(err, IsNil)
c.Assert(errors.ErrorStack(checkErr), Equals, "")
callback = &ddl.TestDDLCallback{}
d.SetHook(callback)
d.(ddl.DDLForTest).SetHook(callback)
}

type stateCase struct {
Expand Down Expand Up @@ -381,12 +381,12 @@ func (s *testStateChangeSuite) runTestInSchemaState(c *C, state model.SchemaStat
}
}
d := s.dom.DDL()
d.SetHook(callback)
d.(ddl.DDLForTest).SetHook(callback)
_, err = s.se.Execute(context.Background(), alterTableSQL)
c.Assert(err, IsNil)
c.Assert(errors.ErrorStack(checkErr), Equals, "")
callback = &ddl.TestDDLCallback{}
d.SetHook(callback)
d.(ddl.DDLForTest).SetHook(callback)

if expectQuery != nil {
tk := testkit.NewTestKit(c, s.store)
Expand Down Expand Up @@ -454,7 +454,7 @@ func (s *testStateChangeSuite) TestShowIndex(c *C) {
}

d := s.dom.DDL()
d.SetHook(callback)
d.(ddl.DDLForTest).SetHook(callback)
alterTableSQL := `alter table t add index c2(c2)`
_, err = s.se.Execute(context.Background(), alterTableSQL)
c.Assert(err, IsNil)
Expand All @@ -465,7 +465,7 @@ func (s *testStateChangeSuite) TestShowIndex(c *C) {
err = checkResult(result, testkit.Rows("t 0 PRIMARY 1 c1 A 0 <nil> <nil> BTREE ", "t 1 c2 1 c2 A 0 <nil> <nil> YES BTREE "))
c.Assert(err, IsNil)
callback = &ddl.TestDDLCallback{}
d.SetHook(callback)
d.(ddl.DDLForTest).SetHook(callback)
}

func (s *testStateChangeSuite) TestParallelAlterModifyColumn(c *C) {
Expand Down Expand Up @@ -541,7 +541,7 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin
times++
}
d := s.dom.DDL()
d.SetHook(callback)
d.(ddl.DDLForTest).SetHook(callback)

wg := sync.WaitGroup{}
var err1 error
Expand Down Expand Up @@ -573,7 +573,7 @@ func (s *testStateChangeSuite) testControlParallelExecSQL(c *C, sql1, sql2 strin
f(c, err1, err2)

callback = &ddl.TestDDLCallback{}
d.SetHook(callback)
d.(ddl.DDLForTest).SetHook(callback)
}

func (s *testStateChangeSuite) testParallelExecSQL(c *C, sql string) {
Expand All @@ -598,7 +598,7 @@ func (s *testStateChangeSuite) testParallelExecSQL(c *C, sql string) {
}

d := s.dom.DDL()
d.SetHook(callback)
d.(ddl.DDLForTest).SetHook(callback)

wg.Add(2)
go func() {
Expand All @@ -614,7 +614,7 @@ func (s *testStateChangeSuite) testParallelExecSQL(c *C, sql string) {
c.Assert(err2, IsNil)
c.Assert(err3, IsNil)
callback = &ddl.TestDDLCallback{}
d.SetHook(callback)
d.(ddl.DDLForTest).SetHook(callback)
}

// TestCreateTableIfNotExists parallel exec create table if not exists xxx. No error returns is expected.
Expand Down
6 changes: 3 additions & 3 deletions ddl/ddl_db_test.go
Expand Up @@ -454,9 +454,7 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) {
checkErr = errors.Trace(err)
}
}
originHook := s.dom.DDL().GetHook()
s.dom.DDL().SetHook(hook)
defer s.dom.DDL().SetHook(originHook)
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
done := make(chan error, 1)
go backgroundExec(s.store, "create unique index c3_index on t1 (c3)", done)

Expand Down Expand Up @@ -498,6 +496,8 @@ LOOP:

s.mustExec(c, "drop table t1")
ddl.ReorgWaitTimeout = oldReorgWaitTimeout
callback := &ddl.TestDDLCallback{}
s.dom.DDL().(ddl.DDLForTest).SetHook(callback)
}

func (s *testDBSuite) TestAddAnonymousIndex(c *C) {
Expand Down
13 changes: 13 additions & 0 deletions ddl/ddl_test.go
Expand Up @@ -33,6 +33,19 @@ import (
"golang.org/x/net/context"
)

type DDLForTest interface {
// SetHook sets the hook.
SetHook(h Callback)
}

// SetHook implements DDL.SetHook interface.
func (d *ddl) SetHook(h Callback) {
d.hookMu.Lock()
defer d.hookMu.Unlock()

d.hook = h
}

func TestT(t *testing.T) {
CustomVerboseFlag = true
logLevel := os.Getenv("log_level")
Expand Down
27 changes: 21 additions & 6 deletions ddl/ddl_worker.go
Expand Up @@ -14,6 +14,7 @@
package ddl

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -74,17 +75,30 @@ func newWorker(tp workerType, id int, store kv.Storage, ctxPool *pools.ResourceP
return worker
}

func (w *worker) String() string {
var str string
switch w.tp {
case generalWorker:
str = "general"
case addIdxWorker:
str = "add index"
default:
str = "unknow"
}
return fmt.Sprintf("%d, tp %s", w.id, str)
}

func (w *worker) close() {
close(w.quitCh)
w.delRangeManager.clear()
w.wg.Wait()
log.Infof("[ddl] close DDL worker %v", w.tp)
log.Infof("[ddl] close DDL worker %s", w)
}

// start is used for async online schema changing, it will try to become the owner firstly,
// then wait or pull the job queue to handle a schema change job.
func (w *worker) start(d *ddlCtx) {
log.Infof("[ddl] start DDL worker %v", w.tp)
log.Infof("[ddl] start DDL worker %s", w)
defer w.wg.Done()

w.delRangeManager.start()
Expand All @@ -100,7 +114,7 @@ func (w *worker) start(d *ddlCtx) {
r := recover()
if r != nil {
buf := util.GetStack()
log.Errorf("[ddl] worker %v %s", r, buf)
log.Errorf("[ddl] ddl %s, worker %s, %v %s", d.uuid, w, r, buf)
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
}
}()
Expand All @@ -110,13 +124,14 @@ func (w *worker) start(d *ddlCtx) {
case <-ticker.C:
log.Debugf("[ddl] wait %s to check DDL status again", checkTime)
case <-w.ddlJobCh:
log.Debugf("[ddl] worker %s waits %s to check DDL status again", w, checkTime)
case <-w.quitCh:
return
}

err := w.handleDDLJobQueue(d)
if err != nil {
log.Errorf("[ddl] handle ddl job err %v", errors.ErrorStack(err))
log.Errorf("[ddl] worker %s handles DDL job err %v", w, errors.ErrorStack(err))
}
}
}
Expand Down Expand Up @@ -336,7 +351,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
}

if job.IsDone() || job.IsRollbackDone() {
binloginfo.SetDDLBinlog(d.workerVars.BinlogClient, txn, job.ID, job.Query)
binloginfo.SetDDLBinlog(d.binlogCli, txn, job.ID, job.Query)
if !job.IsRollbackDone() {
job.State = model.JobStateSynced
}
Expand All @@ -363,7 +378,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
if runJobErr != nil || waitDependencyJob {
// wait a while to retry again. If we don't wait here, DDL will retry this job immediately,
// which may act like a deadlock.
log.Infof("[ddl] run DDL job error, sleep a while:%v then retry it.", WaitTimeWhenErrorOccured)
log.Infof("[ddl] worker %s runs DDL job error, sleeps a while:%v then retries it.", w, WaitTimeWhenErrorOccured)
metrics.DDLJobErrCounter.Inc()
time.Sleep(WaitTimeWhenErrorOccured)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/reorg.go
Expand Up @@ -139,7 +139,7 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, lease time.Dura
w.reorgCtx.clean()
return errors.Trace(err)
case <-w.quitCh:
log.Info("[ddl] run reorg job ddl quit")
log.Info("[ddl] run reorg job quit")
w.reorgCtx.setNextHandle(0)
w.reorgCtx.setRowCount(0)
// We return errWaitReorgTimeout here too, so that outer loop will break.
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/binloginfo/binloginfo_test.go
Expand Up @@ -113,7 +113,7 @@ func (s *testBinlogSuite) SetUpSuite(c *C) {
s.ddl = sessionDomain.DDL()

s.client = binlog.NewPumpClient(clientCon)
s.ddl.WorkerVars().BinlogClient = s.client
s.ddl.SetBinlogClient(s.client)
}

func (s *testBinlogSuite) TearDownSuite(c *C) {
Expand Down

0 comments on commit 7d35817

Please sign in to comment.