Skip to content

Commit

Permalink
feat(responsemanager): process do not send first blocks
Browse files Browse the repository at this point in the history
define extension and add processing to response manager
  • Loading branch information
hannahhoward committed Sep 29, 2021
1 parent b7be601 commit a4f1ac3
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 0 deletions.
28 changes: 28 additions & 0 deletions donotsendfirstblocks/donotsendfirstblocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package donotsendfirstblocks

import (
basicnode "github.com/ipld/go-ipld-prime/node/basic"

"github.com/ipfs/go-graphsync/ipldutil"
)

// EncodeDoNotSendFirstBlocks returns encoded cbor data for the given number
// of blocks to skip
func EncodeDoNotSendFirstBlocks(skipBlockCount int64) ([]byte, error) {
nb := basicnode.Prototype.Int.NewBuilder()
err := nb.AssignInt(skipBlockCount)
if err != nil {
return nil, err
}
nd := nb.Build()
return ipldutil.EncodeNode(nd)
}

// DecodeDoNotSendFirstBlocks returns the number of blocks to skip
func DecodeDoNotSendFirstBlocks(data []byte) (int64, error) {
nd, err := ipldutil.DecodeNode(data)
if err != nil {
return 0, err
}
return nd.AsInt()
}
4 changes: 4 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ const (
// https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md
ExtensionDoNotSendCIDs = ExtensionName("graphsync/do-not-send-cids")

// ExtensionsDoNotSendFirstBlocks tells the responding peer not to wait till the given
// number of blocks have been traversed before it begins to send blocks over the wire
ExtensionsDoNotSendFirstBlocks = ExtensionName("graphsync/do-not-send-first-blocks")

// ExtensionDeDupByKey tells the responding peer to only deduplicate block sending
// for requests that have the same key. The data for the extension is a string key
ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key")
Expand Down
1 change: 1 addition & 0 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type NetworkErrorListeners interface {
type ResponseAssembler interface {
DedupKey(p peer.ID, requestID graphsync.RequestID, key string)
IgnoreBlocks(p peer.ID, requestID graphsync.RequestID, links []ipld.Link)
SkipFirstBlocks(p peer.ID, requestID graphsync.RequestID, skipCount int64)
Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error
}

Expand Down
22 changes: 22 additions & 0 deletions responsemanager/querypreparer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
Expand Down Expand Up @@ -62,6 +63,9 @@ func (qe *queryPreparer) prepareQuery(ctx context.Context,
if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
if err := qe.processDoNotSendFirstBlocks(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
rootLink := cidlink.Link{Cid: request.Root()}
linkSystem := result.CustomLinkSystem
if linkSystem.StorageReadOpener == nil {
Expand Down Expand Up @@ -120,3 +124,21 @@ func (qe *queryPreparer) processDoNoSendCids(request gsmsg.GraphSyncRequest, p p
qe.responseAssembler.IgnoreBlocks(p, request.ID(), links)
return nil
}

func (qe *queryPreparer) processDoNotSendFirstBlocks(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
doNotSendFirstBlocksData, has := request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks)
if !has {
return nil
}
skipCount, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData)
if err != nil {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
return nil
})
return err
}
qe.responseAssembler.SkipFirstBlocks(p, request.ID(), skipCount)
return nil
}
37 changes: 37 additions & 0 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
"github.com/ipfs/go-graphsync/donotsendfirstblocks"
"github.com/ipfs/go-graphsync/listeners"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/messagequeue"
Expand Down Expand Up @@ -348,6 +349,29 @@ func TestValidationAndExtensions(t *testing.T) {
td.assertCompleteRequestWith(graphsync.RequestCompletedFull)
td.assertIgnoredCids(set)
})

t.Run("do-not-send-first-blocks extension", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
responseManager := td.newResponseManager()
responseManager.Startup()
td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})
data, err := donotsendfirstblocks.EncodeDoNotSendFirstBlocks(4)
require.NoError(t, err)
requests := []gsmsg.GraphSyncRequest{
gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0),
graphsync.ExtensionData{
Name: graphsync.ExtensionsDoNotSendFirstBlocks,
Data: data,
}),
}
responseManager.ProcessRequests(td.ctx, td.p, requests)
td.assertCompleteRequestWith(graphsync.RequestCompletedFull)
td.assertSkippedFirstBlocks(4)
})

t.Run("dedup-by-key extension", func(t *testing.T) {
td := newTestData(t)
defer td.cancel()
Expand Down Expand Up @@ -797,6 +821,7 @@ type fakeResponseAssembler struct {
pausedRequests chan pausedRequest
clearedRequests chan clearedRequest
ignoredLinks chan []ipld.Link
skippedFirstBlocks chan int64
notifeePublisher *testutil.MockPublisher
dedupKeys chan string
missingBlock bool
Expand All @@ -812,6 +837,9 @@ func (fra *fakeResponseAssembler) IgnoreBlocks(p peer.ID, requestID graphsync.Re
fra.ignoredLinks <- links
}

func (fra *fakeResponseAssembler) SkipFirstBlocks(p peer.ID, requestID graphsync.RequestID, skipCount int64) {
fra.skippedFirstBlocks <- skipCount
}
func (fra *fakeResponseAssembler) DedupKey(p peer.ID, requestID graphsync.RequestID, key string) {
fra.dedupKeys <- key
}
Expand Down Expand Up @@ -947,6 +975,7 @@ type testData struct {
pausedRequests chan pausedRequest
clearedRequests chan clearedRequest
ignoredLinks chan []ipld.Link
skippedFirstBlocks chan int64
dedupKeys chan string
responseAssembler *fakeResponseAssembler
queryQueue *fakeQueryQueue
Expand Down Expand Up @@ -995,6 +1024,7 @@ func newTestData(t *testing.T) testData {
td.pausedRequests = make(chan pausedRequest, 1)
td.clearedRequests = make(chan clearedRequest, 1)
td.ignoredLinks = make(chan []ipld.Link, 1)
td.skippedFirstBlocks = make(chan int64, 1)
td.dedupKeys = make(chan string, 1)
td.blockSends = make(chan graphsync.BlockData, td.blockChainLength*2)
td.completedResponseStatuses = make(chan graphsync.ResponseStatusCode, 1)
Expand All @@ -1007,6 +1037,7 @@ func newTestData(t *testing.T) testData {
pausedRequests: td.pausedRequests,
clearedRequests: td.clearedRequests,
ignoredLinks: td.ignoredLinks,
skippedFirstBlocks: td.skippedFirstBlocks,
dedupKeys: td.dedupKeys,
notifeePublisher: td.notifeePublisher,
}
Expand Down Expand Up @@ -1170,6 +1201,12 @@ func (td *testData) assertIgnoredCids(set *cid.Set) {
}
}

func (td *testData) assertSkippedFirstBlocks(expectedSkipCount int64) {
var skippedFirstBlocks int64
testutil.AssertReceive(td.ctx, td.t, td.skippedFirstBlocks, &skippedFirstBlocks, "should skip blocks")
require.Equal(td.t, expectedSkipCount, skippedFirstBlocks)
}

func (td *testData) notifyStatusMessagesSent() {
td.notifeePublisher.PublishMatchingEvents(func(data notifications.TopicData) bool {
_, isSn := data.(graphsync.ResponseStatusCode)
Expand Down

0 comments on commit a4f1ac3

Please sign in to comment.