From 46b0f29d3a7e706146d269a635eb8620676deabb Mon Sep 17 00:00:00 2001 From: cyli Date: Mon, 3 Apr 2017 13:15:39 -0700 Subject: [PATCH] Update agent to send the node TLS information along with the node description, and to restart the session if the information has changed. Signed-off-by: cyli --- agent/agent.go | 134 +++++++++++-------- agent/agent_test.go | 273 +++++++++++++++++++++++++++++++-------- agent/config.go | 20 ++- agent/testutils/fakes.go | 219 +++++++++++++++++++++++++++++++ codecov.yml | 2 + integration/exec.go | 94 -------------- integration/node.go | 3 +- 7 files changed, 545 insertions(+), 200 deletions(-) create mode 100644 agent/testutils/fakes.go delete mode 100644 integration/exec.go diff --git a/agent/agent.go b/agent/agent.go index 3721f2916a..295366b2e8 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1,6 +1,7 @@ package agent import ( + "bytes" "fmt" "math/rand" "reflect" @@ -44,6 +45,8 @@ type Agent struct { stopOnce sync.Once // only allow stop to be called once closed chan struct{} // only closed in run err error // read only after closed is closed + + nodeUpdatePeriod time.Duration } // New returns a new agent, ready for task dispatch. @@ -53,14 +56,15 @@ func New(config *Config) (*Agent, error) { } a := &Agent{ - config: config, - sessionq: make(chan sessionOperation), - started: make(chan struct{}), - leaving: make(chan struct{}), - left: make(chan struct{}), - stopped: make(chan struct{}), - closed: make(chan struct{}), - ready: make(chan struct{}), + config: config, + sessionq: make(chan sessionOperation), + started: make(chan struct{}), + leaving: make(chan struct{}), + left: make(chan struct{}), + stopped: make(chan struct{}), + closed: make(chan struct{}), + ready: make(chan struct{}), + nodeUpdatePeriod: nodeUpdatePeriod, } a.worker = newWorker(config.DB, config.Executor, a) @@ -182,13 +186,15 @@ func (a *Agent) run(ctx context.Context) { log.G(ctx).Debug("(*Agent).run") defer log.G(ctx).Debug("(*Agent).run exited") + nodeTLSInfo := a.config.NodeTLSInfo + // get the node description - nodeDescription, err := a.nodeDescriptionWithHostname(ctx) + nodeDescription, err := a.nodeDescriptionWithHostname(ctx, nodeTLSInfo) if err != nil { log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Error("agent: node description unavailable") } // nodeUpdateTicker is used to periodically check for updates to node description - nodeUpdateTicker := time.NewTicker(nodeUpdatePeriod) + nodeUpdateTicker := time.NewTicker(a.nodeUpdatePeriod) defer nodeUpdateTicker.Stop() var ( @@ -214,6 +220,35 @@ func (a *Agent) run(ctx context.Context) { a.worker.Listen(ctx, reporter) + updateNode := func() { + // skip updating if the registration isn't finished + if registered != nil { + return + } + // get the current node description + newNodeDescription, err := a.nodeDescriptionWithHostname(ctx, nodeTLSInfo) + if err != nil { + log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Error("agent: updated node description unavailable") + } + + // if newNodeDescription is nil, it will cause a panic when + // trying to create a session. Typically this can happen + // if the engine goes down + if newNodeDescription == nil { + return + } + + // if the node description has changed, update it to the new one + // and close the session. The old session will be stopped and a + // new one will be created with the updated description + if !reflect.DeepEqual(nodeDescription, newNodeDescription) { + nodeDescription = newNodeDescription + // close the session + log.G(ctx).Info("agent: found node update") + session.sendError(nil) + } + } + for { select { case operation := <-sessionq: @@ -247,7 +282,7 @@ func (a *Agent) run(ctx context.Context) { } } case msg := <-session.messages: - if err := a.handleSessionMessage(ctx, msg); err != nil { + if err := a.handleSessionMessage(ctx, msg, nodeTLSInfo); err != nil { log.G(ctx).WithError(err).Error("session message handler failed") } case sub := <-session.subscriptions: @@ -305,33 +340,17 @@ func (a *Agent) run(ctx context.Context) { } session = newSession(ctx, a, delay, session.sessionID, nodeDescription) registered = session.registered - case <-nodeUpdateTicker.C: - // skip this case if the registration isn't finished - if registered != nil { - continue - } - // get the current node description - newNodeDescription, err := a.nodeDescriptionWithHostname(ctx) - if err != nil { - log.G(ctx).WithError(err).WithField("agent", a.config.Executor).Error("agent: updated node description unavailable") - } - - // if newNodeDescription is nil, it will cause a panic when - // trying to create a session. Typically this can happen - // if the engine goes down - if newNodeDescription == nil { - continue - } - - // if the node description has changed, update it to the new one - // and close the session. The old session will be stopped and a - // new one will be created with the updated description - if !reflect.DeepEqual(nodeDescription, newNodeDescription) { - nodeDescription = newNodeDescription - // close the session - log.G(ctx).Info("agent: found node update") - session.sendError(nil) + case ev := <-a.config.NotifyTLSChange: + // the TLS info has changed, so force a check to see if we need to restart the session + if tlsInfo, ok := ev.(*api.NodeTLSInfo); ok { + nodeTLSInfo = tlsInfo + updateNode() + nodeUpdateTicker.Stop() + nodeUpdateTicker = time.NewTicker(a.nodeUpdatePeriod) } + case <-nodeUpdateTicker.C: + // periodically check to see whether the node information has changed, and if so, restart the session + updateNode() case <-a.stopped: // TODO(stevvooe): Wait on shutdown and cleanup. May need to pump // this loop a few times. @@ -347,7 +366,7 @@ func (a *Agent) run(ctx context.Context) { } } -func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMessage) error { +func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMessage, nti *api.NodeTLSInfo) error { seen := map[api.Peer]struct{}{} for _, manager := range message.Managers { if manager.Peer.Addr == "" { @@ -358,18 +377,28 @@ func (a *Agent) handleSessionMessage(ctx context.Context, message *api.SessionMe seen[*manager.Peer] = struct{}{} } - if message.Node != nil { - if a.node == nil || !nodesEqual(a.node, message.Node) { - if a.config.NotifyNodeChange != nil { - a.config.NotifyNodeChange <- message.Node.Copy() - } - a.node = message.Node.Copy() - if err := a.config.Executor.Configure(ctx, a.node); err != nil { - log.G(ctx).WithError(err).Error("node configure failed") - } + var changes *NodeChanges + if message.Node != nil && (a.node == nil || !nodesEqual(a.node, message.Node)) { + if a.config.NotifyNodeChange != nil { + changes = &NodeChanges{Node: message.Node.Copy()} + } + a.node = message.Node.Copy() + if err := a.config.Executor.Configure(ctx, a.node); err != nil { + log.G(ctx).WithError(err).Error("node configure failed") + } + } + if len(message.RootCA) > 0 && !bytes.Equal(message.RootCA, nti.TrustRoot) { + if changes == nil { + changes = &NodeChanges{RootCert: message.RootCA} + } else { + changes.RootCert = message.RootCA } } + if changes != nil { + a.config.NotifyNodeChange <- changes + } + // prune managers not in list. for peer := range a.config.ConnBroker.Remotes().Weights() { if _, ok := seen[peer]; !ok { @@ -517,12 +546,15 @@ func (a *Agent) Publisher(ctx context.Context, subscriptionID string) (exec.LogP } // nodeDescriptionWithHostname retrieves node description, and overrides hostname if available -func (a *Agent) nodeDescriptionWithHostname(ctx context.Context) (*api.NodeDescription, error) { +func (a *Agent) nodeDescriptionWithHostname(ctx context.Context, tlsInfo *api.NodeTLSInfo) (*api.NodeDescription, error) { desc, err := a.config.Executor.Describe(ctx) - // Override hostname - if a.config.Hostname != "" && desc != nil { - desc.Hostname = a.config.Hostname + // Override hostname and TLS info + if desc != nil { + if a.config.Hostname != "" && desc != nil { + desc.Hostname = a.config.Hostname + } + desc.TLSInfo = tlsInfo } return desc, err } diff --git a/agent/agent_test.go b/agent/agent_test.go index 2e01a0b3c9..9ef53cbfbd 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -1,40 +1,23 @@ package agent import ( + "errors" "testing" "time" - "github.com/docker/swarmkit/agent/exec" + events "github.com/docker/go-events" + agentutils "github.com/docker/swarmkit/agent/testutils" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/ca" "github.com/docker/swarmkit/ca/testutils" "github.com/docker/swarmkit/connectionbroker" + raftutils "github.com/docker/swarmkit/manager/state/raft/testutils" "github.com/docker/swarmkit/remotes" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/net/context" ) -// NoopExecutor is a dummy executor that implements enough to get the agent started. -type NoopExecutor struct { -} - -func (e *NoopExecutor) Describe(ctx context.Context) (*api.NodeDescription, error) { - return &api.NodeDescription{}, nil -} - -func (e *NoopExecutor) Configure(ctx context.Context, node *api.Node) error { - return nil -} - -func (e *NoopExecutor) Controller(t *api.Task) (exec.Controller, error) { - return nil, exec.ErrRuntimeUnsupported -} - -func (e *NoopExecutor) SetNetworkBootstrapKeys([]*api.EncryptionKey) error { - return nil -} - func TestAgent(t *testing.T) { // TODO(stevvooe): The current agent is fairly monolithic, making it hard // to test without implementing or mocking an entire master. We'd like to @@ -73,10 +56,11 @@ func TestAgentStartStop(t *testing.T) { defer cleanup() agent, err := New(&Config{ - Executor: &NoopExecutor{}, + Executor: &agentutils.TestExecutor{}, ConnBroker: connectionbroker.New(remotes), Credentials: agentSecurityConfig.ClientTLSCreds, DB: db, + NodeTLSInfo: &api.NodeTLSInfo{}, }) require.NoError(t, err) assert.NotNil(t, agent) @@ -93,69 +77,252 @@ func TestAgentStartStop(t *testing.T) { assert.NoError(t, agent.Stop(ctx)) } -func TestHandleSessionMessage(t *testing.T) { - // TODO(rajdeepd): The current agent is fairly monolithic, hence we - // have to test message handling this way. Needs to be refactored - ctx, _ := context.WithTimeout(context.Background(), 5000*time.Millisecond) - agent, cleanup := agentTestEnv(t) +func TestHandleSessionMessageNetworkManagerChanges(t *testing.T) { + nodeChangeCh := make(chan *NodeChanges, 1) + defer close(nodeChangeCh) + tester := agentTestEnv(t, nodeChangeCh, nil) + defer tester.cleanup() - defer cleanup() + currSession, closedSessions := tester.dispatcher.GetSessions() + require.NotNil(t, currSession) + require.NotNil(t, currSession.Description) + require.Empty(t, closedSessions) var messages = []*api.SessionMessage{ - {SessionID: "sm1", Node: &api.Node{}, + { Managers: []*api.WeightedPeer{ {&api.Peer{NodeID: "node1", Addr: "10.0.0.1"}, 1.0}}, - NetworkBootstrapKeys: []*api.EncryptionKey{{}}}, - {SessionID: "sm1", Node: &api.Node{}, + NetworkBootstrapKeys: []*api.EncryptionKey{{}}, + }, + { Managers: []*api.WeightedPeer{ {&api.Peer{NodeID: "node1", Addr: ""}, 1.0}}, - NetworkBootstrapKeys: []*api.EncryptionKey{{}}}, - {SessionID: "sm1", Node: &api.Node{}, + NetworkBootstrapKeys: []*api.EncryptionKey{{}}, + }, + { Managers: []*api.WeightedPeer{ {&api.Peer{NodeID: "node1", Addr: "10.0.0.1"}, 1.0}}, - NetworkBootstrapKeys: nil}, - {SessionID: "sm1", Node: &api.Node{}, + NetworkBootstrapKeys: nil, + }, + { Managers: []*api.WeightedPeer{ {&api.Peer{NodeID: "", Addr: "10.0.0.1"}, 1.0}}, - NetworkBootstrapKeys: []*api.EncryptionKey{{}}}, - {SessionID: "sm1", Node: &api.Node{}, + NetworkBootstrapKeys: []*api.EncryptionKey{{}}, + }, + { Managers: []*api.WeightedPeer{ {&api.Peer{NodeID: "node1", Addr: "10.0.0.1"}, 0.0}}, - NetworkBootstrapKeys: []*api.EncryptionKey{{}}}, + NetworkBootstrapKeys: []*api.EncryptionKey{{}}, + }, } for _, m := range messages { - err := agent.handleSessionMessage(ctx, m) - if err != nil { - t.Fatal("err should be nil") + m.SessionID = currSession.SessionID + tester.dispatcher.SessionMessageChannel() <- m + select { + case nodeChange := <-nodeChangeCh: + require.FailNow(t, "there should be no node changes with these messages: %v", nodeChange) + case <-time.After(100 * time.Millisecond): } } + + currSession, closedSessions = tester.dispatcher.GetSessions() + require.NotEmpty(t, currSession) + require.Empty(t, closedSessions) +} + +func TestHandleSessionMessageNodeChanges(t *testing.T) { + nodeChangeCh := make(chan *NodeChanges, 1) + defer close(nodeChangeCh) + tester := agentTestEnv(t, nodeChangeCh, nil) + defer tester.cleanup() + + currSession, closedSessions := tester.dispatcher.GetSessions() + require.NotNil(t, currSession) + require.NotNil(t, currSession.Description) + require.Empty(t, closedSessions) + + var testcases = []struct { + msg *api.SessionMessage + change *NodeChanges + errorMsg string + }{ + { + msg: &api.SessionMessage{ + Node: &api.Node{}, + }, + change: &NodeChanges{Node: &api.Node{}}, + errorMsg: "the node changed, but no notification of node change", + }, + { + msg: &api.SessionMessage{ + RootCA: []byte("new root CA"), + }, + change: &NodeChanges{RootCert: []byte("new root CA")}, + errorMsg: "the root cert changed, but no notification of node change", + }, + { + msg: &api.SessionMessage{ + Node: &api.Node{ID: "something"}, + RootCA: []byte("new root CA"), + }, + change: &NodeChanges{ + Node: &api.Node{ID: "something"}, + RootCert: []byte("new root CA"), + }, + errorMsg: "the root cert and node both changed, but no notification of node change", + }, + { + msg: &api.SessionMessage{ + Node: &api.Node{ID: "something"}, + RootCA: tester.testCA.RootCA.Certs, + }, + errorMsg: "while a node and root cert were provided, nothing has changed so no node changed", + }, + } + + for _, tc := range testcases { + tc.msg.SessionID = currSession.SessionID + tester.dispatcher.SessionMessageChannel() <- tc.msg + if tc.change != nil { + select { + case nodeChange := <-nodeChangeCh: + require.Equal(t, tc.change, nodeChange, tc.errorMsg) + case <-time.After(100 * time.Millisecond): + require.FailNow(t, tc.errorMsg) + } + } else { + select { + case nodeChange := <-nodeChangeCh: + require.FailNow(t, "%s: but got change: %v", tc.errorMsg, nodeChange) + case <-time.After(100 * time.Millisecond): + } + } + } + + currSession, closedSessions = tester.dispatcher.GetSessions() + require.NotEmpty(t, currSession) + require.Empty(t, closedSessions) } -func agentTestEnv(t *testing.T) (*Agent, func()) { +// when the node description changes, the session is restarted and propagated up to the dispatcher +func TestSessionRestartedOnNodeDescriptionChange(t *testing.T) { + tlsCh := make(chan events.Event, 1) + defer close(tlsCh) + tester := agentTestEnv(t, nil, tlsCh) + defer tester.cleanup() + + currSession, closedSessions := tester.dispatcher.GetSessions() + require.NotNil(t, currSession) + require.NotNil(t, currSession.Description) + require.Empty(t, closedSessions) + + tester.executor.UpdateNodeDescription(&api.NodeDescription{ + Hostname: "testAgent", + }) + var gotSession *api.SessionRequest + require.NoError(t, raftutils.PollFuncWithTimeout(nil, func() error { + gotSession, closedSessions = tester.dispatcher.GetSessions() + if gotSession == nil || len(closedSessions) != 1 { + return errors.New("session has not been restarted yet") + } + return nil + }, 1*time.Second)) + require.NotEqual(t, currSession, gotSession) + require.NotNil(t, gotSession.Description) + require.Equal(t, "testAgent", gotSession.Description.Hostname) + currSession = gotSession + + newTLSInfo := &api.NodeTLSInfo{ + TrustRoot: testutils.ECDSA256SHA256Cert, + CertIssuerPublicKey: []byte("public key"), + CertIssuerSubject: []byte("subject"), + } + tlsCh <- newTLSInfo + require.NoError(t, raftutils.PollFuncWithTimeout(nil, func() error { + gotSession, closedSessions = tester.dispatcher.GetSessions() + if gotSession == nil || len(closedSessions) != 2 { + return errors.New("session has not been restarted yet") + } + return nil + }, 1*time.Second)) + require.NotEqual(t, currSession, gotSession) + require.NotNil(t, gotSession.Description) + require.Equal(t, "testAgent", gotSession.Description.Hostname) + require.Equal(t, newTLSInfo, gotSession.Description.TLSInfo) +} + +type agentTester struct { + agent *Agent + dispatcher *agentutils.MockDispatcher + executor *agentutils.TestExecutor + cleanup func() + testCA *testutils.TestCA +} + +func agentTestEnv(t *testing.T, nodeChangeCh chan *NodeChanges, tlsChangeCh chan events.Event) *agentTester { var cleanup []func() tc := testutils.NewTestCA(t) - cleanup = append(cleanup, func() { tc.Stop() }) + cleanup = append(cleanup, tc.Stop) agentSecurityConfig, err := tc.NewNodeConfig(ca.WorkerRole) require.NoError(t, err) + managerSecurityConfig, err := tc.NewNodeConfig(ca.ManagerRole) + require.NoError(t, err) - addr := "localhost:4949" - remotes := remotes.NewRemotes(api.Peer{Addr: addr}) + mockDispatcher, mockDispatcherStop := agentutils.NewMockDispatcher(t, managerSecurityConfig) + cleanup = append(cleanup, mockDispatcherStop) + + remotes := remotes.NewRemotes(api.Peer{Addr: mockDispatcher.Addr}) db, cleanupStorage := storageTestEnv(t) cleanup = append(cleanup, func() { cleanupStorage() }) + executor := &agentutils.TestExecutor{} + agent, err := New(&Config{ - Executor: &NoopExecutor{}, - ConnBroker: connectionbroker.New(remotes), - Credentials: agentSecurityConfig.ClientTLSCreds, - DB: db, + Executor: executor, + ConnBroker: connectionbroker.New(remotes), + Credentials: agentSecurityConfig.ClientTLSCreds, + DB: db, + NotifyNodeChange: nodeChangeCh, + NotifyTLSChange: tlsChangeCh, + NodeTLSInfo: &api.NodeTLSInfo{ + TrustRoot: tc.RootCA.Certs, + CertIssuerPublicKey: agentSecurityConfig.IssuerInfo().PublicKey, + CertIssuerSubject: agentSecurityConfig.IssuerInfo().Subject, + }, }) require.NoError(t, err) - return agent, func() { - for i := len(cleanup) - 1; i >= 0; i-- { - cleanup[i]() - } + agent.nodeUpdatePeriod = 200 * time.Millisecond + + go agent.Start(context.Background()) + cleanup = append(cleanup, func() { + agent.Stop(context.Background()) + }) + + getErr := make(chan error) + go func() { + getErr <- agent.Err(context.Background()) + }() + select { + case err := <-getErr: + require.FailNow(t, "starting agent errored with: %v", err) + case <-agent.Ready(): + case <-time.After(5 * time.Second): + require.FailNow(t, "agent not ready within 5 seconds") + } + + return &agentTester{ + agent: agent, + dispatcher: mockDispatcher, + executor: executor, + testCA: tc, + cleanup: func() { + // go in reverse order + for i := len(cleanup) - 1; i >= 0; i-- { + cleanup[i]() + } + }, } } diff --git a/agent/config.go b/agent/config.go index de9359842e..70c94ecb65 100644 --- a/agent/config.go +++ b/agent/config.go @@ -2,6 +2,7 @@ package agent import ( "github.com/boltdb/bolt" + "github.com/docker/go-events" "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/connectionbroker" @@ -9,6 +10,13 @@ import ( "google.golang.org/grpc/credentials" ) +// NodeChanges encapsulates changes that should be made to the node as per session messages +// from the dispatcher +type NodeChanges struct { + Node *api.Node + RootCert []byte +} + // Config provides values for an Agent. type Config struct { // Hostname the name of host for agent instance. @@ -25,10 +33,16 @@ type Config struct { DB *bolt.DB // NotifyNodeChange channel receives new node changes from session messages. - NotifyNodeChange chan<- *api.Node + NotifyNodeChange chan<- *NodeChanges + + // NotifyTLSChange channel sends new TLS information changes, which can cause a session to restart + NotifyTLSChange <-chan events.Event // Credentials is credentials for grpc connection to manager. Credentials credentials.TransportCredentials + + // NodeTLSInfo contains the starting node TLS info to bootstrap into the agent + NodeTLSInfo *api.NodeTLSInfo } func (c *Config) validate() error { @@ -44,5 +58,9 @@ func (c *Config) validate() error { return errors.New("agent: database required") } + if c.NodeTLSInfo == nil { + return errors.New("agent: Node TLS info is required") + } + return nil } diff --git a/agent/testutils/fakes.go b/agent/testutils/fakes.go new file mode 100644 index 0000000000..31625bbac8 --- /dev/null +++ b/agent/testutils/fakes.go @@ -0,0 +1,219 @@ +package testutils + +import ( + "net" + "sync" + "testing" + "time" + + "google.golang.org/grpc" + + "github.com/docker/swarmkit/agent/exec" + "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/ca" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" +) + +// TestExecutor is executor for integration tests +type TestExecutor struct { + mu sync.Mutex + desc *api.NodeDescription +} + +// Describe just returns empty NodeDescription. +func (e *TestExecutor) Describe(ctx context.Context) (*api.NodeDescription, error) { + e.mu.Lock() + defer e.mu.Unlock() + if e.desc == nil { + return &api.NodeDescription{}, nil + } + return e.desc.Copy(), nil +} + +// Configure does nothing. +func (e *TestExecutor) Configure(ctx context.Context, node *api.Node) error { + return nil +} + +// SetNetworkBootstrapKeys does nothing. +func (e *TestExecutor) SetNetworkBootstrapKeys([]*api.EncryptionKey) error { + return nil +} + +// Controller returns TestController. +func (e *TestExecutor) Controller(t *api.Task) (exec.Controller, error) { + return &TestController{ + ch: make(chan struct{}), + }, nil +} + +// UpdateNodeDescription sets the node description on the test executor +func (e *TestExecutor) UpdateNodeDescription(newDesc *api.NodeDescription) { + e.mu.Lock() + defer e.mu.Unlock() + e.desc = newDesc +} + +// TestController is dummy channel based controller for tests. +type TestController struct { + ch chan struct{} + closeOnce sync.Once +} + +// Update does nothing. +func (t *TestController) Update(ctx context.Context, task *api.Task) error { + return nil +} + +// Prepare does nothing. +func (t *TestController) Prepare(ctx context.Context) error { + return nil +} + +// Start does nothing. +func (t *TestController) Start(ctx context.Context) error { + return nil +} + +// Wait waits on internal channel. +func (t *TestController) Wait(ctx context.Context) error { + select { + case <-t.ch: + case <-ctx.Done(): + } + return nil +} + +// Shutdown closes internal channel +func (t *TestController) Shutdown(ctx context.Context) error { + t.closeOnce.Do(func() { + close(t.ch) + }) + return nil +} + +// Terminate closes internal channel if it wasn't closed before. +func (t *TestController) Terminate(ctx context.Context) error { + t.closeOnce.Do(func() { + close(t.ch) + }) + return nil +} + +// Remove does nothing. +func (t *TestController) Remove(ctx context.Context) error { + return nil +} + +// Close does nothing. +func (t *TestController) Close() error { + t.closeOnce.Do(func() { + close(t.ch) + }) + return nil +} + +// MockDispatcher is a fake dispatcher that one agent at a time can connect to +type MockDispatcher struct { + mu sync.Mutex + sessionCh chan *api.SessionMessage + openSession *api.SessionRequest + closedSessions []*api.SessionRequest + + Addr string +} + +// UpdateTaskStatus is not implemented +func (m *MockDispatcher) UpdateTaskStatus(context.Context, *api.UpdateTaskStatusRequest) (*api.UpdateTaskStatusResponse, error) { + panic("not implemented") +} + +// Tasks keeps an open stream until canceled +func (m *MockDispatcher) Tasks(_ *api.TasksRequest, stream api.Dispatcher_TasksServer) error { + select { + case <-stream.Context().Done(): + } + return nil +} + +// Assignments keeps an open stream until canceled +func (m *MockDispatcher) Assignments(_ *api.AssignmentsRequest, stream api.Dispatcher_AssignmentsServer) error { + select { + case <-stream.Context().Done(): + } + return nil +} + +// Heartbeat always successfully heartbeats +func (m *MockDispatcher) Heartbeat(context.Context, *api.HeartbeatRequest) (*api.HeartbeatResponse, error) { + return &api.HeartbeatResponse{Period: time.Second * 5}, nil +} + +// Session allows a session to be established, and sends the node info +func (m *MockDispatcher) Session(r *api.SessionRequest, stream api.Dispatcher_SessionServer) error { + m.mu.Lock() + m.openSession = r + m.mu.Unlock() + defer func() { + m.mu.Lock() + defer m.mu.Unlock() + m.closedSessions = append(m.closedSessions, m.openSession) + m.openSession = nil + }() + + // send the initial message first + if err := stream.Send(&api.SessionMessage{ + SessionID: r.SessionID, + Managers: []*api.WeightedPeer{ + { + Peer: &api.Peer{Addr: m.Addr}, + }, + }, + }); err != nil { + return err + } + + ctx := stream.Context() + for { + select { + case msg := <-m.sessionCh: + msg.SessionID = r.SessionID + if err := stream.Send(msg); err != nil { + return err + } + case <-ctx.Done(): + return nil + } + } +} + +// GetSessions return all the established and closed sessions +func (m *MockDispatcher) GetSessions() (*api.SessionRequest, []*api.SessionRequest) { + m.mu.Lock() + defer m.mu.Unlock() + return m.openSession, m.closedSessions +} + +// SessionMessageChannel returns a writable channel to inject session messages +func (m *MockDispatcher) SessionMessageChannel() chan<- *api.SessionMessage { + return m.sessionCh +} + +// NewMockDispatcher starts and returns a mock dispatcher instance that can be connected to +func NewMockDispatcher(t *testing.T, secConfig *ca.SecurityConfig) (*MockDispatcher, func()) { + l, err := net.Listen("tcp", "127.0.0.1:0") + addr := l.Addr().String() + require.NoError(t, err) + + serverOpts := []grpc.ServerOption{grpc.Creds(secConfig.ServerTLSCreds)} + s := grpc.NewServer(serverOpts...) + + m := &MockDispatcher{ + Addr: addr, + sessionCh: make(chan *api.SessionMessage, 1), + } + api.RegisterDispatcherServer(s, m) + go s.Serve(l) + return m, s.Stop +} diff --git a/codecov.yml b/codecov.yml index b03ce3d879..aa409fa9e0 100644 --- a/codecov.yml +++ b/codecov.yml @@ -8,3 +8,5 @@ coverage: enabled: yes target: 0 changes: false +ignore: + -**/testutils diff --git a/integration/exec.go b/integration/exec.go deleted file mode 100644 index 43d1e0de10..0000000000 --- a/integration/exec.go +++ /dev/null @@ -1,94 +0,0 @@ -package integration - -import ( - "sync" - - "github.com/docker/swarmkit/agent/exec" - "github.com/docker/swarmkit/api" - "golang.org/x/net/context" -) - -// TestExecutor is executor for integration tests -type TestExecutor struct { -} - -// Describe just returns empty NodeDescription. -func (e *TestExecutor) Describe(ctx context.Context) (*api.NodeDescription, error) { - return &api.NodeDescription{}, nil -} - -// Configure does nothing. -func (e *TestExecutor) Configure(ctx context.Context, node *api.Node) error { - return nil -} - -// SetNetworkBootstrapKeys does nothing. -func (e *TestExecutor) SetNetworkBootstrapKeys([]*api.EncryptionKey) error { - return nil -} - -// Controller returns TestController. -func (e *TestExecutor) Controller(t *api.Task) (exec.Controller, error) { - return &TestController{ - ch: make(chan struct{}), - }, nil -} - -// TestController is dummy channel based controller for tests. -type TestController struct { - ch chan struct{} - closeOnce sync.Once -} - -// Update does nothing. -func (t *TestController) Update(ctx context.Context, task *api.Task) error { - return nil -} - -// Prepare does nothing. -func (t *TestController) Prepare(ctx context.Context) error { - return nil -} - -// Start does nothing. -func (t *TestController) Start(ctx context.Context) error { - return nil -} - -// Wait waits on internal channel. -func (t *TestController) Wait(ctx context.Context) error { - select { - case <-t.ch: - case <-ctx.Done(): - } - return nil -} - -// Shutdown closes internal channel -func (t *TestController) Shutdown(ctx context.Context) error { - t.closeOnce.Do(func() { - close(t.ch) - }) - return nil -} - -// Terminate closes internal channel if it wasn't closed before. -func (t *TestController) Terminate(ctx context.Context) error { - t.closeOnce.Do(func() { - close(t.ch) - }) - return nil -} - -// Remove does nothing. -func (t *TestController) Remove(ctx context.Context) error { - return nil -} - -// Close does nothing. -func (t *TestController) Close() error { - t.closeOnce.Do(func() { - close(t.ch) - }) - return nil -} diff --git a/integration/node.go b/integration/node.go index 64dad8409a..d751bf01f9 100644 --- a/integration/node.go +++ b/integration/node.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc" + agentutils "github.com/docker/swarmkit/agent/testutils" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/ca" "github.com/docker/swarmkit/identity" @@ -40,7 +41,7 @@ func newTestNode(joinAddr, joinToken string, lateBind bool, rootCA *ca.RootCA) ( ListenControlAPI: cAddr, JoinAddr: joinAddr, StateDir: tmpDir, - Executor: &TestExecutor{}, + Executor: &agentutils.TestExecutor{}, JoinToken: joinToken, } if !lateBind {