Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Add the mock owner-manager and schema-syncer #3359

Merged
merged 17 commits into from Jun 4, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 16 additions & 16 deletions ddl/ddl.go
Expand Up @@ -128,8 +128,8 @@ type DDL interface {
Stop() error
// RegisterEventCh registers event channel for ddl.
RegisterEventCh(chan<- *Event)
// SchemaVersionSyncer gets the schema version syncer.
SchemaVersionSyncer() *schemaVersionSyncer
// SchemaSyncer gets the schema syncer.
SchemaSyncer() SchemaSyncer
}

// Event is an event that a ddl operation happened.
Expand Down Expand Up @@ -163,8 +163,8 @@ type ddl struct {
hook Callback
hookMu sync.RWMutex
store kv.Storage
// worker is used for electing the owner.
worker *worker
// worker is used for DDL access the etcd.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/access/to access

worker EtcdWorker
// lease is schema seconds.
lease time.Duration
uuid string
Expand Down Expand Up @@ -228,15 +228,14 @@ func newDDL(ctx goctx.Context, etcdCli *clientv3.Client, store kv.Storage,

id := uuid.NewV4().String()
ctx, cancelFunc := goctx.WithCancel(ctx)
worker := &worker{
schemaVersionSyncer: &schemaVersionSyncer{
etcdCli: etcdCli,
selfSchemaVerPath: fmt.Sprintf("%s/%s", ddlAllSchemaVersions, id),
},
ddlID: id,
cancel: cancelFunc,
var worker EtcdWorker
// If etcdCli is nil, it's the local store, so use the mockEtcdWorker.
// It's always used for testing.
if etcdCli == nil {
worker = NewMockEtcdWorker(etcdCli, id, cancelFunc)
} else {
worker = NewEtcdWorker(etcdCli, id, cancelFunc)
}

d := &ddl{
infoHandle: infoHandle,
hook: hook,
Expand Down Expand Up @@ -301,7 +300,7 @@ func (d *ddl) Stop() error {
func (d *ddl) start(ctx goctx.Context) {
d.quitCh = make(chan struct{})
if ChangeOwnerInNewWay {
d.campaignOwners(ctx)
d.worker.CampaignOwners(ctx, &d.wait)
}

d.wait.Add(2)
Expand All @@ -320,7 +319,7 @@ func (d *ddl) close() {
}

close(d.quitCh)
d.worker.cancel()
d.worker.Cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer NewEtcdWorker with d.quitCh, so when close(d.quitCh), the etcd worker will get notification too.


d.wait.Wait()
log.Infof("close DDL:%s", d.uuid)
Expand Down Expand Up @@ -379,8 +378,9 @@ func (d *ddl) genGlobalID() (int64, error) {
return globalID, errors.Trace(err)
}

func (d *ddl) SchemaVersionSyncer() *schemaVersionSyncer {
return d.worker.schemaVersionSyncer
// SchemaSyncer implements DDL.SchemaSyncer interface.
func (d *ddl) SchemaSyncer() SchemaSyncer {
return d.worker
}

func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error {
Expand Down
8 changes: 4 additions & 4 deletions ddl/ddl_worker.go
Expand Up @@ -103,13 +103,13 @@ func (d *ddl) getCheckOwnerTimeout(flag JobType) int64 {

func (d *ddl) isOwner(flag JobType) error {
if flag == ddlJobFlag {
if d.worker.isOwner() {
if d.worker.IsOwner() {
return nil
}
log.Infof("[ddl] not %s job owner, self id %s", flag, d.uuid)
return errNotOwner
}
if d.worker.isBgOwner() {
if d.worker.IsBgOwner() {
return nil
}
log.Infof("[ddl] not %s job owner, self id %s", flag, d.uuid)
Expand Down Expand Up @@ -455,12 +455,12 @@ func (d *ddl) waitSchemaChanged(waitTime time.Duration, latestSchemaVersion int6
// TODO: Make ctx exits when the d is close.
ctx, cancelFunc := goctx.WithTimeout(goctx.Background(), waitTime)
defer cancelFunc()
err := d.worker.updateGlobalVersion(ctx, latestSchemaVersion)
err := d.worker.UpdateGlobalVersion(ctx, latestSchemaVersion)
if err != nil {
log.Infof("[ddl] update latest schema version %d failed %v", latestSchemaVersion, err)
}

err = d.worker.checkAllVersions(ctx, latestSchemaVersion)
err = d.worker.CheckAllVersions(ctx, latestSchemaVersion)
if err != nil {
log.Infof("[ddl] wait latest schema version %d to deadline %v", latestSchemaVersion, err)
}
Expand Down
24 changes: 3 additions & 21 deletions ddl/ddl_worker_test.go
Expand Up @@ -61,27 +61,9 @@ func (s *testDDLSuite) TestCheckOwner(c *C) {
testCheckOwner(c, d1, true, ddlJobFlag)
testCheckOwner(c, d1, true, bgJobFlag)

d2 := newDDL(goctx.Background(), nil, store, nil, nil, testLease)
defer d2.Stop()

// Change the DDL owner.
d1.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove those code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the mock etcd worker is the local store, this worker must be the owner.

// Make sure owner is changed.
time.Sleep(6 * testLease)
testCheckOwner(c, d2, true, ddlJobFlag)
testCheckOwner(c, d2, true, bgJobFlag)

// Change the DDL owner.
d2.SetLease(goctx.Background(), 1*time.Second)
err := d2.Stop()
c.Assert(err, IsNil)
d1.start(goctx.Background())
testCheckOwner(c, d1, true, ddlJobFlag)
testCheckOwner(c, d1, true, bgJobFlag)

d2.SetLease(goctx.Background(), 1*time.Second)
d2.SetLease(goctx.Background(), 2*time.Second)
c.Assert(d2.GetLease(), Equals, 2*time.Second)
d1.SetLease(goctx.Background(), 1*time.Second)
d1.SetLease(goctx.Background(), 2*time.Second)
c.Assert(d1.GetLease(), Equals, 2*time.Second)
}

func (s *testDDLSuite) TestSchemaError(c *C) {
Expand Down
111 changes: 79 additions & 32 deletions ddl/owner.go → ddl/etcd_worker.go
Expand Up @@ -14,18 +14,43 @@
package ddl

import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/juju/errors"
"github.com/ngaut/log"
goctx "golang.org/x/net/context"
)

// ChangeOwnerInNewWay is used for testing.
var ChangeOwnerInNewWay = false
// EtcdWorker is an interface for DDL access the etcd.
type EtcdWorker interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about rename it to DDLSyncer.
DDL workers use DDLSyncer to synchronize ddl information. Using etcd to synchronize ddl information is an implementation of DDLSyncer. So put etcd in the name of the interface is not proper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also used to elect ddl leader, so Syncer is not accuracy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it's also used to campaign the owners. So DDLSyncer may not be good.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it is an interface, it doesn't have to be etcd, etcd it's the implementation.

I think separate it into two interfaces is better.
One is SchemaSyncer, the other one is OwnerManager.

// ID returns the ID of DDL.
ID() string
// IsOwner returns whether the worker is the DDL owner.
IsOwner() bool
// SetOwner sets whether the worker is the DDL owner.
SetOwner(isOwner bool)
// IsOwner returns whether the worker is the background owner.
IsBgOwner() bool
// SetOwner sets whether the worker is the background owner.
SetBgOwner(isOwner bool)
// CampaignOwners campaigns the DDL owner and the background owner.
CampaignOwners(ctx goctx.Context, wg *sync.WaitGroup) error

SchemaSyncer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to the beginning of the interface.


// Cancel cancels this etcd worker campaign.
Cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this, really...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancel and close is not the same thing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, so I cancel the campaign operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, can this function be called multiple times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This question we discussed.

}

// ChangeOwnerInNewWay is used for controlling the way of changing owner.
// TODO: Remove it.
var ChangeOwnerInNewWay = true

const (
ddlOwnerKey = "/tidb/ddl/owner"
Expand All @@ -38,28 +63,54 @@ type worker struct {
*schemaVersionSyncer
ddlOwner int32
bgOwner int32
ddlID string
ddlID string // id is the ID of DDL.
etcdSession *concurrency.Session
cancel goctx.CancelFunc
}

func (w *worker) isOwner() bool {
// NewEtcdWorker creates a new EtcdWorker.
func NewEtcdWorker(etcdCli *clientv3.Client, id string, cancel goctx.CancelFunc) EtcdWorker {
return &worker{
schemaVersionSyncer: &schemaVersionSyncer{
etcdCli: etcdCli,
selfSchemaVerPath: fmt.Sprintf("%s/%s", ddlAllSchemaVersions, id),
},
ddlID: id,
cancel: cancel,
}
}

// ID implements EtcdWorker.ID interface.
func (w *worker) ID() string {
return w.ddlID
}

// IsOwner implements EtcdWorker.IsOwner interface.
func (w *worker) IsOwner() bool {
return atomic.LoadInt32(&w.ddlOwner) == 1
}

func (w *worker) setOwner(isOwner bool) {
// SetOwner implements EtcdWorker.SetOwner interface.
func (w *worker) SetOwner(isOwner bool) {
if isOwner {
atomic.StoreInt32(&w.ddlOwner, 1)
} else {
atomic.StoreInt32(&w.ddlOwner, 0)
}
}

func (w *worker) isBgOwner() bool {
// Cancel implements EtcdWorker.Cancel interface.
func (w *worker) Cancel() {
w.cancel()
}

// IsBgOwner implements EtcdWorker.IsBgOwner interface.
func (w *worker) IsBgOwner() bool {
return atomic.LoadInt32(&w.bgOwner) == 1
}

func (w *worker) setBgOwner(isOwner bool) {
// SetBgOwner implements EtcdWorker.SetBgOwner interface.
func (w *worker) SetBgOwner(isOwner bool) {
if isOwner {
atomic.StoreInt32(&w.bgOwner, 1)
} else {
Expand All @@ -81,38 +132,38 @@ func (w *worker) newSession(ctx goctx.Context, retryCnt int) error {
return errors.Trace(err)
}

func (d *ddl) campaignOwners(ctx goctx.Context) error {
err := d.worker.newSession(ctx, newSessionDefaultRetryCnt)
// CampaignOwners implements EtcdWorker.CampaignOwners interface.
func (w *worker) CampaignOwners(ctx goctx.Context, wg *sync.WaitGroup) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why wg not a member of worker, but pass as argument instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses the DDL's wg.

err := w.newSession(ctx, newSessionDefaultRetryCnt)
if err != nil {
return errors.Trace(err)
}

d.wait.Add(2)
wg.Add(2)
ddlCtx, _ := goctx.WithCancel(ctx)
go d.campaignLoop(ddlCtx, ddlOwnerKey)
go w.campaignLoop(ddlCtx, ddlOwnerKey, wg)

bgCtx, _ := goctx.WithCancel(ctx)
go d.campaignLoop(bgCtx, bgOwnerKey)
go w.campaignLoop(bgCtx, bgOwnerKey, wg)
return nil
}

func (d *ddl) campaignLoop(ctx goctx.Context, key string) {
defer d.wait.Done()
worker := d.worker
func (w *worker) campaignLoop(ctx goctx.Context, key string, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-worker.etcdSession.Done():
case <-w.etcdSession.Done():
// TODO: Create session again?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we fix this TODO before rc3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shenli
I added it in this PR.

log.Warnf("etcd session is done.")
log.Warnf("[ddl] etcd session is done.")
case <-ctx.Done():
return
default:
}

elec := concurrency.NewElection(worker.etcdSession, key)
err := elec.Campaign(ctx, worker.ddlID)
elec := concurrency.NewElection(w.etcdSession, key)
err := elec.Campaign(ctx, w.ddlID)
if err != nil {
log.Infof("[ddl] worker %s failed to campaign, err %v", worker.ddlID, err)
log.Infof("[ddl] worker %s failed to campaign, err %v", w.ddlID, err)
continue
}

Expand All @@ -124,28 +175,24 @@ func (d *ddl) campaignLoop(ctx goctx.Context, key string) {
continue
}
leader := string(resp.Kvs[0].Value)
log.Infof("[ddl] %s worker is %s, owner is %v", key, worker.ddlID, leader)
if leader == worker.ddlID {
worker.setOwnerVal(key, true)
log.Info("[ddl] %s worker is %s, owner is %v", key, w.ddlID, leader)
if leader == w.ddlID {
w.setOwnerVal(key, true)
} else {
log.Warnf("[ddl] worker %s isn't the owner", worker.ddlID)
log.Warnf("[ddl] worker %s isn't the owner", w.ddlID)
continue
}

// TODO: Use content instead of quitCh.
worker.watchOwner(ctx, string(resp.Kvs[0].Key))
worker.setOwnerVal(key, false)
d.hookMu.Lock()
d.hook.OnWatched(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this hook?

d.hookMu.Unlock()
w.watchOwner(ctx, string(resp.Kvs[0].Key))
w.setOwnerVal(key, false)
}
}

func (w *worker) setOwnerVal(key string, val bool) {
if key == ddlOwnerKey {
w.setOwner(val)
w.SetOwner(val)
} else {
w.setBgOwner(val)
w.SetBgOwner(val)
}
}

Expand Down
6 changes: 3 additions & 3 deletions ddl/owner_test.go → ddl/etcd_worker_test.go
Expand Up @@ -13,9 +13,9 @@

package ddl

/*
// TODO: Remove the package of integration.
// The tests are passed here. But import `integration` will introduce a lot of dependencies.
/*
import (
"testing"
"time"
Expand All @@ -34,11 +34,11 @@ func checkOwners(d *ddl, val bool) (isOwner bool, isBgOwner bool) {
// make sure that campaigning owners is completed.
for i := 0; i < 600; i++ {
time.Sleep(5 * time.Millisecond)
isOwner = d.worker.isOwner()
isOwner = d.worker.IsOwner()
if isOwner != val {
continue
}
isBgOwner = d.worker.isBgOwner()
isBgOwner = d.worker.IsBgOwner()
if isBgOwner != val {
continue
}
Expand Down