Skip to content

Commit

Permalink
Merge pull request #1335 from halseth/peer-mutex-relaxation
Browse files Browse the repository at this point in the history
Peer mutex relaxation
  • Loading branch information
Roasbeef committed Jun 6, 2018
2 parents 782fd9d + ac1ab6f commit 2b2b83f
Showing 1 changed file with 33 additions and 57 deletions.
90 changes: 33 additions & 57 deletions server.go
Expand Up @@ -1148,33 +1148,19 @@ func (s *server) establishPersistentConnections() error {
}

// BroadcastMessage sends a request to the server to broadcast a set of
// messages to all peers other than the one specified by the `skip` parameter.
// messages to all peers other than the one specified by the `skips` parameter.
//
// NOTE: This function is safe for concurrent access.
func (s *server) BroadcastMessage(skip map[routing.Vertex]struct{},
func (s *server) BroadcastMessage(skips map[routing.Vertex]struct{},
msgs ...lnwire.Message) error {

s.mu.RLock()
defer s.mu.RUnlock()

return s.broadcastMessages(skip, msgs)
}

// broadcastMessages is an internal method that delivers messages to all active
// peers except the one specified by `skip`.
//
// NOTE: This method MUST be called while the server's mutex is locked.
func (s *server) broadcastMessages(
skips map[routing.Vertex]struct{},
msgs []lnwire.Message) error {

srvrLog.Debugf("Broadcasting %v messages", len(msgs))

// Iterate over all known peers, dispatching a go routine to enqueue
// all messages to each of peers. We synchronize access to peersByPub
// throughout this process to ensure we deliver messages to exact set
// of peers present at the time of invocation.
var wg sync.WaitGroup
// Filter out peers found in the skips map. We synchronize access to
// peersByPub throughout this process to ensure we deliver messages to
// exact set of peers present at the time of invocation.
s.mu.RLock()
peers := make([]*peer, 0, len(s.peersByPub))
for _, sPeer := range s.peersByPub {
if skips != nil {
if _, ok := skips[sPeer.pubKeyBytes]; ok {
Expand All @@ -1184,6 +1170,14 @@ func (s *server) broadcastMessages(
}
}

peers = append(peers, sPeer)
}
s.mu.RUnlock()

// Iterate over all known peers, dispatching a go routine to enqueue
// all messages to each of peers.
var wg sync.WaitGroup
for _, sPeer := range peers {
// Dispatch a go routine to enqueue all messages to this peer.
wg.Add(1)
s.wg.Add(1)
Expand All @@ -1205,16 +1199,29 @@ func (s *server) broadcastMessages(
func (s *server) SendToPeer(target *btcec.PublicKey,
msgs ...lnwire.Message) error {

// Queue the incoming messages in the peer's outgoing message buffer.
// We acquire the shared lock here to ensure the peer map doesn't change
// from underneath us.
// Compute the target peer's identifier.
targetPubBytes := target.SerializeCompressed()

srvrLog.Tracef("Attempting to send msgs %v to: %x",
len(msgs), targetPubBytes)

// Lookup intended target in peersByPub, returning an error to the
// caller if the peer is unknown. Access to peersByPub is synchronized
// here to ensure we consider the exact set of peers present at the
// time of invocation.
s.mu.RLock()
targetPeer, errChans, err := s.sendToPeer(target, msgs)
targetPeer, err := s.findPeerByPubStr(string(targetPubBytes))
s.mu.RUnlock()
if err != nil {
if err == ErrPeerNotConnected {
srvrLog.Errorf("unable to send message to %x, "+
"peer is not connected", targetPubBytes)
return err
}

// Send messages to the peer and get the error channels that will be
// signaled by the peer's write handler.
errChans := s.sendPeerMessages(targetPeer, msgs, nil)

// With the server's shared lock released, we now handle all of the
// errors being returned from the target peer's write handler.
for _, errChan := range errChans {
Expand Down Expand Up @@ -1259,37 +1266,6 @@ func (s *server) NotifyWhenOnline(peer *btcec.PublicKey,
s.peerConnectedListeners[pubStr], connectedChan)
}

// sendToPeer is an internal method that queues the given messages in the
// outgoing buffer of the specified `target` peer. Upon success, this method
// returns the peer instance and a slice of error chans that will contain
// responses from the write handler.
func (s *server) sendToPeer(target *btcec.PublicKey,
msgs []lnwire.Message) (*peer, []chan error, error) {

// Compute the target peer's identifier.
targetPubBytes := target.SerializeCompressed()

srvrLog.Tracef("Attempting to send msgs %v to: %x",
len(msgs), targetPubBytes)

// Lookup intended target in peersByPub, returning an error to the
// caller if the peer is unknown. Access to peersByPub is synchronized
// here to ensure we consider the exact set of peers present at the
// time of invocation.
targetPeer, err := s.findPeerByPubStr(string(targetPubBytes))
if err == ErrPeerNotConnected {
srvrLog.Errorf("unable to send message to %x, "+
"peer is not connected", targetPubBytes)
return nil, nil, err
}

// Send messages to the peer and return the error channels that will be
// signaled by the peer's write handler.
errChans := s.sendPeerMessages(targetPeer, msgs, nil)

return targetPeer, errChans, nil
}

// sendPeerMessages enqueues a list of messages into the outgoingQueue of the
// `targetPeer`. This method supports additional broadcast-level
// synchronization by using the additional `wg` to coordinate a particular
Expand Down

0 comments on commit 2b2b83f

Please sign in to comment.