Skip to content

Commit

Permalink
ddl: update the behavior when 'RunWorker's' is false (#5598) (#5604)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored and winoros committed Jan 10, 2018
1 parent d94c29f commit 75a99b9
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 7 deletions.
13 changes: 9 additions & 4 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,15 @@ func (d *ddl) Stop() error {

func (d *ddl) start(ctx goctx.Context) {
d.quitCh = make(chan struct{})
err := d.ownerManager.CampaignOwner(ctx)
terror.Log(errors.Trace(err))
d.wait.Add(1)
go d.onDDLWorker()

// If RunWorker is true, we need campaign owner and do DDL job.
// Otherwise, we needn't do that.
if RunWorker {
err := d.ownerManager.CampaignOwner(ctx)
terror.Log(errors.Trace(err))
d.wait.Add(1)
go d.onDDLWorker()
}

// For every start, we will send a fake job to let worker
// check owner firstly and try to find whether a job exists and run.
Expand Down
3 changes: 0 additions & 3 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ var RunWorker = true
// then wait or pull the job queue to handle a schema change job.
func (d *ddl) onDDLWorker() {
defer d.wait.Done()
if !RunWorker {
return
}

// We use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time. If lease is 0, we will use default 1s.
Expand Down
48 changes: 48 additions & 0 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,54 @@ func (s *testDDLSuite) TestCheckOwner(c *C) {
c.Assert(d1.GetLease(), Equals, 2*time.Second)
}

// TestRunWorker tests no job is handled when the value of RunWorker is false.
func (s *testDDLSuite) TestRunWorker(c *C) {
defer testleak.AfterTest(c)()
store := testCreateStore(c, "test_run_worker")
defer store.Close()

RunWorker = false
d := testNewDDL(goctx.Background(), nil, store, nil, nil, testLease)
testCheckOwner(c, d, false)
defer d.Stop()
ctx := testNewContext(d)

dbInfo := testSchemaInfo(c, d, "test")
job := &model.Job{
SchemaID: dbInfo.ID,
Type: model.ActionCreateSchema,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{dbInfo},
}

exitCh := make(chan struct{})
go func(ch chan struct{}) {
err := d.doDDLJob(ctx, job)
c.Assert(err, IsNil)
close(ch)
}(exitCh)
// Make sure the DDL job is in the DDL job queue.
// The reason for doing it twice is to eliminate the operation in the start function.
<-d.ddlJobCh
<-d.ddlJobCh
// Make sure the DDL job doesn't be handled.
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
job, err := d.getFirstDDLJob(t)
c.Assert(err, IsNil)
c.Assert(job, NotNil)
c.Assert(job.SchemaID, Equals, dbInfo.ID, Commentf("job %s", job))
return nil
})
// Make sure the DDL job can be done and exit that goroutine.
RunWorker = true
d1 := testNewDDL(goctx.Background(), nil, store, nil, nil, testLease)
testCheckOwner(c, d1, true)
defer d1.Stop()
asyncNotify(d1.ddlJobCh)
<-exitCh
}

func (s *testDDLSuite) TestSchemaError(c *C) {
defer testleak.AfterTest(c)()
store := testCreateStore(c, "test_schema_error")
Expand Down

0 comments on commit 75a99b9

Please sign in to comment.