Skip to content

Commit

Permalink
Merge pull request #235 from rusq/i234
Browse files Browse the repository at this point in the history
introduce retry on tcp errors
  • Loading branch information
rusq committed Sep 23, 2023
2 parents be0e57f + 6528ffb commit 885f24e
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 25 deletions.
2 changes: 1 addition & 1 deletion channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (sd *Session) getChannels(ctx context.Context, chanTypes []string, cb func(
nextcur string
)
reqStart := time.Now()
if err := withRetry(ctx, limiter, sd.options.Tier3Retries, func() error {
if err := network.WithRetry(ctx, limiter, sd.options.Tier3Retries, func() error {
var err error
trace.WithRegion(ctx, "GetConversationsContext", func() {
chans, nextcur, err = sd.client.GetConversationsContext(ctx, params)
Expand Down
34 changes: 27 additions & 7 deletions internal/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"runtime/trace"
"sync"
Expand All @@ -29,7 +30,8 @@ var (
lg logger.Interface = logger.Default
// waitFn returns the amount of time to wait before retrying depending on
// the current attempt. This variable exists to reduce the test time.
waitFn = cubicWait
waitFn = cubicWait
netWaitFn = expWait

mu sync.RWMutex
)
Expand All @@ -38,7 +40,7 @@ var (
// function wasn't able to complete without errors.
var ErrRetryFailed = errors.New("callback was unable to complete without errors within the allowed number of retries")

// withRetry will run the callback function fn. If the function returns
// WithRetry will run the callback function fn. If the function returns
// slack.RateLimitedError, it will delay, and then call it again up to
// maxAttempts times. It will return an error if it runs out of attempts.
func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func() error) error {
Expand All @@ -48,7 +50,7 @@ func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func(
}
for attempt := 0; attempt < maxAttempts; attempt++ {
var err error
trace.WithRegion(ctx, "withRetry.wait", func() {
trace.WithRegion(ctx, "WithRetry.wait", func() {
err = lim.Wait(ctx)
})
if err != nil {
Expand All @@ -61,23 +63,33 @@ func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func(
break
}

tracelogf(ctx, "error", "slackRetry: %s after %d attempts", cbErr, attempt+1)
tracelogf(ctx, "error", "WithRetry: %[1]s (%[1]T) after %[2]d attempts", cbErr, attempt+1)
var (
rle *slack.RateLimitedError
sce slack.StatusCodeError
ne *net.OpError // read tcp error: see #234
)
if errors.As(cbErr, &rle) {
switch {
case errors.As(cbErr, &rle):
tracelogf(ctx, "info", "got rate limited, sleeping %s", rle.RetryAfter)
time.Sleep(rle.RetryAfter)
continue
} else if errors.As(cbErr, &sce) {
case errors.As(cbErr, &sce):
if isRecoverable(sce.Code) {
// possibly transient error
delay := waitFn(attempt)
tracelogf(ctx, "info", "got server error %d, sleeping %s", sce.Code, delay)
time.Sleep(delay)
continue
}
case errors.As(cbErr, &ne):
if ne.Op == "read" || ne.Op == "write" {
// possibly transient error
delay := netWaitFn(attempt)
tracelogf(ctx, "info", "got network error %s, sleeping %s", ne.Op, delay)
time.Sleep(delay)
continue
}
}

return fmt.Errorf("callback error: %w", cbErr)
Expand All @@ -90,7 +102,7 @@ func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func(

// isRecoverable returns true if the status code is a recoverable error.
func isRecoverable(statusCode int) bool {
return (statusCode >= http.StatusInternalServerError && statusCode <= 599) || statusCode == 408
return (statusCode >= http.StatusInternalServerError && statusCode <= 599 && statusCode != 501) || statusCode == 408
}

// cubicWait is the wait time function. Time is calculated as (x+2)^3 seconds,
Expand All @@ -105,6 +117,14 @@ func cubicWait(attempt int) time.Duration {
return delay
}

func expWait(attempt int) time.Duration {
delay := time.Duration(2<<uint(attempt)) * time.Second
if delay > maxAllowedWaitTime {
return maxAllowedWaitTime
}
return delay
}

func tracelogf(ctx context.Context, category string, fmt string, a ...any) {
mu.RLock()
defer mu.RUnlock()
Expand Down
43 changes: 37 additions & 6 deletions internal/network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"net/http/httptest"
"reflect"
Expand All @@ -23,6 +24,14 @@ func calcRunDuration(rateLimit float64, attempts int) time.Duration {
return time.Duration(attempts) * time.Duration(float64(time.Second)/rateLimit)
}

func calcExpRunDuration(attempts int) time.Duration {
var sec time.Duration
for i := 0; i < attempts; i++ {
sec += expWait(i)
}
return sec
}

// retryFn will return slack.RateLimitedError for numAttempts time and err after.
func retryFn(numAttempts int, retryAfter time.Duration, err error) func() error {
i := 0
Expand All @@ -35,6 +44,18 @@ func retryFn(numAttempts int, retryAfter time.Duration, err error) func() error
}
}

// errSeqFn will return err for forTimes time and thenErr after.
func errSeqFn(err error, forTimes int, thenErr error) func() error {
i := 0
return func() error {
if i < forTimes {
i++
return err
}
return thenErr
}
}

func dAbs(d time.Duration) time.Duration {
if d < 0 {
return -d
Expand Down Expand Up @@ -110,7 +131,7 @@ func Test_withRetry(t *testing.T) {
false,
calcRunDuration(10.0, 4),
},
{"slackRetry should honour the value in the rate limit error",
{"should honour the value in the rate limit error",
args{
context.Background(),
rate.NewLimiter(1000, 1),
Expand All @@ -130,6 +151,17 @@ func Test_withRetry(t *testing.T) {
true,
calcRunDuration(10.0, 4),
},
{
"network error (#234)",
args{
context.Background(),
rate.NewLimiter(10.0, 1),
3,
errSeqFn(&net.OpError{Op: "read"}, 2, nil),
},
false,
calcExpRunDuration(2),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -138,10 +170,10 @@ func Test_withRetry(t *testing.T) {
t.Errorf("withRetry() error = %v, wantErr %v", err, tt.wantErr)
}
runTime := time.Since(start)
runTimeError := dAbs(runTime - tt.mustCompleteIn)
t.Logf("runtime = %s, mustCompleteIn = %s, error = ABS(%[1]s - %[2]s) = %[3]s", runTime, tt.mustCompleteIn, runTimeError)
if runTimeError > maxRunDurationError {
t.Errorf("runtime error %s is not within allowed threshold: %s", runTimeError, maxRunDurationError)
ξ := dAbs(runTime - tt.mustCompleteIn)
t.Logf("runtime = %s, mustCompleteIn = %s, ξ = ABS(%[1]s - %[2]s) = %[3]s", runTime, tt.mustCompleteIn, ξ)
if ξ > maxRunDurationError {
t.Errorf("runtime error %s is not within allowed threshold: %s", ξ, maxRunDurationError)
}
})
}
Expand Down Expand Up @@ -198,7 +230,6 @@ func Test500ErrorHandling(t *testing.T) {

const (
testRetryCount = 1
waitThreshold = 100 * time.Millisecond
)

// Create a test server that returns a 404 error.
Expand Down
4 changes: 2 additions & 2 deletions messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (sd *Session) dumpChannel(ctx context.Context, channelID string, oldest, la
resp *slack.GetConversationHistoryResponse
)
reqStart := time.Now()
if err := withRetry(ctx, convLimiter, sd.options.Tier3Retries, func() error {
if err := network.WithRetry(ctx, convLimiter, sd.options.Tier3Retries, func() error {
var err error
trace.WithRegion(ctx, "GetConversationHistoryContext", func() {
resp, err = sd.client.GetConversationHistoryContext(ctx, &slack.GetConversationHistoryParameters{
Expand Down Expand Up @@ -168,7 +168,7 @@ func (sd *Session) dumpChannel(ctx context.Context, channelID string, oldest, la
func (sd *Session) getChannelName(ctx context.Context, l *rate.Limiter, channelID string) (string, error) {
// get channel name
var ci *slack.Channel
if err := withRetry(ctx, l, sd.options.Tier3Retries, func() error {
if err := network.WithRetry(ctx, l, sd.options.Tier3Retries, func() error {
var err error
ci, err = sd.client.GetConversationInfoContext(ctx, &slack.GetConversationInfoInput{ChannelID: channelID})
return err
Expand Down
7 changes: 0 additions & 7 deletions slackdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,6 @@ func (sd *Session) limiter(t network.Tier) *rate.Limiter {
return network.NewLimiter(t, sd.options.Tier3Burst, int(sd.options.Tier3Boost))
}

// withRetry will run the callback function fn. If the function returns
// slack.RateLimitedError, it will delay, and then call it again up to
// maxAttempts times. It will return an error if it runs out of attempts.
func withRetry(ctx context.Context, l *rate.Limiter, maxAttempts int, fn func() error) error {
return network.WithRetry(ctx, l, maxAttempts, fn)
}

func checkCacheFile(filename string, maxAge time.Duration) error {
if filename == "" {
return errors.New("no cache filename")
Expand Down
2 changes: 1 addition & 1 deletion thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (sd *Session) dumpThread(
nextCursor string
)
reqStart := time.Now()
if err := withRetry(ctx, l, sd.options.Tier3Retries, func() error {
if err := network.WithRetry(ctx, l, sd.options.Tier3Retries, func() error {
var err error
trace.WithRegion(ctx, "GetConversationRepliesContext", func() {
msgs, hasmore, nextCursor, err = sd.client.GetConversationRepliesContext(
Expand Down
2 changes: 1 addition & 1 deletion users.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (sd *Session) fetchUsers(ctx context.Context) (types.Users, error) {
var (
users []slack.User
)
if err := withRetry(ctx, network.NewLimiter(network.Tier2, sd.options.Tier2Burst, int(sd.options.Tier2Boost)), sd.options.Tier2Retries, func() error {
if err := network.WithRetry(ctx, network.NewLimiter(network.Tier2, sd.options.Tier2Burst, int(sd.options.Tier2Boost)), sd.options.Tier2Retries, func() error {
var err error
users, err = sd.client.GetUsersContext(ctx)
return err
Expand Down

0 comments on commit 885f24e

Please sign in to comment.