Skip to content

Commit

Permalink
manager: Fix hanging Stop method
Browse files Browse the repository at this point in the history
If raftNode.JoinAndStart failed, Stop will block forever because it
waits for the manager to start up.

To fix this, close the "started" channel even if Run exits early due to
an error. Fix the way the collector is initialized so its Stop method
won't hang either.

Add a test that makes sure the node shuts down cleanly after a failed
manager initialization.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann authored and aluzzardi committed May 31, 2017
1 parent 1a3e510 commit f7a002c
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 31 deletions.
10 changes: 8 additions & 2 deletions manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ func New(config *Config) (*Manager, error) {

m := &Manager{
config: *config,
collector: metrics.NewCollector(raftNode.MemoryStore()),
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig, config.RootCAPaths),
dispatcher: dispatcher.New(raftNode, dispatcher.DefaultConfig()),
logbroker: logbroker.New(raftNode.MemoryStore()),
Expand Down Expand Up @@ -502,12 +501,16 @@ func (m *Manager) Run(parent context.Context) error {
healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING)

if err := m.raftNode.JoinAndStart(ctx); err != nil {
// Don't block future calls to Stop.
close(m.started)
return errors.Wrap(err, "can't initialize raft node")
}

localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING)

// Start metrics collection.

m.collector = metrics.NewCollector(m.raftNode.MemoryStore())
go func(collector *metrics.Collector) {
if err := collector.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("collector failed with an error")
Expand Down Expand Up @@ -590,7 +593,10 @@ func (m *Manager) Stop(ctx context.Context, clearData bool) {

m.raftNode.Cancel()

m.collector.Stop()
if m.collector != nil {
m.collector.Stop()
}

m.dispatcher.Stop()
m.logbroker.Stop()
m.caserver.Stop()
Expand Down
2 changes: 1 addition & 1 deletion manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (n *Node) JoinAndStart(ctx context.Context) (err error) {
if err != nil {
n.stopMu.Lock()
// to shutdown transport
close(n.stopped)
n.cancelFunc()
n.stopMu.Unlock()
n.done()
} else {
Expand Down
74 changes: 46 additions & 28 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -252,6 +251,7 @@ func TestLoadSecurityConfigDownloadAllCerts(t *testing.T) {
func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "manager-root-ca-update")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

// don't bother with a listening socket
cAddr := filepath.Join(tmpDir, "control.sock")
Expand All @@ -264,13 +264,7 @@ func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) {
node, err := New(cfg)
require.NoError(t, err)

var nodeErr error
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
nodeErr = node.Start(context.Background())
wg.Done()
}()
require.NoError(t, node.Start(context.Background()))

select {
case <-node.Ready():
Expand All @@ -297,13 +291,12 @@ func TestManagerIgnoresDispatcherRootCAUpdate(t *testing.T) {
require.Equal(t, currentCACerts, caCerts)

require.NoError(t, node.Stop(context.Background()))
wg.Wait()
require.NoError(t, nodeErr)
}

func TestAgentRespectsDispatcherRootCAUpdate(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "manager-root-ca-update")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

// bootstrap worker TLS certificates
paths := ca.NewConfigPaths(filepath.Join(tmpDir, certDirectory))
Expand All @@ -329,13 +322,7 @@ func TestAgentRespectsDispatcherRootCAUpdate(t *testing.T) {
node, err := New(cfg)
require.NoError(t, err)

var nodeErr error
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
nodeErr = node.Start(context.Background())
wg.Done()
}()
require.NoError(t, node.Start(context.Background()))

select {
case <-node.Ready():
Expand Down Expand Up @@ -371,13 +358,12 @@ func TestAgentRespectsDispatcherRootCAUpdate(t *testing.T) {
}, time.Second))

require.NoError(t, node.Stop(context.Background()))
wg.Wait()
require.NoError(t, nodeErr)
}

func TestCertRenewals(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "no-top-level-role")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

paths := ca.NewConfigPaths(filepath.Join(tmpDir, "certificates"))

Expand All @@ -391,13 +377,7 @@ func TestCertRenewals(t *testing.T) {
node, err := New(cfg)
require.NoError(t, err)

var nodeErr error
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
nodeErr = node.Start(context.Background())
wg.Done()
}()
require.NoError(t, node.Start(context.Background()))

select {
case <-node.Ready():
Expand Down Expand Up @@ -449,6 +429,44 @@ func TestCertRenewals(t *testing.T) {
}, 5*time.Second))

require.NoError(t, node.Stop(context.Background()))
wg.Wait()
require.NoError(t, nodeErr)
}

func TestManagerFailedStartup(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "manager-root-ca-update")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

paths := ca.NewConfigPaths(filepath.Join(tmpDir, certDirectory))

rootCA, err := ca.CreateRootCA(ca.DefaultRootCN)
require.NoError(t, err)
require.NoError(t, ca.SaveRootCA(rootCA, paths.RootCA))

krw := ca.NewKeyReadWriter(paths.Node, nil, nil)
require.NoError(t, err)
_, _, err = rootCA.IssueAndSaveNewCertificates(krw, identity.NewID(), ca.ManagerRole, identity.NewID())
require.NoError(t, err)

// don't bother with a listening socket
cAddr := filepath.Join(tmpDir, "control.sock")
cfg := &Config{
ListenControlAPI: cAddr,
StateDir: tmpDir,
Executor: &agentutils.TestExecutor{},
JoinAddr: "127.0.0.1",
}

node, err := New(cfg)
require.NoError(t, err)

require.NoError(t, node.Start(context.Background()))

select {
case <-node.Ready():
require.FailNow(t, "node should not become ready")
case <-time.After(5 * time.Second):
require.FailNow(t, "node neither became ready nor encountered an error")
case <-node.closed:
require.EqualError(t, node.err, "manager stopped: can't initialize raft node: attempted to join raft cluster without knowing own address")
}
}

0 comments on commit f7a002c

Please sign in to comment.