Skip to content

Commit

Permalink
client: retry version negotiation after error
Browse files Browse the repository at this point in the history
When a client is configured for API version negotiation, it would try
exactly once to ping the server, then unconditionally configure the
client API version and set the negotiated flag. Any failure to ping the
server would result in the client "negotiating" the fallback API version
v1.24, even if the request never reached the daemon. Transient network
issues or simply calling a client method with an already-canceled
context would be sufficient to cause this to happen.

Modify the client's negotiation process to retry negotiation on each
subsequent request until it receives a ping response from the daemon.
Only responses with HTTP status codes in the range [1, 500] are
considered to be ping responses from the daemon as HTTP status codes
>= 501 (e.g. 502 Gateway Timeout, 503 Service Unavailable) could be
returned by intermediate hops.

Signed-off-by: Cory Snider <csnider@mirantis.com>
  • Loading branch information
corhere committed Jul 4, 2022
1 parent 632e667 commit 2d4806a
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 11 deletions.
21 changes: 15 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,25 +227,34 @@ func (cli *Client) ClientVersion() string {
// added (1.24).
func (cli *Client) NegotiateAPIVersion(ctx context.Context) {
if !cli.manualOverride {
_ = cli.negotiateAPIVersion(ctx, true /* force renegotiation */)
_, _ = cli.negotiateAPIVersion(ctx, true /* force renegotiation */)
}
}

func (cli *Client) negotiateAPIVersion(ctx context.Context, force bool) string {
func (cli *Client) negotiateAPIVersion(ctx context.Context, force bool) (string, error) {
var state versionNegotiation
select {
case <-ctx.Done():
return ""
return "", ctx.Err()
case state = <-cli.version:
}

if !cli.negotiateVersion || (state.negotiated && !force) {
cli.version <- state
return state.version
return state.version, nil
}

ping, _ := cli.Ping(ctx)
return cli.negotiateAPIVersionPing(state, ping)
ping, ok, err := cli.ping(ctx)
if !ok {
// The daemon could not be reached.
// Let negotiation be retried on the next request.
cli.version <- state
return state.version, err
}
// The daemon was successfully pinged, although the response may have
// contained an error body. Regardless, we have all the information
// needed to successfully complete negotiation.
return cli.negotiateAPIVersionPing(state, ping), nil
}

// NegotiateAPIVersionPing downgrades the client's API version to match the
Expand Down
22 changes: 19 additions & 3 deletions client/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ import (
// a HEAD request on the endpoint, but falls back to GET if HEAD is not supported
// by the daemon.
func (cli *Client) Ping(ctx context.Context) (types.Ping, error) {
ping, _, err := cli.ping(ctx)
return ping, err
}

// ping pings the server and returns a boolean 'ok' value alongside the ping
// response data and error to signal whether the ping response data is valid.
// This is necessary to distinguish a ping with valid data in an HTTP 500
// response from a ping which never reached the daemon as err != nil would be
// returned in both cases.
func (cli *Client) ping(ctx context.Context) (types.Ping, bool, error) {
// Ping requests are used during API version negotiation, so we want to
// hit the non-versioned /_ping endpoint, not /v1.xx/_ping
unversioned := versionedClient{cli: cli, version: ""}
Expand All @@ -22,7 +32,7 @@ func (cli *Client) Ping(ctx context.Context) (types.Ping, error) {
switch serverResp.statusCode {
case http.StatusOK, http.StatusInternalServerError:
// Server handled the request, so parse the response
return parsePingResponse(serverResp.header), err
return parsePingResponse(serverResp.header), true, err
}
// We only want to fall back to GET if the daemon is reachable but does
// not support HEAD /_ping requests. The client converts status codes
Expand All @@ -35,12 +45,18 @@ func (cli *Client) Ping(ctx context.Context) (types.Ping, error) {
// such cases any returned error must be an error returned by the
// daemon.
if err != nil && serverResp.statusCode <= 0 {
return types.Ping{}, err
return types.Ping{}, false, err
}

serverResp, err = unversioned.get(ctx, "/_ping", nil, nil)
ensureReaderClosed(serverResp)
return parsePingResponse(serverResp.header), err
// HTTP 500 (usually) comes from the daemon but middleboxes could
// respond with HTTP 502, 503 or 504. Status codes from middleboxes are
// a signal that the daemon was not reached.
if serverResp.statusCode <= 0 || serverResp.statusCode > http.StatusInternalServerError {
return types.Ping{}, false, err
}
return parsePingResponse(serverResp.header), true, err
}

func parsePingResponse(header http.Header) types.Ping {
Expand Down
4 changes: 2 additions & 2 deletions client/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type versionedClient struct {
// request to ensure that the same version is consistently used, even if the
// client's configured version is concurrently modified.
func (cli *Client) versioned(ctx context.Context) (versionedClient, error) {
ver := cli.negotiateAPIVersion(ctx, false)
return versionedClient{cli: cli, version: ver}, ctx.Err()
ver, err := cli.negotiateAPIVersion(ctx, false)
return versionedClient{cli: cli, version: ver}, err
}

// head sends an HTTP HEAD request to the docker API.
Expand Down
175 changes: 175 additions & 0 deletions client/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client // import "github.com/docker/docker/client"
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -188,3 +189,177 @@ func TestConcurrentRequests(t *testing.T) {
"GET /v1.30/info": 3,
})
}

func TestRetryNegotiation(t *testing.T) {
type testcase struct {
name string
handlePing func() (*http.Response, error)
}

status := func(code int) testcase {
return testcase{
name: fmt.Sprintf("StatusCode=%d", code),
handlePing: func() (*http.Response, error) {
return &http.Response{
StatusCode: code,
Body: io.NopCloser(strings.NewReader(http.StatusText(code))),
}, nil
},
}
}

for _, tt := range []testcase{
status(http.StatusBadGateway), // HTTP 502
status(http.StatusServiceUnavailable), // HTTP 503
status(http.StatusGatewayTimeout), // HTTP 504
{
name: "RequestError",
handlePing: func() (*http.Response, error) {
return nil, fmt.Errorf("fake request error")
},
},
} {
tt := tt
t.Run(tt.name, func(t *testing.T) {
var handler func(*http.Request) (*http.Response, error)
client, err := NewClientWithOpts(
WithAPIVersionNegotiation(),
WithHTTPClient(newMockClient(func(r *http.Request) (*http.Response, error) {
t.Logf("Mock HTTP client: %s %s", r.Method, r.URL)
return handler(r)
})),
)
assert.NilError(t, err)

handler = func(r *http.Request) (*http.Response, error) {
if r.URL.Path != "/_ping" {
t.Errorf("unexpected request to %s %s", r.Method, r.URL)
return nil, fmt.Errorf("unexpected request")
}
return tt.handlePing()
}
info, err := client.Info(context.Background())
assert.Check(t, is.DeepEqual(types.Info{}, info))
assert.Check(t, err != nil)

// This time allow negotiation to succeed but respond to
// the request for daemon info with an error.
handler = func(r *http.Request) (*http.Response, error) {
switch r.URL.Path {
case "/_ping":
header := make(http.Header)
header.Set("API-Version", "1.30")
return &http.Response{
StatusCode: http.StatusInternalServerError,
Header: header,
Body: io.NopCloser(strings.NewReader("pong")),
}, nil
case "/v1.30/info":
return &http.Response{
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(strings.NewReader("don't feel like it today")),
}, nil
}
t.Errorf("unexpected request to %s %s", r.Method, r.URL)
return nil, fmt.Errorf("unexpected request")
}
info, err = client.Info(context.Background())
assert.Check(t, is.DeepEqual(types.Info{}, info))
assert.Check(t, is.ErrorContains(err, "don't feel like it today"))

// Get info again, successfully this time. No version
// negotiation should take place.
expectedInfo := types.Info{Name: "fake-info"}
infoJSON, err := json.Marshal(&expectedInfo)
assert.NilError(t, err)
handler = func(r *http.Request) (*http.Response, error) {
if r.URL.Path == "/v1.30/info" {
header := make(http.Header)
header.Set("Content-Type", "application/json")
return &http.Response{
StatusCode: http.StatusOK,
Header: header,
Body: io.NopCloser(bytes.NewReader(infoJSON)),
}, nil
}
t.Errorf("unexpected request to %s %s", r.Method, r.URL)
return nil, fmt.Errorf("unexpected request")
}
info, err = client.Info(context.Background())
assert.Check(t, err)
assert.Check(t, is.DeepEqual(info, expectedInfo))
})
}

t.Run("ContextCanceled", func(t *testing.T) {
var handler func(*http.Request) (*http.Response, error)
client, err := NewClientWithOpts(
WithAPIVersionNegotiation(),
WithHTTPClient(newMockClient(func(r *http.Request) (*http.Response, error) {
t.Logf("Mock HTTP client: %s %s", r.Method, r.URL)
return handler(r)
})),
)
assert.NilError(t, err)

// Cancel the context while the ping request is in-flight.
ctx, cancel := context.WithCancel(context.Background())
handler = func(r *http.Request) (*http.Response, error) {
if r.URL.Path != "/_ping" {
t.Errorf("unexpected request to %s %s", r.Method, r.URL)
return nil, fmt.Errorf("unexpected request")
}
cancel()
return nil, ctx.Err()
}
info, err := client.Info(ctx)
assert.Check(t, is.DeepEqual(types.Info{}, info))
assert.Check(t, is.ErrorIs(err, context.Canceled))

// This time allow negotiation to succeed but cancel the context
// while the info request is in-flight.
ctx, cancel = context.WithCancel(context.Background())
handler = func(r *http.Request) (*http.Response, error) {
switch r.URL.Path {
case "/_ping":
header := make(http.Header)
header.Set("API-Version", "1.30")
return &http.Response{
StatusCode: http.StatusInternalServerError,
Header: header,
Body: io.NopCloser(strings.NewReader("pong")),
}, nil
case "/v1.30/info":
cancel()
return nil, ctx.Err()
}
t.Errorf("unexpected request to %s %s", r.Method, r.URL)
return nil, fmt.Errorf("unexpected request")
}
info, err = client.Info(ctx)
assert.Check(t, is.DeepEqual(types.Info{}, info))
assert.Check(t, is.ErrorIs(err, context.Canceled))

// Get info without any context cancelation shenanigans.
// No version negotiation should take place.
expectedInfo := types.Info{Name: "fake-info"}
infoJSON, err := json.Marshal(&expectedInfo)
assert.NilError(t, err)
handler = func(r *http.Request) (*http.Response, error) {
if r.URL.Path == "/v1.30/info" {
header := make(http.Header)
header.Set("Content-Type", "application/json")
return &http.Response{
StatusCode: http.StatusOK,
Header: header,
Body: io.NopCloser(bytes.NewReader(infoJSON)),
}, nil
}
t.Errorf("unexpected request to %s %s", r.Method, r.URL)
return nil, fmt.Errorf("unexpected request")
}
info, err = client.Info(context.Background())
assert.Check(t, err)
assert.Check(t, is.DeepEqual(info, expectedInfo))
})
}

0 comments on commit 2d4806a

Please sign in to comment.