Skip to content

Commit

Permalink
Add Goodbye Message When Disconnecting With Peers (#5589)
Browse files Browse the repository at this point in the history
* add goodbye message
* Merge branch 'master' into addGoodbye
* fix interface
* Merge refs/heads/master into addGoodbye
* use wrapper func
* Merge branch 'addGoodbye' of https://github.com/prysmaticlabs/geth-sharding into addGoodbye
* Merge refs/heads/master into addGoodbye
* Merge refs/heads/master into addGoodbye
* Merge refs/heads/master into addGoodbye
* Merge refs/heads/master into addGoodbye
  • Loading branch information
nisdas committed Apr 23, 2020
1 parent 37b68ba commit 2ebd684
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 14 deletions.
21 changes: 12 additions & 9 deletions beacon-chain/p2p/handshake.go
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"time"

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand All @@ -15,7 +14,8 @@ import (
// AddConnectionHandler adds a callback function which handles the connection with a
// newly added peer. It performs a handshake with that peer by sending a hello request
// and validating the response from the peer.
func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer.ID) error) {
func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer.ID) error,
goodbyeFunc func(ctx context.Context, id peer.ID) error) {
s.host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(net network.Network, conn network.Conn) {
log := log.WithField("peer", conn.RemotePeer().Pretty())
Expand All @@ -28,10 +28,15 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
}
s.peers.Add(nil /* ENR */, conn.RemotePeer(), conn.RemoteMultiaddr(), conn.Stat().Direction)
if len(s.peers.Active()) >= int(s.cfg.MaxPeers) {
log.WithField("reason", "at peer limit").Trace("Ignoring connection request")
if err := s.Disconnect(conn.RemotePeer()); err != nil {
log.WithError(err).Error("Unable to disconnect from peer")
}
go func() {
log.WithField("reason", "at peer limit").Trace("Ignoring connection request")
if err := goodbyeFunc(context.Background(), conn.RemotePeer()); err != nil {
log.WithError(err).Error("Unable to send goodbye message to peer")
}
if err := s.Disconnect(conn.RemotePeer()); err != nil {
log.WithError(err).Error("Unable to disconnect from peer")
}
}()
return
}
if s.peers.IsBad(conn.RemotePeer()) {
Expand All @@ -52,9 +57,7 @@ func (s *Service) AddConnectionHandler(reqFunc func(ctx context.Context, id peer
"activePeers": len(s.peers.Active()),
})
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnecting)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := reqFunc(ctx, conn.RemotePeer()); err != nil && err != io.EOF {
if err := reqFunc(context.Background(), conn.RemotePeer()); err != nil && err != io.EOF {
log.WithError(err).Trace("Handshake failed")
if err.Error() == "protocol not supported" {
// This is only to ensure the smooth running of our testnets. This will not be
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/interfaces.go
Expand Up @@ -37,7 +37,7 @@ type SetStreamHandler interface {

// ConnectionHandler configures p2p to handle connections with a peer.
type ConnectionHandler interface {
AddConnectionHandler(f func(ctx context.Context, id peer.ID) error)
AddConnectionHandler(f func(ctx context.Context, id peer.ID) error, g func(context.Context, peer.ID) error)
AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error)
}

Expand Down
3 changes: 2 additions & 1 deletion beacon-chain/p2p/testing/p2p.go
Expand Up @@ -168,7 +168,8 @@ func (p *TestP2P) PeerID() peer.ID {
}

// AddConnectionHandler handles the connection with a newly connected peer.
func (p *TestP2P) AddConnectionHandler(f func(ctx context.Context, id peer.ID) error) {
func (p *TestP2P) AddConnectionHandler(f func(ctx context.Context, id peer.ID) error,
g func(context.Context, peer.ID) error) {
p.Host.Network().Notify(&network.NotifyBundle{
ConnectedF: func(net network.Network, conn network.Conn) {
// Must be handled in a goroutine as this callback cannot be blocking.
Expand Down
20 changes: 20 additions & 0 deletions beacon-chain/sync/rpc_goodbye.go
Expand Up @@ -6,6 +6,8 @@ import (
"time"

libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
)

const (
Expand Down Expand Up @@ -41,6 +43,24 @@ func (r *Service) goodbyeRPCHandler(ctx context.Context, msg interface{}, stream
return r.p2p.Disconnect(stream.Conn().RemotePeer())
}

func (r *Service) sendGoodByeMessage(ctx context.Context, code uint64, id peer.ID) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

stream, err := r.p2p.Send(ctx, &code, p2p.RPCGoodByeTopic, id)
if err != nil {
return err
}
log := log.WithField("Reason", goodbyeMessage(code))
log.WithField("peer", stream.Conn().RemotePeer()).Debug("Sending Goodbye message to peer")
return nil
}

// sends a goodbye message for a generic error
func (r *Service) sendGenericGoodbyeMessage(ctx context.Context, id peer.ID) error {
return r.sendGoodByeMessage(ctx, codeGenericError, id)
}

func goodbyeMessage(num uint64) string {
reason, ok := goodByes[num]
if ok {
Expand Down
49 changes: 49 additions & 0 deletions beacon-chain/sync/rpc_goodbye_test.go
Expand Up @@ -58,3 +58,52 @@ func TestGoodByeRPCHandler_Disconnects_With_Peer(t *testing.T) {
t.Error("Peer is still not disconnected despite sending a goodbye message")
}
}

func TestSendGoodbye_SendsMessage(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
if len(p1.Host.Network().Peers()) != 1 {
t.Error("Expected peers to be connected")
}

// Set up a head state in the database with data we expect.
d := db.SetupDB(t)
defer db.TeardownDB(t, d)

r := &Service{
db: d,
p2p: p1,
}
failureCode := codeClientShutdown

// Setup streams
pcl := protocol.ID("/eth2/beacon_chain/req/goodbye/1/ssz")
var wg sync.WaitGroup
wg.Add(1)
p2.Host.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
out := new(uint64)
if err := r.p2p.Encoding().DecodeWithLength(stream, out); err != nil {
t.Fatal(err)
}
if *out != failureCode {
t.Fatalf("Wanted goodbye code of %d but got %d", failureCode, *out)
}

})

err := r.sendGoodByeMessage(context.Background(), failureCode, p2.Host.ID())
if err != nil {
t.Errorf("Unxpected error: %v", err)
}

if testutil.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}

conns := p1.Host.Network().ConnsToPeer(p1.Host.ID())
if len(conns) > 0 {
t.Error("Peer is still not disconnected despite sending a goodbye message")
}
}
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_ping.go
Expand Up @@ -18,7 +18,7 @@ func (r *Service) pingHandler(ctx context.Context, msg interface{}, stream libp2
log.WithError(err).Error("Failed to close stream")
}
}()
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
setRPCStreamDeadlines(stream)

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_status_test.go
Expand Up @@ -384,7 +384,7 @@ func TestStatusRPCRequest_RequestSent(t *testing.T) {
}
})

p1.AddConnectionHandler(r.sendRPCStatusRequest)
p1.AddConnectionHandler(r.sendRPCStatusRequest, r.sendGenericGoodbyeMessage)
p1.Connect(p2)

if testutil.WaitTimeout(&wg, 1*time.Second) {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/service.go
Expand Up @@ -135,7 +135,7 @@ func (r *Service) Start() {
panic(err)
}

r.p2p.AddConnectionHandler(r.reValidatePeer)
r.p2p.AddConnectionHandler(r.reValidatePeer, r.sendGenericGoodbyeMessage)
r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus)
r.p2p.AddPingMethod(r.sendPingRequest)
r.processPendingBlocksQueue()
Expand Down

0 comments on commit 2ebd684

Please sign in to comment.