Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ require (
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-ipld-cbor v0.0.4 // indirect
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log v1.0.4
github.com/ipfs/go-log/v2 v2.1.1
github.com/ipfs/go-merkledag v0.3.1
github.com/ipfs/go-peertaskqueue v0.2.0
Expand Down
7 changes: 0 additions & 7 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,6 @@ type RequestUpdatedHookActions interface {
UnpauseResponse()
}

// OnIncomingRequestQueuedHook is a hook that runs each time a new incoming request is added to the responder's task queue.
// It receives the peer that sent the request and all data about the request.
type OnIncomingRequestQueuedHook func(p peer.ID, request RequestData)

// OnIncomingRequestHook is a hook that runs each time a new request is received.
// It receives the peer that sent the request and all data about the request.
// It receives an interface for customizing the response to this request
Expand Down Expand Up @@ -316,9 +312,6 @@ type GraphExchange interface {
// UnregisterPersistenceOption unregisters an alternate loader/storer combo
UnregisterPersistenceOption(name string) error

// RegisterIncomingRequestQueuedHook adds a hook that runs when a new incoming request is added to the responder's task queue.
RegisterIncomingRequestQueuedHook(hook OnIncomingRequestQueuedHook) UnregisterHookFunc

// RegisterIncomingRequestHook adds a hook that runs when a request is received
RegisterIncomingRequestHook(hook OnIncomingRequestHook) UnregisterHookFunc

Expand Down
13 changes: 2 additions & 11 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package graphsync
import (
"context"

logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue"
ipld "github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -44,7 +44,6 @@ type GraphSync struct {
responseAssembler *responseassembler.ResponseAssembler
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
incomingRequestQueuedHooks *responderhooks.IncomingRequestQueuedHooks
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
Expand Down Expand Up @@ -126,7 +125,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
networkErrorListeners := listeners.NewNetworkErrorListeners()
receiverErrorListeners := listeners.NewReceiverNetworkErrorListeners()
persistenceOptions := persistenceoptions.New()
requestQueuedHooks := responderhooks.NewRequestQueuedHooks()
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
Expand All @@ -145,7 +143,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
responseAssembler := responseassembler.New(ctx, peerManager)
peerTaskQueue := peertaskqueue.New()
responseManager := responsemanager.New(ctx, loader, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests)
responseManager := responsemanager.New(ctx, loader, responseAssembler, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests)
graphSync := &GraphSync{
network: network,
loader: loader,
Expand All @@ -156,7 +154,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
responseAssembler: responseAssembler,
peerTaskQueue: peerTaskQueue,
peerManager: peerManager,
incomingRequestQueuedHooks: requestQueuedHooks,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
Expand Down Expand Up @@ -195,12 +192,6 @@ func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingReques
return gs.incomingRequestHooks.Register(hook)
}

// RegisterIncomingRequestHook adds a hook that runs when a new incoming request is added
// to the responder's task queue.
func (gs *GraphSync) RegisterIncomingRequestQueuedHook(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc {
return gs.incomingRequestQueuedHooks.Register(hook)
}

// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
return gs.incomingResponseHooks.Register(hook)
Expand Down
2 changes: 1 addition & 1 deletion messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"time"

logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"

gsmsg "github.com/ipfs/go-graphsync/message"
Expand Down
2 changes: 1 addition & 1 deletion network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"io"
"time"

logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/asyncloader/responsecache/responsecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"

blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"

Expand Down
17 changes: 9 additions & 8 deletions requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"

"github.com/hannahhoward/go-pubsub"
"golang.org/x/xerrors"
"sync/atomic"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -66,12 +67,12 @@ type AsyncLoader interface {
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
ctx context.Context
cancel func()
messages chan requestManagerMessage
peerHandler PeerHandler
rc *responseCollector
asyncLoader AsyncLoader
ctx context.Context
cancel func()
messages chan requestManagerMessage
peerHandler PeerHandler
rc *responseCollector
asyncLoader AsyncLoader
disconnectNotif *pubsub.PubSub
// dont touch out side of run loop
nextRequestID graphsync.RequestID
Expand Down
34 changes: 0 additions & 34 deletions responsemanager/hooks/requesthook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,6 @@ type PersistenceOptions interface {
GetLoader(name string) (ipld.Loader, bool)
}

// IncomingRequestQueuedHooks is a set of incoming request queued hooks that can be processed.
type IncomingRequestQueuedHooks struct {
pubSub *pubsub.PubSub
}

type internalRequestQueuedHookEvent struct {
p peer.ID
request graphsync.RequestData
}

func requestQueuedHookDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error {
ie := event.(internalRequestQueuedHookEvent)
hook := subscriberFn.(graphsync.OnIncomingRequestQueuedHook)
hook(ie.p, ie.request)
return nil
}

// Register registers an extension to process new incoming requests.
func (rqh *IncomingRequestQueuedHooks) Register(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc {
return graphsync.UnregisterHookFunc(rqh.pubSub.Subscribe(hook))
}

// NewRequestQueuedHooks returns a new list of incoming request queued hooks.
func NewRequestQueuedHooks() *IncomingRequestQueuedHooks {
return &IncomingRequestQueuedHooks{
pubSub: pubsub.New(requestQueuedHookDispatcher),
}
}

// ProcessRequestQueuedHooks runs request hooks against an incoming queued request.
func (rqh *IncomingRequestQueuedHooks) ProcessRequestQueuedHooks(p peer.ID, request graphsync.RequestData) {
_ = rqh.pubSub.Publish(internalRequestQueuedHookEvent{p, request})
}

// IncomingRequestHooks is a set of incoming request hooks that can be processed
type IncomingRequestHooks struct {
persistenceOptions PersistenceOptions
Expand Down
122 changes: 1 addition & 121 deletions responsemanager/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ import (
"strings"
"time"

"github.com/ipfs/go-cid"
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
Expand All @@ -26,12 +22,10 @@ var errCancelledByCommand = errors.New("response cancelled by responder")

// TODO: Move this into a seperate module and fully seperate from the ResponseManager
type queryExecutor struct {
requestHooks RequestHooks
blockHooks BlockHooks
updateHooks UpdateHooks
cancelledListeners CancelledListeners
responseAssembler ResponseAssembler
loader ipld.Loader
queryQueue QueryQueue
messages chan responseManagerMessage
ctx context.Context
Expand Down Expand Up @@ -72,7 +66,7 @@ func (qe *queryExecutor) processQueriesWorker() {
log.Info("Empty task on peer request stack")
continue
}
status, err := qe.executeTask(key, taskData)
status, err := qe.executeQuery(key.p, taskData.request, taskData.loader, taskData.traverser, taskData.signals, taskData.subscriber)
isCancelled := err != nil && isContextErr(err)
if isCancelled {
qe.cancelledListeners.NotifyCancelledListeners(key.p, taskData.request)
Expand All @@ -83,121 +77,7 @@ func (qe *queryExecutor) processQueriesWorker() {
}
}
qe.queryQueue.TasksDone(pid, tasks...)

}

}

func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) (graphsync.ResponseStatusCode, error) {
var err error
loader := taskData.loader
traverser := taskData.traverser
if loader == nil || traverser == nil {
var isPaused bool
loader, traverser, isPaused, err = qe.prepareQuery(taskData.ctx, key.p, taskData.request, taskData.signals, taskData.subscriber)
if err != nil {
return graphsync.RequestFailedUnknown, err
}
select {
case <-qe.ctx.Done():
return graphsync.RequestFailedUnknown, errors.New("context cancelled")
case qe.messages <- &setResponseDataRequest{key, loader, traverser}:
}
if isPaused {
return graphsync.RequestPaused, hooks.ErrPaused{}
}
}
return qe.executeQuery(key.p, taskData.request, loader, traverser, taskData.signals, taskData.subscriber)
}

func (qe *queryExecutor) prepareQuery(ctx context.Context,
p peer.ID,
request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) {
result := qe.requestHooks.ProcessRequestHooks(p, request)
var transactionError error
var isPaused bool
failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub}
err := qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
for _, extension := range result.Extensions {
rb.SendExtensionData(extension)
}
if result.Err != nil || !result.IsValidated {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
transactionError = errors.New("request not valid")
} else if result.IsPaused {
rb.PauseRequest()
isPaused = true
}
return nil
})
if err != nil {
return nil, nil, false, err
}
if transactionError != nil {
return nil, nil, false, transactionError
}
if err := qe.processDedupByKey(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
rootLink := cidlink.Link{Cid: request.Root()}
traverser := ipldutil.TraversalBuilder{
Root: rootLink,
Selector: request.Selector(),
Chooser: result.CustomChooser,
}.Start(ctx)
loader := result.CustomLoader
if loader == nil {
loader = qe.loader
}
return loader, traverser, isPaused, nil
}

func (qe *queryExecutor) processDedupByKey(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
dedupData, has := request.Extension(graphsync.ExtensionDeDupByKey)
if !has {
return nil
}
key, err := dedupkey.DecodeDedupKey(dedupData)
if err != nil {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
return nil
})
return err
}
qe.responseAssembler.DedupKey(p, request.ID(), key)
return nil
}

func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
if !has {
return nil
}
cidSet, err := cidset.DecodeCidSet(doNotSendCidsData)
if err != nil {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
return nil
})
return err
}
links := make([]ipld.Link, 0, cidSet.Len())
err = cidSet.ForEach(func(c cid.Cid) error {
links = append(links, cidlink.Link{Cid: c})
return nil
})
if err != nil {
return err
}
qe.responseAssembler.IgnoreBlocks(p, request.ID(), links)
return nil
}

func (qe *queryExecutor) executeQuery(
Expand Down
Loading