Skip to content

Commit

Permalink
fix: ignore DeadlineExceeded error correctly on bootstrap
Browse files Browse the repository at this point in the history
The problem was that gRPC method `status.Code(err)` doesn't unwrap
errors, while Talos client returns errors wrapped with
`multierror.Error` and `fmt.Errrorf`, so `status.Code` doesn't return
error code correctly.

Fix that by introducing our own client method which correctly goes over
the chain of wrapped errors.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
(cherry picked from commit 10c2875)
  • Loading branch information
smira committed Jul 7, 2021
1 parent 17edc88 commit 5c640cd
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 32 deletions.
3 changes: 1 addition & 2 deletions cmd/talosctl/cmd/talos/diskusage.go
Expand Up @@ -14,7 +14,6 @@ import (
humanize "github.com/dustin/go-humanize"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
Expand Down Expand Up @@ -70,7 +69,7 @@ var duCmd = &cobra.Command{
for {
info, err := stream.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
return w.Flush()
}

Expand Down
3 changes: 1 addition & 2 deletions cmd/talosctl/cmd/talos/dmesg.go
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/talos-systems/talos/pkg/machinery/client"
)
Expand All @@ -37,7 +36,7 @@ var dmesgCmd = &cobra.Command{
for {
resp, err := stream.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
break
}

Expand Down
3 changes: 1 addition & 2 deletions cmd/talosctl/cmd/talos/get.go
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/talos-systems/talos/cmd/talosctl/cmd/talos/output"
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
Expand Down Expand Up @@ -62,7 +61,7 @@ var getCmd = &cobra.Command{
for {
msg, err := watchClient.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
return nil
}

Expand Down
9 changes: 4 additions & 5 deletions cmd/talosctl/cmd/talos/health.go
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/talos-systems/talos/pkg/cluster"
Expand Down Expand Up @@ -130,7 +129,7 @@ func healthOnServer(ctx context.Context, c *client.Client) error {
controlPlaneNodes = append(controlPlaneNodes, healthCmdFlags.clusterState.InitNode)
}

client, err := c.ClusterHealthCheck(ctx, healthCmdFlags.clusterWaitTimeout, &clusterapi.ClusterInfo{
healthCheckClient, err := c.ClusterHealthCheck(ctx, healthCmdFlags.clusterWaitTimeout, &clusterapi.ClusterInfo{
ControlPlaneNodes: controlPlaneNodes,
WorkerNodes: healthCmdFlags.clusterState.WorkerNodes,
ForceEndpoint: healthCmdFlags.forceEndpoint,
Expand All @@ -139,14 +138,14 @@ func healthOnServer(ctx context.Context, c *client.Client) error {
return err
}

if err := client.CloseSend(); err != nil {
if err := healthCheckClient.CloseSend(); err != nil {
return err
}

for {
msg, err := client.Recv()
msg, err := healthCheckClient.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions cmd/talosctl/cmd/talos/inspect.go
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/emicklei/dot"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/talos-systems/talos/pkg/cli"
Expand Down Expand Up @@ -89,7 +88,7 @@ to render the graph:
for {
resp, err := listClient.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
break
}

Expand Down
5 changes: 2 additions & 3 deletions cmd/talosctl/cmd/talos/list.go
Expand Up @@ -16,7 +16,6 @@ import (
humanize "github.com/dustin/go-humanize"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
Expand Down Expand Up @@ -87,7 +86,7 @@ var lsCmd = &cobra.Command{
for {
info, err := stream.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
if multipleNodes {
return w.Flush()
}
Expand Down Expand Up @@ -132,7 +131,7 @@ var lsCmd = &cobra.Command{
for {
info, err := stream.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
return w.Flush()
}

Expand Down
3 changes: 1 addition & 2 deletions cmd/talosctl/cmd/talos/logs.go
Expand Up @@ -15,7 +15,6 @@ import (
criconstants "github.com/containerd/cri/pkg/constants"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/talos-systems/talos/pkg/cli"
"github.com/talos-systems/talos/pkg/machinery/api/common"
Expand Down Expand Up @@ -158,7 +157,7 @@ func (slicer *lineSlicer) run(stream machine.MachineService_LogsClient) {
for {
data, err := stream.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
return
}
slicer.errCh <- err
Expand Down
3 changes: 1 addition & 2 deletions cmd/talosctl/pkg/talos/helpers/resources.go
Expand Up @@ -11,7 +11,6 @@ import (
"os"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/talos-systems/talos/pkg/machinery/client"
)
Expand Down Expand Up @@ -51,7 +50,7 @@ func ForEachResource(ctx context.Context, c *client.Client, callback func(ctx co
for {
msg, err := listClient.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions internal/integration/api/diskusage.go
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/talos-systems/talos/internal/integration/base"
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
Expand Down Expand Up @@ -115,7 +114,7 @@ func (suite *DiskUsageSuite) TestDiskUsageRequests() {
responseCount++

if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
break
}

Expand Down
3 changes: 1 addition & 2 deletions internal/integration/api/etcd.go
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/talos-systems/talos/internal/integration/base"
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
Expand Down Expand Up @@ -126,7 +125,7 @@ func (suite *EtcdSuite) TestEtcdLeaveCluster() {

info, err = stream.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
if err == io.EOF || client.StatusCode(err) == codes.Canceled {
break
}
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/cluster/bootstrap.go
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/talos-systems/go-retry/retry"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
"github.com/talos-systems/talos/pkg/machinery/client"
Expand Down Expand Up @@ -50,7 +49,7 @@ func (s *APIBootstrapper) Bootstrap(ctx context.Context, out io.Writer) error {

fmt.Fprintln(out, "waiting for API")

err = retry.Constant(5*time.Minute, retry.WithUnits(500*time.Millisecond)).Retry(func() error {
err = retry.Constant(5*time.Minute, retry.WithUnits(500*time.Millisecond)).RetryWithContext(nodeCtx, func(nodeCtx context.Context) error {
retryCtx, cancel := context.WithTimeout(nodeCtx, 500*time.Millisecond)
defer cancel()

Expand All @@ -67,16 +66,20 @@ func (s *APIBootstrapper) Bootstrap(ctx context.Context, out io.Writer) error {

fmt.Fprintln(out, "bootstrapping cluster")

return retry.Constant(backoff.DefaultConfig.MaxDelay, retry.WithUnits(100*time.Millisecond)).Retry(func() error {
return retry.Constant(backoff.DefaultConfig.MaxDelay, retry.WithUnits(100*time.Millisecond)).RetryWithContext(nodeCtx, func(nodeCtx context.Context) error {
retryCtx, cancel := context.WithTimeout(nodeCtx, 500*time.Millisecond)
defer cancel()

if err = cli.Bootstrap(retryCtx, &machineapi.BootstrapRequest{}); err != nil {
switch {
// deadline exceeded in case it's verbatim context error
case errors.Is(err, context.DeadlineExceeded):
return retry.ExpectedError(err)
case status.Code(err) == codes.FailedPrecondition || status.Code(err) == codes.DeadlineExceeded:
// FailedPrecondition when time is not in sync yet on the server
// DeadlineExceeded when the call fails in the gRPC stack either on the server or client side
case client.StatusCode(err) == codes.FailedPrecondition || client.StatusCode(err) == codes.DeadlineExceeded:
return retry.ExpectedError(err)
// connection refused, including proxied connection refused via the endpoint to the node
case strings.Contains(err.Error(), "connection refused"):
return retry.ExpectedError(err)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/machinery/client/client.go
Expand Up @@ -24,7 +24,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

clusterapi "github.com/talos-systems/talos/pkg/machinery/api/cluster"
Expand Down Expand Up @@ -962,7 +961,7 @@ func ReadStream(stream MachineStream) (io.ReadCloser, <-chan error, error) {
for {
data, err := stream.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
if err == io.EOF || StatusCode(err) == codes.Canceled {
return
}
//nolint:errcheck
Expand Down
3 changes: 1 addition & 2 deletions pkg/machinery/client/events.go
Expand Up @@ -13,7 +13,6 @@ import (
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
Expand Down Expand Up @@ -97,7 +96,7 @@ func (c *Client) EventsWatch(ctx context.Context, watchFunc func(<-chan Event),
for {
event, err := stream.Recv()
if err != nil {
if err == io.EOF || status.Code(err) == codes.Canceled {
if err == io.EOF || StatusCode(err) == codes.Canceled {
return nil
}

Expand Down
36 changes: 36 additions & 0 deletions pkg/machinery/client/status.go
@@ -0,0 +1,36 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package client

import (
"errors"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// StatusCode returns the Code of the error if it is a Status error, codes.OK if err
// is nil, or codes.Unknown otherwise correctly unwrapping wrapped errors.
//
// StatusCode is mostly equivalent to grpc `status.Code` method, but it correctly unwraps wrapped errors
// including `multierror.Error` used when parsing multi-node responses.
func StatusCode(err error) codes.Code {
type grpcStatus interface {
GRPCStatus() *status.Status
}

// Don't use FromError to avoid allocation of OK status.
if err == nil {
return codes.OK
}

var se grpcStatus

if errors.As(err, &se) {
return se.GRPCStatus().Code()
}

return codes.Unknown
}
63 changes: 63 additions & 0 deletions pkg/machinery/client/status_test.go
@@ -0,0 +1,63 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package client_test

import (
"errors"
"fmt"
"testing"

"github.com/hashicorp/go-multierror"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/talos-systems/talos/pkg/machinery/client"
)

func TestStatusCode(t *testing.T) {
for _, tt := range []struct {
name string
err error
code codes.Code
}{
{
name: "nil",
err: nil,
code: codes.OK,
},
{
name: "not status",
err: errors.New("some error"),
code: codes.Unknown,
},
{
name: "status",
err: status.Error(codes.AlreadyExists, "file already exists"),
code: codes.AlreadyExists,
},
{
name: "status wrapped",
err: multierror.Append(nil, status.Error(codes.AlreadyExists, "file already exists")).ErrorOrNil(),
code: codes.AlreadyExists,
},
{
name: "multiple wrapped",
err: multierror.Append(nil, status.Error(codes.FailedPrecondition, "can't be zero"), status.Error(codes.AlreadyExists, "file already exists")).ErrorOrNil(),
code: codes.FailedPrecondition,
},
{
name: "double wrapped",
err: multierror.Append(nil, fmt.Errorf("127.0.0.1: %w", status.Error(codes.AlreadyExists, "file already exists"))).ErrorOrNil(),
code: codes.AlreadyExists,
},
} {
tt := tt

t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, client.StatusCode(tt.err), tt.code)
})
}
}

0 comments on commit 5c640cd

Please sign in to comment.