From 60253acd15b6564a538dd4fa678ac0826792eb85 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sat, 10 Nov 2018 20:15:58 -0500 Subject: [PATCH] p2p/conn: FlushStop. Use in pex. Closes #2092 In seed mode, we call StopPeer immediately after Send. Since flushing msgs to the peer happens in the background, the peer connection is often closed before the messages are actually sent out. This allows all msgs to first be written and flushed out on the conn before it is closed. --- blockchain/reactor_test.go | 1 + p2p/conn/connection.go | 48 +++++++++++++++++++++++++++++++++-- p2p/conn/connection_test.go | 50 +++++++++++++++++++++++++++++++++++++ p2p/peer.go | 8 ++++++ p2p/peer_set_test.go | 1 + p2p/pex/pex_reactor.go | 1 + p2p/pex/pex_reactor_test.go | 3 ++- 7 files changed, 109 insertions(+), 3 deletions(-) diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index fca063e0ccfb..9b26f919a462 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -197,6 +197,7 @@ func (tp *bcrTestPeer) TrySend(chID byte, msgBytes []byte) bool { return true } +func (tp *bcrTestPeer) FlushStop() {} func (tp *bcrTestPeer) Send(chID byte, msgBytes []byte) bool { return tp.TrySend(chID, msgBytes) } func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{} } func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} } diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 80fc53ddb59f..a2cb81736d82 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -85,6 +85,7 @@ type MConnection struct { config MConnConfig quit chan struct{} + done chan struct{} flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. pingTimer *cmn.RepeatTimer // send pings periodically @@ -191,6 +192,7 @@ func (c *MConnection) OnStart() error { return err } c.quit = make(chan struct{}) + c.done = make(chan struct{}) c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval) c.pongTimeoutCh = make(chan bool, 1) @@ -200,15 +202,56 @@ func (c *MConnection) OnStart() error { return nil } -// OnStop implements BaseService -func (c *MConnection) OnStop() { +// FlushStop replicates the logic of OnStop, +// but flushes all pending msgs before closing the connection. +func (c *MConnection) FlushStop() { c.BaseService.OnStop() c.flushTimer.Stop() c.pingTimer.Stop() c.chStatsTimer.Stop() if c.quit != nil { close(c.quit) + // wait until the sendRoutine exits + // so we dont race on calling sendSomePacketMsgs + <-c.done + } + + // Send and flush all pending msgs. + // By now, IsRunning == false, + // so any concurrent attempts to send will fail. + // Since sendRoutine has exited, we can call this + // safely + eof := c.sendSomePacketMsgs() + for !eof { + eof = c.sendSomePacketMsgs() + } + c.flush() + + // Now we can close the connection + c.conn.Close() // nolint: errcheck + + // We can't close pong safely here because + // recvRoutine may write to it after we've stopped. + // Though it doesn't need to get closed at all, + // we close it @ recvRoutine. + + // c.Stop() +} + +// OnStop implements BaseService +func (c *MConnection) OnStop() { + select { + case <-c.quit: + // already quit via FlushStop + return + default: } + + c.BaseService.OnStop() + c.flushTimer.Stop() + c.pingTimer.Stop() + c.chStatsTimer.Stop() + close(c.quit) c.conn.Close() // nolint: errcheck // We can't close pong safely here because @@ -366,6 +409,7 @@ FOR_LOOP: c.sendMonitor.Update(int(_n)) c.flush() case <-c.quit: + close(c.done) break FOR_LOOP case <-c.send: // Send some PacketMsgs diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 59fe0d1df2ad..e951f00ad3f6 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -36,6 +36,56 @@ func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msg return c } +func TestMConnectionSendAndStop(t *testing.T) { + server, client := NetPipe() + defer server.Close() // nolint: errcheck + defer client.Close() // nolint: errcheck + + clientConn := createTestMConnection(client) + err := clientConn.Start() + require.Nil(t, err) + defer clientConn.Stop() + + msg := []byte("abc") + assert.True(t, clientConn.Send(0x01, msg)) + + aminoMsgLength := 14 + + // start the reader in a new routine, so we can flush + errCh := make(chan error) + go func() { + msgB := make([]byte, aminoMsgLength) + _, err := server.Read(msgB) + if err != nil { + t.Fatal(err) + } + errCh <- err + }() + + // stop the conn - it should flush all conns + clientConn.FlushStop() + + timer := time.NewTimer(3 * time.Second) + select { + case <-errCh: + case <-timer.C: + t.Error("timed out waiting for msgs to be read") + } +} + +func TestNetPipe(t *testing.T) { + server, client := NetPipe() + clientConn := createTestMConnection(client) + err := clientConn.Start() + require.Nil(t, err) + go func() { + m := make([]byte, 4) + server.Read(m) + }() + clientConn.Send(0x1, []byte("ABCD")) + +} + func TestMConnectionSend(t *testing.T) { server, client := NetPipe() defer server.Close() // nolint: errcheck diff --git a/p2p/peer.go b/p2p/peer.go index e98c16d26179..f248ced2c1c4 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -17,6 +17,7 @@ const metricsTickerDuration = 10 * time.Second // Peer is an interface representing a peer connected on a reactor. type Peer interface { cmn.Service + FlushStop() ID() ID // peer's cryptographic ID RemoteIP() net.IP // remote IP of the connection @@ -184,6 +185,13 @@ func (p *peer) OnStart() error { return nil } +// OnStop implements BaseService. +func (p *peer) FlushStop() { + p.metricsTicker.Stop() + p.BaseService.OnStop() + p.mconn.FlushStop() // stop everything and close the conn +} + // OnStop implements BaseService. func (p *peer) OnStop() { p.metricsTicker.Stop() diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index daa9b2c821ac..04b877b0d703 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -18,6 +18,7 @@ type mockPeer struct { id ID } +func (mp *mockPeer) FlushStop() { mp.Stop() } func (mp *mockPeer) TrySend(chID byte, msgBytes []byte) bool { return true } func (mp *mockPeer) Send(chID byte, msgBytes []byte) bool { return true } func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} } diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 46a12c48813b..54dc308a45c5 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -221,6 +221,7 @@ func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { // 2) limit the output size if r.config.SeedMode { r.SendAddrs(src, r.book.GetSelectionWithBias(biasToSelectNewPeers)) + src.FlushStop() r.Switch.StopPeerGracefully(src) } else { r.SendAddrs(src, r.book.GetSelection()) diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 9d3f49bba538..7302bd90af4a 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -40,7 +40,7 @@ func TestPEXReactorBasic(t *testing.T) { assert.NotEmpty(t, r.GetChannels()) } -func TestPEXReactorAddRemovePeer(t *testing.T) { +func TestPEXReactorAddRemoveneer(t *testing.T) { r, book := createReactor(&PEXReactorConfig{}) defer teardownReactor(book) @@ -387,6 +387,7 @@ func newMockPeer() mockPeer { return mp } +func (mp mockPeer) FlushStop() { mp.Stop() } func (mp mockPeer) ID() p2p.ID { return mp.addr.ID } func (mp mockPeer) IsOutbound() bool { return mp.outbound } func (mp mockPeer) IsPersistent() bool { return mp.persistent }