Skip to content

Commit

Permalink
Fix unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
suzaku committed Jul 5, 2019
1 parent 2edd00a commit 6d4221d
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ var (
// save valuePointer headPointer, for binlog in vlog not after headPointer, we have save it in metadata db
// at start up, we can scan the vlog from headPointer and save the ts -> valuePointer to metadata db
headPointerKey = []byte("!binlog!headPointer")
// If the kv channel blocks for more than this value, turn on the slow chaser
slowChaserThreshold = 3 * time.Second
)

// Storage is the interface to handle binlog storage
Expand Down Expand Up @@ -910,7 +912,7 @@ func (a *Append) writeToValueLog(reqs chan *request) chan *request {
for _, req := range batch {
select {
case done <- req:
case <-time.After(3 * time.Second):
case <-time.After(slowChaserThreshold):
slowChaser.TurnOn(&req.valuePointer)
break SEND
}
Expand Down
10 changes: 10 additions & 0 deletions pump/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ func (as *AppendSuit) TestNewAppend(c *check.C) {
}

func (as *AppendSuit) TestBlockedWriteKVShouldNotStopWritingVlogs(c *check.C) {
origThres := slowChaserThreshold
defer func() {
slowChaserThreshold = origThres
}()
slowChaserThreshold = 10 * time.Millisecond
// Set KVChanCapacity to be extremely small so that we can feed it up
store := newAppendWithOptions(c, DefaultOptions().WithKVChanCapacity(10))
incoming := make(chan *request, 100)
Expand Down Expand Up @@ -120,6 +125,11 @@ func (as *AppendSuit) TestBlockedWriteKVShouldNotStopWritingVlogs(c *check.C) {
}

func (as *AppendSuit) TestVlogsShouldBeInSyncWhenDownStreamRecovers(c *check.C) {
origThres := slowChaserThreshold
defer func() {
slowChaserThreshold = origThres
}()
slowChaserThreshold = 10 * time.Millisecond
opts := DefaultOptions().WithKVChanCapacity(10).WithValueLogFileSize(9000)
store := newAppendWithOptions(c, opts)
incoming := make(chan *request, 100)
Expand Down

0 comments on commit 6d4221d

Please sign in to comment.