Skip to content

Commit

Permalink
Delete unused retry implementations (#5909)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->

- `ConcurrentRetrier`: deleted, not used anywhere
- `TwoPhaseRetryPolicy`: deleted, not used anywhere
- `SystemClock`: replaced with `clock.TimeSource`

## Why?
<!-- Tell your future self why have you made these changes -->

Less code is good.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

Compiler.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
stephanos committed May 14, 2024
1 parent ee3e492 commit 814953b
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 174 deletions.
64 changes: 5 additions & 59 deletions common/backoff/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ package backoff

import (
"context"
"sync"
"time"

"go.temporal.io/api/serviceerror"
"go.temporal.io/server/common/clock"
)

const (
Expand All @@ -54,63 +54,8 @@ type (

// IsRetryable handler can be used to exclude certain errors during retry
IsRetryable func(error) bool

// ConcurrentRetrier is used for client-side throttling. It determines whether to
// throttle outgoing traffic in case downstream backend server rejects
// requests due to out-of-quota or server busy errors.
ConcurrentRetrier struct {
sync.Mutex
retrier Retrier // Backoff retrier
failureCount int64 // Number of consecutive failures seen
}
)

// Throttle Sleep if there were failures since the last success call.
func (c *ConcurrentRetrier) Throttle() {
c.throttleInternal()
}

func (c *ConcurrentRetrier) throttleInternal() time.Duration {
next := done

// Check if we have failure count.
failureCount := c.failureCount
if failureCount > 0 {
defer c.Unlock()
c.Lock()
if c.failureCount > 0 {
next = c.retrier.NextBackOff()
}
}

if next != done {
time.Sleep(next)
}

return next
}

// Succeeded marks client request succeeded.
func (c *ConcurrentRetrier) Succeeded() {
defer c.Unlock()
c.Lock()
c.failureCount = 0
c.retrier.Reset()
}

// Failed marks client request failed because backend is busy.
func (c *ConcurrentRetrier) Failed() {
defer c.Unlock()
c.Lock()
c.failureCount++
}

// NewConcurrentRetrier returns an instance of concurrent backoff retrier.
func NewConcurrentRetrier(retryPolicy RetryPolicy) *ConcurrentRetrier {
retrier := NewRetrier(retryPolicy, SystemClock)
return &ConcurrentRetrier{retrier: retrier}
}

// ThrottleRetry is a resource aware version of Retry.
// Resource exhausted error will be retried using a different throttle retry policy, instead of the specified one.
func ThrottleRetry(operation Operation, policy RetryPolicy, isRetryable IsRetryable) error {
Expand All @@ -137,8 +82,9 @@ func ThrottleRetryContext(

deadline, hasDeadline := ctx.Deadline()

r := NewRetrier(policy, SystemClock)
t := NewRetrier(throttleRetryPolicy, SystemClock)
timeSrc := clock.NewRealTimeSource()
r := NewRetrier(policy, timeSrc)
t := NewRetrier(throttleRetryPolicy, timeSrc)
for ctx.Err() == nil {
if err = operation(ctx); err == nil {
return nil
Expand All @@ -156,7 +102,7 @@ func ThrottleRetryContext(
next = max(next, t.NextBackOff())
}

if hasDeadline && SystemClock.Now().Add(next).After(deadline) {
if hasDeadline && timeSrc.Now().Add(next).After(deadline) {
break
}

Expand Down
50 changes: 2 additions & 48 deletions common/backoff/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package backoff

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -144,51 +143,6 @@ func (s *RetrySuite) TestIsRetryableFailure() {
s.Equal(1, i)
}

func (s *RetrySuite) TestConcurrentRetrier() {
policy := NewExponentialRetryPolicy(1 * time.Millisecond).
WithMaximumInterval(10 * time.Millisecond).
WithMaximumAttempts(4)

// Basic checks
retrier := NewConcurrentRetrier(policy)
retrier.Failed()
s.Equal(int64(1), retrier.failureCount)
retrier.Succeeded()
s.Equal(int64(0), retrier.failureCount)
sleepDuration := retrier.throttleInternal()
s.Equal(done, sleepDuration)

// Multiple count check.
retrier.Failed()
retrier.Failed()
s.Equal(int64(2), retrier.failureCount)
// Verify valid sleep times.
ch := make(chan time.Duration, 3)
go func() {
for i := 0; i < 3; i++ {
ch <- retrier.throttleInternal()
}
}()
for i := 0; i < 3; i++ {
val := <-ch
fmt.Printf("Duration: %d\n", val)
s.True(val > 0)
}
retrier.Succeeded()
s.Equal(int64(0), retrier.failureCount)
// Verify we don't have any sleep times.
go func() {
for i := 0; i < 3; i++ {
ch <- retrier.throttleInternal()
}
}()
for i := 0; i < 3; i++ {
val := <-ch
fmt.Printf("Duration: %d\n", val)
s.Equal(done, val)
}
}

func (s *RetrySuite) TestRetryContextCancel() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
Expand Down Expand Up @@ -247,7 +201,7 @@ func (s *RetrySuite) TestThrottleRetryContext() {
return &someError{}
}

start := SystemClock.Now()
start := time.Now()
err := ThrottleRetryContext(context.Background(), op, policy, retryEverything)
s.Equal(&someError{}, err)
s.GreaterOrEqual(
Expand All @@ -257,7 +211,7 @@ func (s *RetrySuite) TestThrottleRetryContext() {
)

// test if context timeout is respected
start = SystemClock.Now()
start = time.Now()
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
err = ThrottleRetryContext(ctx, func(_ context.Context) error { return &serviceerror.ResourceExhausted{} }, policy, retryEverything)
s.Equal(&serviceerror.ResourceExhausted{}, err)
Expand Down
46 changes: 8 additions & 38 deletions common/backoff/retrypolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"math"
"math/rand"
"time"

"go.temporal.io/server/common/clock"
)

const (
Expand Down Expand Up @@ -61,11 +63,6 @@ type (
Reset()
}

// Clock used by ExponentialRetryPolicy implementation to get the current time. Mainly used for unit testing
Clock interface {
Now() time.Time
}

// ExponentialRetryPolicy provides the implementation for retry policy using a coefficient to compute the next delay.
// Formula used to compute the next delay is:
// min(initialInterval * pow(backoffCoefficient, currentAttempt), maximumInterval)
Expand All @@ -77,29 +74,16 @@ type (
maximumAttempts int
}

// TwoPhaseRetryPolicy implements a policy that first use one policy to get next delay,
// and once expired use the second policy for the following retry.
// It can achieve fast retries in first phase then slowly retires in second phase.
TwoPhaseRetryPolicy struct {
firstPolicy RetryPolicy
secondPolicy RetryPolicy
}

disabledRetryPolicyImpl struct{}

systemClock struct{}

retrierImpl struct {
policy RetryPolicy
clock Clock
timeSource clock.TimeSource
currentAttempt int
startTime time.Time
}
)

// SystemClock implements Clock interface that uses time.Now().UTC().
var SystemClock = systemClock{}

// NewExponentialRetryPolicy returns an instance of ExponentialRetryPolicy using the provided initialInterval
func NewExponentialRetryPolicy(initialInterval time.Duration) *ExponentialRetryPolicy {
p := &ExponentialRetryPolicy{
Expand All @@ -114,11 +98,11 @@ func NewExponentialRetryPolicy(initialInterval time.Duration) *ExponentialRetryP
}

// NewRetrier is used for creating a new instance of Retrier
func NewRetrier(policy RetryPolicy, clock Clock) Retrier {
func NewRetrier(policy RetryPolicy, timeSource clock.TimeSource) Retrier {
return &retrierImpl{
policy: policy,
clock: clock,
startTime: clock.Now(),
timeSource: timeSource,
startTime: timeSource.Now(),
currentAttempt: 1,
}
}
Expand Down Expand Up @@ -203,27 +187,13 @@ func (p *ExponentialRetryPolicy) ComputeNextDelay(elapsedTime time.Duration, num
return time.Duration(nextInterval)
}

// ComputeNextDelay returns the next delay interval.
func (tp *TwoPhaseRetryPolicy) ComputeNextDelay(elapsedTime time.Duration, numAttempts int) time.Duration {
nextInterval := tp.firstPolicy.ComputeNextDelay(elapsedTime, numAttempts)
if nextInterval == done {
nextInterval = tp.secondPolicy.ComputeNextDelay(elapsedTime, numAttempts-defaultFirstPhaseMaximumAttempts)
}
return nextInterval
}

func (r *disabledRetryPolicyImpl) ComputeNextDelay(_ time.Duration, _ int) time.Duration {
return done
}

// Now returns the current time using the system clock
func (t systemClock) Now() time.Time {
return time.Now().UTC()
}

// Reset will set the Retrier into initial state
func (r *retrierImpl) Reset() {
r.startTime = r.clock.Now()
r.startTime = r.timeSource.Now()
r.currentAttempt = 1
}

Expand All @@ -237,5 +207,5 @@ func (r *retrierImpl) NextBackOff() time.Duration {
}

func (r *retrierImpl) getElapsedTime() time.Duration {
return r.clock.Now().Sub(r.startTime)
return r.timeSource.Now().Sub(r.startTime)
}
40 changes: 15 additions & 25 deletions common/backoff/retrypolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,14 @@ import (

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.temporal.io/server/common/clock"
)

type (
RetryPolicySuite struct {
*require.Assertions // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, not merely log an error
suite.Suite
}

TestClock struct {
currentTime time.Time
}
)

// ExampleExponentialRetryPolicy_WithMaximumInterval demonstrates example delays with a backoff coefficient of 2 and a
Expand Down Expand Up @@ -163,8 +160,8 @@ func (s *RetryPolicySuite) TestExpirationInterval() {
policy := createPolicy(2 * time.Second).
WithExpirationInterval(5 * time.Minute)

r, clock := createRetrier(policy)
clock.moveClock(6 * time.Minute)
r, ts := createRetrier(policy)
ts.Advance(6 * time.Minute)
next := r.NextBackOff()

s.Equal(done, next)
Expand All @@ -174,13 +171,13 @@ func (s *RetryPolicySuite) TestExpirationOverflow() {
policy := createPolicy(2 * time.Second).
WithExpirationInterval(5 * time.Second)

r, clock := createRetrier(policy)
r, ts := createRetrier(policy)
next := r.NextBackOff()
min, max := getNextBackoffRange(2 * time.Second)
s.True(next >= min, "NextBackoff too low")
s.True(next < max, "NextBackoff too high")

clock.moveClock(2 * time.Second)
ts.Advance(2 * time.Second)

next = r.NextBackOff()
min, max = getNextBackoffRange(3 * time.Second)
Expand All @@ -193,7 +190,7 @@ func (s *RetryPolicySuite) TestDefaultPublishRetryPolicy() {
WithExpirationInterval(time.Minute).
WithMaximumInterval(10 * time.Second)

r, clock := createRetrier(policy)
r, ts := createRetrier(policy)
expectedResult := []time.Duration{
50 * time.Millisecond,
100 * time.Millisecond,
Expand All @@ -219,7 +216,7 @@ func (s *RetryPolicySuite) TestDefaultPublishRetryPolicy() {
min, max := getNextBackoffRange(expected)
s.True(next >= min, "NextBackoff too low: actual: %v, min: %v", next, min)
s.True(next < max, "NextBackoff too high: actual: %v, max: %v", next, max)
clock.moveClock(expected)
ts.Advance(expected)
}
}
}
Expand All @@ -229,33 +226,25 @@ func (s *RetryPolicySuite) TestNoMaxAttempts() {
WithExpirationInterval(time.Minute).
WithMaximumInterval(10 * time.Second)

r, clock := createRetrier(policy)
r, ts := createRetrier(policy)
for i := 0; i < 100; i++ {
next := r.NextBackOff()
s.True(next > 0 || next == done, "Unexpected value for next retry duration: %v", next)
clock.moveClock(next)
ts.Advance(next)
}
}

func (s *RetryPolicySuite) TestUnbounded() {
policy := createPolicy(50 * time.Millisecond)

r, clock := createRetrier(policy)
r, ts := createRetrier(policy)
for i := 0; i < 100; i++ {
next := r.NextBackOff()
s.True(next > 0 || next == done, "Unexpected value for next retry duration: %v", next)
clock.moveClock(next)
ts.Advance(next)
}
}

func (c *TestClock) Now() time.Time {
return c.currentTime
}

func (c *TestClock) moveClock(duration time.Duration) {
c.currentTime = c.currentTime.Add(duration)
}

func createPolicy(initialInterval time.Duration) *ExponentialRetryPolicy {
policy := NewExponentialRetryPolicy(initialInterval).
WithBackoffCoefficient(2).
Expand All @@ -266,9 +255,10 @@ func createPolicy(initialInterval time.Duration) *ExponentialRetryPolicy {
return policy
}

func createRetrier(policy RetryPolicy) (Retrier, *TestClock) {
clock := &TestClock{currentTime: time.Time{}}
return NewRetrier(policy, clock), clock
func createRetrier(policy RetryPolicy) (Retrier, *clock.EventTimeSource) {
ts := clock.NewEventTimeSource()
ts.Update(time.Time{})
return NewRetrier(policy, ts), ts
}

func getNextBackoffRange(duration time.Duration) (time.Duration, time.Duration) {
Expand Down

0 comments on commit 814953b

Please sign in to comment.