Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

wait for sessionWantSender to shutdown before completing session shutdown #317

Merged
merged 3 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func New(ctx context.Context,
periodicSearchDelay: periodicSearchDelay,
self: self,
}
s.sws = newSessionWantSender(ctx, id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted)
s.sws = newSessionWantSender(id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted)

go s.run(ctx)

Expand Down Expand Up @@ -387,6 +387,9 @@ func (s *Session) handleShutdown() {
s.idleTick.Stop()
// Shut down the session peer manager
s.sprm.Shutdown()
// Shut down the sessionWantSender (blocks until sessionWantSender stops
// sending)
s.sws.Shutdown()
// Remove the session from the want manager
s.wm.RemoveSession(s.ctx, s.id)
}
Expand Down
32 changes: 24 additions & 8 deletions internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,13 @@ type onPeersExhaustedFn func([]cid.Cid)
// consults the peer response tracker (records which peers sent us blocks).
//
type sessionWantSender struct {
// When the context is cancelled, sessionWantSender shuts down
// The context is used when sending wants
ctx context.Context
// Called to shutdown the sessionWantSender
shutdown func()
// The sessionWantSender uses the closed channel to signal when it's
// finished shutting down
closed chan struct{}
// The session ID
sessionID uint64
// A channel that collects incoming changes (events)
Expand All @@ -97,11 +102,14 @@ type sessionWantSender struct {
onPeersExhausted onPeersExhaustedFn
}

func newSessionWantSender(ctx context.Context, sid uint64, pm PeerManager, spm SessionPeerManager,
func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager,
bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn) sessionWantSender {

ctx, cancel := context.WithCancel(context.Background())
sws := sessionWantSender{
ctx: ctx,
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
shutdown: cancel,
closed: make(chan struct{}),
sessionID: sid,
changes: make(chan change, changesBufferSize),
wants: make(map[cid.Cid]*wantInfo),
Expand Down Expand Up @@ -158,12 +166,25 @@ func (sws *sessionWantSender) Run() {
case ch := <-sws.changes:
sws.onChange([]change{ch})
case <-sws.ctx.Done():
sws.shutdown()
// Unregister the session with the PeerManager
sws.pm.UnregisterSession(sws.sessionID)

// Close the 'closed' channel to signal to Shutdown() that the run
// loop has exited
close(sws.closed)
return
}
}
}

// Shutdown the sessionWantSender
func (sws *sessionWantSender) Shutdown() {
// Signal to the run loop to stop processing
sws.shutdown()
// Wait for run loop to complete
<-sws.closed
}

// addChange adds a new change to the queue
func (sws *sessionWantSender) addChange(c change) {
select {
Expand All @@ -172,11 +193,6 @@ func (sws *sessionWantSender) addChange(c change) {
}
}

// shutdown unregisters the session with the PeerManager
func (sws *sessionWantSender) shutdown() {
sws.pm.UnregisterSession(sws.sessionID)
}

// collectChanges collects all the changes that have occurred since the last
// invocation of onChange
func (sws *sessionWantSender) collectChanges(changes []change) []change {
Expand Down
22 changes: 11 additions & 11 deletions internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestSendWants(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -176,7 +176,7 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -234,7 +234,7 @@ func TestReceiveBlock(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -294,7 +294,7 @@ func TestPeerUnavailable(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -360,7 +360,7 @@ func TestPeersExhausted(t *testing.T) {
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}

ep := exhaustedPeers{}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -436,7 +436,7 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) {
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}

ep := exhaustedPeers{}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -484,7 +484,7 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) {
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}

ep := exhaustedPeers{}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -522,7 +522,7 @@ func TestConsecutiveDontHaveLimit(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -578,7 +578,7 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -633,7 +633,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -717,7 +717,7 @@ func TestConsecutiveDontHaveDontRemoveIfHasWantedBlock(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down