Skip to content

Commit

Permalink
pump/: Fix block at wait for MaxCommitTS when offline (#701)
Browse files Browse the repository at this point in the history
* pump/: Fix block at wait for MaxCommitTS when offline

when offline:
1, ApplyAction change statue to be closing, reject write but accept fake
binlog
2, commitStatus() -> waitSafeToOffline(), we will write a fake binlog
and wait when storage.MaxCommitTS() >= commit ts of the fake binlog
...

after 1, may having some unwritten c-binlog(we will reject the write
request from tidb), and in sorter, block at s.cond.Wait case because no
more any item push.
  • Loading branch information
july2993 committed Aug 7, 2019
1 parent be03106 commit 499b6c1
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 11 deletions.
26 changes: 15 additions & 11 deletions pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (s *Server) writeBinlog(ctx context.Context, in *binlog.WriteBinlogReq, isF
goto errHandle
}

if !isFakeBinlog {
if !isFakeBinlog && blog.Tp == binlog.BinlogType_Prewrite {
state := s.node.NodeStatus().State
if state != node.Online {
err = errors.Errorf("no online: %v", state)
Expand Down Expand Up @@ -758,20 +758,23 @@ func (s *Server) waitSafeToOffline(ctx context.Context) error {
maxCommitTS := s.storage.MaxCommitTS()
if maxCommitTS < fakeBinlog.CommitTs {
log.Info("max commit TS in storage: %d, fake binlog commit ts: %d", maxCommitTS, fakeBinlog.CommitTs)
select {
case <-time.After(time.Second):
continue
case <-ctx.Done():
return errors.Trace(ctx.Err())
}
} else if !s.storage.AllMatched() {
log.Info("wait all P-binlog to be matched")
} else {
break
}
select {
case <-time.After(time.Second):
continue
case <-ctx.Done():
return errors.Trace(ctx.Err())
}

break
}

log.Debug("start to check offline safe for drainers")

// check drainer has consume fake binlog we just write
maxCommitTS := s.storage.MaxCommitTS()
for {
select {
case <-time.After(time.Second):
Expand All @@ -787,12 +790,13 @@ func (s *Server) waitSafeToOffline(ctx context.Context) error {
continue
}

if drainer.MaxCommitTS < fakeBinlog.CommitTs {
log.Infof("wait for drainer: %v maxCommitTS: %d, pump maxCommitTS: %d", drainer.NodeID, drainer.MaxCommitTS, fakeBinlog.CommitTs)
if drainer.MaxCommitTS < maxCommitTS {
log.Infof("wait for drainer: %v maxCommitTS: %d, pump maxCommitTS: %d", drainer.NodeID, drainer.MaxCommitTS, maxCommitTS)
needByDrainer = true
break
}
}

if !needByDrainer {
return nil
}
Expand Down
20 changes: 20 additions & 0 deletions pump/storage/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ func (s *sorter) setResolver(resolver func(startTS int64) bool) {
s.resolver = resolver
}

func (s *sorter) allMatched() bool {
s.lock.Lock()
defer s.lock.Unlock()

return len(s.waitStartTS) == 0
}

func (s *sorter) pushTSItem(item sortItem) {
if s.isClosed() {
// i think we can just panic
Expand All @@ -136,6 +143,18 @@ func (s *sorter) pushTSItem(item sortItem) {
func (s *sorter) run() {
defer s.wg.Done()

go func() {
// Avoid if no any more pushTSItem call so block at s.cond.Wait() in run() waiting the matching c-binlog
tick := time.NewTicker(1 * time.Second)
defer tick.Stop()
for range tick.C {
s.cond.Signal()
if s.isClosed() {
return
}
}
}()

var maxTSItem sortItem
for {
s.cond.L.Lock()
Expand Down Expand Up @@ -163,6 +182,7 @@ func (s *sorter) run() {
// we may get the C binlog soon at start up time
if time.Since(getTime) > time.Second {
if s.resolver != nil && s.resolver(item.start) {
delete(s.waitStartTS, item.start)
break
}
}
Expand Down
3 changes: 3 additions & 0 deletions pump/storage/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func testSorter(c *check.C, items []sortItem, expectMaxCommitTS []int64) {
// we should never push item with commit ts less than lastGetSortItemTS, or something go wrong
if item.tp == pb.BinlogType_Commit {
c.Assert(item.commit, check.Greater, atomic.LoadInt64(&lastGetSortItemTS))
} else if item.tp == pb.BinlogType_Prewrite {
c.Assert(sorter.allMatched(), check.IsFalse)
}

if item.commit > maxTS {
Expand All @@ -51,6 +53,7 @@ func testSorter(c *check.C, items []sortItem, expectMaxCommitTS []int64) {
}

c.Assert(maxTS, check.Equals, maxCommitTS[len(maxCommitTS)-1])
c.Assert(sorter.allMatched(), check.IsTrue)
}

func (s *SorterSuite) TestSorter(c *check.C) {
Expand Down
8 changes: 8 additions & 0 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type Storage interface {

GetGCTS() int64

// AllMatched return if all the P-binlog have the matching C-binlog
AllMatched() bool

MaxCommitTS() int64

// GetBinlog return the binlog of ts
Expand Down Expand Up @@ -1215,3 +1218,8 @@ func openMetadataDB(kvDir string, cf *KVConfig) (*leveldb.DB, error) {

return leveldb.OpenFile(kvDir, &opt)
}

// AllMatched implement Storage.AllMatched
func (a *Append) AllMatched() bool {
return a.sorter.allMatched()
}

0 comments on commit 499b6c1

Please sign in to comment.