Skip to content
Permalink
Browse files

Allow to synchronise GTIDs 'OnPosSynced' (#378)

  • Loading branch information...
bejelith authored and siddontang committed May 13, 2019
1 parent 621cc12 commit 8804d83ea8328534e3c47c0f1bf5a34d8a455a60
Showing with 20 additions and 14 deletions.
  1. +3 −1 .travis.yml
  2. +1 −2 canal/canal.go
  3. +7 −3 canal/canal_test.go
  4. +1 −1 canal/dump.go
  5. +7 −6 canal/handler.go
  6. +1 −1 canal/sync.go
@@ -19,8 +19,10 @@ before_install:
- "sudo service mysql stop || true"
- "echo '[mysqld]' | sudo tee /etc/mysql/conf.d/replication.cnf"
- "echo 'server-id=1' | sudo tee -a /etc/mysql/conf.d/replication.cnf"
- "echo 'log-bin=mysql' | sudo tee -a /etc/mysql/conf.d/replication.cnf"
- "echo 'log-bin=mysql' | sudo tee -a /etc/mysql/conf.d/replication.cnf"
- "echo 'binlog-format = row' | sudo tee -a /etc/mysql/conf.d/replication.cnf"
- "echo 'gtid-mode = ON' | sudo tee -a /etc/mysql/conf.d/replication.cnf"
- "echo 'enforce_gtid_consistency = ON' | sudo tee -a /etc/mysql/conf.d/replication.cnf"

# Start mysql (avoid errors to have logs)
- "sudo service mysql start || true"
@@ -234,7 +234,6 @@ func (c *Canal) run() error {

func (c *Canal) Close() {
log.Infof("closing canal")

c.m.Lock()
defer c.m.Unlock()

@@ -245,7 +244,7 @@ func (c *Canal) Close() {
c.connLock.Unlock()
c.syncer.Close()

c.eventHandler.OnPosSynced(c.master.Position(), true)
c.eventHandler.OnPosSynced(c.master.Position(), c.master.GTIDSet(), true)
}

func (c *Canal) WaitDumpDone() <-chan struct{} {
@@ -62,10 +62,11 @@ func (s *canalTestSuite) SetUpSuite(c *C) {
s.execute(c, "INSERT INTO test.canal_test (content, name) VALUES (?, ?), (?, ?), (?, ?)", "1", "a", `\0\ndsfasdf`, "b", "", "c")

s.execute(c, "SET GLOBAL binlog_format = 'ROW'")

s.c.SetEventHandler(&testEventHandler{c: c})
go func() {
err = s.c.Run()
set, _ := mysql.ParseGTIDSet("mysql", "")
err = s.c.StartFromGTID(set)
c.Assert(err, IsNil)
}()
}
@@ -89,7 +90,6 @@ func (s *canalTestSuite) execute(c *C, query string, args ...interface{}) *mysql

type testEventHandler struct {
DummyEventHandler

c *C
}

@@ -102,6 +102,10 @@ func (h *testEventHandler) String() string {
return "testEventHandler"
}

func (h *testEventHandler) OnPosSynced(p mysql.Position, set mysql.GTIDSet, f bool) error {
return nil
}

func (s *canalTestSuite) TestCanal(c *C) {
<-s.c.WaitDumpDone()

@@ -158,7 +158,7 @@ func (c *Canal) dump() error {

pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)}
c.master.Update(pos)
if err := c.eventHandler.OnPosSynced(pos, true); err != nil {
if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), true); err != nil {
return errors.Trace(err)
}
var startPos fmt.Stringer = pos
@@ -16,7 +16,7 @@ type EventHandler interface {
OnXID(nextPos mysql.Position) error
OnGTID(gtid mysql.GTIDSet) error
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
OnPosSynced(pos mysql.Position, force bool) error
OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error
String() string
}

@@ -28,11 +28,12 @@ func (h *DummyEventHandler) OnTableChanged(schema string, table string) error {
func (h *DummyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
return nil
}
func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil }
func (h *DummyEventHandler) OnXID(mysql.Position) error { return nil }
func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error { return nil }
func (h *DummyEventHandler) OnPosSynced(mysql.Position, bool) error { return nil }
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }
func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil }
func (h *DummyEventHandler) OnXID(mysql.Position) error { return nil }
func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error { return nil }
func (h *DummyEventHandler) OnPosSynced(mysql.Position, mysql.GTIDSet, bool) error { return nil }

func (h *DummyEventHandler) String() string { return "DummyEventHandler" }

// `SetEventHandler` registers the sync handler, you must register your
// own handler before starting Canal.
@@ -171,7 +171,7 @@ func (c *Canal) runSyncBinlog() error {
if savePos {
c.master.Update(pos)
c.master.UpdateTimestamp(ev.Header.Timestamp)
if err := c.eventHandler.OnPosSynced(pos, force); err != nil {
if err := c.eventHandler.OnPosSynced(pos, c.master.GTIDSet(), force); err != nil {
return errors.Trace(err)
}
}

0 comments on commit 8804d83

Please sign in to comment.
You can’t perform that action at this time.