From 729dfe7d3ce2eb5a94d022732e86194aaa6f9bf9 Mon Sep 17 00:00:00 2001 From: Nurlan Moldomurov Date: Mon, 12 Nov 2018 19:21:57 +0600 Subject: [PATCH 1/5] PMM-3158 async communication between pmm-agent and pmm-managed --- services/agents/conn.go | 53 +++++++++++++++++---------- services/agents/message_dispatcher.go | 38 +++++++++++++++++++ 2 files changed, 71 insertions(+), 20 deletions(-) create mode 100644 services/agents/message_dispatcher.go diff --git a/services/agents/conn.go b/services/agents/conn.go index 48d11bc1fc..f1cc6ac2da 100644 --- a/services/agents/conn.go +++ b/services/agents/conn.go @@ -18,7 +18,6 @@ package agents import ( - "fmt" "sync/atomic" "github.com/percona/pmm/api/agent" @@ -29,16 +28,21 @@ import ( ) type Conn struct { - stream agent.Agent_ConnectServer - lastID uint32 - l *logrus.Entry + stream agent.Agent_ConnectServer + lastID uint32 + l *logrus.Entry + messageDispatcher *messageDispatcher } func NewConn(uuid string, stream agent.Agent_ConnectServer) *Conn { - return &Conn{ - stream: stream, - l: logrus.WithField("pmm-agent", uuid), + // Create goroutine to dispatch messages + conn := &Conn{ + stream: stream, + l: logrus.WithField("pmm-agent", uuid), + messageDispatcher: newMessageDispatcher(uuid), } + go conn.startMessageDispatcher() + return conn } func (c *Conn) SendAndRecv(toAgent agent.ServerMessagePayload) (*agent.AgentMessage, error) { @@ -52,19 +56,28 @@ func (c *Conn) SendAndRecv(toAgent agent.ServerMessagePayload) (*agent.AgentMess return nil, errors.Wrap(err, "failed to send message to agent") } - // FIXME Instead of reading the next message and dropping it if it is not what we expect, - // we should wait for the right message. - // We should have a single stream receiver in a separate goroutine, - // and internal subscriptions for IDs. - // https://jira.percona.com/browse/PMM-3158 - - agentMessage, err := c.stream.Recv() - if err != nil { - return nil, errors.Wrap(err, "failed to receive message from agent") - } + agentMessage := c.messageDispatcher.WaitForMessage(id) c.l.Debugf("Recv: %s.", agentMessage) - if agentMessage.Id != id { - return nil, status.Errorf(codes.Internal, fmt.Sprintf("expected message from agent with ID %d, got ID %d", id, agentMessage.Id)) - } + return agentMessage, nil } +func (c *Conn) startMessageDispatcher() { + for { + select { + case <-c.stream.Context().Done(): + c.l.Debugln("Close connection: ", c.lastID) + return + default: + agentMessage, err := c.stream.Recv() + if err != nil { + errorStatus, ok := status.FromError(err) + if ok && errorStatus.Code() == codes.Canceled { + c.l.Debugln("Connection closed from other side") + return + } + c.l.Fatal(errors.Wrap(err, "failed to receive message from agent")) + } + c.messageDispatcher.MessageReceived(agentMessage) + } + } +} diff --git a/services/agents/message_dispatcher.go b/services/agents/message_dispatcher.go new file mode 100644 index 0000000000..63203f25e4 --- /dev/null +++ b/services/agents/message_dispatcher.go @@ -0,0 +1,38 @@ +package agents + +import ( + "sync" + + "github.com/sirupsen/logrus" + + "github.com/percona/pmm/api/agent" +) + +type messageDispatcher struct { + sync.RWMutex + l *logrus.Entry + subscribers map[uint32]chan *agent.AgentMessage +} + +func newMessageDispatcher(uuid string) *messageDispatcher { + return &messageDispatcher{ + subscribers: make(map[uint32]chan *agent.AgentMessage), + l: logrus.WithField("message dispatcher", uuid), + } +} + +func (m *messageDispatcher) MessageReceived(message *agent.AgentMessage) { + m.Lock() + defer m.Unlock() + m.subscribers[message.Id] <- message + delete(m.subscribers, message.Id) +} + +func (m *messageDispatcher) WaitForMessage(id uint32) *agent.AgentMessage { + m.Lock() + agentChan := make(chan *agent.AgentMessage) + defer close(agentChan) + m.subscribers[id] = agentChan + m.Unlock() + return <-agentChan +} From b9bacb365c48035f071fc609258de7f26ea4d29d Mon Sep 17 00:00:00 2001 From: Nurlan Moldomurov Date: Tue, 13 Nov 2018 17:40:22 +0600 Subject: [PATCH 2/5] PMM-3158 refactoring --- services/agents/conn.go | 87 ++++++++++++++++++--------- services/agents/message_dispatcher.go | 38 ------------ 2 files changed, 60 insertions(+), 65 deletions(-) delete mode 100644 services/agents/message_dispatcher.go diff --git a/services/agents/conn.go b/services/agents/conn.go index f1cc6ac2da..095091c6b0 100644 --- a/services/agents/conn.go +++ b/services/agents/conn.go @@ -18,30 +18,30 @@ package agents import ( + "sync" "sync/atomic" "github.com/percona/pmm/api/agent" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) type Conn struct { - stream agent.Agent_ConnectServer - lastID uint32 - l *logrus.Entry - messageDispatcher *messageDispatcher + stream agent.Agent_ConnectServer + lastID uint32 + l *logrus.Entry + rw sync.RWMutex + subscribers map[uint32][]chan *agent.AgentMessage } func NewConn(uuid string, stream agent.Agent_ConnectServer) *Conn { - // Create goroutine to dispatch messages conn := &Conn{ - stream: stream, - l: logrus.WithField("pmm-agent", uuid), - messageDispatcher: newMessageDispatcher(uuid), + stream: stream, + l: logrus.WithField("pmm-agent", uuid), + subscribers: make(map[uint32][]chan *agent.AgentMessage), } - go conn.startMessageDispatcher() + // create goroutine to dispatch messages + go conn.startResponseDispatcher() return conn } @@ -56,28 +56,61 @@ func (c *Conn) SendAndRecv(toAgent agent.ServerMessagePayload) (*agent.AgentMess return nil, errors.Wrap(err, "failed to send message to agent") } - agentMessage := c.messageDispatcher.WaitForMessage(id) + agentChan := make(chan *agent.AgentMessage) + defer close(agentChan) + + c.addSubscriber(id, agentChan) + defer c.removeSubscriber(id, agentChan) + + agentMessage := <-agentChan c.l.Debugf("Recv: %s.", agentMessage) return agentMessage, nil } -func (c *Conn) startMessageDispatcher() { - for { - select { - case <-c.stream.Context().Done(): - c.l.Debugln("Close connection: ", c.lastID) + +func (c *Conn) startResponseDispatcher() { + for c.stream.Context().Err() != nil { + agentMessage, err := c.stream.Recv() + if err != nil { + c.l.Warnln("Connection closed", err) return - default: - agentMessage, err := c.stream.Recv() - if err != nil { - errorStatus, ok := status.FromError(err) - if ok && errorStatus.Code() == codes.Canceled { - c.l.Debugln("Connection closed from other side") - return - } - c.l.Fatal(errors.Wrap(err, "failed to receive message from agent")) + } + c.emit(agentMessage) + } +} + +func (c *Conn) emit(message *agent.AgentMessage) { + c.rw.Lock() + defer c.rw.Unlock() + if _, ok := c.subscribers[message.Id]; ok { + for _, subscriber := range c.subscribers[message.Id] { + go func() { + subscriber <- message + }() + } + } else { + c.l.Warnln("Unexpected message: %T %s", message, message) + } +} + +func (c *Conn) removeSubscriber(id uint32, agentChan chan *agent.AgentMessage) { + c.rw.Lock() + defer c.rw.Unlock() + if _, ok := c.subscribers[id]; ok { + for i := range c.subscribers[id] { + if c.subscribers[id][i] == agentChan { + c.subscribers[id] = append(c.subscribers[id][:i], c.subscribers[id][i+1:]...) + break } - c.messageDispatcher.MessageReceived(agentMessage) } } } + +func (c *Conn) addSubscriber(id uint32, agentChan chan *agent.AgentMessage) { + c.rw.Lock() + defer c.rw.Unlock() + if _, ok := c.subscribers[id]; !ok { + c.subscribers[id] = []chan *agent.AgentMessage{} + } + c.subscribers[id] = append(c.subscribers[id], agentChan) +} diff --git a/services/agents/message_dispatcher.go b/services/agents/message_dispatcher.go deleted file mode 100644 index 63203f25e4..0000000000 --- a/services/agents/message_dispatcher.go +++ /dev/null @@ -1,38 +0,0 @@ -package agents - -import ( - "sync" - - "github.com/sirupsen/logrus" - - "github.com/percona/pmm/api/agent" -) - -type messageDispatcher struct { - sync.RWMutex - l *logrus.Entry - subscribers map[uint32]chan *agent.AgentMessage -} - -func newMessageDispatcher(uuid string) *messageDispatcher { - return &messageDispatcher{ - subscribers: make(map[uint32]chan *agent.AgentMessage), - l: logrus.WithField("message dispatcher", uuid), - } -} - -func (m *messageDispatcher) MessageReceived(message *agent.AgentMessage) { - m.Lock() - defer m.Unlock() - m.subscribers[message.Id] <- message - delete(m.subscribers, message.Id) -} - -func (m *messageDispatcher) WaitForMessage(id uint32) *agent.AgentMessage { - m.Lock() - agentChan := make(chan *agent.AgentMessage) - defer close(agentChan) - m.subscribers[id] = agentChan - m.Unlock() - return <-agentChan -} From a8eb935bca1e5c3f3d4e3f41a562ecb71aa54ce3 Mon Sep 17 00:00:00 2001 From: Nurlan Moldomurov Date: Tue, 13 Nov 2018 18:25:33 +0600 Subject: [PATCH 3/5] PMM-3158 fix tests --- services/agents/conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/agents/conn.go b/services/agents/conn.go index 095091c6b0..9e0bb916a0 100644 --- a/services/agents/conn.go +++ b/services/agents/conn.go @@ -89,7 +89,7 @@ func (c *Conn) emit(message *agent.AgentMessage) { }() } } else { - c.l.Warnln("Unexpected message: %T %s", message, message) + c.l.Warnf("Unexpected message: %T %s", message, message) } } From 3c6e107bc92996ca72dc738584e7924830e49dc2 Mon Sep 17 00:00:00 2001 From: Nurlan Moldomurov Date: Tue, 13 Nov 2018 18:29:41 +0600 Subject: [PATCH 4/5] PMM-3158 fix possible race condition --- services/agents/conn.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/agents/conn.go b/services/agents/conn.go index 9e0bb916a0..66bd0eac9e 100644 --- a/services/agents/conn.go +++ b/services/agents/conn.go @@ -83,10 +83,10 @@ func (c *Conn) emit(message *agent.AgentMessage) { c.rw.Lock() defer c.rw.Unlock() if _, ok := c.subscribers[message.Id]; ok { - for _, subscriber := range c.subscribers[message.Id] { - go func() { + for i := range c.subscribers[message.Id] { + go func(subscriber chan *agent.AgentMessage) { subscriber <- message - }() + }(c.subscribers[message.Id][i]) } } else { c.l.Warnf("Unexpected message: %T %s", message, message) From 97b7dc11da3ae9624d6fde7e3b616b0eda87e45e Mon Sep 17 00:00:00 2001 From: Nurlan Moldomurov Date: Wed, 14 Nov 2018 15:07:32 +0600 Subject: [PATCH 5/5] PMM-3158 added request handler --- services/agents/conn.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/services/agents/conn.go b/services/agents/conn.go index 66bd0eac9e..01ff9df93a 100644 --- a/services/agents/conn.go +++ b/services/agents/conn.go @@ -32,6 +32,7 @@ type Conn struct { l *logrus.Entry rw sync.RWMutex subscribers map[uint32][]chan *agent.AgentMessage + requestChan chan *agent.AgentMessage } func NewConn(uuid string, stream agent.Agent_ConnectServer) *Conn { @@ -39,6 +40,7 @@ func NewConn(uuid string, stream agent.Agent_ConnectServer) *Conn { stream: stream, l: logrus.WithField("pmm-agent", uuid), subscribers: make(map[uint32][]chan *agent.AgentMessage), + requestChan: make(chan *agent.AgentMessage), } // create goroutine to dispatch messages go conn.startResponseDispatcher() @@ -68,6 +70,12 @@ func (c *Conn) SendAndRecv(toAgent agent.ServerMessagePayload) (*agent.AgentMess return agentMessage, nil } +func (c *Conn) RecvRequestMessage() *agent.AgentMessage { + agentMessage := <-c.requestChan + c.l.Debugf("Recv: %s.", agentMessage) + return agentMessage +} + func (c *Conn) startResponseDispatcher() { for c.stream.Context().Err() != nil { agentMessage, err := c.stream.Recv() @@ -75,13 +83,21 @@ func (c *Conn) startResponseDispatcher() { c.l.Warnln("Connection closed", err) return } - c.emit(agentMessage) + + switch agentMessage.GetPayload().(type) { + case *agent.AgentMessage_Ping, *agent.AgentMessage_State: + go func(agentMessage *agent.AgentMessage) { + c.requestChan <- agentMessage + }(agentMessage) + case *agent.AgentMessage_Auth, *agent.AgentMessage_QanData: + c.emit(agentMessage) + } } } func (c *Conn) emit(message *agent.AgentMessage) { - c.rw.Lock() - defer c.rw.Unlock() + c.rw.RLock() + defer c.rw.RUnlock() if _, ok := c.subscribers[message.Id]; ok { for i := range c.subscribers[message.Id] { go func(subscriber chan *agent.AgentMessage) {