diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index fca063e0ccfb..9b26f919a462 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -197,6 +197,7 @@ func (tp *bcrTestPeer) TrySend(chID byte, msgBytes []byte) bool { return true } +func (tp *bcrTestPeer) FlushStop() {} func (tp *bcrTestPeer) Send(chID byte, msgBytes []byte) bool { return tp.TrySend(chID, msgBytes) } func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{} } func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} } diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 80fc53ddb59f..a2cb81736d82 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -85,6 +85,7 @@ type MConnection struct { config MConnConfig quit chan struct{} + done chan struct{} flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. pingTimer *cmn.RepeatTimer // send pings periodically @@ -191,6 +192,7 @@ func (c *MConnection) OnStart() error { return err } c.quit = make(chan struct{}) + c.done = make(chan struct{}) c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval) c.pongTimeoutCh = make(chan bool, 1) @@ -200,15 +202,56 @@ func (c *MConnection) OnStart() error { return nil } -// OnStop implements BaseService -func (c *MConnection) OnStop() { +// FlushStop replicates the logic of OnStop, +// but flushes all pending msgs before closing the connection. +func (c *MConnection) FlushStop() { c.BaseService.OnStop() c.flushTimer.Stop() c.pingTimer.Stop() c.chStatsTimer.Stop() if c.quit != nil { close(c.quit) + // wait until the sendRoutine exits + // so we dont race on calling sendSomePacketMsgs + <-c.done + } + + // Send and flush all pending msgs. + // By now, IsRunning == false, + // so any concurrent attempts to send will fail. + // Since sendRoutine has exited, we can call this + // safely + eof := c.sendSomePacketMsgs() + for !eof { + eof = c.sendSomePacketMsgs() + } + c.flush() + + // Now we can close the connection + c.conn.Close() // nolint: errcheck + + // We can't close pong safely here because + // recvRoutine may write to it after we've stopped. + // Though it doesn't need to get closed at all, + // we close it @ recvRoutine. + + // c.Stop() +} + +// OnStop implements BaseService +func (c *MConnection) OnStop() { + select { + case <-c.quit: + // already quit via FlushStop + return + default: } + + c.BaseService.OnStop() + c.flushTimer.Stop() + c.pingTimer.Stop() + c.chStatsTimer.Stop() + close(c.quit) c.conn.Close() // nolint: errcheck // We can't close pong safely here because @@ -366,6 +409,7 @@ FOR_LOOP: c.sendMonitor.Update(int(_n)) c.flush() case <-c.quit: + close(c.done) break FOR_LOOP case <-c.send: // Send some PacketMsgs diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 59fe0d1df2ad..e951f00ad3f6 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -36,6 +36,56 @@ func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msg return c } +func TestMConnectionSendAndStop(t *testing.T) { + server, client := NetPipe() + defer server.Close() // nolint: errcheck + defer client.Close() // nolint: errcheck + + clientConn := createTestMConnection(client) + err := clientConn.Start() + require.Nil(t, err) + defer clientConn.Stop() + + msg := []byte("abc") + assert.True(t, clientConn.Send(0x01, msg)) + + aminoMsgLength := 14 + + // start the reader in a new routine, so we can flush + errCh := make(chan error) + go func() { + msgB := make([]byte, aminoMsgLength) + _, err := server.Read(msgB) + if err != nil { + t.Fatal(err) + } + errCh <- err + }() + + // stop the conn - it should flush all conns + clientConn.FlushStop() + + timer := time.NewTimer(3 * time.Second) + select { + case <-errCh: + case <-timer.C: + t.Error("timed out waiting for msgs to be read") + } +} + +func TestNetPipe(t *testing.T) { + server, client := NetPipe() + clientConn := createTestMConnection(client) + err := clientConn.Start() + require.Nil(t, err) + go func() { + m := make([]byte, 4) + server.Read(m) + }() + clientConn.Send(0x1, []byte("ABCD")) + +} + func TestMConnectionSend(t *testing.T) { server, client := NetPipe() defer server.Close() // nolint: errcheck diff --git a/p2p/peer.go b/p2p/peer.go index e98c16d26179..f248ced2c1c4 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -17,6 +17,7 @@ const metricsTickerDuration = 10 * time.Second // Peer is an interface representing a peer connected on a reactor. type Peer interface { cmn.Service + FlushStop() ID() ID // peer's cryptographic ID RemoteIP() net.IP // remote IP of the connection @@ -184,6 +185,13 @@ func (p *peer) OnStart() error { return nil } +// OnStop implements BaseService. +func (p *peer) FlushStop() { + p.metricsTicker.Stop() + p.BaseService.OnStop() + p.mconn.FlushStop() // stop everything and close the conn +} + // OnStop implements BaseService. func (p *peer) OnStop() { p.metricsTicker.Stop() diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index daa9b2c821ac..04b877b0d703 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -18,6 +18,7 @@ type mockPeer struct { id ID } +func (mp *mockPeer) FlushStop() { mp.Stop() } func (mp *mockPeer) TrySend(chID byte, msgBytes []byte) bool { return true } func (mp *mockPeer) Send(chID byte, msgBytes []byte) bool { return true } func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} } diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 46a12c48813b..54dc308a45c5 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -221,6 +221,7 @@ func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { // 2) limit the output size if r.config.SeedMode { r.SendAddrs(src, r.book.GetSelectionWithBias(biasToSelectNewPeers)) + src.FlushStop() r.Switch.StopPeerGracefully(src) } else { r.SendAddrs(src, r.book.GetSelection()) diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 9d3f49bba538..7302bd90af4a 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -40,7 +40,7 @@ func TestPEXReactorBasic(t *testing.T) { assert.NotEmpty(t, r.GetChannels()) } -func TestPEXReactorAddRemovePeer(t *testing.T) { +func TestPEXReactorAddRemoveneer(t *testing.T) { r, book := createReactor(&PEXReactorConfig{}) defer teardownReactor(book) @@ -387,6 +387,7 @@ func newMockPeer() mockPeer { return mp } +func (mp mockPeer) FlushStop() { mp.Stop() } func (mp mockPeer) ID() p2p.ID { return mp.addr.ID } func (mp mockPeer) IsOutbound() bool { return mp.outbound } func (mp mockPeer) IsPersistent() bool { return mp.persistent }