From 957951706c7ac116c07ca381dfd17e7ffc0aeb6a Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 21 Jun 2021 11:21:16 -0700 Subject: [PATCH 1/9] WIP --- responsemanager/queryexecutor.go | 2 ++ responsemanager/responsemanager.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index 9e7d6054..d08c677c 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -104,6 +104,7 @@ func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) case qe.messages <- &setResponseDataRequest{key, loader, traverser}: } if isPaused { + log.Debug("Request is paused on initial setup: %d", key.requestID) return graphsync.RequestPaused, hooks.ErrPaused{} } } @@ -113,6 +114,7 @@ func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) func (qe *queryExecutor) prepareQuery(ctx context.Context, p peer.ID, request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) { + log.Debug("Processing request hooks for request: %d", request.ID) result := qe.requestHooks.ProcessRequestHooks(p, request) var transactionError error var isPaused bool diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index b212da20..3be4cb12 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -362,6 +362,7 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync } func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { + log.Debug("Unpause request called: %d", requestID) key := responseKey{p, requestID} inProgressResponse, ok := rm.inProgressResponses[key] if !ok { @@ -484,6 +485,7 @@ func (ftr *finishTaskRequest) handle(rm *ResponseManager) { return } if _, ok := ftr.err.(hooks.ErrPaused); ok { + log.Debug("Request finished on pause: %d", ftr.key.requestID) response.isPaused = true return } From 5fe8b46ac9f800b7042ba973f8515dea6feffcf7 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 21 Jun 2021 12:11:20 -0700 Subject: [PATCH 2/9] WIP --- go.mod | 1 - impl/graphsync.go | 2 +- messagequeue/messagequeue.go | 2 +- network/libp2p_impl.go | 2 +- .../asyncloader/responsecache/responsecache.go | 2 +- requestmanager/requestmanager.go | 17 +++++++++-------- responsemanager/queryexecutor.go | 4 ++-- .../responseassembler/responseBuilder.go | 2 +- responsemanager/responsemanager.go | 6 +++--- 9 files changed, 19 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index da7302a1..78810232 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,6 @@ require ( github.com/ipfs/go-ipfs-util v0.0.2 github.com/ipfs/go-ipld-cbor v0.0.4 // indirect github.com/ipfs/go-ipld-format v0.2.0 - github.com/ipfs/go-log v1.0.4 github.com/ipfs/go-log/v2 v2.1.1 github.com/ipfs/go-merkledag v0.3.1 github.com/ipfs/go-peertaskqueue v0.2.0 diff --git a/impl/graphsync.go b/impl/graphsync.go index 3fc5c4c6..76d6b7dc 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -3,7 +3,7 @@ package graphsync import ( "context" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-peertaskqueue" ipld "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index 745974b9..9f5a0750 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -7,7 +7,7 @@ import ( "sync" "time" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/peer" gsmsg "github.com/ipfs/go-graphsync/message" diff --git a/network/libp2p_impl.go b/network/libp2p_impl.go index 727ab9f0..9a29bea9 100644 --- a/network/libp2p_impl.go +++ b/network/libp2p_impl.go @@ -6,7 +6,7 @@ import ( "io" "time" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" diff --git a/requestmanager/asyncloader/responsecache/responsecache.go b/requestmanager/asyncloader/responsecache/responsecache.go index a2288267..1eb9d9fe 100644 --- a/requestmanager/asyncloader/responsecache/responsecache.go +++ b/requestmanager/asyncloader/responsecache/responsecache.go @@ -5,7 +5,7 @@ import ( "sync" blocks "github.com/ipfs/go-block-format" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" diff --git a/requestmanager/requestmanager.go b/requestmanager/requestmanager.go index 02497c7d..08615168 100644 --- a/requestmanager/requestmanager.go +++ b/requestmanager/requestmanager.go @@ -4,13 +4,14 @@ import ( "context" "errors" "fmt" + "sync/atomic" + "github.com/hannahhoward/go-pubsub" "golang.org/x/xerrors" - "sync/atomic" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/libp2p/go-libp2p-core/peer" @@ -66,12 +67,12 @@ type AsyncLoader interface { // RequestManager tracks outgoing requests and processes incoming reponses // to them. type RequestManager struct { - ctx context.Context - cancel func() - messages chan requestManagerMessage - peerHandler PeerHandler - rc *responseCollector - asyncLoader AsyncLoader + ctx context.Context + cancel func() + messages chan requestManagerMessage + peerHandler PeerHandler + rc *responseCollector + asyncLoader AsyncLoader disconnectNotif *pubsub.PubSub // dont touch out side of run loop nextRequestID graphsync.RequestID diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index d08c677c..58774716 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -104,7 +104,7 @@ func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) case qe.messages <- &setResponseDataRequest{key, loader, traverser}: } if isPaused { - log.Debug("Request is paused on initial setup: %d", key.requestID) + log.Infof("Request is paused on initial setup: %d", key.requestID) return graphsync.RequestPaused, hooks.ErrPaused{} } } @@ -114,7 +114,7 @@ func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) func (qe *queryExecutor) prepareQuery(ctx context.Context, p peer.ID, request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) { - log.Debug("Processing request hooks for request: %d", request.ID) + log.Infof("Processing request hooks for request: %d", request.ID) result := qe.requestHooks.ProcessRequestHooks(p, request) var transactionError error var isPaused bool diff --git a/responsemanager/responseassembler/responseBuilder.go b/responsemanager/responseassembler/responseBuilder.go index a18c83e6..7ccee6cf 100644 --- a/responsemanager/responseassembler/responseBuilder.go +++ b/responsemanager/responseassembler/responseBuilder.go @@ -2,7 +2,7 @@ package responseassembler import ( blocks "github.com/ipfs/go-block-format" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index 3be4cb12..c2987df0 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -6,7 +6,7 @@ import ( "math" "time" - logging "github.com/ipfs/go-log" + logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-peertaskqueue/peertask" ipld "github.com/ipld/go-ipld-prime" "github.com/libp2p/go-libp2p-core/peer" @@ -362,7 +362,7 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync } func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { - log.Debug("Unpause request called: %d", requestID) + log.Infof("Unpause request called: %d", requestID) key := responseKey{p, requestID} inProgressResponse, ok := rm.inProgressResponses[key] if !ok { @@ -485,7 +485,7 @@ func (ftr *finishTaskRequest) handle(rm *ResponseManager) { return } if _, ok := ftr.err.(hooks.ErrPaused); ok { - log.Debug("Request finished on pause: %d", ftr.key.requestID) + log.Infof("Request finished on pause: %d", ftr.key.requestID) response.isPaused = true return } From 2e300c58528a34b348db1ee01a6464dd24ac52b3 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 21 Jun 2021 12:21:57 -0700 Subject: [PATCH 3/9] WIP --- responsemanager/queryexecutor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index 58774716..16627b2a 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -114,7 +114,7 @@ func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) func (qe *queryExecutor) prepareQuery(ctx context.Context, p peer.ID, request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) { - log.Infof("Processing request hooks for request: %d", request.ID) + log.Infof("Processing request hooks for request: %d", request.ID()) result := qe.requestHooks.ProcessRequestHooks(p, request) var transactionError error var isPaused bool From e14f19016f812897e7e72ccac1b97d49c2e216b6 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 21 Jun 2021 16:31:51 -0700 Subject: [PATCH 4/9] WIP --- responsemanager/queryexecutor.go | 122 +----------------------- responsemanager/querypreparer.go | 115 ++++++++++++++++++++++ responsemanager/responsemanager.go | 24 +++-- responsemanager/responsemanager_test.go | 21 ++++ 4 files changed, 155 insertions(+), 127 deletions(-) create mode 100644 responsemanager/querypreparer.go diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index 16627b2a..affb9058 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -6,14 +6,10 @@ import ( "strings" "time" - "github.com/ipfs/go-cid" ipld "github.com/ipld/go-ipld-prime" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/libp2p/go-libp2p-core/peer" "github.com/ipfs/go-graphsync" - "github.com/ipfs/go-graphsync/cidset" - "github.com/ipfs/go-graphsync/dedupkey" "github.com/ipfs/go-graphsync/ipldutil" gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/notifications" @@ -72,7 +68,7 @@ func (qe *queryExecutor) processQueriesWorker() { log.Info("Empty task on peer request stack") continue } - status, err := qe.executeTask(key, taskData) + status, err := qe.executeQuery(key.p, taskData.request, taskData.loader, taskData.traverser, taskData.signals, taskData.subscriber) isCancelled := err != nil && isContextErr(err) if isCancelled { qe.cancelledListeners.NotifyCancelledListeners(key.p, taskData.request) @@ -83,123 +79,7 @@ func (qe *queryExecutor) processQueriesWorker() { } } qe.queryQueue.TasksDone(pid, tasks...) - - } - -} - -func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) (graphsync.ResponseStatusCode, error) { - var err error - loader := taskData.loader - traverser := taskData.traverser - if loader == nil || traverser == nil { - var isPaused bool - loader, traverser, isPaused, err = qe.prepareQuery(taskData.ctx, key.p, taskData.request, taskData.signals, taskData.subscriber) - if err != nil { - return graphsync.RequestFailedUnknown, err - } - select { - case <-qe.ctx.Done(): - return graphsync.RequestFailedUnknown, errors.New("context cancelled") - case qe.messages <- &setResponseDataRequest{key, loader, traverser}: - } - if isPaused { - log.Infof("Request is paused on initial setup: %d", key.requestID) - return graphsync.RequestPaused, hooks.ErrPaused{} - } - } - return qe.executeQuery(key.p, taskData.request, loader, traverser, taskData.signals, taskData.subscriber) -} - -func (qe *queryExecutor) prepareQuery(ctx context.Context, - p peer.ID, - request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) { - log.Infof("Processing request hooks for request: %d", request.ID()) - result := qe.requestHooks.ProcessRequestHooks(p, request) - var transactionError error - var isPaused bool - failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub} - err := qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { - for _, extension := range result.Extensions { - rb.SendExtensionData(extension) - } - if result.Err != nil || !result.IsValidated { - rb.FinishWithError(graphsync.RequestFailedUnknown) - rb.AddNotifee(failNotifee) - transactionError = errors.New("request not valid") - } else if result.IsPaused { - rb.PauseRequest() - isPaused = true - } - return nil - }) - if err != nil { - return nil, nil, false, err - } - if transactionError != nil { - return nil, nil, false, transactionError - } - if err := qe.processDedupByKey(request, p, failNotifee); err != nil { - return nil, nil, false, err - } - if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil { - return nil, nil, false, err - } - rootLink := cidlink.Link{Cid: request.Root()} - traverser := ipldutil.TraversalBuilder{ - Root: rootLink, - Selector: request.Selector(), - Chooser: result.CustomChooser, - }.Start(ctx) - loader := result.CustomLoader - if loader == nil { - loader = qe.loader - } - return loader, traverser, isPaused, nil -} - -func (qe *queryExecutor) processDedupByKey(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error { - dedupData, has := request.Extension(graphsync.ExtensionDeDupByKey) - if !has { - return nil - } - key, err := dedupkey.DecodeDedupKey(dedupData) - if err != nil { - _ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { - rb.FinishWithError(graphsync.RequestFailedUnknown) - rb.AddNotifee(failNotifee) - return nil - }) - return err - } - qe.responseAssembler.DedupKey(p, request.ID(), key) - return nil -} - -func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error { - doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs) - if !has { - return nil - } - cidSet, err := cidset.DecodeCidSet(doNotSendCidsData) - if err != nil { - _ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { - rb.FinishWithError(graphsync.RequestFailedUnknown) - rb.AddNotifee(failNotifee) - return nil - }) - return err - } - links := make([]ipld.Link, 0, cidSet.Len()) - err = cidSet.ForEach(func(c cid.Cid) error { - links = append(links, cidlink.Link{Cid: c}) - return nil - }) - if err != nil { - return err } - qe.responseAssembler.IgnoreBlocks(p, request.ID(), links) - return nil } func (qe *queryExecutor) executeQuery( diff --git a/responsemanager/querypreparer.go b/responsemanager/querypreparer.go new file mode 100644 index 00000000..35e886a7 --- /dev/null +++ b/responsemanager/querypreparer.go @@ -0,0 +1,115 @@ +package responsemanager + +import ( + "context" + "errors" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/cidset" + "github.com/ipfs/go-graphsync/dedupkey" + "github.com/ipfs/go-graphsync/ipldutil" + gsmsg "github.com/ipfs/go-graphsync/message" + "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/responsemanager/responseassembler" + ipld "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/libp2p/go-libp2p-core/peer" +) + +type queryPreparer struct { + requestHooks RequestHooks + responseAssembler ResponseAssembler + loader ipld.Loader +} + +func (qe *queryPreparer) prepareQuery(ctx context.Context, + p peer.ID, + request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) { + log.Infof("Processing request hooks for request: %d", request.ID()) + result := qe.requestHooks.ProcessRequestHooks(p, request) + var transactionError error + var isPaused bool + failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub} + err := qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { + for _, extension := range result.Extensions { + rb.SendExtensionData(extension) + } + if result.Err != nil || !result.IsValidated { + rb.FinishWithError(graphsync.RequestFailedUnknown) + rb.AddNotifee(failNotifee) + transactionError = errors.New("request not valid") + } else if result.IsPaused { + rb.PauseRequest() + isPaused = true + } + return nil + }) + if err != nil { + return nil, nil, false, err + } + if transactionError != nil { + return nil, nil, false, transactionError + } + if err := qe.processDedupByKey(request, p, failNotifee); err != nil { + return nil, nil, false, err + } + if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil { + return nil, nil, false, err + } + rootLink := cidlink.Link{Cid: request.Root()} + traverser := ipldutil.TraversalBuilder{ + Root: rootLink, + Selector: request.Selector(), + Chooser: result.CustomChooser, + }.Start(ctx) + loader := result.CustomLoader + if loader == nil { + loader = qe.loader + } + return loader, traverser, isPaused, nil +} + +func (qe *queryPreparer) processDedupByKey(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error { + dedupData, has := request.Extension(graphsync.ExtensionDeDupByKey) + if !has { + return nil + } + key, err := dedupkey.DecodeDedupKey(dedupData) + if err != nil { + _ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { + rb.FinishWithError(graphsync.RequestFailedUnknown) + rb.AddNotifee(failNotifee) + return nil + }) + return err + } + qe.responseAssembler.DedupKey(p, request.ID(), key) + return nil +} + +func (qe *queryPreparer) processDoNoSendCids(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error { + doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs) + if !has { + return nil + } + cidSet, err := cidset.DecodeCidSet(doNotSendCidsData) + if err != nil { + _ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error { + rb.FinishWithError(graphsync.RequestFailedUnknown) + rb.AddNotifee(failNotifee) + return nil + }) + return err + } + links := make([]ipld.Link, 0, cidSet.Len()) + err = cidSet.ForEach(func(c cid.Cid) error { + links = append(links, cidlink.Link{Cid: c}) + return nil + }) + if err != nil { + return err + } + qe.responseAssembler.IgnoreBlocks(p, request.ID(), links) + return nil +} diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index c2987df0..183847e8 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -442,21 +442,33 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { completedListeners: rm.completedListeners, networkErrorListeners: rm.networkErrorListeners, }) + signals := signals{ + pauseSignal: make(chan struct{}, 1), + updateSignal: make(chan struct{}, 1), + errSignal: make(chan error, 1), + } + loader, traverser, isPaused, err := (&queryPreparer{rm.qe.requestHooks, rm.responseAssembler, rm.qe.loader}).prepareQuery(ctx, key.p, request, signals, sub) + if err != nil { + cancelFn() + return + } rm.inProgressResponses[key] = &inProgressResponseStatus{ ctx: ctx, cancelFn: cancelFn, subscriber: sub, request: request, - signals: signals{ - pauseSignal: make(chan struct{}, 1), - updateSignal: make(chan struct{}, 1), - errSignal: make(chan error, 1), - }, + signals: signals, + isPaused: isPaused, + loader: loader, + traverser: traverser, } // TODO: Use a better work estimation metric. + if isPaused { + return + } + rm.queryQueue.PushTasks(prm.p, peertask.Task{Topic: key, Priority: int(request.Priority()), Work: 1}) - rm.requestQueuedHooks.ProcessRequestQueuedHooks(prm.p, request) select { case rm.workSignal <- struct{}{}: diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index aba13f71..a4d0a51d 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -426,6 +426,27 @@ func TestValidationAndExtensions(t *testing.T) { td.assertCompleteRequestWithSuccess() }) + t.Run("if started paused, unpausing always works", func(t *testing.T) { + td := newTestData(t) + defer td.cancel() + responseManager := td.newResponseManager() + responseManager.Startup() + advance := make(chan struct{}) + td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + hookActions.ValidateRequest() + hookActions.PauseResponse() + close(advance) + }) + go func() { + <-advance + err := responseManager.UnpauseResponse(td.p, td.requestID) + require.NoError(t, err) + }() + responseManager.ProcessRequests(td.ctx, td.p, td.requests) + td.assertPausedRequest() + td.verifyNResponses(td.blockChainLength) + td.assertCompleteRequestWithSuccess() + }) }) t.Run("test update hook processing", func(t *testing.T) { From bf0262a1416d6195322feac7b93893319c13d1e5 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 21 Jun 2021 16:38:36 -0700 Subject: [PATCH 5/9] WIP --- responsemanager/responsemanager_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index a4d0a51d..bf75ca29 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -115,6 +115,7 @@ func TestEarlyCancellation(t *testing.T) { defer td.cancel() td.queryQueue.popWait.Add(1) responseManager := td.newResponseManager() + td.requestHooks.Register(selectorvalidator.SelectorValidator(100)) responseManager.Startup() responseManager.ProcessRequests(td.ctx, td.p, td.requests) From 73c9300af494969aa481bb6f7f212e59194cfe12 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 21 Jun 2021 17:54:36 -0700 Subject: [PATCH 6/9] WIP --- responsemanager/responsemanager.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index 183847e8..bccc23d3 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -362,7 +362,6 @@ func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSync } func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error { - log.Infof("Unpause request called: %d", requestID) key := responseKey{p, requestID} inProgressResponse, ok := rm.inProgressResponses[key] if !ok { @@ -432,6 +431,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { rm.processUpdate(key, request) continue } + rm.requestQueuedHooks.ProcessRequestQueuedHooks(prm.p, request) ctx, cancelFn := context.WithCancel(rm.ctx) sub := notifications.NewTopicDataSubscriber(&subscriber{ p: key.p, @@ -497,7 +497,6 @@ func (ftr *finishTaskRequest) handle(rm *ResponseManager) { return } if _, ok := ftr.err.(hooks.ErrPaused); ok { - log.Infof("Request finished on pause: %d", ftr.key.requestID) response.isPaused = true return } From 4a0a8c10589eb0fccb7f2b4d3299611d86521e0d Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 21 Jun 2021 18:51:05 -0700 Subject: [PATCH 7/9] WIP --- responsemanager/responsemanager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index bccc23d3..2ff8aeee 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -450,7 +450,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { loader, traverser, isPaused, err := (&queryPreparer{rm.qe.requestHooks, rm.responseAssembler, rm.qe.loader}).prepareQuery(ctx, key.p, request, signals, sub) if err != nil { cancelFn() - return + continue } rm.inProgressResponses[key] = &inProgressResponseStatus{ @@ -465,7 +465,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { } // TODO: Use a better work estimation metric. if isPaused { - return + continue } rm.queryQueue.PushTasks(prm.p, peertask.Task{Topic: key, Priority: int(request.Priority()), Work: 1}) From 5341e6add3e74b9728551b353a44abf21b88e484 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 2 Jul 2021 14:17:49 -0700 Subject: [PATCH 8/9] refactor(responsemanager): move deps move unused query executor dependencies to response manager --- responsemanager/queryexecutor.go | 2 -- responsemanager/responsemanager.go | 8 +++++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index affb9058..cc8bb38f 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -22,12 +22,10 @@ var errCancelledByCommand = errors.New("response cancelled by responder") // TODO: Move this into a seperate module and fully seperate from the ResponseManager type queryExecutor struct { - requestHooks RequestHooks blockHooks BlockHooks updateHooks UpdateHooks cancelledListeners CancelledListeners responseAssembler ResponseAssembler - loader ipld.Loader queryQueue QueryQueue messages chan responseManagerMessage ctx context.Context diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index 2ff8aeee..754dcec5 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -126,6 +126,8 @@ type ResponseManager struct { cancelFn context.CancelFunc responseAssembler ResponseAssembler queryQueue QueryQueue + requestHooks RequestHooks + loader ipld.Loader requestQueuedHooks RequestQueuedHooks updateHooks UpdateHooks cancelledListeners CancelledListeners @@ -158,12 +160,10 @@ func New(ctx context.Context, messages := make(chan responseManagerMessage, 16) workSignal := make(chan struct{}, 1) qe := &queryExecutor{ - requestHooks: requestHooks, blockHooks: blockHooks, updateHooks: updateHooks, cancelledListeners: cancelledListeners, responseAssembler: responseAssembler, - loader: loader, queryQueue: queryQueue, messages: messages, ctx: ctx, @@ -173,6 +173,8 @@ func New(ctx context.Context, return &ResponseManager{ ctx: ctx, cancelFn: cancelFn, + requestHooks: requestHooks, + loader: loader, responseAssembler: responseAssembler, queryQueue: queryQueue, requestQueuedHooks: requestQueuedHooks, @@ -447,7 +449,7 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { updateSignal: make(chan struct{}, 1), errSignal: make(chan error, 1), } - loader, traverser, isPaused, err := (&queryPreparer{rm.qe.requestHooks, rm.responseAssembler, rm.qe.loader}).prepareQuery(ctx, key.p, request, signals, sub) + loader, traverser, isPaused, err := (&queryPreparer{rm.requestHooks, rm.responseAssembler, rm.loader}).prepareQuery(ctx, key.p, request, signals, sub) if err != nil { cancelFn() continue From 234bf651a8a72ef369a595498844641d3d566f94 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 2 Jul 2021 14:51:08 -0700 Subject: [PATCH 9/9] Revert "Merge pull request #172 from ipfs/feat/request-queued-hook" This reverts commit 85d8cf043e089f5a070b16fc5bda09cd19b370db, reversing changes made to 88edb5462e17629cd828be6be04b9422c9a258f3. --- graphsync.go | 7 ----- impl/graphsync.go | 11 +------- responsemanager/hooks/requesthook.go | 34 ------------------------- responsemanager/responsemanager.go | 10 -------- responsemanager/responsemanager_test.go | 24 ++--------------- 5 files changed, 3 insertions(+), 83 deletions(-) diff --git a/graphsync.go b/graphsync.go index e3076145..96a9ecd1 100644 --- a/graphsync.go +++ b/graphsync.go @@ -247,10 +247,6 @@ type RequestUpdatedHookActions interface { UnpauseResponse() } -// OnIncomingRequestQueuedHook is a hook that runs each time a new incoming request is added to the responder's task queue. -// It receives the peer that sent the request and all data about the request. -type OnIncomingRequestQueuedHook func(p peer.ID, request RequestData) - // OnIncomingRequestHook is a hook that runs each time a new request is received. // It receives the peer that sent the request and all data about the request. // It receives an interface for customizing the response to this request @@ -316,9 +312,6 @@ type GraphExchange interface { // UnregisterPersistenceOption unregisters an alternate loader/storer combo UnregisterPersistenceOption(name string) error - // RegisterIncomingRequestQueuedHook adds a hook that runs when a new incoming request is added to the responder's task queue. - RegisterIncomingRequestQueuedHook(hook OnIncomingRequestQueuedHook) UnregisterHookFunc - // RegisterIncomingRequestHook adds a hook that runs when a request is received RegisterIncomingRequestHook(hook OnIncomingRequestHook) UnregisterHookFunc diff --git a/impl/graphsync.go b/impl/graphsync.go index 76d6b7dc..08ad0e23 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -44,7 +44,6 @@ type GraphSync struct { responseAssembler *responseassembler.ResponseAssembler peerTaskQueue *peertaskqueue.PeerTaskQueue peerManager *peermanager.PeerMessageManager - incomingRequestQueuedHooks *responderhooks.IncomingRequestQueuedHooks incomingRequestHooks *responderhooks.IncomingRequestHooks outgoingBlockHooks *responderhooks.OutgoingBlockHooks requestUpdatedHooks *responderhooks.RequestUpdatedHooks @@ -126,7 +125,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, networkErrorListeners := listeners.NewNetworkErrorListeners() receiverErrorListeners := listeners.NewReceiverNetworkErrorListeners() persistenceOptions := persistenceoptions.New() - requestQueuedHooks := responderhooks.NewRequestQueuedHooks() incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions) outgoingBlockHooks := responderhooks.NewBlockHooks() requestUpdatedHooks := responderhooks.NewUpdateHooks() @@ -145,7 +143,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners) responseAssembler := responseassembler.New(ctx, peerManager) peerTaskQueue := peertaskqueue.New() - responseManager := responsemanager.New(ctx, loader, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests) + responseManager := responsemanager.New(ctx, loader, responseAssembler, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests) graphSync := &GraphSync{ network: network, loader: loader, @@ -156,7 +154,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, responseAssembler: responseAssembler, peerTaskQueue: peerTaskQueue, peerManager: peerManager, - incomingRequestQueuedHooks: requestQueuedHooks, incomingRequestHooks: incomingRequestHooks, outgoingBlockHooks: outgoingBlockHooks, requestUpdatedHooks: requestUpdatedHooks, @@ -195,12 +192,6 @@ func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingReques return gs.incomingRequestHooks.Register(hook) } -// RegisterIncomingRequestHook adds a hook that runs when a new incoming request is added -// to the responder's task queue. -func (gs *GraphSync) RegisterIncomingRequestQueuedHook(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc { - return gs.incomingRequestQueuedHooks.Register(hook) -} - // RegisterIncomingResponseHook adds a hook that runs when a response is received func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc { return gs.incomingResponseHooks.Register(hook) diff --git a/responsemanager/hooks/requesthook.go b/responsemanager/hooks/requesthook.go index 418d7fe4..e6505719 100644 --- a/responsemanager/hooks/requesthook.go +++ b/responsemanager/hooks/requesthook.go @@ -16,40 +16,6 @@ type PersistenceOptions interface { GetLoader(name string) (ipld.Loader, bool) } -// IncomingRequestQueuedHooks is a set of incoming request queued hooks that can be processed. -type IncomingRequestQueuedHooks struct { - pubSub *pubsub.PubSub -} - -type internalRequestQueuedHookEvent struct { - p peer.ID - request graphsync.RequestData -} - -func requestQueuedHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error { - ie := event.(internalRequestQueuedHookEvent) - hook := subscriberFn.(graphsync.OnIncomingRequestQueuedHook) - hook(ie.p, ie.request) - return nil -} - -// Register registers an extension to process new incoming requests. -func (rqh *IncomingRequestQueuedHooks) Register(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc { - return graphsync.UnregisterHookFunc(rqh.pubSub.Subscribe(hook)) -} - -// NewRequestQueuedHooks returns a new list of incoming request queued hooks. -func NewRequestQueuedHooks() *IncomingRequestQueuedHooks { - return &IncomingRequestQueuedHooks{ - pubSub: pubsub.New(requestQueuedHookDispatcher), - } -} - -// ProcessRequestQueuedHooks runs request hooks against an incoming queued request. -func (rqh *IncomingRequestQueuedHooks) ProcessRequestQueuedHooks(p peer.ID, request graphsync.RequestData) { - _ = rqh.pubSub.Publish(internalRequestQueuedHookEvent{p, request}) -} - // IncomingRequestHooks is a set of incoming request hooks that can be processed type IncomingRequestHooks struct { persistenceOptions PersistenceOptions diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index 754dcec5..5fcab943 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -73,11 +73,6 @@ type RequestHooks interface { ProcessRequestHooks(p peer.ID, request graphsync.RequestData) hooks.RequestResult } -// RequestQueuedHooks is an interface for processing request queued hooks -type RequestQueuedHooks interface { - ProcessRequestQueuedHooks(p peer.ID, request graphsync.RequestData) -} - // BlockHooks is an interface for processing block hooks type BlockHooks interface { ProcessBlockHooks(p peer.ID, request graphsync.RequestData, blockData graphsync.BlockData) hooks.BlockResult @@ -128,7 +123,6 @@ type ResponseManager struct { queryQueue QueryQueue requestHooks RequestHooks loader ipld.Loader - requestQueuedHooks RequestQueuedHooks updateHooks UpdateHooks cancelledListeners CancelledListeners completedListeners CompletedListeners @@ -146,7 +140,6 @@ func New(ctx context.Context, loader ipld.Loader, responseAssembler ResponseAssembler, queryQueue QueryQueue, - requestQueuedHooks RequestQueuedHooks, requestHooks RequestHooks, blockHooks BlockHooks, updateHooks UpdateHooks, @@ -177,7 +170,6 @@ func New(ctx context.Context, loader: loader, responseAssembler: responseAssembler, queryQueue: queryQueue, - requestQueuedHooks: requestQueuedHooks, updateHooks: updateHooks, cancelledListeners: cancelledListeners, completedListeners: completedListeners, @@ -433,7 +425,6 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { rm.processUpdate(key, request) continue } - rm.requestQueuedHooks.ProcessRequestQueuedHooks(prm.p, request) ctx, cancelFn := context.WithCancel(rm.ctx) sub := notifications.NewTopicDataSubscriber(&subscriber{ p: key.p, @@ -471,7 +462,6 @@ func (prm *processRequestMessage) handle(rm *ResponseManager) { } rm.queryQueue.PushTasks(prm.p, peertask.Task{Topic: key, Priority: int(request.Priority()), Work: 1}) - select { case rm.workSignal <- struct{}{}: default: diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index bf75ca29..f2134737 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -37,19 +37,6 @@ func TestIncomingQuery(t *testing.T) { blks := td.blockChain.AllBlocks() responseManager := td.newResponseManager() - - type queuedHook struct { - p peer.ID - request graphsync.RequestData - } - - qhc := make(chan *queuedHook, 1) - td.requestQueuedHooks.Register(func(p peer.ID, request graphsync.RequestData) { - qhc <- &queuedHook{ - p: p, - request: request, - } - }) td.requestHooks.Register(selectorvalidator.SelectorValidator(100)) responseManager.Startup() @@ -58,11 +45,6 @@ func TestIncomingQuery(t *testing.T) { for i := 0; i < len(blks); i++ { td.assertSendBlock() } - - // ensure request queued hook fires. - out := <-qhc - require.Equal(t, td.p, out.p) - require.Equal(t, out.request.ID(), td.requestID) } func TestCancellationQueryInProgress(t *testing.T) { @@ -908,7 +890,6 @@ type testData struct { updateRequests []gsmsg.GraphSyncRequest p peer.ID peristenceOptions *persistenceoptions.PersistenceOptions - requestQueuedHooks *hooks.IncomingRequestQueuedHooks requestHooks *hooks.IncomingRequestHooks blockHooks *hooks.OutgoingBlockHooks updateHooks *hooks.RequestUpdatedHooks @@ -982,7 +963,6 @@ func newTestData(t *testing.T) testData { } td.p = testutil.GeneratePeers(1)[0] td.peristenceOptions = persistenceoptions.New() - td.requestQueuedHooks = hooks.NewRequestQueuedHooks() td.requestHooks = hooks.NewRequestHooks(td.peristenceOptions) td.blockHooks = hooks.NewBlockHooks() td.updateHooks = hooks.NewUpdateHooks() @@ -1012,13 +992,13 @@ func newTestData(t *testing.T) testData { } func (td *testData) newResponseManager() *ResponseManager { - return New(td.ctx, td.loader, td.responseAssembler, td.queryQueue, td.requestQueuedHooks, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6) + return New(td.ctx, td.loader, td.responseAssembler, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6) } func (td *testData) alternateLoaderResponseManager() *ResponseManager { obs := make(map[ipld.Link][]byte) oloader, _ := testutil.NewTestStore(obs) - return New(td.ctx, oloader, td.responseAssembler, td.queryQueue, td.requestQueuedHooks, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6) + return New(td.ctx, oloader, td.responseAssembler, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners, td.blockSentListeners, td.networkErrorListeners, 6) } func (td *testData) assertPausedRequest() {