Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 23 additions & 22 deletions ratex/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ import (
type RateLimitParams struct {
RateLimiter *rate.Limiter // can be nil to create a new rate limiter

// RetryAttempt represents the current retry attempt, starting at 1. This will increment for each retry
RetryAttempt int
MinDuration time.Duration
MaxDuration time.Duration
// Attempt represents the current attempt, starting at 1. This will increment for each (re)try
Attempt int

MinDuration time.Duration
MaxDuration time.Duration
}

func RateLimit(ctx context.Context, params RateLimitParams) (*rate.Limiter, error) {
ctx, span := telemetry.StartSpan(ctx, "rate-limiter-wait",
trace.WithAttributes(
attribute.Int("retry_attempt", params.RetryAttempt),
attribute.Int("attempt", params.Attempt),
attribute.Int64("min_duration_ms", params.MinDuration.Milliseconds()),
attribute.Int64("max_duration_ms", params.MaxDuration.Milliseconds()),
))
Expand All @@ -52,7 +53,7 @@ func RateLimit(ctx context.Context, params RateLimitParams) (*rate.Limiter, erro

// generateRateLimiter initializes a new rate limiter or sets a new limit on it.
func generateRateLimiter(ctx context.Context, params RateLimitParams) (*rate.Limiter, error) {
rateLimitDuration, err := generateRateLimitDuration(params.RetryAttempt, params.MinDuration, params.MaxDuration)
rateLimitDuration, err := generateRateLimitDuration(params.Attempt, params.MinDuration, params.MaxDuration)
if err != nil {
return nil, fmt.Errorf("generating rate limit duration: %w", err)
}
Expand Down Expand Up @@ -103,7 +104,7 @@ func generateRateLimitDuration(multiplier int, minDuration, maxDuration time.Dur

type RetryParams struct {
ShouldRetry func(err error) bool
MaxRetries int
MaxAttempts int
MinDuration time.Duration
MaxDuration time.Duration
}
Expand All @@ -116,8 +117,8 @@ func ExecRetryable[R any](ctx context.Context, closure func(ctx context.Context)
)

// Validate params
if params.MaxRetries <= 0 {
params.MaxRetries = 1 // Default to at least one try
if params.MaxAttempts <= 0 {
params.MaxAttempts = 1 // Default to at least one try
}
if params.MinDuration <= 0 {
params.MinDuration = 100 * time.Millisecond // Default min backoff
Expand All @@ -126,20 +127,20 @@ func ExecRetryable[R any](ctx context.Context, closure func(ctx context.Context)
params.MaxDuration = params.MinDuration * 10 // Default max to 10x min
}

retryFunc := func(ctx context.Context, retryAttempt int) (R, error) {
tryFunc := func(ctx context.Context, attemptNum int) (R, error) {
tryCtx, span := telemetry.StartSpan(ctx, "try",
trace.WithAttributes(
attribute.Int("retry_attempt", retryAttempt),
attribute.Int("max_tries", params.MaxRetries),
attribute.Int("attempt", attemptNum),
attribute.Int("max_attempts", params.MaxAttempts),
),
)
defer span.End()
return closure(tryCtx)
}

for i := 0; i < params.MaxRetries; i++ {
retryAttempt := i + 1
retVal, err = retryFunc(ctx, retryAttempt)
for i := range params.MaxAttempts {
attempt := i + 1
retVal, err = tryFunc(ctx, attempt)

// no error means success - break out
if err == nil {
Expand All @@ -152,20 +153,20 @@ func ExecRetryable[R any](ctx context.Context, closure func(ctx context.Context)
}

// record event if we'll be attempting retries
err = fmt.Errorf("try %d of %d: %w", retryAttempt, params.MaxRetries, err)
err = fmt.Errorf("try %d of %d: %w", attempt, params.MaxAttempts, err)
telemetry.AddEvent(ctx, err.Error())

if retryAttempt != params.MaxRetries {
if attempt != params.MaxAttempts {
// If error and we haven't hit max tries,
// generate rate limiter to delay retries.
// This will jitter a wait time before the next iteration.
//
// We abort on rate limit errors (e.g., ctx cancel) instead of continuing
rlParams := RateLimitParams{
RateLimiter: rateLimiter,
RetryAttempt: retryAttempt,
MinDuration: params.MinDuration,
MaxDuration: params.MaxDuration,
RateLimiter: rateLimiter,
Attempt: attempt,
MinDuration: params.MinDuration,
MaxDuration: params.MaxDuration,
}
rateLimiter, err = RateLimit(ctx, rlParams)
if err != nil {
Expand All @@ -176,7 +177,7 @@ func ExecRetryable[R any](ctx context.Context, closure func(ctx context.Context)
}

if err != nil {
return retVal, fmt.Errorf("hit max tries %d: %w", params.MaxRetries, err)
return retVal, fmt.Errorf("hit max tries %d: %w", params.MaxAttempts, err)
}
return retVal, nil
}
44 changes: 22 additions & 22 deletions ratex/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestExecRetryable(t *testing.T) {
}
params := RetryParams{
ShouldRetry: func(err error) bool { return true },
MaxRetries: 3,
MaxAttempts: 3,
MinDuration: 10 * time.Millisecond,
MaxDuration: 50 * time.Millisecond,
}
Expand All @@ -40,7 +40,7 @@ func TestExecRetryable(t *testing.T) {
}
params := RetryParams{
ShouldRetry: func(err error) bool { return true },
MaxRetries: 3,
MaxAttempts: 3,
MinDuration: 10 * time.Millisecond,
MaxDuration: 50 * time.Millisecond,
}
Expand All @@ -63,7 +63,7 @@ func TestExecRetryable(t *testing.T) {
}
params := RetryParams{
ShouldRetry: func(err error) bool { return false },
MaxRetries: 3,
MaxAttempts: 3,
MinDuration: 10 * time.Millisecond,
MaxDuration: 50 * time.Millisecond,
}
Expand All @@ -73,15 +73,15 @@ func TestExecRetryable(t *testing.T) {
require.Equal(t, "non-retryable error", err.Error())
})

t.Run("Retryable failures exceeding MaxRetries", func(t *testing.T) {
t.Run("Retryable failures exceeding MaxAttempts", func(t *testing.T) {
attempts := 0
closure := func(ctx context.Context) (string, error) {
attempts++
return "", errors.New("retryable error")
}
params := RetryParams{
ShouldRetry: func(err error) bool { return true },
MaxRetries: 3,
MaxAttempts: 3,
MinDuration: 10 * time.Millisecond,
MaxDuration: 50 * time.Millisecond,
}
Expand All @@ -106,7 +106,7 @@ func TestExecRetryable(t *testing.T) {
}
params := RetryParams{
ShouldRetry: func(err error) bool { return true },
MaxRetries: 3,
MaxAttempts: 3,
MinDuration: 10 * time.Millisecond,
MaxDuration: 50 * time.Millisecond,
}
Expand All @@ -124,7 +124,7 @@ func TestExecRetryable(t *testing.T) {
}
params := RetryParams{
ShouldRetry: func(err error) bool { return true },
MaxRetries: 3,
MaxAttempts: 3,
MinDuration: 100 * time.Millisecond,
MaxDuration: 200 * time.Millisecond,
}
Expand All @@ -143,7 +143,7 @@ func TestExecRetryable(t *testing.T) {
}
params := RetryParams{
ShouldRetry: func(err error) bool { return true },
MaxRetries: 0, // Should default to 1
MaxAttempts: 0, // Should default to 1
MinDuration: 0, // Should default to 100ms
MaxDuration: 0, // Should default to 100ms * 10 = 1s
}
Expand All @@ -159,7 +159,7 @@ func TestExecRetryable(t *testing.T) {
}
params := RetryParams{
ShouldRetry: func(err error) bool { return true },
MaxRetries: 3,
MaxAttempts: 3,
MinDuration: 100 * time.Millisecond,
MaxDuration: 50 * time.Millisecond, // Will be set to 100ms * 10 = 1s
}
Expand All @@ -174,7 +174,7 @@ func TestExecRetryable(t *testing.T) {
}
params := RetryParams{
ShouldRetry: func(err error) bool { return true },
MaxRetries: 3,
MaxAttempts: 3,
MinDuration: 10 * time.Millisecond,
MaxDuration: 50 * time.Millisecond,
}
Expand Down Expand Up @@ -234,10 +234,10 @@ func TestRateLimit(t *testing.T) {

t.Run("New limiter", func(t *testing.T) {
params := RateLimitParams{
RateLimiter: nil,
RetryAttempt: 1,
MinDuration: 10 * time.Millisecond,
MaxDuration: 20 * time.Millisecond,
RateLimiter: nil,
Attempt: 1,
MinDuration: 10 * time.Millisecond,
MaxDuration: 20 * time.Millisecond,
}
limiter, err := RateLimit(ctx, params)
require.NoError(t, err)
Expand All @@ -247,10 +247,10 @@ func TestRateLimit(t *testing.T) {
t.Run("Existing limiter update", func(t *testing.T) {
existing := rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
params := RateLimitParams{
RateLimiter: existing,
RetryAttempt: 2,
MinDuration: 10 * time.Millisecond,
MaxDuration: 20 * time.Millisecond,
RateLimiter: existing,
Attempt: 2,
MinDuration: 10 * time.Millisecond,
MaxDuration: 20 * time.Millisecond,
}
limiter, err := RateLimit(ctx, params)
require.NoError(t, err)
Expand All @@ -260,10 +260,10 @@ func TestRateLimit(t *testing.T) {
t.Run("Context cancel during wait", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
params := RateLimitParams{
RateLimiter: nil,
RetryAttempt: 1,
MinDuration: 100 * time.Millisecond,
MaxDuration: 200 * time.Millisecond,
RateLimiter: nil,
Attempt: 1,
MinDuration: 100 * time.Millisecond,
MaxDuration: 200 * time.Millisecond,
}
go func() {
time.Sleep(50 * time.Millisecond)
Expand Down
4 changes: 2 additions & 2 deletions sql/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func TestDBRetryableOperations(t *testing.T) {

retryParams := ratex.RetryParams{
ShouldRetry: func(err error) bool { return true },
MaxRetries: 3,
MaxAttempts: 3,
MinDuration: 10 * time.Millisecond,
MaxDuration: 50 * time.Millisecond,
}
Expand Down Expand Up @@ -412,7 +412,7 @@ func TestDBRetryableOperations(t *testing.T) {
require.Error(t, err)
})

t.Run("ExecContextRetryable - Retryable failures exceeding MaxRetries", func(t *testing.T) {
t.Run("ExecContextRetryable - Retryable failures exceeding MaxAttempts", func(t *testing.T) {
closure := func(ctx context.Context) (gosql.Result, error) {
return nil, errors.New("retryable error")
}
Expand Down
Loading