Skip to content

Commit

Permalink
p2p/conn: FlushStop. Use in pex. Closes #2092
Browse files Browse the repository at this point in the history
In seed mode, we call StopPeer immediately after Send.
Since flushing msgs to the peer happens in the background,
the peer connection is often closed before the messages are
actually sent out. This allows all msgs to first be written and flushed out
on the conn before it is closed.
  • Loading branch information
ebuchman committed Nov 11, 2018
1 parent 6e9aee5 commit 60253ac
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 3 deletions.
1 change: 1 addition & 0 deletions blockchain/reactor_test.go
Expand Up @@ -197,6 +197,7 @@ func (tp *bcrTestPeer) TrySend(chID byte, msgBytes []byte) bool {
return true
}

func (tp *bcrTestPeer) FlushStop() {}
func (tp *bcrTestPeer) Send(chID byte, msgBytes []byte) bool { return tp.TrySend(chID, msgBytes) }
func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{} }
func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} }
Expand Down
48 changes: 46 additions & 2 deletions p2p/conn/connection.go
Expand Up @@ -85,6 +85,7 @@ type MConnection struct {
config MConnConfig

quit chan struct{}
done chan struct{}
flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
pingTimer *cmn.RepeatTimer // send pings periodically

Expand Down Expand Up @@ -191,6 +192,7 @@ func (c *MConnection) OnStart() error {
return err
}
c.quit = make(chan struct{})
c.done = make(chan struct{})
c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval)
c.pongTimeoutCh = make(chan bool, 1)
Expand All @@ -200,15 +202,56 @@ func (c *MConnection) OnStart() error {
return nil
}

// OnStop implements BaseService
func (c *MConnection) OnStop() {
// FlushStop replicates the logic of OnStop,
// but flushes all pending msgs before closing the connection.
func (c *MConnection) FlushStop() {
c.BaseService.OnStop()
c.flushTimer.Stop()
c.pingTimer.Stop()
c.chStatsTimer.Stop()
if c.quit != nil {
close(c.quit)
// wait until the sendRoutine exits
// so we dont race on calling sendSomePacketMsgs
<-c.done
}

// Send and flush all pending msgs.
// By now, IsRunning == false,
// so any concurrent attempts to send will fail.
// Since sendRoutine has exited, we can call this
// safely
eof := c.sendSomePacketMsgs()
for !eof {
eof = c.sendSomePacketMsgs()
}
c.flush()

// Now we can close the connection
c.conn.Close() // nolint: errcheck

// We can't close pong safely here because
// recvRoutine may write to it after we've stopped.
// Though it doesn't need to get closed at all,
// we close it @ recvRoutine.

// c.Stop()
}

// OnStop implements BaseService
func (c *MConnection) OnStop() {
select {
case <-c.quit:
// already quit via FlushStop
return
default:
}

c.BaseService.OnStop()
c.flushTimer.Stop()
c.pingTimer.Stop()
c.chStatsTimer.Stop()
close(c.quit)
c.conn.Close() // nolint: errcheck

// We can't close pong safely here because
Expand Down Expand Up @@ -366,6 +409,7 @@ FOR_LOOP:
c.sendMonitor.Update(int(_n))
c.flush()
case <-c.quit:
close(c.done)
break FOR_LOOP
case <-c.send:
// Send some PacketMsgs
Expand Down
50 changes: 50 additions & 0 deletions p2p/conn/connection_test.go
Expand Up @@ -36,6 +36,56 @@ func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msg
return c
}

func TestMConnectionSendAndStop(t *testing.T) {
server, client := NetPipe()
defer server.Close() // nolint: errcheck
defer client.Close() // nolint: errcheck

clientConn := createTestMConnection(client)
err := clientConn.Start()
require.Nil(t, err)
defer clientConn.Stop()

msg := []byte("abc")
assert.True(t, clientConn.Send(0x01, msg))

aminoMsgLength := 14

// start the reader in a new routine, so we can flush
errCh := make(chan error)
go func() {
msgB := make([]byte, aminoMsgLength)
_, err := server.Read(msgB)
if err != nil {
t.Fatal(err)
}
errCh <- err
}()

// stop the conn - it should flush all conns
clientConn.FlushStop()

timer := time.NewTimer(3 * time.Second)
select {
case <-errCh:
case <-timer.C:
t.Error("timed out waiting for msgs to be read")
}
}

func TestNetPipe(t *testing.T) {
server, client := NetPipe()
clientConn := createTestMConnection(client)
err := clientConn.Start()
require.Nil(t, err)
go func() {
m := make([]byte, 4)
server.Read(m)
}()
clientConn.Send(0x1, []byte("ABCD"))

}

func TestMConnectionSend(t *testing.T) {
server, client := NetPipe()
defer server.Close() // nolint: errcheck
Expand Down
8 changes: 8 additions & 0 deletions p2p/peer.go
Expand Up @@ -17,6 +17,7 @@ const metricsTickerDuration = 10 * time.Second
// Peer is an interface representing a peer connected on a reactor.
type Peer interface {
cmn.Service
FlushStop()

ID() ID // peer's cryptographic ID
RemoteIP() net.IP // remote IP of the connection
Expand Down Expand Up @@ -184,6 +185,13 @@ func (p *peer) OnStart() error {
return nil
}

// OnStop implements BaseService.
func (p *peer) FlushStop() {
p.metricsTicker.Stop()
p.BaseService.OnStop()
p.mconn.FlushStop() // stop everything and close the conn
}

// OnStop implements BaseService.
func (p *peer) OnStop() {
p.metricsTicker.Stop()
Expand Down
1 change: 1 addition & 0 deletions p2p/peer_set_test.go
Expand Up @@ -18,6 +18,7 @@ type mockPeer struct {
id ID
}

func (mp *mockPeer) FlushStop() { mp.Stop() }
func (mp *mockPeer) TrySend(chID byte, msgBytes []byte) bool { return true }
func (mp *mockPeer) Send(chID byte, msgBytes []byte) bool { return true }
func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} }
Expand Down
1 change: 1 addition & 0 deletions p2p/pex/pex_reactor.go
Expand Up @@ -221,6 +221,7 @@ func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
// 2) limit the output size
if r.config.SeedMode {
r.SendAddrs(src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
src.FlushStop()
r.Switch.StopPeerGracefully(src)
} else {
r.SendAddrs(src, r.book.GetSelection())
Expand Down
3 changes: 2 additions & 1 deletion p2p/pex/pex_reactor_test.go
Expand Up @@ -40,7 +40,7 @@ func TestPEXReactorBasic(t *testing.T) {
assert.NotEmpty(t, r.GetChannels())
}

func TestPEXReactorAddRemovePeer(t *testing.T) {
func TestPEXReactorAddRemoveneer(t *testing.T) {
r, book := createReactor(&PEXReactorConfig{})
defer teardownReactor(book)

Expand Down Expand Up @@ -387,6 +387,7 @@ func newMockPeer() mockPeer {
return mp
}

func (mp mockPeer) FlushStop() { mp.Stop() }
func (mp mockPeer) ID() p2p.ID { return mp.addr.ID }
func (mp mockPeer) IsOutbound() bool { return mp.outbound }
func (mp mockPeer) IsPersistent() bool { return mp.persistent }
Expand Down

0 comments on commit 60253ac

Please sign in to comment.