From a4f1ac3a93a23d703c2d715f3335493de6ed66c9 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 29 Sep 2021 15:42:59 -0700 Subject: [PATCH] feat(responsemanager): process do not send first blocks define extension and add processing to response manager --- donotsendfirstblocks/donotsendfirstblocks.go | 28 +++++++++++++++ graphsync.go | 4 +++ responsemanager/client.go | 1 + responsemanager/querypreparer.go | 22 ++++++++++++ responsemanager/responsemanager_test.go | 37 ++++++++++++++++++++ 5 files changed, 92 insertions(+) create mode 100644 donotsendfirstblocks/donotsendfirstblocks.go diff --git a/donotsendfirstblocks/donotsendfirstblocks.go b/donotsendfirstblocks/donotsendfirstblocks.go new file mode 100644 index 00000000..a6ee8a42 --- /dev/null +++ b/donotsendfirstblocks/donotsendfirstblocks.go @@ -0,0 +1,28 @@ +package donotsendfirstblocks + +import ( + basicnode "github.com/ipld/go-ipld-prime/node/basic" + + "github.com/ipfs/go-graphsync/ipldutil" +) + +// EncodeDoNotSendFirstBlocks returns encoded cbor data for the given number +// of blocks to skip +func EncodeDoNotSendFirstBlocks(skipBlockCount int64) ([]byte, error) { + nb := basicnode.Prototype.Int.NewBuilder() + err := nb.AssignInt(skipBlockCount) + if err != nil { + return nil, err + } + nd := nb.Build() + return ipldutil.EncodeNode(nd) +} + +// DecodeDoNotSendFirstBlocks returns the number of blocks to skip +func DecodeDoNotSendFirstBlocks(data []byte) (int64, error) { + nd, err := ipldutil.DecodeNode(data) + if err != nil { + return 0, err + } + return nd.AsInt() +} diff --git a/graphsync.go b/graphsync.go index 599d98bc..17f2dc07 100644 --- a/graphsync.go +++ b/graphsync.go @@ -45,6 +45,10 @@ const ( // https://github.com/ipld/specs/blob/master/block-layer/graphsync/known_extensions.md ExtensionDoNotSendCIDs = ExtensionName("graphsync/do-not-send-cids") + // ExtensionsDoNotSendFirstBlocks tells the responding peer not to wait till the given + // number of blocks have been traversed before it begins to send blocks over the wire + ExtensionsDoNotSendFirstBlocks = ExtensionName("graphsync/do-not-send-first-blocks") + // ExtensionDeDupByKey tells the responding peer to only deduplicate block sending // for requests that have the same key. The data for the extension is a string key ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key") diff --git a/responsemanager/client.go b/responsemanager/client.go index d0a20dba..6f2cd63c 100644 --- a/responsemanager/client.go +++ b/responsemanager/client.go @@ -126,6 +126,7 @@ type NetworkErrorListeners interface { type ResponseAssembler interface { DedupKey(p peer.ID, requestID graphsync.RequestID, key string) IgnoreBlocks(p peer.ID, requestID graphsync.RequestID, links []ipld.Link) + SkipFirstBlocks(p peer.ID, requestID graphsync.RequestID, skipCount int64) Transaction(p peer.ID, requestID graphsync.RequestID, transaction responseassembler.Transaction) error } diff --git a/responsemanager/querypreparer.go b/responsemanager/querypreparer.go index 811e285f..be12c43b 100644 --- a/responsemanager/querypreparer.go +++ b/responsemanager/querypreparer.go @@ -12,6 +12,7 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/dedupkey" + "github.com/ipfs/go-graphsync/donotsendfirstblocks" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/notifications" @@ -62,6 +63,9 @@ func (qe *queryPreparer) prepareQuery(ctx context.Context, if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil { return nil, nil, false, err } + if err := qe.processDoNotSendFirstBlocks(request, p, failNotifee); err != nil { + return nil, nil, false, err + } rootLink := cidlink.Link{Cid: request.Root()} linkSystem := result.CustomLinkSystem if linkSystem.StorageReadOpener == nil { @@ -120,3 +124,21 @@ func (qe *queryPreparer) processDoNoSendCids(request gsmsg.GraphSyncRequest, p p qe.responseAssembler.IgnoreBlocks(p, request.ID(), links) return nil } + +func (qe *queryPreparer) processDoNotSendFirstBlocks(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error { + doNotSendFirstBlocksData, has := request.Extension(graphsync.ExtensionsDoNotSendFirstBlocks) + if !has { + return nil + } + skipCount, err := donotsendfirstblocks.DecodeDoNotSendFirstBlocks(doNotSendFirstBlocksData) + if err != nil { + _ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { + rb.FinishWithError(graphsync.RequestFailedUnknown) + rb.AddNotifee(failNotifee) + return nil + }) + return err + } + qe.responseAssembler.SkipFirstBlocks(p, request.ID(), skipCount) + return nil +} diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 0d0c04f9..ef5267f1 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -20,6 +20,7 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/cidset" "github.com/ipfs/go-graphsync/dedupkey" + "github.com/ipfs/go-graphsync/donotsendfirstblocks" "github.com/ipfs/go-graphsync/listeners" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/messagequeue" @@ -348,6 +349,29 @@ func TestValidationAndExtensions(t *testing.T) { td.assertCompleteRequestWith(graphsync.RequestCompletedFull) td.assertIgnoredCids(set) }) + + t.Run("do-not-send-first-blocks extension", func(t *testing.T) { + td := newTestData(t) + defer td.cancel() + responseManager := td.newResponseManager() + responseManager.Startup() + td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + hookActions.ValidateRequest() + }) + data, err := donotsendfirstblocks.EncodeDoNotSendFirstBlocks(4) + require.NoError(t, err) + requests := []gsmsg.GraphSyncRequest{ + gsmsg.NewRequest(td.requestID, td.blockChain.TipLink.(cidlink.Link).Cid, td.blockChain.Selector(), graphsync.Priority(0), + graphsync.ExtensionData{ + Name: graphsync.ExtensionsDoNotSendFirstBlocks, + Data: data, + }), + } + responseManager.ProcessRequests(td.ctx, td.p, requests) + td.assertCompleteRequestWith(graphsync.RequestCompletedFull) + td.assertSkippedFirstBlocks(4) + }) + t.Run("dedup-by-key extension", func(t *testing.T) { td := newTestData(t) defer td.cancel() @@ -797,6 +821,7 @@ type fakeResponseAssembler struct { pausedRequests chan pausedRequest clearedRequests chan clearedRequest ignoredLinks chan []ipld.Link + skippedFirstBlocks chan int64 notifeePublisher *testutil.MockPublisher dedupKeys chan string missingBlock bool @@ -812,6 +837,9 @@ func (fra *fakeResponseAssembler) IgnoreBlocks(p peer.ID, requestID graphsync.Re fra.ignoredLinks <- links } +func (fra *fakeResponseAssembler) SkipFirstBlocks(p peer.ID, requestID graphsync.RequestID, skipCount int64) { + fra.skippedFirstBlocks <- skipCount +} func (fra *fakeResponseAssembler) DedupKey(p peer.ID, requestID graphsync.RequestID, key string) { fra.dedupKeys <- key } @@ -947,6 +975,7 @@ type testData struct { pausedRequests chan pausedRequest clearedRequests chan clearedRequest ignoredLinks chan []ipld.Link + skippedFirstBlocks chan int64 dedupKeys chan string responseAssembler *fakeResponseAssembler queryQueue *fakeQueryQueue @@ -995,6 +1024,7 @@ func newTestData(t *testing.T) testData { td.pausedRequests = make(chan pausedRequest, 1) td.clearedRequests = make(chan clearedRequest, 1) td.ignoredLinks = make(chan []ipld.Link, 1) + td.skippedFirstBlocks = make(chan int64, 1) td.dedupKeys = make(chan string, 1) td.blockSends = make(chan graphsync.BlockData, td.blockChainLength*2) td.completedResponseStatuses = make(chan graphsync.ResponseStatusCode, 1) @@ -1007,6 +1037,7 @@ func newTestData(t *testing.T) testData { pausedRequests: td.pausedRequests, clearedRequests: td.clearedRequests, ignoredLinks: td.ignoredLinks, + skippedFirstBlocks: td.skippedFirstBlocks, dedupKeys: td.dedupKeys, notifeePublisher: td.notifeePublisher, } @@ -1170,6 +1201,12 @@ func (td *testData) assertIgnoredCids(set *cid.Set) { } } +func (td *testData) assertSkippedFirstBlocks(expectedSkipCount int64) { + var skippedFirstBlocks int64 + testutil.AssertReceive(td.ctx, td.t, td.skippedFirstBlocks, &skippedFirstBlocks, "should skip blocks") + require.Equal(td.t, expectedSkipCount, skippedFirstBlocks) +} + func (td *testData) notifyStatusMessagesSent() { td.notifeePublisher.PublishMatchingEvents(func(data notifications.TopicData) bool { _, isSn := data.(graphsync.ResponseStatusCode)