Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unixfs fetch #364

Merged
merged 4 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 36 additions & 30 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ Having outlined all the steps to execute a single roundtrip Graphsync request, t

To do this, GraphSync maintains several independent threads of execution (i.e. goroutines). Specifically:
- On the requestor side:
1. We maintain an independent thread to make and track requests (RequestManager)
2. Each outgoing request has an independent thread performing selector verification
1. We maintain an independent thread to manage requests (RequestManager) and injest responses.
2. We maintain fixed number of threads that continuously pull the highest priority outgoing request from the queue, initiate the graphsync request on the network and perform selector verification against responses
3. Each outgoing request has an independent thread collecting and buffering final responses before they are returned to the caller. Graphsync returns responses to the caller through a channel. If the caller fails to immediately read the response channel, this should not block other requests from being processed.
- On the responder side:
1. We maintain an independent thread to receive incoming requests and track outgoing responses. As each incoming request is received, it's put into a prioritized queue.
Expand All @@ -97,35 +97,38 @@ Interacting with a local blockstore is expressed via an IPLD `LinkSystem`. The b

## Requestor Implementation

The requestor implementation consists of the RequestManager which makes and tracks requests and the AsyncLoader subsystem which manages incoming responses. These systems work in concert to verify responses by performing a local selector traversal that is backed by network response data.
The RequestManager tracks outgoing requests and processes incoming responses. When a request is initiated, the RequestManager begins tracking it, and places into the a TaskQueue. When the request becomes the highest priority task, it is executed by a request executor that runs a selector traversal backed by both our local store and the responses to a Graphsync network request.

When a request is initiated, the RequestManager sends the request across the network, begins tracking it, initiates a query to verify the responses from the network, and initiates a thread to ultimately collect and return verified data to the caller. Most of these processes are straightforward, but verifying the response is not simple. We need to execute an IPLD selector query backed by network data, which will arrive asynchronously and potentially out of order.
When a request begins execution, we load blocks from local storage until we encounter a block we don't have. Then we initiate a Graphsync network request to backfill the blocks we don't yet have. While this request is in progress, we wait for network responses to come in, then feed these blocks into the selector traversal. Blocks that loaded by the selector traversal are considered "verified" and get saved to local storage. Periodically, we will have to fall back to the local store to load a block we previously stored from the remote because remotes generally don't load blocks twice. Once the remote finishes its responses, we assume any remaining block loads must come from local storage or error.

To accomplish this, we delegate to the AsyncLoader, which is responsible for ingesting responses from the network, and providing them to the local IPLD selector traversal operation as it loads links.
To manage this complicated process, the RequestManager creates a ReconciledLoader for each request.

The AsyncLoader needs to take in blocks and response metadata from the network, put that data in an in-memory cache until either:
### Reconciled Loader

- The local traversal actually requests that data, which validates the block, and therefore tells the async loader it can move to permanent storage.
- The local traversal completes, at which point, any blocks that haven't been verified can be dropped, cause they've been shown not to be the correct responses.
The ReconciledLoader loads blocks by inteleaving two data sources: local storage, and incoming responses from the network. It also verifies that incoming remote responses match the order of links loaded by a locally executing selector.

The AsyncLoader has to also manage requests for link-loads coming in form the local traversal, which may come in before OR after the relevant blocks are received from the network. Moreover, if a block is already in local storage for whatever reason, there's no need to hold up the local traversal on a network response.
Each ReconciledLoader contains a TraversalRecord and a Verifier. The TraversalRecord is a compact representation of an ordered history of links loaded in a traversal, along with their paths. The Verifier is a class for replaying a traversal history against a series of responses from a remote. It also handles missing links the local may have had the remote does not.

The following process outlines the basic process for loading links asynchronously from the network:
The ReconciledLoader also contains a remote queue of metadata and blocks for a given request.

![Async Loading Process](async-loading.png)
The loading process is as follows:

The main components that make up the AsyncLoader are:
- when we request a link, we wait for new items from the remote
- if we're offline, we return immediately
- if we're online, we wait until we have new items to read
- when we first go online, we initiate a "verification" against previous loaded data -- this comes from the traversal record
- this means when we first go online, we actually don't have any "new" remote items until the verification finishes. if the verification fails, we actually pass as an error in response to the load request -- this is a bit odd, but keep in mind the traverser that calls BlockReadOpener will simply throw this in the graphsync error channel and close the request. I really didn't want to deal with having the channel inside the reconciled loaded and getting drawn request closing process.
- if we have new remote items, and we're not loading from a path we know the remote is missing, we attempt to load the link from the first remote item, erroring if the link we're loading and the remote do no match
- if we're on a path the remote is missing, we simply go right too loading local
- if the remote sent the block, it's directly in the remote item queue and we save and return it.
- if the remote said it had the block but it was a duplicate, we load local
if the remote said its missing the block, we record the new missing path, then load local
- Once we load, we actually hold till the next load before putting the link in the traversal record. This enables a "retry" to be run when local load fails but then the node goes online

