Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Fix order of session broadcast wants #291

Merged
merged 2 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func New(ctx context.Context,
periodicSearchDelay delay.D,
self peer.ID) *Session {
s := &Session{
sw: newSessionWants(),
sw: newSessionWants(broadcastLiveWantsLimit),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
Expand Down Expand Up @@ -433,7 +433,7 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
}

// No peers discovered yet, broadcast some want-haves
ks := s.sw.GetNextWants(broadcastLiveWantsLimit)
ks := s.sw.GetNextWants()
if len(ks) > 0 {
log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks))
s.wm.BroadcastWantHaves(ctx, s.id, ks)
Expand Down
11 changes: 9 additions & 2 deletions internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,19 @@ func TestSessionFindMorePeers(t *testing.T) {
t.Fatal("Did not make second want request ")
}

// Verify a broadcast was made
// The session should keep broadcasting periodically until it receives a response
select {
case receivedWantReq := <-fwm.wantReqs:
if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
t.Fatal("did not rebroadcast whole live list")
}
// Make sure the first block is not included because it has already
// been received
for _, c := range receivedWantReq.cids {
if c.Equals(cids[0]) {
t.Fatal("should not braodcast block that was already received")
}
}
case <-ctx.Done():
t.Fatal("Never rebroadcast want list")
}
Expand Down
71 changes: 55 additions & 16 deletions internal/session/sessionwants.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,29 @@ import (
cid "github.com/ipfs/go-cid"
)

// liveWantsOrder and liveWants will get out of sync as blocks are received.
// This constant is the maximum amount to allow them to be out of sync before
// cleaning up the ordering array.
const liveWantsOrderGCLimit = 32

// sessionWants keeps track of which cids are waiting to be sent out, and which
// peers are "live" - ie, we've sent a request but haven't received a block yet
type sessionWants struct {
toFetch *cidQueue
// The wants that have not yet been sent out
toFetch *cidQueue
// Wants that have been sent but have not received a response
liveWants map[cid.Cid]time.Time
// The order in which wants were requested
liveWantsOrder []cid.Cid
// The maximum number of want-haves to send in a broadcast
broadcastLimit int
}

func newSessionWants() sessionWants {
func newSessionWants(broadcastLimit int) sessionWants {
return sessionWants{
toFetch: newCidQueue(),
liveWants: make(map[cid.Cid]time.Time),
toFetch: newCidQueue(),
liveWants: make(map[cid.Cid]time.Time),
broadcastLimit: broadcastLimit,
}
}

Expand All @@ -33,19 +45,23 @@ func (sw *sessionWants) BlocksRequested(newWants []cid.Cid) {
}
}

