Skip to content

Commit

Permalink
Merge the v0.6.x commit history (#190)
Browse files Browse the repository at this point in the history
* feat: fire network error when network disconnects during request (#164)

* release: 0.6.1

* Better logging for Graphsync traversal (#167)

* log gs traversal

* Apply suggestions from code review

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* add debug logs

* add debug logs

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* release: v0.6.2 (#168)

* Fix/log blockstore reads (#169)

* log gs traversal

* Apply suggestions from code review

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* add debug logs

* fixed logging

* Apply suggestions from code review

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* fixed error

Co-authored-by: dirkmc <dirkmdev@gmail.com>

* release: v0.6.3 (#170)

* request queued hook

* test request queued hook

* release: v0.6.4

* Resolve 175 race condition, no change to hook timing (#178)

* feat(requestmanager): add request timing (#181)

* Add cancel request and wait function (#185)

* fix(responsemanager): fix error codes (#182)

enforce much tighter consistency with graphsync spec in usage of error codes

* refactor: replace particular request not found errors with public error (#188)

* release: 0.6.8

Co-authored-by: dirkmc <dirkmdev@gmail.com>
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
  • Loading branch information
3 people committed Aug 5, 2021
1 parent aa3b445 commit d60a91f
Show file tree
Hide file tree
Showing 25 changed files with 775 additions and 267 deletions.
107 changes: 107 additions & 0 deletions CHANGELOG.md
Expand Up @@ -20,6 +20,113 @@ some significant breaking changes.
| Hannah Howard | 2 | +3316/-3015 | 25 |
| Steven Allen | 1 | +95/-227 | 5 |

# go-graphsync 0.6.8

### Changelog

- github.com/ipfs/go-graphsync:
- refactor: replace particular request not found errors with public error (#188) ([ipfs/go-graphsync#188](https://github.com/ipfs/go-graphsync/pull/188))
- fix(responsemanager): fix error codes (#182) ([ipfs/go-graphsync#182](https://github.com/ipfs/go-graphsync/pull/182))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Hannah Howard | 1 | +100/-51 | 5 |
| dirkmc | 1 | +10/-3 | 2 |

# go-graphsync 0.6.7

### Changelog

- github.com/ipfs/go-graphsync:
- Add cancel request and wait function (#185) ([ipfs/go-graphsync#185](https://github.com/ipfs/go-graphsync/pull/185))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Hannah Howard | 1 | +154/-32 | 9 |
# go-graphsync 0.6.6

### Changelog

- github.com/ipfs/go-graphsync:
- feat(requestmanager): add request timing (#181) ([ipfs/go-graphsync#181](https://github.com/ipfs/go-graphsync/pull/181))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Hannah Howard | 1 | +9/-1 | 1 |

# go-graphsync 0.6.5

### Changelog

- github.com/ipfs/go-graphsync:
- Resolve 175 race condition, no change to hook timing (#178) ([ipfs/go-graphsync#178](https://github.com/ipfs/go-graphsync/pull/178))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Hannah Howard | 1 | +199/-171 | 10 |

# go-graphsync 0.6.4

### Changelog

- github.com/ipfs/go-graphsync:
- feat/request-queued-hook (#172) ([ipfs/go-graphsync#172](https://github.com/ipfs/go-graphsync/pull/172))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| aarshkshah1992 | 3 | +87/-3 | 7 |
| dirkmc | 1 | +11/-0 | 1 |

# go-graphsync 0.6.3

### Changelog

- github.com/ipfs/go-graphsync:
- Fix/log blockstore reads (#169) ([ipfs/go-graphsync#169](https://github.com/ipfs/go-graphsync/pull/169))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Aarsh Shah | 2 | +40/-177 | 6 |

# go-graphsync 0.6.2

### Changelog

- github.com/ipfs/go-graphsync:
- Better logging for Graphsync traversal (#167) ([ipfs/go-graphsync#167](https://github.com/ipfs/go-graphsync/pull/167))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| Aarsh Shah | 1 | +18/-2 | 2 |

# go-graphsync 0.6.1

### Changelog

- github.com/ipfs/go-graphsync:
- feat: fire network error when network disconnects during request (#164) ([ipfs/go-graphsync#164](https://github.com/ipfs/go-graphsync/pull/164))

### Contributors

| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| dirkmc | 1 | +86/-8 | 4 |


# go-graphsync 0.6.0

Major code refactor for simplicity, ease of understanding
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -24,7 +24,7 @@ require (
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-ipld-cbor v0.0.5 // indirect
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log v1.0.4
github.com/ipfs/go-log/v2 v2.1.1
github.com/ipfs/go-merkledag v0.3.2
github.com/ipfs/go-peertaskqueue v0.2.0
github.com/ipfs/go-unixfs v0.2.4
Expand Down
26 changes: 22 additions & 4 deletions graphsync.go
Expand Up @@ -44,11 +44,12 @@ const (
ExtensionDeDupByKey = ExtensionName("graphsync/dedup-by-key")
)

// RequestContextCancelledErr is an error message received on the error channel when the request context given by the user is cancelled/times out
type RequestContextCancelledErr struct{}
// RequestClientCancelledErr is an error message received on the error channel when the request is cancelled on by the client code,
// either by closing the passed request context or calling CancelRequest
type RequestClientCancelledErr struct{}

func (e RequestContextCancelledErr) Error() string {
return "request context cancelled"
func (e RequestClientCancelledErr) Error() string {
return "request cancelled by client"
}

// RequestFailedBusyErr is an error message received on the error channel when the peer is busy
Expand Down Expand Up @@ -86,6 +87,13 @@ func (e RequestCancelledErr) Error() string {
return "request failed - responder cancelled"
}

// RequestNotFoundErr indicates that a request with a particular request ID was not found
type RequestNotFoundErr struct{}

func (e RequestNotFoundErr) Error() string {
return "request not found"
}

var (
// ErrExtensionAlreadyRegistered means a user extension can be registered only once
ErrExtensionAlreadyRegistered = errors.New("extension already registered")
Expand Down Expand Up @@ -197,6 +205,10 @@ type RequestUpdatedHookActions interface {
UnpauseResponse()
}

// OnIncomingRequestQueuedHook is a hook that runs each time a new incoming request is added to the responder's task queue.
// It receives the peer that sent the request and all data about the request.
type OnIncomingRequestQueuedHook func(p peer.ID, request RequestData)

// OnIncomingRequestHook is a hook that runs each time a new request is received.
// It receives the peer that sent the request and all data about the request.
// It receives an interface for customizing the response to this request
Expand Down Expand Up @@ -262,6 +274,9 @@ type GraphExchange interface {
// UnregisterPersistenceOption unregisters an alternate loader/storer combo
UnregisterPersistenceOption(name string) error

// RegisterIncomingRequestQueuedHook adds a hook that runs when a new incoming request is added to the responder's task queue.
RegisterIncomingRequestQueuedHook(hook OnIncomingRequestQueuedHook) UnregisterHookFunc

// RegisterIncomingRequestHook adds a hook that runs when a request is received
RegisterIncomingRequestHook(hook OnIncomingRequestHook) UnregisterHookFunc

Expand Down Expand Up @@ -312,4 +327,7 @@ type GraphExchange interface {

// CancelResponse cancels an in progress response
CancelResponse(peer.ID, RequestID) error

// CancelRequest cancels an in progress request
CancelRequest(context.Context, RequestID) error
}
18 changes: 16 additions & 2 deletions impl/graphsync.go
Expand Up @@ -3,7 +3,7 @@ package graphsync
import (
"context"

logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue"
ipld "github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -43,6 +43,7 @@ type GraphSync struct {
responseAssembler *responseassembler.ResponseAssembler
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
incomingRequestQueuedHooks *responderhooks.IncomingRequestQueuedHooks
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
Expand Down Expand Up @@ -124,6 +125,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
networkErrorListeners := listeners.NewNetworkErrorListeners()
receiverErrorListeners := listeners.NewReceiverNetworkErrorListeners()
persistenceOptions := persistenceoptions.New()
requestQueuedHooks := responderhooks.NewRequestQueuedHooks()
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
Expand All @@ -142,7 +144,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestManager := requestmanager.New(ctx, asyncLoader, linkSystem, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks, networkErrorListeners)
responseAssembler := responseassembler.New(ctx, peerManager)
peerTaskQueue := peertaskqueue.New()
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests)
responseManager := responsemanager.New(ctx, linkSystem, responseAssembler, peerTaskQueue, requestQueuedHooks, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners, gsConfig.maxInProgressRequests)
graphSync := &GraphSync{
network: network,
linkSystem: linkSystem,
Expand All @@ -152,6 +154,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
responseAssembler: responseAssembler,
peerTaskQueue: peerTaskQueue,
peerManager: peerManager,
incomingRequestQueuedHooks: requestQueuedHooks,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
Expand Down Expand Up @@ -190,6 +193,12 @@ func (gs *GraphSync) RegisterIncomingRequestHook(hook graphsync.OnIncomingReques
return gs.incomingRequestHooks.Register(hook)
}

// RegisterIncomingRequestQueuedHook adds a hook that runs when a new incoming request is added
// to the responder's task queue.
func (gs *GraphSync) RegisterIncomingRequestQueuedHook(hook graphsync.OnIncomingRequestQueuedHook) graphsync.UnregisterHookFunc {
return gs.incomingRequestQueuedHooks.Register(hook)
}

// RegisterIncomingResponseHook adds a hook that runs when a response is received
func (gs *GraphSync) RegisterIncomingResponseHook(hook graphsync.OnIncomingResponseHook) graphsync.UnregisterHookFunc {
return gs.incomingResponseHooks.Register(hook)
Expand Down Expand Up @@ -285,6 +294,11 @@ func (gs *GraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) er
return gs.responseManager.CancelResponse(p, requestID)
}

// CancelRequest cancels an in progress request
func (gs *GraphSync) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error {
return gs.requestManager.CancelRequest(ctx, requestID)
}

type graphSyncReceiver GraphSync

func (gsr *graphSyncReceiver) graphSync() *GraphSync {
Expand Down
4 changes: 2 additions & 2 deletions impl/graphsync_test.go
Expand Up @@ -616,7 +616,7 @@ func TestNetworkDisconnect(t *testing.T) {

testutil.AssertReceive(ctx, t, networkError, &err, "should receive network error")
testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error")
require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error())
require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error())
testutil.AssertReceive(ctx, t, receiverError, &err, "should receive an error on receiver side")
}

Expand Down Expand Up @@ -652,7 +652,7 @@ func TestConnectFail(t *testing.T) {
var err error
testutil.AssertReceive(ctx, t, reqNetworkError, &err, "should receive network error")
testutil.AssertReceive(ctx, t, errChan, &err, "should receive an error")
require.EqualError(t, err, graphsync.RequestContextCancelledErr{}.Error())
require.EqualError(t, err, graphsync.RequestClientCancelledErr{}.Error())
}

func TestGraphsyncRoundTripAlternatePersistenceAndNodes(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions ipldutil/traverser.go
Expand Up @@ -11,6 +11,12 @@ import (
"github.com/ipld/go-ipld-prime/traversal/selector"
)

/* TODO: This traverser creates an extra go-routine and is quite complicated, in order to give calling code control of
a selector traversal. If it were implemented inside of go-ipld-primes traversal library, with access to private functions,
it could be done without an extra go-routine, avoiding the possibility of races and simplifying implementation. This has
been documented here: https://github.com/ipld/go-ipld-prime/issues/213 -- and when this issue is implemented, this traverser
can go away */

var defaultLinkSystem = cidlink.DefaultLinkSystem()

var defaultVisitor traversal.AdvVisitFn = func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil }
Expand Down Expand Up @@ -45,6 +51,8 @@ type Traverser interface {
Error(err error)
// Shutdown cancels the traversal
Shutdown(ctx context.Context)
// NBlocksTraversed returns the number of blocks successfully traversed
NBlocksTraversed() int
}

type state struct {
Expand All @@ -64,6 +72,7 @@ type nextResponse struct {
func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser {
ctx, cancel := context.WithCancel(parentCtx)
t := &traverser{
blocksCount: 0,
parentCtx: parentCtx,
ctx: ctx,
cancel: cancel,
Expand Down Expand Up @@ -100,6 +109,7 @@ func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser {
// traverser is a class to perform a selector traversal that stops every time a new block is loaded
// and waits for manual input (in the form of advance or error)
type traverser struct {
blocksCount int
parentCtx context.Context
ctx context.Context
cancel func()
Expand All @@ -118,6 +128,10 @@ type traverser struct {
stopped chan struct{}
}

func (t *traverser) NBlocksTraversed() int {
return t.blocksCount
}

func (t *traverser) loader(lnkCtx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
select {
case <-t.ctx.Done():
Expand Down Expand Up @@ -159,6 +173,7 @@ func (t *traverser) writeDone(err error) {
func (t *traverser) start() {
select {
case <-t.ctx.Done():
close(t.stopped)
return
case t.awaitRequest <- struct{}{}:
}
Expand Down Expand Up @@ -218,16 +233,20 @@ func (t *traverser) Advance(reader io.Reader) error {
if isComplete {
return errors.New("cannot advance when done")
}

select {
case <-t.ctx.Done():
return ContextCancelError{}
case t.awaitRequest <- struct{}{}:
}

select {
case <-t.ctx.Done():
return ContextCancelError{}
case t.responses <- nextResponse{reader, nil}:
}

t.blocksCount++
return nil
}

Expand Down
17 changes: 17 additions & 0 deletions ipldutil/traverser_test.go
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"testing"
"time"

blocks "github.com/ipfs/go-block-format"
ipld "github.com/ipld/go-ipld-prime"
Expand All @@ -21,6 +22,22 @@ import (
func TestTraverser(t *testing.T) {
ctx := context.Background()

t.Run("started with shutdown context, then shutdown", func(t *testing.T) {
cancelledCtx, cancel := context.WithCancel(ctx)
cancel()
testdata := testutil.NewTestIPLDTree()
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
sel := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()
traverser := TraversalBuilder{
Root: testdata.RootNodeLnk,
Selector: sel,
}.Start(cancelledCtx)
timeoutCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
traverser.Shutdown(timeoutCtx)
require.NoError(t, timeoutCtx.Err())
})

t.Run("traverses correctly, simple struct", func(t *testing.T) {
testdata := testutil.NewTestIPLDTree()
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
Expand Down
3 changes: 2 additions & 1 deletion message/message.go
Expand Up @@ -33,7 +33,8 @@ func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool {
status == graphsync.RequestFailedContentNotFound ||
status == graphsync.RequestFailedLegal ||
status == graphsync.RequestFailedUnknown ||
status == graphsync.RequestCancelled
status == graphsync.RequestCancelled ||
status == graphsync.RequestRejected
}

// IsTerminalResponseCode returns true if the response code signals
Expand Down
2 changes: 1 addition & 1 deletion messagequeue/messagequeue.go
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"time"

logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"

gsmsg "github.com/ipfs/go-graphsync/message"
Expand Down

0 comments on commit d60a91f

Please sign in to comment.