* The UnverifiedBlockStore -- basically just a temporary cache to hold blocks, and write them to permanent storage as they are verified
* The ResponseCache -- essentially just the UnverifiedBlockStore + some link tracking, so that the BlockStore is properly pruned as requests expire
* The LoadAttemptQueue -- Basically a queue to track attempts to load links. A link load can have one of three results --
- It can load successfully,
- It can load and knowingly fail (i.e responder tells you it is for sure missing a link)
- It can be indeterminate -- you don't yet have the block in the response cache or in local storage, but the responder is still sending responses and hasn't indicated it doesn't have the block.
Injesting data is handled by the ResponseManager and is very straightforward -- we add divide up responses and blocks by request ID and then queue responses into the respective queues for the ReconciledLoader. Coordination is simply handled via mutexes and sync.Cond for signalling.

The load attempt queue essentially attempts the load then if the response is 3, puts the load attempt in a paused queue awaiting more responses (note if the responder eventually finishes without ever sending a response for that link, the request will error at the point the last response is sent)

With the AsyncLoader handling the complicated parts of loading data from the network, the RequestManager simply manages the overall process.
The following diagram demonstrates step by step how request execution works:
![Executing A Request](request-execution.png)

## Responder Implementation

Expand All @@ -143,26 +146,23 @@ In addition, an optimized responder implementation accounts for the following co

* *Preserve Bandwith* - Be efficient with network usage, deduplicate data, and buffer response output so that each new network message contains all response data we have at the time the pipe becomes free.

The responder implementation is managed by the Response Manager. The ResponseManager delegates to PeerTaskQueue to rate limit the number of in progress selector traversals and ensure no one peer is given more priority than others. As data is generated from selector traversals, the ResponseManager uses the ResponseAssembler to aggregate response data for each peer and send compact messages over the network.
The responder implementation is managed by the Response Manager. The ResponseManager delegates to TaskQueue and QueryExecutor to manage selector traversals. The TaskQueue rate limits the number of in progress selector traversals and ensures no one peer is given more priority than others. As data is generated from selector traversals, the QueryExecutor uses the ResponseAssembler to aggregate and send messages to the message sending layer.

The following diagram outlines in greater detail go-graphsync's responder implementation, covering how it's initialized and how it responds to requests:
![Responding To A Request](responder-sequence.png)

Here are some key components in this implementation:

### PeerTaskQueue - Preventing DOS Attacks

Rather than responding to incoming requests immediately, the ResponseManager places each incoming request in a prioritized queue.

The queue here is a generalized implementation of the PeerRequestQueue in Bitswap (called the PeerTaskQueue). The PeerTaskQueue first balances peers so that those with the most current in progress requests are prioritized after those with fewer in progress requests, and then within a peer prioritizes the requests with the highest priority or earliest received.
### TaskQueue - Preventing DOS Attacks

Meanwhile, the ResponseManager starts a fixed number of workers (currently 6), each of which continually pulls the highest priority job off the queue, processes the traversal and sends the response. So at any given time, only a fixed number of selector queries are executing on the node.
Rather than responding to incoming requests immediately, the ResponseManager places each incoming request in a TaskQueue. When the request becomes the highest priority task, it is executed by the Query Executor that runs a selector
traversal and sends responses.

The net here is that no peer can have more than a fixed number of requests in progress at once, and even if a peer sends infinite requests, other peers will still jump ahead of it and get a chance to process their requests.
The TaskQueue that no peer can have more than a fixed number of requests in progress at once, and even if a peer sends infinite requests, other peers will still jump ahead of it and get a chance to process their requests.

### ResponseAssembler -- Deduping blocks and data

Once a request is dequeued, we generate an intercepted loader and provide it to go-ipld-prime to execute a traversal. Each call to the loader will generate a block that we either have or don't. We need to transmit that information across the network. However, that information needs to be encoded in the GraphSync message format, and combined with any other responses we may be sent to the same peer at the same time, ideally without sending blocks more times than necessary.
Once a request is dequeued, we initiate a selector traversal. Each time the traversal reaches a block boundary, it will request to load the block from a loader function we provide. Our loader function will use local storage to determine if we have this block or not, but we also need to transmit that information across the network. That information needs to be encoded in the GraphSync message format, and combined with any other responses we may be sent to the same peer at the same time, ideally without sending blocks more times than necessary.

These tasks are managed by the ResponseAssembler. The ResponseAssembber creates a LinkTracker for each peer to track what blocks have been sent. Responses are sent by calling Transaction on the ResponseAssembler, which provides a ResponseBuilder interface that can be used to assemble responses. Transaction is named as such because all data added to a response by calling methods on the provided ResponseBuilder is gauranteed to go out in the name network message.

Expand All @@ -180,6 +180,12 @@ go-graphsync provides a variety of points in the request/response lifecycle wher

Graphsync also provides listeners that enable a caller to be notified when various asynchronous events happen in the request response lifecycle. Currently graphsync contains an internal pubsub notification system (see [notifications](../notifications)) to escalate low level asynchonous events back to high level modules that pass them to external listeners. A future refactor might look for a way to remove this notification system as it adds additional complexity.

