Skip to content

Commit

Permalink
fix(cleanup): handle Too Many Requests (429) (#6455)
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksei Igrychev <aleksei.igrychev@palark.com>
  • Loading branch information
alexey-igrychev authored Dec 2, 2024
1 parent a449834 commit 458dbcc
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 55 deletions.
4 changes: 2 additions & 2 deletions pkg/docker_registry/common_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func newHttpTransport(skipTlsVerify bool) http.RoundTripper {
// Wrap the transport with retry logic.
t = transport.NewRetry(t)

// Wrap the transport with retry-after logic.
t = transport2.NewRetryAfter(t)
// Wrap the transport with rate limit logic.
t = transport2.NewRateLimit(t)

return t
}
87 changes: 87 additions & 0 deletions pkg/docker_registry/transport/rate_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package transport

import (
"errors"
"net/http"
"strconv"
"time"

"github.com/cenkalti/backoff/v4"

"github.com/werf/logboek"
parallelConstant "github.com/werf/werf/v2/pkg/util/parallel/constant"
)

type RateLimit struct {
underlying http.RoundTripper
}

func NewRateLimit(underlying http.RoundTripper) http.RoundTripper {
return &RateLimit{underlying: underlying}
}

type rateLimitError struct {
error
}

func (t *RateLimit) RoundTrip(req *http.Request) (*http.Response, error) {
var resp *http.Response
operation := func() error {
var err error
resp, err = t.underlying.RoundTrip(req) //nolint:bodyclose
if err != nil {
return backoff.Permanent(err)
}

if resp.StatusCode != http.StatusTooManyRequests {
return nil
}

// Ensure response body is closed if retrying.
defer resp.Body.Close()

return rateLimitError{error: errors.New(resp.Status)}
}

notify := func(err error, duration time.Duration) {
ctx := req.Context()
workerId := ctx.Value(parallelConstant.CtxBackgroundTaskIDKey)
if workerId != nil {
logboek.Context(ctx).Warn().LogF(
"WARNING: %s. Retrying in %v... (worker %d)\nThe --parallel ($WERF_PARALLEL) and --parallel-tasks-limit ($WERF_PARALLEL_TASKS_LIMIT) options can be used to regulate parallel tasks.\n",
err,
duration,
workerId.(int),
)
logboek.Context(ctx).Warn().LogOptionalLn()
} else {
logboek.Context(ctx).Warn().LogF(
"WARNING: %s. Retrying in %v...\n",
err,
duration,
)
}
}

initialInterval := backoff.DefaultInitialInterval
{
if err := operation(); !errors.As(err, &rateLimitError{}) {
return resp, err
}

if retryAfterHeader := resp.Header.Get("Retry-After"); retryAfterHeader != "" {
if seconds, err := strconv.Atoi(retryAfterHeader); err == nil {
initialInterval = time.Duration(seconds) * time.Second
}
}
}

eb := backoff.NewExponentialBackOff()
eb.InitialInterval = initialInterval
eb.MaxElapsedTime = 5 * time.Minute // Maximum time for all retries.
if err := backoff.RetryNotify(operation, eb, notify); err != nil {
return nil, err
}

return resp, nil
}
53 changes: 0 additions & 53 deletions pkg/docker_registry/transport/retry_after.go

This file was deleted.

0 comments on commit 458dbcc

Please sign in to comment.