Skip to content

Commit

Permalink
feat(responseassembler): add response skipping
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Sep 29, 2021
1 parent ce3951d commit b7be601
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 13 deletions.
36 changes: 25 additions & 11 deletions responsemanager/responseassembler/peerlinktracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@ import (
)

type peerLinkTracker struct {
linkTrackerLk sync.RWMutex
linkTracker *linktracker.LinkTracker
altTrackers map[string]*linktracker.LinkTracker
dedupKeys map[graphsync.RequestID]string
linkTrackerLk sync.RWMutex
linkTracker *linktracker.LinkTracker
altTrackers map[string]*linktracker.LinkTracker
dedupKeys map[graphsync.RequestID]string
blockSentCount map[graphsync.RequestID]int64
skipFirstBlocks map[graphsync.RequestID]int64
}

func newTracker() *peerLinkTracker {
return &peerLinkTracker{
linkTracker: linktracker.New(),
dedupKeys: make(map[graphsync.RequestID]string),
altTrackers: make(map[string]*linktracker.LinkTracker),
linkTracker: linktracker.New(),
dedupKeys: make(map[graphsync.RequestID]string),
altTrackers: make(map[string]*linktracker.LinkTracker),
blockSentCount: make(map[graphsync.RequestID]int64),
skipFirstBlocks: make(map[graphsync.RequestID]int64),
}
}

Expand Down Expand Up @@ -54,6 +58,12 @@ func (prs *peerLinkTracker) IgnoreBlocks(requestID graphsync.RequestID, links []
prs.linkTrackerLk.Unlock()
}

func (prs *peerLinkTracker) SkipFirstBlocks(requestID graphsync.RequestID, blocksToSkip int64) {
prs.linkTrackerLk.Lock()
prs.skipFirstBlocks[requestID] = blocksToSkip
prs.linkTrackerLk.Unlock()
}

// FinishTracking clears link tracking data for the request.
func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool {
prs.linkTrackerLk.Lock()
Expand All @@ -74,16 +84,20 @@ func (prs *peerLinkTracker) FinishTracking(requestID graphsync.RequestID) bool {
delete(prs.altTrackers, key)
}
}
delete(prs.blockSentCount, requestID)
delete(prs.skipFirstBlocks, requestID)
return allBlocks
}

// RecordLinkTraversal records whether a link is found for a request.
func (prs *peerLinkTracker) RecordLinkTraversal(requestID graphsync.RequestID,
link ipld.Link, hasBlock bool) (isUnique bool) {
link ipld.Link, hasBlock bool) bool {
prs.linkTrackerLk.Lock()
defer prs.linkTrackerLk.Unlock()
prs.blockSentCount[requestID]++
notSkipped := prs.skipFirstBlocks[requestID] < prs.blockSentCount[requestID]
linkTracker := prs.getLinkTracker(requestID)
isUnique = linkTracker.BlockRefCount(link) == 0
isUnique := linkTracker.BlockRefCount(link) == 0
linkTracker.RecordLinkTraversal(requestID, link, hasBlock)
prs.linkTrackerLk.Unlock()
return
return hasBlock && notSkipped && isUnique
}
4 changes: 2 additions & 2 deletions responsemanager/responseassembler/responseBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func (rb *responseBuilder) AddNotifee(notifee notifications.Notifee) {
func (rb *responseBuilder) setupBlockOperation(
link ipld.Link, data []byte) blockOperation {
hasBlock := data != nil
isUnique := rb.linkTracker.RecordLinkTraversal(rb.requestID, link, hasBlock)
send := rb.linkTracker.RecordLinkTraversal(rb.requestID, link, hasBlock)
return blockOperation{
data, hasBlock && isUnique, link, rb.requestID,
data, send, link, rb.requestID,
}
}

Expand Down
5 changes: 5 additions & 0 deletions responsemanager/responseassembler/responseassembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (ra *ResponseAssembler) IgnoreBlocks(p peer.ID, requestID graphsync.Request
ra.GetProcess(p).(*peerLinkTracker).IgnoreBlocks(requestID, links)
}

// SkipFirstBlocks tells the assembler for the given request to not send the first N blocks
func (ra *ResponseAssembler) SkipFirstBlocks(p peer.ID, requestID graphsync.RequestID, skipFirstBlocks int64) {
ra.GetProcess(p).(*peerLinkTracker).SkipFirstBlocks(requestID, skipFirstBlocks)
}

// Transaction builds a response, and queues it for sending in the next outgoing message
func (ra *ResponseAssembler) Transaction(p peer.ID, requestID graphsync.RequestID, transaction Transaction) error {
rb := &responseBuilder{
Expand Down
78 changes: 78 additions & 0 deletions responsemanager/responseassembler/responseassembler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,84 @@ func TestResponseAssemblerIgnoreBlocks(t *testing.T) {

}

func TestResponseAssemblerSkipFirstBlocks(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
p := testutil.GeneratePeers(1)[0]
requestID1 := graphsync.RequestID(rand.Int31())
requestID2 := graphsync.RequestID(rand.Int31())
blks := testutil.GenerateBlocksOfSize(5, 100)
links := make([]ipld.Link, 0, len(blks))
for _, block := range blks {
links = append(links, cidlink.Link{Cid: block.Cid()})
}
fph := newFakePeerHandler(ctx, t)
responseAssembler := New(ctx, fph)

responseAssembler.SkipFirstBlocks(p, requestID1, 3)

var bd1, bd2, bd3, bd4, bd5 graphsync.BlockData
err := responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error {
bd1 = b.SendResponse(links[0], blks[0].RawData())
return nil
})
require.NoError(t, err)

assertSentNotOnWire(t, bd1, blks[0])
fph.RefuteBlocks()
fph.AssertResponses(expectedResponses{requestID1: graphsync.PartialResponse})

err = responseAssembler.Transaction(p, requestID2, func(b ResponseBuilder) error {
bd1 = b.SendResponse(links[0], blks[0].RawData())
return nil
})
require.NoError(t, err)
fph.AssertResponses(expectedResponses{
requestID2: graphsync.PartialResponse,
})

err = responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error {
bd2 = b.SendResponse(links[1], blks[1].RawData())
bd3 = b.SendResponse(links[2], blks[2].RawData())
return nil
})
require.NoError(t, err)

assertSentNotOnWire(t, bd1, blks[0])
assertSentNotOnWire(t, bd2, blks[1])
assertSentNotOnWire(t, bd3, blks[2])

fph.RefuteBlocks()
fph.AssertResponses(expectedResponses{
requestID1: graphsync.PartialResponse,
})
err = responseAssembler.Transaction(p, requestID1, func(b ResponseBuilder) error {
bd4 = b.SendResponse(links[3], blks[3].RawData())
bd5 = b.SendResponse(links[4], blks[4].RawData())
b.FinishRequest()
return nil
})
require.NoError(t, err)

assertSentOnWire(t, bd4, blks[3])
assertSentOnWire(t, bd5, blks[4])

fph.AssertBlocks(blks[3], blks[4])
fph.AssertResponses(expectedResponses{requestID1: graphsync.RequestCompletedFull})

err = responseAssembler.Transaction(p, requestID2, func(b ResponseBuilder) error {
b.SendResponse(links[3], blks[3].RawData())
b.FinishRequest()
return nil
})
require.NoError(t, err)

fph.AssertBlocks(blks[3])
fph.AssertResponses(expectedResponses{requestID2: graphsync.RequestCompletedFull})

}

func TestResponseAssemblerDupKeys(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
Expand Down

0 comments on commit b7be601

Please sign in to comment.