From 814953b6011d0e1a8ead651d00b7e9a3741b908f Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Mon, 13 May 2024 19:46:43 -0700 Subject: [PATCH] Delete unused retry implementations (#5909) ## What changed? - `ConcurrentRetrier`: deleted, not used anywhere - `TwoPhaseRetryPolicy`: deleted, not used anywhere - `SystemClock`: replaced with `clock.TimeSource` ## Why? Less code is good. ## How did you test it? Compiler. ## Potential risks ## Documentation ## Is hotfix candidate? --- common/backoff/retry.go | 64 +++------------------------- common/backoff/retry_test.go | 50 +--------------------- common/backoff/retrypolicy.go | 46 ++++---------------- common/backoff/retrypolicy_test.go | 40 +++++++---------- service/history/queues/queue_base.go | 2 +- service/history/queues/reader.go | 2 +- service/matching/task_reader.go | 3 +- service/worker/pernamespaceworker.go | 3 +- 8 files changed, 36 insertions(+), 174 deletions(-) diff --git a/common/backoff/retry.go b/common/backoff/retry.go index 27e28bf4490..42d3b78c98f 100644 --- a/common/backoff/retry.go +++ b/common/backoff/retry.go @@ -26,10 +26,10 @@ package backoff import ( "context" - "sync" "time" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/common/clock" ) const ( @@ -54,63 +54,8 @@ type ( // IsRetryable handler can be used to exclude certain errors during retry IsRetryable func(error) bool - - // ConcurrentRetrier is used for client-side throttling. It determines whether to - // throttle outgoing traffic in case downstream backend server rejects - // requests due to out-of-quota or server busy errors. - ConcurrentRetrier struct { - sync.Mutex - retrier Retrier // Backoff retrier - failureCount int64 // Number of consecutive failures seen - } ) -// Throttle Sleep if there were failures since the last success call. -func (c *ConcurrentRetrier) Throttle() { - c.throttleInternal() -} - -func (c *ConcurrentRetrier) throttleInternal() time.Duration { - next := done - - // Check if we have failure count. - failureCount := c.failureCount - if failureCount > 0 { - defer c.Unlock() - c.Lock() - if c.failureCount > 0 { - next = c.retrier.NextBackOff() - } - } - - if next != done { - time.Sleep(next) - } - - return next -} - -// Succeeded marks client request succeeded. -func (c *ConcurrentRetrier) Succeeded() { - defer c.Unlock() - c.Lock() - c.failureCount = 0 - c.retrier.Reset() -} - -// Failed marks client request failed because backend is busy. -func (c *ConcurrentRetrier) Failed() { - defer c.Unlock() - c.Lock() - c.failureCount++ -} - -// NewConcurrentRetrier returns an instance of concurrent backoff retrier. -func NewConcurrentRetrier(retryPolicy RetryPolicy) *ConcurrentRetrier { - retrier := NewRetrier(retryPolicy, SystemClock) - return &ConcurrentRetrier{retrier: retrier} -} - // ThrottleRetry is a resource aware version of Retry. // Resource exhausted error will be retried using a different throttle retry policy, instead of the specified one. func ThrottleRetry(operation Operation, policy RetryPolicy, isRetryable IsRetryable) error { @@ -137,8 +82,9 @@ func ThrottleRetryContext( deadline, hasDeadline := ctx.Deadline() - r := NewRetrier(policy, SystemClock) - t := NewRetrier(throttleRetryPolicy, SystemClock) + timeSrc := clock.NewRealTimeSource() + r := NewRetrier(policy, timeSrc) + t := NewRetrier(throttleRetryPolicy, timeSrc) for ctx.Err() == nil { if err = operation(ctx); err == nil { return nil @@ -156,7 +102,7 @@ func ThrottleRetryContext( next = max(next, t.NextBackOff()) } - if hasDeadline && SystemClock.Now().Add(next).After(deadline) { + if hasDeadline && timeSrc.Now().Add(next).After(deadline) { break } diff --git a/common/backoff/retry_test.go b/common/backoff/retry_test.go index 75ad9ed6986..9ea3b7f0cbe 100644 --- a/common/backoff/retry_test.go +++ b/common/backoff/retry_test.go @@ -26,7 +26,6 @@ package backoff import ( "context" - "fmt" "testing" "time" @@ -144,51 +143,6 @@ func (s *RetrySuite) TestIsRetryableFailure() { s.Equal(1, i) } -func (s *RetrySuite) TestConcurrentRetrier() { - policy := NewExponentialRetryPolicy(1 * time.Millisecond). - WithMaximumInterval(10 * time.Millisecond). - WithMaximumAttempts(4) - - // Basic checks - retrier := NewConcurrentRetrier(policy) - retrier.Failed() - s.Equal(int64(1), retrier.failureCount) - retrier.Succeeded() - s.Equal(int64(0), retrier.failureCount) - sleepDuration := retrier.throttleInternal() - s.Equal(done, sleepDuration) - - // Multiple count check. - retrier.Failed() - retrier.Failed() - s.Equal(int64(2), retrier.failureCount) - // Verify valid sleep times. - ch := make(chan time.Duration, 3) - go func() { - for i := 0; i < 3; i++ { - ch <- retrier.throttleInternal() - } - }() - for i := 0; i < 3; i++ { - val := <-ch - fmt.Printf("Duration: %d\n", val) - s.True(val > 0) - } - retrier.Succeeded() - s.Equal(int64(0), retrier.failureCount) - // Verify we don't have any sleep times. - go func() { - for i := 0; i < 3; i++ { - ch <- retrier.throttleInternal() - } - }() - for i := 0; i < 3; i++ { - val := <-ch - fmt.Printf("Duration: %d\n", val) - s.Equal(done, val) - } -} - func (s *RetrySuite) TestRetryContextCancel() { ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -247,7 +201,7 @@ func (s *RetrySuite) TestThrottleRetryContext() { return &someError{} } - start := SystemClock.Now() + start := time.Now() err := ThrottleRetryContext(context.Background(), op, policy, retryEverything) s.Equal(&someError{}, err) s.GreaterOrEqual( @@ -257,7 +211,7 @@ func (s *RetrySuite) TestThrottleRetryContext() { ) // test if context timeout is respected - start = SystemClock.Now() + start = time.Now() ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) err = ThrottleRetryContext(ctx, func(_ context.Context) error { return &serviceerror.ResourceExhausted{} }, policy, retryEverything) s.Equal(&serviceerror.ResourceExhausted{}, err) diff --git a/common/backoff/retrypolicy.go b/common/backoff/retrypolicy.go index 277ac56c57a..01d76813a98 100644 --- a/common/backoff/retrypolicy.go +++ b/common/backoff/retrypolicy.go @@ -28,6 +28,8 @@ import ( "math" "math/rand" "time" + + "go.temporal.io/server/common/clock" ) const ( @@ -61,11 +63,6 @@ type ( Reset() } - // Clock used by ExponentialRetryPolicy implementation to get the current time. Mainly used for unit testing - Clock interface { - Now() time.Time - } - // ExponentialRetryPolicy provides the implementation for retry policy using a coefficient to compute the next delay. // Formula used to compute the next delay is: // min(initialInterval * pow(backoffCoefficient, currentAttempt), maximumInterval) @@ -77,29 +74,16 @@ type ( maximumAttempts int } - // TwoPhaseRetryPolicy implements a policy that first use one policy to get next delay, - // and once expired use the second policy for the following retry. - // It can achieve fast retries in first phase then slowly retires in second phase. - TwoPhaseRetryPolicy struct { - firstPolicy RetryPolicy - secondPolicy RetryPolicy - } - disabledRetryPolicyImpl struct{} - systemClock struct{} - retrierImpl struct { policy RetryPolicy - clock Clock + timeSource clock.TimeSource currentAttempt int startTime time.Time } ) -// SystemClock implements Clock interface that uses time.Now().UTC(). -var SystemClock = systemClock{} - // NewExponentialRetryPolicy returns an instance of ExponentialRetryPolicy using the provided initialInterval func NewExponentialRetryPolicy(initialInterval time.Duration) *ExponentialRetryPolicy { p := &ExponentialRetryPolicy{ @@ -114,11 +98,11 @@ func NewExponentialRetryPolicy(initialInterval time.Duration) *ExponentialRetryP } // NewRetrier is used for creating a new instance of Retrier -func NewRetrier(policy RetryPolicy, clock Clock) Retrier { +func NewRetrier(policy RetryPolicy, timeSource clock.TimeSource) Retrier { return &retrierImpl{ policy: policy, - clock: clock, - startTime: clock.Now(), + timeSource: timeSource, + startTime: timeSource.Now(), currentAttempt: 1, } } @@ -203,27 +187,13 @@ func (p *ExponentialRetryPolicy) ComputeNextDelay(elapsedTime time.Duration, num return time.Duration(nextInterval) } -// ComputeNextDelay returns the next delay interval. -func (tp *TwoPhaseRetryPolicy) ComputeNextDelay(elapsedTime time.Duration, numAttempts int) time.Duration { - nextInterval := tp.firstPolicy.ComputeNextDelay(elapsedTime, numAttempts) - if nextInterval == done { - nextInterval = tp.secondPolicy.ComputeNextDelay(elapsedTime, numAttempts-defaultFirstPhaseMaximumAttempts) - } - return nextInterval -} - func (r *disabledRetryPolicyImpl) ComputeNextDelay(_ time.Duration, _ int) time.Duration { return done } -// Now returns the current time using the system clock -func (t systemClock) Now() time.Time { - return time.Now().UTC() -} - // Reset will set the Retrier into initial state func (r *retrierImpl) Reset() { - r.startTime = r.clock.Now() + r.startTime = r.timeSource.Now() r.currentAttempt = 1 } @@ -237,5 +207,5 @@ func (r *retrierImpl) NextBackOff() time.Duration { } func (r *retrierImpl) getElapsedTime() time.Duration { - return r.clock.Now().Sub(r.startTime) + return r.timeSource.Now().Sub(r.startTime) } diff --git a/common/backoff/retrypolicy_test.go b/common/backoff/retrypolicy_test.go index ddd96fdea83..76082c57aed 100644 --- a/common/backoff/retrypolicy_test.go +++ b/common/backoff/retrypolicy_test.go @@ -32,6 +32,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.temporal.io/server/common/clock" ) type ( @@ -39,10 +40,6 @@ type ( *require.Assertions // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error suite.Suite } - - TestClock struct { - currentTime time.Time - } ) // ExampleExponentialRetryPolicy_WithMaximumInterval demonstrates example delays with a backoff coefficient of 2 and a @@ -163,8 +160,8 @@ func (s *RetryPolicySuite) TestExpirationInterval() { policy := createPolicy(2 * time.Second). WithExpirationInterval(5 * time.Minute) - r, clock := createRetrier(policy) - clock.moveClock(6 * time.Minute) + r, ts := createRetrier(policy) + ts.Advance(6 * time.Minute) next := r.NextBackOff() s.Equal(done, next) @@ -174,13 +171,13 @@ func (s *RetryPolicySuite) TestExpirationOverflow() { policy := createPolicy(2 * time.Second). WithExpirationInterval(5 * time.Second) - r, clock := createRetrier(policy) + r, ts := createRetrier(policy) next := r.NextBackOff() min, max := getNextBackoffRange(2 * time.Second) s.True(next >= min, "NextBackoff too low") s.True(next < max, "NextBackoff too high") - clock.moveClock(2 * time.Second) + ts.Advance(2 * time.Second) next = r.NextBackOff() min, max = getNextBackoffRange(3 * time.Second) @@ -193,7 +190,7 @@ func (s *RetryPolicySuite) TestDefaultPublishRetryPolicy() { WithExpirationInterval(time.Minute). WithMaximumInterval(10 * time.Second) - r, clock := createRetrier(policy) + r, ts := createRetrier(policy) expectedResult := []time.Duration{ 50 * time.Millisecond, 100 * time.Millisecond, @@ -219,7 +216,7 @@ func (s *RetryPolicySuite) TestDefaultPublishRetryPolicy() { min, max := getNextBackoffRange(expected) s.True(next >= min, "NextBackoff too low: actual: %v, min: %v", next, min) s.True(next < max, "NextBackoff too high: actual: %v, max: %v", next, max) - clock.moveClock(expected) + ts.Advance(expected) } } } @@ -229,33 +226,25 @@ func (s *RetryPolicySuite) TestNoMaxAttempts() { WithExpirationInterval(time.Minute). WithMaximumInterval(10 * time.Second) - r, clock := createRetrier(policy) + r, ts := createRetrier(policy) for i := 0; i < 100; i++ { next := r.NextBackOff() s.True(next > 0 || next == done, "Unexpected value for next retry duration: %v", next) - clock.moveClock(next) + ts.Advance(next) } } func (s *RetryPolicySuite) TestUnbounded() { policy := createPolicy(50 * time.Millisecond) - r, clock := createRetrier(policy) + r, ts := createRetrier(policy) for i := 0; i < 100; i++ { next := r.NextBackOff() s.True(next > 0 || next == done, "Unexpected value for next retry duration: %v", next) - clock.moveClock(next) + ts.Advance(next) } } -func (c *TestClock) Now() time.Time { - return c.currentTime -} - -func (c *TestClock) moveClock(duration time.Duration) { - c.currentTime = c.currentTime.Add(duration) -} - func createPolicy(initialInterval time.Duration) *ExponentialRetryPolicy { policy := NewExponentialRetryPolicy(initialInterval). WithBackoffCoefficient(2). @@ -266,9 +255,10 @@ func createPolicy(initialInterval time.Duration) *ExponentialRetryPolicy { return policy } -func createRetrier(policy RetryPolicy) (Retrier, *TestClock) { - clock := &TestClock{currentTime: time.Time{}} - return NewRetrier(policy, clock), clock +func createRetrier(policy RetryPolicy) (Retrier, *clock.EventTimeSource) { + ts := clock.NewEventTimeSource() + ts.Update(time.Time{}) + return NewRetrier(policy, ts), ts } func getNextBackoffRange(duration time.Duration) (time.Duration, time.Duration) { diff --git a/service/history/queues/queue_base.go b/service/history/queues/queue_base.go index 456fa23a952..92616af42b1 100644 --- a/service/history/queues/queue_base.go +++ b/service/history/queues/queue_base.go @@ -234,7 +234,7 @@ func newQueueBase( // pollTimer and checkpointTimer are initialized on Start() checkpointRetrier: backoff.NewRetrier( createCheckpointRetryPolicy(), - backoff.SystemClock, + clock.NewRealTimeSource(), ), alertCh: monitor.AlertCh(), diff --git a/service/history/queues/reader.go b/service/history/queues/reader.go index 01e91276a3f..cf4044ad525 100644 --- a/service/history/queues/reader.go +++ b/service/history/queues/reader.go @@ -156,7 +156,7 @@ func NewReader( retrier: backoff.NewRetrier( common.CreateReadTaskRetryPolicy(), - backoff.SystemClock, + clock.NewRealTimeSource(), ), rateLimitContext: rateLimitContext, diff --git a/service/matching/task_reader.go b/service/matching/task_reader.go index 5fbd4cbdb2a..7ae31666452 100644 --- a/service/matching/task_reader.go +++ b/service/matching/task_reader.go @@ -35,6 +35,7 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" + "go.temporal.io/server/common/clock" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -70,7 +71,7 @@ func newTaskReader(backlogMgr *backlogManagerImpl) *taskReader { taskBuffer: make(chan *persistencespb.AllocatedTaskInfo, backlogMgr.config.GetTasksBatchSize()-1), retrier: backoff.NewRetrier( common.CreateReadTaskRetryPolicy(), - backoff.SystemClock, + clock.NewRealTimeSource(), ), } } diff --git a/service/worker/pernamespaceworker.go b/service/worker/pernamespaceworker.go index a4b2f36e634..a807bb43a7c 100644 --- a/service/worker/pernamespaceworker.go +++ b/service/worker/pernamespaceworker.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/api/serviceerror" sdkclient "go.temporal.io/sdk/client" sdkworker "go.temporal.io/sdk/worker" + "go.temporal.io/server/common/clock" "go.uber.org/fx" "golang.org/x/exp/maps" @@ -260,7 +261,7 @@ func (wm *perNamespaceWorkerManager) getWorkerByNamespace(ns *namespace.Namespac worker := &perNamespaceWorker{ wm: wm, logger: log.With(wm.logger, tag.WorkflowNamespace(ns.Name().String())), - retrier: backoff.NewRetrier(backoff.NewExponentialRetryPolicy(wm.initialRetry), backoff.SystemClock), + retrier: backoff.NewRetrier(backoff.NewExponentialRetryPolicy(wm.initialRetry), clock.NewRealTimeSource()), ns: ns, }