Skip to content

Commit

Permalink
In GetRemoteSignedCertificate, if the certificate hasn't been issued …
Browse files Browse the repository at this point in the history
…in 5 seconds it errors

with a context deadline exceeded and does not retry.  Update it so that if the node has not
been updated within 5 seconds, attempt to get the node status again after an exponential
backoff.

Signed-off-by: cyli <ying.li@docker.com>
  • Loading branch information
cyli committed Mar 28, 2017
1 parent 970b45a commit 9564d69
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 12 deletions.
34 changes: 26 additions & 8 deletions ca/certificates.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func (rca *RootCA) RequestAndSaveNewCertificates(ctx context.Context, kw KeyWrit
// the local connection will not be returned by the connection
// broker anymore.
config.ForceRemote = true

}
if err != nil {
return nil, err
Expand Down Expand Up @@ -810,17 +809,31 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50

// Exponential backoff with Max of 30 seconds to wait for a new retry
for {
timeout := 5 * time.Second
if config.NodeCertificateStatusRequestTimeout > 0 {
timeout = config.NodeCertificateStatusRequestTimeout
}
// Send the Request and retrieve the certificate
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
stateCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
statusResponse, err := caClient.NodeCertificateStatus(ctx, statusRequest)
if err != nil {
statusResponse, err := caClient.NodeCertificateStatus(stateCtx, statusRequest)
switch {
case err != nil && grpc.Code(err) != codes.DeadlineExceeded:
conn.Close(false)
return nil, err
}

case err != nil: // this was a deadline exceeded error - we need to figure out which context
select { // the entire `GetRemoteSignedCertificate` call context was cancelled - return the error
case <-ctx.Done():
conn.Close(false)
return nil, err
default:
// the RPC call to `NodeCertificateStatus` was cancelled - retry the next loop through after an
// exponential backoff
}

// If the certificate was issued, return
if statusResponse.Status.State == api.IssuanceStateIssued {
case statusResponse.Status.State == api.IssuanceStateIssued:
if statusResponse.Certificate == nil {
conn.Close(false)
return nil, errors.New("no certificate in CertificateStatus response")
Expand All @@ -838,9 +851,14 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, rootCAPool *x50
}

// If we're still pending, the issuance failed, or the state is unknown
// let's continue trying.
// let's continue trying after an exponential backoff
expBackoff.Failure(nil, nil)
time.Sleep(expBackoff.Proceed(nil))
select {
case <-ctx.Done():
conn.Close(false)
return nil, err
case <-time.After(expBackoff.Proceed(nil)):
}
}
}

Expand Down
151 changes: 147 additions & 4 deletions ca/certificates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,30 @@ import (
"encoding/pem"
"fmt"
"io/ioutil"
"net"
"os"
"testing"
"time"

"google.golang.org/grpc"

cfcsr "github.com/cloudflare/cfssl/csr"
"github.com/cloudflare/cfssl/helpers"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/ca/testutils"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/identity"
"github.com/docker/swarmkit/manager/state"
raftutils "github.com/docker/swarmkit/manager/state/raft/testutils"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/remotes"
"github.com/opencontainers/go-digest"
"github.com/phayes/permbits"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
)

func init() {
Expand Down Expand Up @@ -479,11 +486,106 @@ func TestGetRemoteSignedCertificateNodeInfo(t *testing.T) {
assert.NotNil(t, cert)
}

// A CA Server implementation that doesn't actually sign anything - something else
// will have to update the memory store to have a valid value for a node
type nonSigningCAServer struct {
tc *testutils.TestCA
server *grpc.Server
addr string
}

func newNonSigningCAServer(t *testing.T, tc *testutils.TestCA) *nonSigningCAServer {
secConfig, err := tc.NewNodeConfig(ca.ManagerRole)
require.NoError(t, err)
serverOpts := []grpc.ServerOption{grpc.Creds(secConfig.ServerTLSCreds)}
grpcServer := grpc.NewServer(serverOpts...)
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

n := &nonSigningCAServer{
tc: tc,
addr: l.Addr().String(),
server: grpcServer,
}

api.RegisterNodeCAServer(grpcServer, n)
go grpcServer.Serve(l)
return n
}

func (n *nonSigningCAServer) stop(t *testing.T) {
n.server.Stop()
}

func (n *nonSigningCAServer) getConnBroker() *connectionbroker.Broker {
return connectionbroker.New(remotes.NewRemotes(api.Peer{Addr: n.addr}))
}

// only returns the status in the store
func (n *nonSigningCAServer) NodeCertificateStatus(ctx context.Context, request *api.NodeCertificateStatusRequest) (*api.NodeCertificateStatusResponse, error) {
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(500 * time.Millisecond):
var node *api.Node
n.tc.MemoryStore.View(func(tx store.ReadTx) {
node = store.GetNode(tx, request.NodeID)
})
if node != nil && node.Certificate.Status.State == api.IssuanceStateIssued {
return &api.NodeCertificateStatusResponse{
Status: &node.Certificate.Status,
Certificate: &node.Certificate,
}, nil
}
}
}
}

func (n *nonSigningCAServer) IssueNodeCertificate(ctx context.Context, request *api.IssueNodeCertificateRequest) (*api.IssueNodeCertificateResponse, error) {
nodeID := identity.NewID()
role := api.NodeRoleWorker
if n.tc.ManagerToken == request.Token {
role = api.NodeRoleManager
}

// Create a new node
err := n.tc.MemoryStore.Update(func(tx store.Tx) error {
node := &api.Node{
Role: role,
ID: nodeID,
Certificate: api.Certificate{
CSR: request.CSR,
CN: nodeID,
Role: role,
Status: api.IssuanceStatus{
State: api.IssuanceStatePending,
},
},
Spec: api.NodeSpec{
DesiredRole: role,
Membership: api.NodeMembershipAccepted,
Availability: request.Availability,
},
}

return store.CreateNode(tx, node)
})
if err != nil {
return nil, err
}
return &api.IssueNodeCertificateResponse{
NodeID: nodeID,
NodeMembership: api.NodeMembershipAccepted,
}, nil
}

func TestGetRemoteSignedCertificateWithPending(t *testing.T) {
t.Parallel()

tc := testutils.NewTestCA(t)
defer tc.Stop()
require.NoError(t, tc.CAServer.Stop())

// Create a new CSR to be signed
csr, _, err := ca.GenerateNewCSR()
Expand All @@ -492,29 +594,70 @@ func TestGetRemoteSignedCertificateWithPending(t *testing.T) {
updates, cancel := state.Watch(tc.MemoryStore.WatchQueue(), api.EventCreateNode{})
defer cancel()

fakeCAServer := newNonSigningCAServer(t, tc)

completed := make(chan error)
defer close(completed)
go func() {
_, err := ca.GetRemoteSignedCertificate(context.Background(), csr, tc.RootCA.Pool,
ca.CertificateRequestConfig{
Token: tc.WorkerToken,
ConnBroker: tc.ConnBroker,
ConnBroker: fakeCAServer.getConnBroker(),
// ensure the RPC call to get state is cancelled after 1 second
NodeCertificateStatusRequestTimeout: 500 * time.Second,
})
completed <- err
}()

event := <-updates
node := event.(api.EventCreateNode).Node.Copy()
var node *api.Node
// wait for a new node to show up
for node == nil {
select {
case event := <-updates: // we want to skip the first node, which is the test CA
n := event.(api.EventCreateNode).Node.Copy()
if n.Certificate.Status.State == api.IssuanceStatePending {
node = n
}
}
}
// wait for five seconds and ensure that GetRemoteSignedCertificate has not returned with an error yet
select {
case <-completed:
assert.FailNow(t, "GetRemoteSignedCertificate should wait at least 3 seconds")
case <-time.After(3 * time.Second):
// good, it's still polling so we can proceed with the test
}

// Directly update the status of the store
err = tc.MemoryStore.Update(func(tx store.Tx) error {
node.Certificate.Status.State = api.IssuanceStateIssued

return store.UpdateNode(tx, node)
})
assert.NoError(t, err)

// Make sure GetRemoteSignedCertificate didn't return an error
assert.NoError(t, <-completed)

// make sure if we cancel the GetRemoteSignedCertificate, it cancels immediately
go func() {
ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond)
_, err := ca.GetRemoteSignedCertificate(ctx, csr, tc.RootCA.Pool,
ca.CertificateRequestConfig{
Token: tc.WorkerToken,
ConnBroker: fakeCAServer.getConnBroker(),
// ensure the RPC call to get state is cancelled after 1 second
NodeCertificateStatusRequestTimeout: time.Second,
})
completed <- err
}()

// wait for five seconds and ensure that GetRemoteSignedCertificate has not returned with an error yet
select {
case err = <-completed:
assert.Equal(t, grpc.Code(err), codes.DeadlineExceeded)
case <-time.After(570 * time.Second):
assert.FailNow(t, "GetRemoteSignedCertificate should have been canceled after 500 milliseconds")
}
}

func TestNewRootCA(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions ca/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ type CertificateRequestConfig struct {
// where the local node is running a manager, but is in the process of
// being demoted.
ForceRemote bool
// NodeCertificateStatusRequestTimeout determines how long to wait for a node
// status RPC result. If not provided (zero value), will default to 5 seconds.
NodeCertificateStatusRequestTimeout time.Duration
}

// CreateSecurityConfig creates a new key and cert for this node, either locally
Expand Down

0 comments on commit 9564d69

Please sign in to comment.