Skip to content
This repository was archived by the owner on Jun 21, 2022. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 83 additions & 21 deletions services/agents/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,33 @@
package agents

import (
"fmt"
"sync"
"sync/atomic"

"github.com/percona/pmm/api/agent"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type Conn struct {
stream agent.Agent_ConnectServer
lastID uint32
l *logrus.Entry
stream agent.Agent_ConnectServer
lastID uint32
l *logrus.Entry
rw sync.RWMutex
subscribers map[uint32][]chan *agent.AgentMessage
requestChan chan *agent.AgentMessage
}

func NewConn(uuid string, stream agent.Agent_ConnectServer) *Conn {
return &Conn{
stream: stream,
l: logrus.WithField("pmm-agent", uuid),
conn := &Conn{
stream: stream,
l: logrus.WithField("pmm-agent", uuid),
subscribers: make(map[uint32][]chan *agent.AgentMessage),
requestChan: make(chan *agent.AgentMessage),
}
// create goroutine to dispatch messages
go conn.startResponseDispatcher()
return conn
}

func (c *Conn) SendAndRecv(toAgent agent.ServerMessagePayload) (*agent.AgentMessage, error) {
Expand All @@ -52,19 +58,75 @@ func (c *Conn) SendAndRecv(toAgent agent.ServerMessagePayload) (*agent.AgentMess
return nil, errors.Wrap(err, "failed to send message to agent")
}

// FIXME Instead of reading the next message and dropping it if it is not what we expect,
// we should wait for the right message.
// We should have a single stream receiver in a separate goroutine,
// and internal subscriptions for IDs.
// https://jira.percona.com/browse/PMM-3158
agentChan := make(chan *agent.AgentMessage)
defer close(agentChan)

agentMessage, err := c.stream.Recv()
if err != nil {
return nil, errors.Wrap(err, "failed to receive message from agent")
}
c.addSubscriber(id, agentChan)
defer c.removeSubscriber(id, agentChan)

agentMessage := <-agentChan
c.l.Debugf("Recv: %s.", agentMessage)
if agentMessage.Id != id {
return nil, status.Errorf(codes.Internal, fmt.Sprintf("expected message from agent with ID %d, got ID %d", id, agentMessage.Id))
}

return agentMessage, nil
}

func (c *Conn) RecvRequestMessage() *agent.AgentMessage {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Conn.RecvRequestMessage should have comment or be unexported

agentMessage := <-c.requestChan
c.l.Debugf("Recv: %s.", agentMessage)
return agentMessage
}

func (c *Conn) startResponseDispatcher() {
for c.stream.Context().Err() != nil {
agentMessage, err := c.stream.Recv()
if err != nil {
c.l.Warnln("Connection closed", err)
return
}

switch agentMessage.GetPayload().(type) {
case *agent.AgentMessage_Ping, *agent.AgentMessage_State:
go func(agentMessage *agent.AgentMessage) {
c.requestChan <- agentMessage
}(agentMessage)
case *agent.AgentMessage_Auth, *agent.AgentMessage_QanData:
c.emit(agentMessage)
}
}
}

func (c *Conn) emit(message *agent.AgentMessage) {
c.rw.RLock()
defer c.rw.RUnlock()
if _, ok := c.subscribers[message.Id]; ok {
for i := range c.subscribers[message.Id] {
go func(subscriber chan *agent.AgentMessage) {
subscriber <- message

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loop variable subscriber captured by func literal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a "possible" race condition – it is a very real one: https://golang.org/doc/faq#closures_and_goroutines
Please add a unit test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already fixed it

}(c.subscribers[message.Id][i])
}
} else {
c.l.Warnf("Unexpected message: %T %s", message, message)
}
}

func (c *Conn) removeSubscriber(id uint32, agentChan chan *agent.AgentMessage) {
c.rw.Lock()
defer c.rw.Unlock()
if _, ok := c.subscribers[id]; ok {
for i := range c.subscribers[id] {
if c.subscribers[id][i] == agentChan {
c.subscribers[id] = append(c.subscribers[id][:i], c.subscribers[id][i+1:]...)
break
}
}
}
}

func (c *Conn) addSubscriber(id uint32, agentChan chan *agent.AgentMessage) {
c.rw.Lock()
defer c.rw.Unlock()
if _, ok := c.subscribers[id]; !ok {
c.subscribers[id] = []chan *agent.AgentMessage{}
}
c.subscribers[id] = append(c.subscribers[id], agentChan)
}