Skip to content

Commit

Permalink
Introduce a valLogScanner interface to ease testing
Browse files Browse the repository at this point in the history
  • Loading branch information
suzaku committed Jul 8, 2019
1 parent 6d4221d commit 819991a
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 7 deletions.
8 changes: 6 additions & 2 deletions pump/storage/chaser.go
Expand Up @@ -29,17 +29,21 @@ var (
recoveryCoolDown = time.Minute
)

type valLogScanner interface {
scanRequests(valuePointer, func(*request) error) error
}

type slowChaser struct {
on int32
vlog *valueLog
vlog valLogScanner
lastUnreadPtr *valuePointer
recoveryTimeout time.Duration
lastRecoverAttempt time.Time
output chan *request
WriteLock sync.Mutex
}

func newSlowChaser(vlog *valueLog, recoveryTimeout time.Duration, output chan *request) *slowChaser {
func newSlowChaser(vlog valLogScanner, recoveryTimeout time.Duration, output chan *request) *slowChaser {
return &slowChaser{
recoveryTimeout: recoveryTimeout,
vlog: vlog,
Expand Down
181 changes: 176 additions & 5 deletions pump/storage/chaser_test.go
Expand Up @@ -18,8 +18,17 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
)

type scannerMock struct {
scanImpl func(valuePointer, func(*request) error) error
}

func (sm scannerMock) scanRequests(start valuePointer, f func(*request) error) error {
return sm.scanImpl(start, f)
}

type waitUntilTurnedOnSuite struct{}

var _ = Suite(&waitUntilTurnedOnSuite{})
Expand Down Expand Up @@ -68,18 +77,180 @@ type runSuite struct{}

var _ = Suite(&runSuite{})

func (rs *runSuite) TestCanBeStoppedWhenWaiting(c *C) {
sc := slowChaser{}
ctx, cancel := context.WithCancel(context.Background())
func (rs *runSuite) managedRun(ctx context.Context, c *C, sc *slowChaser) {
stopped := make(chan struct{})
go func() {
sc.Run(ctx)
close(stopped)
}()
cancel()
select {
case <-stopped:
case <-time.After(500 * time.Millisecond):
c.Fatal("Takes too long to stop slow chaser")
c.Fatal("Run doesn't return in time after canceled.")
}
}

func (rs *runSuite) TestCanBeStoppedWhenWaiting(c *C) {
sc := slowChaser{}
ctx, cancel := context.WithCancel(context.Background())
go cancel()
rs.managedRun(ctx, c, &sc)
}

func (rs *runSuite) TestShouldRetryIfFailedToCatchUp(c *C) {
mock := struct{ scannerMock }{}
var offsetRecords []valuePointer
finished := make(chan struct{})
mock.scanImpl = func(start valuePointer, f func(*request) error) error {
offsetRecords = append(offsetRecords, start)
for i := 0; i < 3; i++ {
next := request{
valuePointer: valuePointer{
Fid: start.Fid,
Offset: start.Offset + int64(i*10),
},
}
f(&next)
}
if len(offsetRecords) == 3 {
close(finished)
}
return errors.New("fake")
}
sc := newSlowChaser(mock, time.Second, make(chan *request, 100))
sc.TurnOn(&valuePointer{Fid: 42, Offset: 1000})

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-finished
cancel()

c.Assert(len(offsetRecords), GreaterEqual, 3)
c.Assert(offsetRecords[0], DeepEquals, valuePointer{Fid: 42, Offset: 1000})
c.Assert(offsetRecords[1], DeepEquals, valuePointer{Fid: 42, Offset: 1020})
c.Assert(offsetRecords[2], DeepEquals, valuePointer{Fid: 42, Offset: 1040})
}()

rs.managedRun(ctx, c, sc)
}

func (rs *runSuite) TestShouldNotTryRecoveryForSlowCatchUp(c *C) {
mock := struct{ scannerMock }{}
callCount := 0
finished := make(chan struct{})
mock.scanImpl = func(start valuePointer, f func(*request) error) error {
callCount++
if callCount == 3 {
close(finished)
}
time.Sleep(time.Millisecond)
return nil
}

sc := newSlowChaser(mock, time.Second, make(chan *request, 100))
sc.TurnOn(&valuePointer{Fid: 42, Offset: 1000})

// Make sure lastRecoverAttempt is not set
c.Assert(sc.lastRecoverAttempt, Equals, time.Time{})

origThres := slowCatchUpThreshold
defer func() {
slowCatchUpThreshold = origThres
}()
// Set threshold to be 0.5ms which is smaller than 1ms
slowCatchUpThreshold = 500 * time.Microsecond

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-finished
cancel()
c.Assert(sc.lastRecoverAttempt, Equals, time.Time{})
c.Assert(sc.IsOn(), IsTrue)
}()

rs.managedRun(ctx, c, sc)
}

func (rs *runSuite) TestShouldNotRecoverFrequently(c *C) {
mock := struct{ scannerMock }{}
callCount := 0
finished := make(chan struct{})
mock.scanImpl = func(start valuePointer, f func(*request) error) error {
callCount++
if callCount == 3 {
close(finished)
}
return nil
}

sc := newSlowChaser(mock, time.Second, make(chan *request, 100))
sc.TurnOn(&valuePointer{Fid: 42, Offset: 1000})

// Set it up like we have just tried recovering and failed
sc.lastRecoverAttempt = time.Now()
lastRecoverAttempt := sc.lastRecoverAttempt

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-finished
cancel()
c.Assert(sc.lastRecoverAttempt, Equals, lastRecoverAttempt)
c.Assert(sc.IsOn(), IsTrue)
}()

rs.managedRun(ctx, c, sc)
}

func (rs *runSuite) TestShouldRecoverAndTurnOff(c *C) {
mock := struct{ scannerMock }{}
callCount := 0
finished := make(chan struct{})
mock.scanImpl = func(start valuePointer, f func(*request) error) error {
callCount++
// Should be called 2 times: catch up and recover
if callCount == 2 {
close(finished)
}
return nil
}

sc := newSlowChaser(mock, time.Second, make(chan *request, 100))
sc.TurnOn(&valuePointer{Fid: 42, Offset: 1000})

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-finished
cancel()
}()
rs.managedRun(ctx, c, sc)
c.Assert(sc.IsOn(), IsFalse)
}

func (rs *runSuite) TestContinueCatchingUpIfRecoveryFailed(c *C) {
mock := struct{ scannerMock }{}
callCount := 0
finished := make(chan struct{})
mock.scanImpl = func(start valuePointer, f func(*request) error) error {
callCount++
// Arrange for at least 3 calls: catch up, failed-recovery, catch up
if callCount == 3 {
close(finished)
}
if callCount == 2 {
return errors.New("Failed recovery")
}
return nil
}

sc := newSlowChaser(mock, time.Second, make(chan *request, 100))
sc.TurnOn(&valuePointer{Fid: 42, Offset: 1000})

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-finished
cancel()
}()

rs.managedRun(ctx, c, sc)
c.Assert(sc.IsOn(), IsTrue)
}

0 comments on commit 819991a

Please sign in to comment.