// GetNextWants moves as many CIDs from the fetch queue to the live wants
// list as possible (given the limit). Returns the newly live wants.
func (sw *sessionWants) GetNextWants(limit int) []cid.Cid {
// GetNextWants is called when the session has not yet discovered peers with
// the blocks that it wants. It moves as many CIDs from the fetch queue to
// the live wants queue as possible (given the broadcast limit).
// Returns the newly live wants.
func (sw *sessionWants) GetNextWants() []cid.Cid {
now := time.Now()

// Move CIDs from fetch queue to the live wants queue (up to the limit)
// Move CIDs from fetch queue to the live wants queue (up to the broadcast
// limit)
currentLiveCount := len(sw.liveWants)
toAdd := limit - currentLiveCount
toAdd := sw.broadcastLimit - currentLiveCount

var live []cid.Cid
for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
c := sw.toFetch.Pop()
live = append(live, c)
sw.liveWantsOrder = append(sw.liveWantsOrder, c)
sw.liveWants[c] = now
}

Expand All @@ -58,6 +74,7 @@ func (sw *sessionWants) WantsSent(ks []cid.Cid) {
for _, c := range ks {
if _, ok := sw.liveWants[c]; !ok && sw.toFetch.Has(c) {
sw.toFetch.Remove(c)
sw.liveWantsOrder = append(sw.liveWantsOrder, c)
sw.liveWants[c] = now
}
}
Expand All @@ -73,11 +90,13 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
return wanted, totalLatency
}

// Filter for blocks that were actually wanted (as opposed to duplicates)
now := time.Now()
for _, c := range ks {
if sw.isWanted(c) {
wanted = append(wanted, c)

// Measure latency
sentAt, ok := sw.liveWants[c]
if ok && !sentAt.IsZero() {
totalLatency += now.Sub(sentAt)
Expand All @@ -89,21 +108,39 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
}
}

// If the live wants ordering array is a long way out of sync with the
// live wants map, clean up the ordering array
if len(sw.liveWantsOrder)-len(sw.liveWants) > liveWantsOrderGCLimit {
cleaned := sw.liveWantsOrder[:0]
for _, c := range sw.liveWantsOrder {
if _, ok := sw.liveWants[c]; ok {
cleaned = append(cleaned, c)
}
}
sw.liveWantsOrder = cleaned
}
Stebalien marked this conversation as resolved.
Show resolved Hide resolved

return wanted, totalLatency
}

// PrepareBroadcast saves the current time for each live want and returns the
// live want CIDs.
// live want CIDs up to the broadcast limit.
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
// TODO: Change this to return wants in order so that the session will
// send out Find Providers request for the first want
// (Note that maps return keys in random order)
now := time.Now()
live := make([]cid.Cid, 0, len(sw.liveWants))
for c := range sw.liveWants {
live = append(live, c)
sw.liveWants[c] = now
for _, c := range sw.liveWantsOrder {
if _, ok := sw.liveWants[c]; ok {
// No response was received for the want, so reset the sent time
// to now as we're about to broadcast
sw.liveWants[c] = now

live = append(live, c)
if len(live) == sw.broadcastLimit {
break
}
}
}

return live
}

Expand All @@ -120,9 +157,11 @@ func (sw *sessionWants) LiveWants() []cid.Cid {
for c := range sw.liveWants {
live = append(live, c)
}

return live
}

// RandomLiveWant returns a randomly selected live want
func (sw *sessionWants) RandomLiveWant() cid.Cid {
if len(sw.liveWants) == 0 {
return cid.Cid{}
Expand Down
87 changes: 83 additions & 4 deletions internal/session/sessionwants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func TestEmptySessionWants(t *testing.T) {
sw := newSessionWants()
sw := newSessionWants(broadcastLiveWantsLimit)

// Expect these functions to return nothing on a new sessionWants
lws := sw.PrepareBroadcast()
Expand All @@ -29,7 +29,7 @@ func TestEmptySessionWants(t *testing.T) {
}

func TestSessionWants(t *testing.T) {
sw := newSessionWants()
sw := newSessionWants(5)
cids := testutil.GenerateCids(10)
others := testutil.GenerateCids(1)

Expand All @@ -42,7 +42,7 @@ func TestSessionWants(t *testing.T) {
// The first 5 cids should go move into the live queue
// toFetch Live
// 98765 43210
nextw := sw.GetNextWants(5)
nextw := sw.GetNextWants()
if len(nextw) != 5 {
t.Fatal("expected 5 next wants")
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestSessionWants(t *testing.T) {
// Should move 2 wants from toFetch queue to live wants
// toFetch Live
// 987__ 65432
nextw = sw.GetNextWants(5)
nextw = sw.GetNextWants()
if len(nextw) != 2 {
t.Fatal("expected 2 next wants")
}
Expand Down Expand Up @@ -108,3 +108,82 @@ func TestSessionWants(t *testing.T) {
t.Fatal("expected 4 live wants")
}
}

func TestPrepareBroadcast(t *testing.T) {
sw := newSessionWants(3)
cids := testutil.GenerateCids(10)

// Add 6 new wants
// toFetch Live
// 543210
sw.BlocksRequested(cids[:6])

// Get next wants with a limit of 3
// The first 3 cids should go move into the live queue
// toFetch Live
// 543 210
sw.GetNextWants()

// Broadcast should contain wants in order
for i := 0; i < 10; i++ {
ws := sw.PrepareBroadcast()
if len(ws) != 3 {
t.Fatal("should broadcast all live wants")
}
for idx, c := range ws {
if !c.Equals(cids[idx]) {
t.Fatal("broadcast should always return wants in order")
}
}
}

// One block received
// Remove a cid from the live queue
sw.BlocksReceived(cids[:1])
// toFetch Live
// 543 21_

// Add 4 new wants
// toFetch Live
// 9876543 21
sw.BlocksRequested(cids[6:])

// 2 Wants sent
// toFetch Live
// 98765 4321
sw.WantsSent(cids[3:5])

// Broadcast should contain wants in order
cids = cids[1:]
for i := 0; i < 10; i++ {
ws := sw.PrepareBroadcast()
if len(ws) != 3 {
t.Fatal("should broadcast live wants up to limit", len(ws), len(cids))
}
for idx, c := range ws {
if !c.Equals(cids[idx]) {
t.Fatal("broadcast should always return wants in order")
}
}
}
}

// Test that even after GC broadcast returns correct wants
func TestPrepareBroadcastAfterGC(t *testing.T) {
sw := newSessionWants(5)
cids := testutil.GenerateCids(liveWantsOrderGCLimit * 2)

sw.BlocksRequested(cids)

// Trigger a sessionWants internal GC of the live wants
sw.BlocksReceived(cids[:liveWantsOrderGCLimit+1])
cids = cids[:liveWantsOrderGCLimit+1]

// Broadcast should contain wants in order
ws := sw.PrepareBroadcast()
for i, c := range ws {
if !c.Equals(cids[i]) {
t.Fatal("broadcast should always return wants in order")
}
}
}