### TaskQueue

In both the requestor and responder implementations, Graphsync utilizes a general prioritized task manager called a TaskQueue. The TaskQueue internally uses the same PeerTaskQueue utilized by Bitswap, that balances peers so that those with the most current in progress requests are prioritized after those with fewer in progress requests, and then within a peer prioritizes the requests with the highest priority or earliest received.

The task queue is initialized with a task executor that can execute a single task. Once initialized task queue starts a fixed number of workers (default: 6), each of which continually pulls the highest priority job off the queue, and executes it with task executor before going to the next task.

### Actor Pattern In RequestManager And ResponseManager

To manage concurrency in a predictable way, the RequestManager and the ResponseManager are informally implemented using the [Actor model](https://en.wikipedia.org/wiki/Actor_model) employed in distributed systems languages like Erlang.
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ require (
github.com/ipfs/go-merkledag v0.5.1
github.com/ipfs/go-peertaskqueue v0.7.1
github.com/ipfs/go-unixfs v0.3.1
github.com/ipfs/go-unixfsnode v1.2.0
github.com/ipfs/go-unixfsnode v1.2.1-0.20220307162924-9214f706a1fb
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.14.5-0.20220204050122-679d74376a0d
github.com/ipld/go-ipld-prime v0.14.5-0.20220307141356-1da040f1c1a5
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p v0.16.0
Expand Down
12 changes: 8 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVB
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
github.com/frankban/quicktest v1.14.1 h1:7j+0Tuzrdj5XLLmXnI8qipQ31hf5nlUW3LPgT+O9aT8=
github.com/frankban/quicktest v1.14.1/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
github.com/frankban/quicktest v1.14.2 h1:SPb1KFFmM+ybpEjPUhCCkZOM5xlovT5UbrMvWnXyBns=
github.com/frankban/quicktest v1.14.2/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
Expand Down Expand Up @@ -446,6 +446,10 @@ github.com/ipfs/go-unixfs v0.3.1 h1:LrfED0OGfG98ZEegO4/xiprx2O+yS+krCMQSp7zLVv8=
github.com/ipfs/go-unixfs v0.3.1/go.mod h1:h4qfQYzghiIc8ZNFKiLMFWOTzrWIAtzYQ59W/pCFf1o=
github.com/ipfs/go-unixfsnode v1.2.0 h1:tHHBJftsJyHGa8bS62PpkYNqHy/Sug3c/vxxC8NaGQY=
github.com/ipfs/go-unixfsnode v1.2.0/go.mod h1:mQEgLjxkV/1mohkC4p7taRRBYPBeXu97SA3YaerT2q0=
github.com/ipfs/go-unixfsnode v1.2.1-0.20220303111128-94986a76305a h1:ijRTD8spPp/tdcwojaATPNKhlgM5RH38hRUCFcK5gxs=
github.com/ipfs/go-unixfsnode v1.2.1-0.20220303111128-94986a76305a/go.mod h1:mQEgLjxkV/1mohkC4p7taRRBYPBeXu97SA3YaerT2q0=
github.com/ipfs/go-unixfsnode v1.2.1-0.20220307162924-9214f706a1fb h1:kgxJk26D9/Wj6b3cRA8Jt4VnRw2vx5l2wCcjA8eN4eU=
github.com/ipfs/go-unixfsnode v1.2.1-0.20220307162924-9214f706a1fb/go.mod h1:mQEgLjxkV/1mohkC4p7taRRBYPBeXu97SA3YaerT2q0=
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
github.com/ipld/go-car/v2 v2.1.1 h1:saaKz4nC0AdfCGHLYKeXLGn8ivoPC54fyS55uyOLKwA=
Expand All @@ -456,8 +460,8 @@ github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db/go.mod h1:KvB
github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8=
github.com/ipld/go-ipld-prime v0.14.0/go.mod h1:9ASQLwUFLptCov6lIYc70GRB4V7UTyLD0IJtrDJe6ZM=
github.com/ipld/go-ipld-prime v0.14.4/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0=
github.com/ipld/go-ipld-prime v0.14.5-0.20220204050122-679d74376a0d h1:HMvFmQbipEXniV3cRdqnkrsvAlKYMjEPbvvKN3mWsDE=
github.com/ipld/go-ipld-prime v0.14.5-0.20220204050122-679d74376a0d/go.mod h1:f5ls+uUY8Slf1NN6YUOeEyYe3TA/J02Rn7zw1NQTeSk=
github.com/ipld/go-ipld-prime v0.14.5-0.20220307141356-1da040f1c1a5 h1:EGZsASPSyOq+OwyY6box6dVEMvKLut+MxdyENGKNvjo=
github.com/ipld/go-ipld-prime v0.14.5-0.20220307141356-1da040f1c1a5/go.mod h1:axSCuOCBPqrH+gvXr2w9uAOulJqBPhHPT2PjoiiU1qA=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
Expand Down
Binary file added impl/__debug_bin
Binary file not shown.
Loading