Skip to content

Commit

Permalink
restart peer handler
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed May 11, 2020
1 parent f0a0251 commit be84156
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 16 deletions.
17 changes: 8 additions & 9 deletions p2p/protocol/identify/id.go
Expand Up @@ -197,6 +197,7 @@ func (ids *IDService) loop() {
mes := &pb.Identify{}
ids.populateMessage(mes, rp, addReq.localConnAddr, addReq.remoteConnAddr)
ph = newPeerHandler(rp, ids, mes)
ph.start()
phs[rp] = ph
addReq.resp <- ph
}
Expand Down Expand Up @@ -226,19 +227,17 @@ func (ids *IDService) loop() {

case rp := <-phClosedCh:
ph := phs[rp]
delete(phs, rp)

// If we are connected to the peer, it means that we got a connection from the peer
// before we could finish removing it's handler on the previous disconnection.
// If we delete the handler and dont replace it, we wont be able to push updates to it
// till we see a new connection. So, create and register a new handler for it with the state
// initialised to the last message we sent to that peer.
// If we delete the handler, we wont be able to push updates to it
// till we see a new connection. So, we should restart the handler.
// The fact that we got the handler on this channel means that it's context and handler
// have completed because we write the handler to this chanel only after it closed.
if ids.Host.Network().Connectedness(rp) == network.Connected {
ph.msgMu.RLock()
mes := ph.idMsgSnapshot
ph.msgMu.RUnlock()
ph = nil
phs[rp] = newPeerHandler(rp, ids, mes)
ph.start()
} else {
delete(phs, rp)
}

case e, more := <-sub.Out():
Expand Down
17 changes: 10 additions & 7 deletions p2p/protocol/identify/peer_loop.go
Expand Up @@ -38,13 +38,9 @@ type peerHandler struct {
}

func newPeerHandler(pid peer.ID, ids *IDService, initState *pb.Identify) *peerHandler {
ctx, cancel := context.WithCancel(context.Background())

ph := &peerHandler{
ids: ids,
ctx: ctx,
cancel: cancel,
pid: pid,
ids: ids,
pid: pid,

idMsgSnapshot: initState,

Expand All @@ -56,9 +52,16 @@ func newPeerHandler(pid peer.ID, ids *IDService, initState *pb.Identify) *peerHa
ph.evalTestCh = make(chan func())
}

return ph
}

func (ph *peerHandler) start() {
ctx, cancel := context.WithCancel(context.Background())
ph.ctx = ctx
ph.cancel = cancel

ph.wg.Add(1)
go ph.loop()
return ph
}

func (ph *peerHandler) close() error {
Expand Down
3 changes: 3 additions & 0 deletions p2p/protocol/identify/peer_loop_test.go
Expand Up @@ -32,6 +32,8 @@ func TestMakeApplyDelta(t *testing.T) {
defer h1.Close()
ids1 := NewIDService(h1)
ph := newPeerHandler(h1.ID(), ids1, &pb.Identify{})
ph.start()
defer ph.close()

m1 := ph.mkDelta()
require.NotNil(t, m1)
Expand Down Expand Up @@ -88,6 +90,7 @@ func TestHandlerClose(t *testing.T) {
defer h1.Close()
ids1 := NewIDService(h1)
ph := newPeerHandler(h1.ID(), ids1, nil)
ph.start()

require.NoError(t, ph.close())
}
Expand Down

0 comments on commit be84156

Please sign in to comment.