diff --git a/pump/storage/chaser.go b/pump/storage/chaser.go index d867081d7..7074ed05d 100644 --- a/pump/storage/chaser.go +++ b/pump/storage/chaser.go @@ -29,9 +29,13 @@ var ( recoveryCoolDown = time.Minute ) +type valLogScanner interface { + scanRequests(valuePointer, func(*request) error) error +} + type slowChaser struct { on int32 - vlog *valueLog + vlog valLogScanner lastUnreadPtr *valuePointer recoveryTimeout time.Duration lastRecoverAttempt time.Time @@ -39,7 +43,7 @@ type slowChaser struct { WriteLock sync.Mutex } -func newSlowChaser(vlog *valueLog, recoveryTimeout time.Duration, output chan *request) *slowChaser { +func newSlowChaser(vlog valLogScanner, recoveryTimeout time.Duration, output chan *request) *slowChaser { return &slowChaser{ recoveryTimeout: recoveryTimeout, vlog: vlog, diff --git a/pump/storage/chaser_test.go b/pump/storage/chaser_test.go index d1c2e9c1d..3e0333c4f 100644 --- a/pump/storage/chaser_test.go +++ b/pump/storage/chaser_test.go @@ -18,8 +18,17 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" ) +type scannerMock struct { + scanImpl func(valuePointer, func(*request) error) error +} + +func (sm scannerMock) scanRequests(start valuePointer, f func(*request) error) error { + return sm.scanImpl(start, f) +} + type waitUntilTurnedOnSuite struct{} var _ = Suite(&waitUntilTurnedOnSuite{}) @@ -68,18 +77,180 @@ type runSuite struct{} var _ = Suite(&runSuite{}) -func (rs *runSuite) TestCanBeStoppedWhenWaiting(c *C) { - sc := slowChaser{} - ctx, cancel := context.WithCancel(context.Background()) +func (rs *runSuite) managedRun(ctx context.Context, c *C, sc *slowChaser) { stopped := make(chan struct{}) go func() { sc.Run(ctx) close(stopped) }() - cancel() select { case <-stopped: case <-time.After(500 * time.Millisecond): - c.Fatal("Takes too long to stop slow chaser") + c.Fatal("Run doesn't return in time after canceled.") + } +} + +func (rs *runSuite) TestCanBeStoppedWhenWaiting(c *C) { + sc := slowChaser{} + ctx, cancel := context.WithCancel(context.Background()) + go cancel() + rs.managedRun(ctx, c, &sc) +} + +func (rs *runSuite) TestShouldRetryIfFailedToCatchUp(c *C) { + mock := struct{ scannerMock }{} + var offsetRecords []valuePointer + finished := make(chan struct{}) + mock.scanImpl = func(start valuePointer, f func(*request) error) error { + offsetRecords = append(offsetRecords, start) + for i := 0; i < 3; i++ { + next := request{ + valuePointer: valuePointer{ + Fid: start.Fid, + Offset: start.Offset + int64(i*10), + }, + } + f(&next) + } + if len(offsetRecords) == 3 { + close(finished) + } + return errors.New("fake") } + sc := newSlowChaser(mock, time.Second, make(chan *request, 100)) + sc.TurnOn(&valuePointer{Fid: 42, Offset: 1000}) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-finished + cancel() + + c.Assert(len(offsetRecords), GreaterEqual, 3) + c.Assert(offsetRecords[0], DeepEquals, valuePointer{Fid: 42, Offset: 1000}) + c.Assert(offsetRecords[1], DeepEquals, valuePointer{Fid: 42, Offset: 1020}) + c.Assert(offsetRecords[2], DeepEquals, valuePointer{Fid: 42, Offset: 1040}) + }() + + rs.managedRun(ctx, c, sc) +} + +func (rs *runSuite) TestShouldNotTryRecoveryForSlowCatchUp(c *C) { + mock := struct{ scannerMock }{} + callCount := 0 + finished := make(chan struct{}) + mock.scanImpl = func(start valuePointer, f func(*request) error) error { + callCount++ + if callCount == 3 { + close(finished) + } + time.Sleep(time.Millisecond) + return nil + } + + sc := newSlowChaser(mock, time.Second, make(chan *request, 100)) + sc.TurnOn(&valuePointer{Fid: 42, Offset: 1000}) + + // Make sure lastRecoverAttempt is not set + c.Assert(sc.lastRecoverAttempt, Equals, time.Time{}) + + origThres := slowCatchUpThreshold + defer func() { + slowCatchUpThreshold = origThres + }() + // Set threshold to be 0.5ms which is smaller than 1ms + slowCatchUpThreshold = 500 * time.Microsecond + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-finished + cancel() + c.Assert(sc.lastRecoverAttempt, Equals, time.Time{}) + c.Assert(sc.IsOn(), IsTrue) + }() + + rs.managedRun(ctx, c, sc) +} + +func (rs *runSuite) TestShouldNotRecoverFrequently(c *C) { + mock := struct{ scannerMock }{} + callCount := 0 + finished := make(chan struct{}) + mock.scanImpl = func(start valuePointer, f func(*request) error) error { + callCount++ + if callCount == 3 { + close(finished) + } + return nil + } + + sc := newSlowChaser(mock, time.Second, make(chan *request, 100)) + sc.TurnOn(&valuePointer{Fid: 42, Offset: 1000}) + + // Set it up like we have just tried recovering and failed + sc.lastRecoverAttempt = time.Now() + lastRecoverAttempt := sc.lastRecoverAttempt + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-finished + cancel() + c.Assert(sc.lastRecoverAttempt, Equals, lastRecoverAttempt) + c.Assert(sc.IsOn(), IsTrue) + }() + + rs.managedRun(ctx, c, sc) +} + +func (rs *runSuite) TestShouldRecoverAndTurnOff(c *C) { + mock := struct{ scannerMock }{} + callCount := 0 + finished := make(chan struct{}) + mock.scanImpl = func(start valuePointer, f func(*request) error) error { + callCount++ + // Should be called 2 times: catch up and recover + if callCount == 2 { + close(finished) + } + return nil + } + + sc := newSlowChaser(mock, time.Second, make(chan *request, 100)) + sc.TurnOn(&valuePointer{Fid: 42, Offset: 1000}) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-finished + cancel() + }() + rs.managedRun(ctx, c, sc) + c.Assert(sc.IsOn(), IsFalse) +} + +func (rs *runSuite) TestContinueCatchingUpIfRecoveryFailed(c *C) { + mock := struct{ scannerMock }{} + callCount := 0 + finished := make(chan struct{}) + mock.scanImpl = func(start valuePointer, f func(*request) error) error { + callCount++ + // Arrange for at least 3 calls: catch up, failed-recovery, catch up + if callCount == 3 { + close(finished) + } + if callCount == 2 { + return errors.New("Failed recovery") + } + return nil + } + + sc := newSlowChaser(mock, time.Second, make(chan *request, 100)) + sc.TurnOn(&valuePointer{Fid: 42, Offset: 1000}) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-finished + cancel() + }() + + rs.managedRun(ctx, c, sc) + c.Assert(sc.IsOn(), IsTrue) }