From b727e680ab9ebed7af6d6adce830ee79b64236a4 Mon Sep 17 00:00:00 2001 From: cyli Date: Mon, 27 Mar 2017 23:42:24 -0700 Subject: [PATCH] In GetRemoteSignedCertificate, if the certificate hasn't been issued in 5 seconds it errors with a context deadline exceeded and does not retry. Update it so that if the node has not been updated within 5 seconds, attempt to get the node status again after an exponential backoff. If NodeCertificateStatus errors with some other error (not context deadline exceeded), GetRemoteSignedCertificate will try again with a different connection. Signed-off-by: cyli --- ca/certificates.go | 39 +++-- ca/certificates_test.go | 322 +++++++++++++++++++++++++++++++++++++++- ca/config.go | 3 + 3 files changed, 345 insertions(+), 19 deletions(-) diff --git a/ca/certificates.go b/ca/certificates.go index 61e8d57d31..ec7794bf35 100644 --- a/ca/certificates.go +++ b/ca/certificates.go @@ -244,7 +244,6 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit // the local connection will not be returned by the connection // broker anymore. config.ForceRemote = true - } if err != nil { return nil, err @@ -773,7 +772,6 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50 if rootCAPool == nil { return nil, errors.New("valid root CA pool required") } - creds := config.Credentials if creds == nil { @@ -810,17 +808,29 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50 // Exponential backoff with Max of 30 seconds to wait for a new retry for { + timeout := 5 * time.Second + if config.NodeCertificateStatusRequestTimeout > 0 { + timeout = config.NodeCertificateStatusRequestTimeout + } // Send the Request and retrieve the certificate - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + stateCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - statusResponse, err := caClient.NodeCertificateStatus(ctx, statusRequest) - if err != nil { + statusResponse, err := caClient.NodeCertificateStatus(stateCtx, statusRequest) + switch { + case err != nil && grpc.Code(err) != codes.DeadlineExceeded: conn.Close(false) - return nil, err - } + // Because IssueNodeCertificate succeeded, if this call failed likely it is due to an issue with this + // particular connection, so we need to get another. We should try a remote connection - the local node + // may be a manager that was demoted, so the local connection (which is preferred) may not work. + config.ForceRemote = true + conn, err = getGRPCConnection(creds, config.ConnBroker, config.ForceRemote) + if err != nil { + return nil, err + } + caClient = api.NewNodeCAClient(conn.ClientConn) - // If the certificate was issued, return - if statusResponse.Status.State == api.IssuanceStateIssued { + // If there was no deadline exceeded error, and the certificate was issued, return + case err == nil && statusResponse.Status.State == api.IssuanceStateIssued: if statusResponse.Certificate == nil { conn.Close(false) return nil, errors.New("no certificate in CertificateStatus response") @@ -837,10 +847,15 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50 } } - // If we're still pending, the issuance failed, or the state is unknown - // let's continue trying. + // If NodeCertificateStatus timed out, we're still pending, the issuance failed, or + // the state is unknown let's continue trying after an exponential backoff expBackoff.Failure(nil, nil) - time.Sleep(expBackoff.Proceed(nil)) + select { + case <-ctx.Done(): + conn.Close(true) + return nil, err + case <-time.After(expBackoff.Proceed(nil)): + } } } diff --git a/ca/certificates_test.go b/ca/certificates_test.go index 83b653052d..a137490096 100644 --- a/ca/certificates_test.go +++ b/ca/certificates_test.go @@ -10,18 +10,27 @@ import ( "encoding/pem" "fmt" "io/ioutil" + "net" "os" + "sync" + "sync/atomic" "testing" "time" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + cfcsr "github.com/cloudflare/cfssl/csr" "github.com/cloudflare/cfssl/helpers" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/ca" "github.com/docker/swarmkit/ca/testutils" + "github.com/docker/swarmkit/connectionbroker" + "github.com/docker/swarmkit/identity" "github.com/docker/swarmkit/manager/state" raftutils "github.com/docker/swarmkit/manager/state/raft/testutils" "github.com/docker/swarmkit/manager/state/store" + "github.com/docker/swarmkit/remotes" "github.com/opencontainers/go-digest" "github.com/phayes/permbits" "github.com/stretchr/testify/assert" @@ -513,42 +522,341 @@ func TestGetRemoteSignedCertificateNodeInfo(t *testing.T) { assert.NotNil(t, cert) } +// A CA Server implementation that doesn't actually sign anything - something else +// will have to update the memory store to have a valid value for a node +type nonSigningCAServer struct { + tc *testutils.TestCA + server *grpc.Server + addr string + nodeStatusCalled int64 +} + +func newNonSigningCAServer(t *testing.T, tc *testutils.TestCA) *nonSigningCAServer { + secConfig, err := tc.NewNodeConfig(ca.ManagerRole) + require.NoError(t, err) + serverOpts := []grpc.ServerOption{grpc.Creds(secConfig.ServerTLSCreds)} + grpcServer := grpc.NewServer(serverOpts...) + l, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + n := &nonSigningCAServer{ + tc: tc, + addr: l.Addr().String(), + server: grpcServer, + } + + api.RegisterNodeCAServer(grpcServer, n) + go grpcServer.Serve(l) + return n +} + +func (n *nonSigningCAServer) stop(t *testing.T) { + n.server.Stop() +} + +func (n *nonSigningCAServer) getConnBroker() *connectionbroker.Broker { + return connectionbroker.New(remotes.NewRemotes(api.Peer{Addr: n.addr})) +} + +// only returns the status in the store +func (n *nonSigningCAServer) NodeCertificateStatus(ctx context.Context, request *api.NodeCertificateStatusRequest) (*api.NodeCertificateStatusResponse, error) { + atomic.AddInt64(&n.nodeStatusCalled, 1) + for { + var node *api.Node + n.tc.MemoryStore.View(func(tx store.ReadTx) { + node = store.GetNode(tx, request.NodeID) + }) + if node != nil && node.Certificate.Status.State == api.IssuanceStateIssued { + return &api.NodeCertificateStatusResponse{ + Status: &node.Certificate.Status, + Certificate: &node.Certificate, + }, nil + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(500 * time.Millisecond): + } + } +} + +func (n *nonSigningCAServer) IssueNodeCertificate(ctx context.Context, request *api.IssueNodeCertificateRequest) (*api.IssueNodeCertificateResponse, error) { + nodeID := identity.NewID() + role := api.NodeRoleWorker + if n.tc.ManagerToken == request.Token { + role = api.NodeRoleManager + } + + // Create a new node + err := n.tc.MemoryStore.Update(func(tx store.Tx) error { + node := &api.Node{ + Role: role, + ID: nodeID, + Certificate: api.Certificate{ + CSR: request.CSR, + CN: nodeID, + Role: role, + Status: api.IssuanceStatus{ + State: api.IssuanceStatePending, + }, + }, + Spec: api.NodeSpec{ + DesiredRole: role, + Membership: api.NodeMembershipAccepted, + Availability: request.Availability, + }, + } + + return store.CreateNode(tx, node) + }) + if err != nil { + return nil, err + } + return &api.IssueNodeCertificateResponse{ + NodeID: nodeID, + NodeMembership: api.NodeMembershipAccepted, + }, nil +} + func TestGetRemoteSignedCertificateWithPending(t *testing.T) { t.Parallel() + if testutils.External { + // we don't actually need an external signing server, since we're faking a CA TestCAServerUpdateRootCA which + // doesn't really sign + return + } tc := testutils.NewTestCA(t) defer tc.Stop() + require.NoError(t, tc.CAServer.Stop()) // Create a new CSR to be signed csr, _, err := ca.GenerateNewCSR() - assert.NoError(t, err) + require.NoError(t, err) updates, cancel := state.Watch(tc.MemoryStore.WatchQueue(), api.EventCreateNode{}) defer cancel() + fakeCAServer := newNonSigningCAServer(t, tc) + completed := make(chan error) + defer close(completed) go func() { _, err := ca.GetRemoteSignedCertificate(context.Background(), csr, tc.RootCA.Pool, ca.CertificateRequestConfig{ Token: tc.WorkerToken, - ConnBroker: tc.ConnBroker, + ConnBroker: fakeCAServer.getConnBroker(), + // ensure the RPC call to get state is cancelled after 500 milliseconds + NodeCertificateStatusRequestTimeout: 500 * time.Millisecond, }) completed <- err }() - event := <-updates - node := event.(api.EventCreateNode).Node.Copy() + var node *api.Node + // wait for a new node to show up + for node == nil { + select { + case event := <-updates: // we want to skip the first node, which is the test CA + n := event.(api.EventCreateNode).Node.Copy() + if n.Certificate.Status.State == api.IssuanceStatePending { + node = n + } + } + } + + // wait for the calls to NodeCertificateStatus to begin on the first signing server before we start timing + require.NoError(t, raftutils.PollFuncWithTimeout(nil, func() error { + if atomic.LoadInt64(&fakeCAServer.nodeStatusCalled) == 0 { + return fmt.Errorf("waiting for NodeCertificateStatus to be called") + } + return nil + }, time.Second*2)) + + // wait for 2.5 seconds and ensure that GetRemoteSignedCertificate has not returned with an error yet - + // the first attempt to get the certificate status should have timed out after 500 milliseconds, but + // it should have tried to poll again. Add a few seconds for fudge time to make sure it's actually + // still polling. + select { + case <-completed: + require.FailNow(t, "GetRemoteSignedCertificate should wait at least 500 milliseconds") + case <-time.After(2500 * time.Millisecond): + // good, it's still polling so we can proceed with the test + } + require.True(t, atomic.LoadInt64(&fakeCAServer.nodeStatusCalled) > 1, "expected NodeCertificateStatus to have been polled more than once") // Directly update the status of the store err = tc.MemoryStore.Update(func(tx store.Tx) error { node.Certificate.Status.State = api.IssuanceStateIssued - return store.UpdateNode(tx, node) }) - assert.NoError(t, err) + require.NoError(t, err) // Make sure GetRemoteSignedCertificate didn't return an error - assert.NoError(t, <-completed) + require.NoError(t, <-completed) + + // make sure if we time out the GetRemoteSignedCertificate call, it cancels immediately and doesn't keep + // polling the status + go func() { + ctx, _ := context.WithTimeout(context.Background(), 1*time.Second) + _, err := ca.GetRemoteSignedCertificate(ctx, csr, tc.RootCA.Pool, + ca.CertificateRequestConfig{ + Token: tc.WorkerToken, + ConnBroker: fakeCAServer.getConnBroker(), + }) + completed <- err + }() + + // wait for 3 seconds and ensure that GetRemoteSignedCertificate has returned with a context DeadlineExceeded + // error - it should have returned after 1 second, but add some more for rudge time. + select { + case err = <-completed: + require.Equal(t, grpc.Code(err), codes.DeadlineExceeded) + case <-time.After(3 * time.Second): + require.FailNow(t, "GetRemoteSignedCertificate should have been canceled after 1 second, and it has been 3") + } +} + +// fake remotes interface that just selects the remotes in order +type fakeRemotes struct { + mu sync.Mutex + peers []api.Peer +} + +func (f *fakeRemotes) Weights() map[api.Peer]int { + panic("this is not called") +} + +func (f *fakeRemotes) Select(...string) (api.Peer, error) { + f.mu.Lock() + defer f.mu.Unlock() + if len(f.peers) > 0 { + return f.peers[0], nil + } + return api.Peer{}, fmt.Errorf("no more peers") +} + +func (f *fakeRemotes) Observe(peer api.Peer, weight int) { + panic("this is not called") +} + +// just removes a peer if the weight is negative +func (f *fakeRemotes) ObserveIfExists(peer api.Peer, weight int) { + f.mu.Lock() + defer f.mu.Unlock() + if weight < 0 { + var newPeers []api.Peer + for _, p := range f.peers { + if p != peer { + newPeers = append(newPeers, p) + } + } + f.peers = newPeers + } +} + +func (f *fakeRemotes) Remove(addrs ...api.Peer) { + panic("this is not called") +} + +var _ remotes.Remotes = &fakeRemotes{} + +// On connection errors, so long as they happen after IssueNodeCertificate is successful, GetRemoteSignedCertificate +// tries to open a new connection and continue polling for NodeCertificateStatus. If there are no more connections, +// then fail. +func TestGetRemoteSignedCertificateConnectionErrors(t *testing.T) { + t.Parallel() + if testutils.External { + // we don't actually need an external signing server, since we're faking a CA TestCAServerUpdateRootCA which + // doesn't really sign + return + } + + tc := testutils.NewTestCA(t) + defer tc.Stop() + require.NoError(t, tc.CAServer.Stop()) + + // Create a new CSR to be signed + csr, _, err := ca.GenerateNewCSR() + require.NoError(t, err) + + // create 2 CA servers referencing the same memory store, so we can have multiple connections + fakeSigningServers := []*nonSigningCAServer{newNonSigningCAServer(t, tc), newNonSigningCAServer(t, tc)} + defer fakeSigningServers[0].stop(t) + defer fakeSigningServers[1].stop(t) + multiBroker := connectionbroker.New(&fakeRemotes{ + peers: []api.Peer{ + {Addr: fakeSigningServers[0].addr}, + {Addr: fakeSigningServers[1].addr}, + }, + }) + + completed, done := make(chan error), make(chan struct{}) + defer close(completed) + defer close(done) + go func() { + _, err := ca.GetRemoteSignedCertificate(context.Background(), csr, tc.RootCA.Pool, + ca.CertificateRequestConfig{ + Token: tc.WorkerToken, + ConnBroker: multiBroker, + }) + select { + case <-done: + case completed <- err: + } + }() + + // wait for the calls to NodeCertificateStatus to begin on the first signing server + require.NoError(t, raftutils.PollFuncWithTimeout(nil, func() error { + if atomic.LoadInt64(&fakeSigningServers[0].nodeStatusCalled) == 0 { + return fmt.Errorf("waiting for NodeCertificateStatus to be called") + } + return nil + }, time.Second*2)) + + // stop 1 server, because it will have been the remote GetRemoteSignedCertificate first connected to, and ensure + // that GetRemoteSignedCertificate is still going + fakeSigningServers[0].stop(t) + select { + case <-completed: + require.FailNow(t, "GetRemoteSignedCertificate should still be going after 2.5 seconds") + case <-time.After(2500 * time.Millisecond): + // good, it's still polling so we can proceed with the test + } + + // wait for the calls to NodeCertificateStatus to begin on the second signing server + require.NoError(t, raftutils.PollFuncWithTimeout(nil, func() error { + if atomic.LoadInt64(&fakeSigningServers[1].nodeStatusCalled) == 0 { + return fmt.Errorf("waiting for NodeCertificateStatus to be called") + } + return nil + }, time.Second*2)) + + // kill the last server - this should cause GetRemoteSignedCertificate to fail because there are no more peers + fakeSigningServers[1].stop(t) + // wait for 5 seconds and ensure that GetRemoteSignedCertificate has returned with an error. + select { + case err = <-completed: + require.Contains(t, err.Error(), "no more peers") + case <-time.After(5 * time.Second): + require.FailNow(t, "GetRemoteSignedCertificate should errored after 5 seconds") + } + + // calling GetRemoteSignedCertificate with a connection that doesn't work with IssueNodeCertificate will fail + // immediately without retrying with a new connection + fakeSigningServers[1] = newNonSigningCAServer(t, tc) + defer fakeSigningServers[1].stop(t) + multiBroker = connectionbroker.New(&fakeRemotes{ + peers: []api.Peer{ + {Addr: fakeSigningServers[0].addr}, + {Addr: fakeSigningServers[1].addr}, + }, + }) + _, err = ca.GetRemoteSignedCertificate(context.Background(), csr, tc.RootCA.Pool, + ca.CertificateRequestConfig{ + Token: tc.WorkerToken, + ConnBroker: multiBroker, + }) + require.Error(t, err) } func TestNewRootCA(t *testing.T) { diff --git a/ca/config.go b/ca/config.go index 12713dbf05..6f469636b7 100644 --- a/ca/config.go +++ b/ca/config.go @@ -350,6 +350,9 @@ type CertificateRequestConfig struct { // where the local node is running a manager, but is in the process of // being demoted. ForceRemote bool + // NodeCertificateStatusRequestTimeout determines how long to wait for a node + // status RPC result. If not provided (zero value), will default to 5 seconds. + NodeCertificateStatusRequestTimeout time.Duration } // CreateSecurityConfig creates a new key and cert for this node, either locally