Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Fix: don't ignore received blocks for pending wants #174

Merged
merged 12 commits into from
Aug 23, 2019
11 changes: 8 additions & 3 deletions session/sessionwants.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package session

import (
"math"
"math/rand"
"sync"
"time"
Expand All @@ -19,6 +20,8 @@ type sessionWants struct {
// measures latency. It returns the CIDs of blocks that were actually wanted
// (as opposed to duplicates) and the total latency for all incoming blocks.
func (sw *sessionWants) BlocksReceived(cids []cid.Cid) ([]cid.Cid, time.Duration) {
now := time.Now()

sw.Lock()
defer sw.Unlock()

Expand All @@ -31,7 +34,7 @@ func (sw *sessionWants) BlocksReceived(cids []cid.Cid) ([]cid.Cid, time.Duration
// If the block CID was in the live wants queue, remove it
tval, ok := sw.liveWants[c]
if ok {
totalLatency += time.Since(tval)
totalLatency += now.Sub(tval)
delete(sw.liveWants, c)
} else {
// Otherwise remove it from the toFetch queue, if it was there
Expand Down Expand Up @@ -131,13 +134,15 @@ func (sw *sessionWants) LiveWants() []cid.Cid {

// RandomLiveWant returns a randomly selected live want
func (sw *sessionWants) RandomLiveWant() cid.Cid {
r := rand.Float64()

sw.RLock()
defer sw.RUnlock()

if len(sw.liveWants) == 0 {
return cid.Cid{}
}
i := rand.Intn(len(sw.liveWants))
i := math.Floor(r * float64(len(sw.liveWants)))
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
// picking a random live want
for k := range sw.liveWants {
if i == 0 {
Expand Down Expand Up @@ -171,7 +176,7 @@ func (sw *sessionWants) FilterInteresting(ks []cid.Cid) []cid.Cid {
sw.RLock()
defer sw.RUnlock()

interested := make([]cid.Cid, 0, len(ks))
var interested []cid.Cid
for _, k := range ks {
if sw.unlockedIsWanted(k) || sw.pastWants.Has(k) {
interested = append(interested, k)
Expand Down
8 changes: 1 addition & 7 deletions sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,9 @@ func (sm *SessionManager) ReceiveFrom(from peer.ID, ks []cid.Cid) {
sm.sessLk.Lock()
defer sm.sessLk.Unlock()

var wg sync.WaitGroup
for _, s := range sm.sessions {
wg.Add(1)
go func() {
defer wg.Done()
s.session.ReceiveFrom(from, ks)
}()
s.session.ReceiveFrom(from, ks)
}
wg.Wait()
}

// IsWanted indicates whether any of the sessions are waiting to receive
Expand Down
31 changes: 0 additions & 31 deletions sessionmanager/sessionmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,37 +145,6 @@ func TestAddingSessions(t *testing.T) {
}
}

func TestReceivingBlocksWhenNotInterested(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
notif := notifications.New()
defer notif.Shutdown()
sm := New(ctx, sessionFactory, peerManagerFactory, requestSplitterFactory, notif)

p := peer.ID(123)
blks := testutil.GenerateBlocksOfSize(3, 1024)
var cids []cid.Cid
for _, b := range blks {
cids = append(cids, b.Cid())
}

nextWanted = []cid.Cid{cids[0], cids[1]}
firstSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
nextWanted = []cid.Cid{cids[0]}
secondSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)
nextWanted = []cid.Cid{}
thirdSession := sm.NewSession(ctx, time.Second, delay.Fixed(time.Minute)).(*fakeSession)

sm.ReceiveFrom(p, []cid.Cid{blks[0].Cid(), blks[1].Cid()})

if !cmpSessionCids(firstSession, []cid.Cid{cids[0], cids[1]}) ||
!cmpSessionCids(secondSession, []cid.Cid{cids[0]}) ||
!cmpSessionCids(thirdSession, []cid.Cid{}) {
t.Fatal("did not receive correct blocks for sessions")
}
}

func TestIsWanted(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
Expand Down