From be841567ae16dd891e3c83c0585a16e598246346 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 11 May 2020 14:44:13 +0530 Subject: [PATCH] restart peer handler --- p2p/protocol/identify/id.go | 17 ++++++++--------- p2p/protocol/identify/peer_loop.go | 17 ++++++++++------- p2p/protocol/identify/peer_loop_test.go | 3 +++ 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 6a46a5b795..822ee83dde 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -197,6 +197,7 @@ func (ids *IDService) loop() { mes := &pb.Identify{} ids.populateMessage(mes, rp, addReq.localConnAddr, addReq.remoteConnAddr) ph = newPeerHandler(rp, ids, mes) + ph.start() phs[rp] = ph addReq.resp <- ph } @@ -226,19 +227,17 @@ func (ids *IDService) loop() { case rp := <-phClosedCh: ph := phs[rp] - delete(phs, rp) // If we are connected to the peer, it means that we got a connection from the peer // before we could finish removing it's handler on the previous disconnection. - // If we delete the handler and dont replace it, we wont be able to push updates to it - // till we see a new connection. So, create and register a new handler for it with the state - // initialised to the last message we sent to that peer. + // If we delete the handler, we wont be able to push updates to it + // till we see a new connection. So, we should restart the handler. + // The fact that we got the handler on this channel means that it's context and handler + // have completed because we write the handler to this chanel only after it closed. if ids.Host.Network().Connectedness(rp) == network.Connected { - ph.msgMu.RLock() - mes := ph.idMsgSnapshot - ph.msgMu.RUnlock() - ph = nil - phs[rp] = newPeerHandler(rp, ids, mes) + ph.start() + } else { + delete(phs, rp) } case e, more := <-sub.Out(): diff --git a/p2p/protocol/identify/peer_loop.go b/p2p/protocol/identify/peer_loop.go index 2973e9df95..7fc4021d4b 100644 --- a/p2p/protocol/identify/peer_loop.go +++ b/p2p/protocol/identify/peer_loop.go @@ -38,13 +38,9 @@ type peerHandler struct { } func newPeerHandler(pid peer.ID, ids *IDService, initState *pb.Identify) *peerHandler { - ctx, cancel := context.WithCancel(context.Background()) - ph := &peerHandler{ - ids: ids, - ctx: ctx, - cancel: cancel, - pid: pid, + ids: ids, + pid: pid, idMsgSnapshot: initState, @@ -56,9 +52,16 @@ func newPeerHandler(pid peer.ID, ids *IDService, initState *pb.Identify) *peerHa ph.evalTestCh = make(chan func()) } + return ph +} + +func (ph *peerHandler) start() { + ctx, cancel := context.WithCancel(context.Background()) + ph.ctx = ctx + ph.cancel = cancel + ph.wg.Add(1) go ph.loop() - return ph } func (ph *peerHandler) close() error { diff --git a/p2p/protocol/identify/peer_loop_test.go b/p2p/protocol/identify/peer_loop_test.go index dba724fc7d..6eb3bd6056 100644 --- a/p2p/protocol/identify/peer_loop_test.go +++ b/p2p/protocol/identify/peer_loop_test.go @@ -32,6 +32,8 @@ func TestMakeApplyDelta(t *testing.T) { defer h1.Close() ids1 := NewIDService(h1) ph := newPeerHandler(h1.ID(), ids1, &pb.Identify{}) + ph.start() + defer ph.close() m1 := ph.mkDelta() require.NotNil(t, m1) @@ -88,6 +90,7 @@ func TestHandlerClose(t *testing.T) { defer h1.Close() ids1 := NewIDService(h1) ph := newPeerHandler(h1.ID(), ids1, nil) + ph.start() require.NoError(t, ph.close()) }