Skip to content
This repository has been archived by the owner on May 4, 2021. It is now read-only.

Commit

Permalink
Make registry client retry on specific http errors (#347)
Browse files Browse the repository at this point in the history
* Make registry client retry on specific errors

* rebase

* rebase and address comment

* i rebased wrong
  • Loading branch information
evelynl94 committed Sep 28, 2020
1 parent 3ca97f6 commit 239e1f2
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 53 deletions.
41 changes: 25 additions & 16 deletions lib/registry/client.go
Expand Up @@ -377,13 +377,22 @@ func (c DockerRegistryClient) pushLayerWithBackoff(layerDigest image.Digest, isC
b := c.config.backoff()
for {
err := c.pushLayerHelper(layerDigest, isConfig)
// TODO: break on non-retryable errors.
if err != nil {
multiError.Add(err)
d := b.NextBackOff()
if d == backoff.Stop {
break
}
if err == nil {
break
}
multiError.Add(err)
d := b.NextBackOff()
if d == backoff.Stop {
break
}
// Retry when registry returns network error, retryable error or
// unexpected code 500. Since building an image could be rather
// expensive, we allow the client to be more forgiving on
// temporarily unexpected condition on registry side.
if httputil.IsNetworkError(err) ||
httputil.IsRetryable(err) ||
httputil.IsStatus(err, http.StatusInternalServerError) {
log.Infof("* Failed to push layer: %s, retrying...", err)
time.Sleep(d)
continue
}
Expand All @@ -394,7 +403,7 @@ func (c DockerRegistryClient) pushLayerWithBackoff(layerDigest image.Digest, isC

func (c DockerRegistryClient) pushLayerHelper(layerDigest image.Digest, isConfig bool) error {
if found, err := c.layerExists(layerDigest); err != nil {
return fmt.Errorf("check layer exists: %s/%s (%s): %s", c.registry, c.repository, layerDigest, err)
return fmt.Errorf("check layer exists: %s/%s (%s): %w", c.registry, c.repository, layerDigest, err)
} else if found {
if isConfig {
log.Infof("* Skipped pushing existing image config %s:%s", c.repository, layerDigest)
Expand All @@ -419,7 +428,7 @@ func (c DockerRegistryClient) pushLayerHelper(layerDigest image.Digest, isConfig
httputil.SendAcceptedCodes(http.StatusAccepted),
httputil.SendHeaders(map[string]string{"Host": c.registry}))
if err != nil {
return fmt.Errorf("send start push layer request %s: %s", URL, err)
return fmt.Errorf("send start push layer request %s: %w", URL, err)
}
defer resp.Body.Close()
URL = resp.Header.Get("Location")
Expand All @@ -434,7 +443,7 @@ func (c DockerRegistryClient) pushLayerHelper(layerDigest image.Digest, isConfig
}
URL, err = c.pushLayerContent(layerDigest, URL)
if err != nil {
return fmt.Errorf("push layer content %s: %s", layerDigest, err)
return fmt.Errorf("push layer content %s: %w", layerDigest, err)
}

parsed, err := url.Parse(URL)
Expand All @@ -445,7 +454,7 @@ func (c DockerRegistryClient) pushLayerHelper(layerDigest image.Digest, isConfig
q.Add("digest", string(layerDigest))
parsed.RawQuery = q.Encode()
if err := c.commitLayer(parsed.String()); err != nil {
return fmt.Errorf("commit layer push %s: %s", layerDigest, err)
return fmt.Errorf("commit layer push %s: %w", layerDigest, err)
}
if isConfig {
log.Infof("* Finished pushing image config %s", layerDigest)
Expand All @@ -472,7 +481,7 @@ func (c DockerRegistryClient) manifestExists(tag string) (bool, error) {
c.config.sendRetry(),
httputil.SendAcceptedCodes(http.StatusOK, http.StatusNotFound, http.StatusBadRequest))
if err != nil {
return false, fmt.Errorf("check manifest exists: %s", err)
return false, fmt.Errorf("check manifest exists: %w", err)
}
defer resp.Body.Close()

Expand All @@ -499,7 +508,7 @@ func (c DockerRegistryClient) layerExists(digest image.Digest) (bool, error) {
c.config.sendRetry(),
httputil.SendAcceptedCodes(http.StatusOK, http.StatusNotFound))
if err != nil {
return false, fmt.Errorf("check manifest exists: %s", err)
return false, fmt.Errorf("check manifest exists: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
Expand Down Expand Up @@ -529,7 +538,7 @@ func (c DockerRegistryClient) pushLayerContent(digest image.Digest, location str
for start < size {
location, err = c.pushOneLayerChunk(location, start, endInclusive, r)
if err != nil {
return location, fmt.Errorf("push layer chunk: %s", err)
return location, fmt.Errorf("push layer chunk: %w", err)
}
start, endInclusive = endInclusive+1, utils.Min(start+pushChunk-1, size-1)
}
Expand Down Expand Up @@ -564,7 +573,7 @@ func (c DockerRegistryClient) pushOneLayerChunk(location string, start, endInclu
httputil.SendHeaders(headers),
httputil.SendBody(ratelimit.Reader(r, readerOptions)))
if err != nil {
return "", fmt.Errorf("send push chunk request: %s", err)
return "", fmt.Errorf("send push chunk request: %w", err)
}
defer resp.Body.Close()

Expand Down Expand Up @@ -597,7 +606,7 @@ func (c DockerRegistryClient) commitLayer(location string) error {
httputil.SendAcceptedCodes(http.StatusCreated, http.StatusNoContent),
httputil.SendHeaders(headers))
if err != nil {
return fmt.Errorf("commit: %s", err)
return fmt.Errorf("commit: %w", err)
}
defer resp.Body.Close()
return nil
Expand Down
30 changes: 29 additions & 1 deletion lib/registry/client_test.go
Expand Up @@ -15,7 +15,10 @@
package registry

import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"path"
"testing"

Expand Down Expand Up @@ -158,8 +161,33 @@ func TestPushLayerRetry(t *testing.T) {
ctx, cleanup := context.BuildContextFixtureWithSampleImage()
defer cleanup()

digest := image.Digest("sha256:" + testutil.SampleLayerTarDigest)
image := image.MustParseName(fmt.Sprintf("localhost:5055/%s:%s", testutil.SampleImageRepoName, testutil.SampleImageTag))
url := uploadRequest{image}.getCommitURL(digest)
// Override the last commit response so it always fails.
responseOverride := responseOverride{
Method: "PUT",
Target: simpleRequest{url},
Response: &http.Response{
StatusCode: http.StatusServiceUnavailable,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
Header: make(http.Header),
},
}
p, err := PushClientFixture(ctx, responseOverride)
require.NoError(err)
p.config.Retries = 1
commitError := fmt.Sprintf("commit layer push %s: commit: PUT "+url+" 503", digest)
require.EqualError(p.PushLayer(digest), commitError+"; "+commitError)
}

func TestPushLayerNoRetry(t *testing.T) {
require := require.New(t)
ctx, cleanup := context.BuildContextFixtureWithSampleImage()
defer cleanup()

p, err := PushClientFixture(ctx)
require.NoError(err)
p.config.Retries = 1
require.EqualError(p.PushLayer(image.NewEmptyDigest()), "push layer content : get layer file stat: file does not exist; push layer content : get layer file stat: file does not exist")
require.EqualError(p.PushLayer(image.NewEmptyDigest()), "push layer content : get layer file stat: file does not exist")
}
128 changes: 98 additions & 30 deletions lib/registry/push_fixture.go
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strings"

"github.com/uber/makisu/lib/context"
"github.com/uber/makisu/lib/docker/image"
Expand All @@ -13,47 +14,101 @@ import (

// PushClientFixture returns a new registry client fixture that can handle
// image push requests.
func PushClientFixture(ctx *context.BuildContext) (*DockerRegistryClient, error) {
func PushClientFixture(ctx *context.BuildContext, overrides ...responseOverride) (*DockerRegistryClient, error) {
image := image.MustParseName(fmt.Sprintf("localhost:5055/%s:%s", testutil.SampleImageRepoName, testutil.SampleImageTag))
cli := &http.Client{
Transport: pushTransportFixture{image},
Transport: newPushTransportFixture(image, overrides...),
}
c := NewWithClient(ctx.ImageStore, image.GetRegistry(), image.GetRepository(), cli)
c.config.Security.TLS.Client.Disabled = true
return c, nil
}

type pushTransportFixture struct {
type requestTarget interface {
getURL() string
}

type simpleRequest struct {
url string
}

func (r simpleRequest) getURL() string {
return r.url
}

type manifestRequest struct {
image image.Name
}

func (t pushTransportFixture) RoundTrip(r *http.Request) (*http.Response, error) {
repoURL := fmt.Sprintf("http://%s/v2/%s", t.image.GetRegistry(), t.image.GetRepository())
manifestURL := fmt.Sprintf("%s/manifests/%s", repoURL, t.image.GetTag())
imageConfigURL := repoURL + "/blobs/sha256:" + testutil.SampleImageConfigDigest
layerTarURL := repoURL + "/blobs/sha256:" + testutil.SampleLayerTarDigest
startUploadURL := repoURL + "/blobs/uploads/"
chunkUploadURL := repoURL + "/blobs/uploads/upload123"
commitUploadURL := repoURL + "/blobs/uploads/commit123"
imageConfigCommitUploadURL := commitUploadURL +
"?digest=sha256%3A" + testutil.SampleImageConfigDigest
layerTarCommitUploadURL := commitUploadURL +
"?digest=sha256%3A" + testutil.SampleLayerTarDigest
url := r.URL.String()
func (r manifestRequest) getURL() string {
return repoURL(r.image) + "/manifests/" + r.image.GetTag()
}

type layerRequest struct {
image image.Name
digest image.Digest
}

func (r layerRequest) getURL() string {
return repoURL(r.image) + "/blobs/sha256:" + r.digest.Hex()
}

type uploadRequest struct {
image image.Name
}

func (r uploadRequest) getURL() string {
return repoURL(r.image) + "/blobs/uploads/"
}

func (r uploadRequest) getResumeLoc() string {
return repoURL(r.image) + "/blobs/uploads/upload123"
}

func (r uploadRequest) getCommitLoc() string {
return repoURL(r.image) + "/blobs/uploads/commit123"
}

func (r uploadRequest) getCommitURL(digest image.Digest) string {
return repoURL(r.image) + "/blobs/uploads/commit123?digest=sha256%3A" + digest.Hex()
}

func repoURL(image image.Name) string {
return fmt.Sprintf("http://%s/v2/%s", image.GetRegistry(), image.GetRepository())
}

type responseOverride struct {
Method string
Target requestTarget
Response *http.Response
}

type pushTransportFixture struct {
image image.Name
responses map[string]*http.Response
locations map[string]string
}

func newPushTransportFixture(i image.Name, overrides ...responseOverride) *pushTransportFixture {
imageConfigDigest := image.Digest("sha256:" + testutil.SampleImageConfigDigest)
layerTarDigest := image.Digest("sha256:" + testutil.SampleLayerTarDigest)
manifestURL := manifestRequest{i}.getURL()
imageConfigURL := layerRequest{i, imageConfigDigest}.getURL()
layerTarURL := layerRequest{i, layerTarDigest}.getURL()
upload := uploadRequest{i}
resps := map[string]*http.Response{
"HEAD" + manifestURL: {
StatusCode: http.StatusOK,
StatusCode: http.StatusNotFound,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
Header: make(http.Header),
},
"HEAD" + imageConfigURL: {
StatusCode: http.StatusOK,
StatusCode: http.StatusNotFound,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
Header: make(http.Header),
},
"HEAD" + layerTarURL: {
StatusCode: http.StatusOK,
StatusCode: http.StatusNotFound,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
Header: make(http.Header),
},
Expand All @@ -62,42 +117,55 @@ func (t pushTransportFixture) RoundTrip(r *http.Request) (*http.Response, error)
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
Header: make(http.Header),
},
"POST" + startUploadURL: {
"POST" + upload.getURL(): {
StatusCode: http.StatusAccepted,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
Header: make(http.Header),
},
"PATCH" + chunkUploadURL: {
"PATCH" + upload.getResumeLoc(): {
StatusCode: http.StatusAccepted,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
Header: make(http.Header),
},
"PUT" + imageConfigCommitUploadURL: {
"PUT" + upload.getCommitURL(testutil.SampleImageConfigDigest): {
StatusCode: http.StatusCreated,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
Header: make(http.Header),
},
"PUT" + layerTarCommitUploadURL: {
"PUT" + upload.getCommitURL(testutil.SampleLayerTarDigest): {
StatusCode: http.StatusCreated,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
Header: make(http.Header),
},
}
locations := map[string]string{
"POST" + startUploadURL: chunkUploadURL,
"PATCH" + chunkUploadURL: commitUploadURL,
locs := map[string]string{
"POST" + upload.getURL(): upload.getResumeLoc(),
"PATCH" + upload.getResumeLoc(): upload.getCommitLoc(),
}
for _, o := range overrides {
resps[strings.ToUpper(o.Method)+o.Target.getURL()] = o.Response
}
return &pushTransportFixture{
image: i,
responses: resps,
locations: locs,
}
}

resp, found := resps[r.Method+url]
func (t *pushTransportFixture) RoundTrip(r *http.Request) (*http.Response, error) {
url := r.URL.String()
resp, found := t.responses[r.Method+url]
if !found {
return &http.Response{
resp = &http.Response{
StatusCode: http.StatusNotFound,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
Header: make(http.Header),
}, nil
}
}
if location, found := locations[r.Method+url]; found {
if location, found := t.locations[r.Method+url]; found {
resp.Header.Add("Location", location)
}
resp.Request = r
resp.Request.URL = r.URL
return resp, nil
}
19 changes: 13 additions & 6 deletions lib/utils/httputil/httputil.go
Expand Up @@ -17,6 +17,7 @@ package httputil
import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -70,8 +71,11 @@ func (e StatusError) Error() string {

// IsStatus returns true if err is a StatusError of the given status.
func IsStatus(err error, status int) bool {
statusErr, ok := err.(StatusError)
return ok && statusErr.Status == status
var e StatusError
if errors.As(err, &e) {
return e.Status == status
}
return false
}

// IsCreated returns true if err is a "created", 201
Expand Down Expand Up @@ -107,8 +111,11 @@ func isRetryable(code int) bool {
// IsRetryable returns true if the statis code indicates that the request is
// retryable.
func IsRetryable(err error) bool {
statusErr, ok := err.(StatusError)
return ok && isRetryable(statusErr.Status)
var e StatusError
if errors.As(err, &e) {
return isRetryable(e.Status)
}
return false
}

// NetworkError occurs on any Send error which occurred while trying to send
Expand All @@ -123,8 +130,8 @@ func (e NetworkError) Error() string {

// IsNetworkError returns true if err is a NetworkError.
func IsNetworkError(err error) bool {
_, ok := err.(NetworkError)
return ok
var e NetworkError
return errors.As(err, &e)
}

type sendOptions struct {
Expand Down

0 comments on commit 239e1f2

Please sign in to comment.