New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: bodies stage #190
feat: bodies stage #190
Conversation
6817d5b
to
83c5c0a
Compare
Codecov Report
@@ Coverage Diff @@
## main #190 +/- ##
==========================================
+ Coverage 69.80% 70.10% +0.29%
==========================================
Files 200 209 +9
Lines 16538 17375 +837
==========================================
+ Hits 11544 12180 +636
- Misses 4994 5195 +201
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
for num in 0..body.tx_amount { | ||
let tx_id = body.base_tx_id + num; | ||
if transaction_cursor.seek_exact(tx_id)?.is_some() { | ||
transaction_cursor.delete_current()?; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use unwind_table_by_num
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't since tx
is already immutably borrowed by block_body_cursor
fn stream_bodies<'a, 'b, I>(&'a self, headers: I) -> BodiesStream<'a> | ||
where | ||
I: IntoIterator<Item = H256>, | ||
<I as IntoIterator>::IntoIter: Send + 'b, | ||
'b: 'a, | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the lifetime won't be necessary if you clone and move the client, but only useful depending on how it's called, so perhaps borrowing is sufficient here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried cloning and moving the client but it didn't work :S
Edit: See the error here, maybe I am missing something
error: captured variable cannot escape `FnMut` closure body
--> crates/net/bodies-downloaders/src/concurrent.rs:48:21
|
44 | let client = Arc::clone(&self.client);
| ------ variable defined here
45 | Box::pin(
46 | stream::iter(headers.into_iter().map(move |header_hash| {
| - inferred to be a `FnMut` closure
47 | {
48 | client
| ^-----
| |
| _____________________variable captured here
| |
49 | | .get_block_body(header_hash)
50 | | .map_ok(move |body| (header_hash, body))
51 | | .map_err(|err| match err {
... |
55 | | err => DownloadError::Client { source: err },
56 | | })
| |__________________________^ returns a reference to a captured variable which escapes the closure body
|
= note: `FnMut` closures only have access to their captured variables while they are executing...
= note: ...therefore, they cannot allow references to captured variables to escape
}) | ||
} | ||
})) | ||
.buffered(self.batch_size), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not the batch size for GetBodies
, instead, this is the number of concurrently active requests.
There should be different settings for both.
the batch_size is size for the headers chunks to divide via Vec::chunks(batch_size)
, and for buffured
we need a separate value that limits the number of request we send.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unclear to me what the actual difference is, my assumption is that if we request multiple bodies in one GetBodies
call they are downloaded concurrently but there is no order guarantee. The reason I only request one per call is because the order needs to be the same as the request order, otherwise we may skip bodies in the database
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's no guarantee, only blocks that were found are returned.
So perhaps, if the BodiesStream
needs to return them in order it probably needs to keep track of the next header to return and buffer the rest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what this is doing, just without requesting multiple bodies per call. Is there any tangible benefit to requesting multiple per call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this we should test, can then tune
#[auto_impl::auto_impl(&, Arc, Box)] | ||
pub trait BodiesClient: Send + Sync + Debug { | ||
/// Fetches the block body for the requested block. | ||
async fn get_block_body(&self, hash: H256) -> Result<BlockBody, BodiesClientError>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while it is possible to send a request for single block.
the GetBlockBodies
request takes a Vec<H256>
to fetch multiple bodies with one request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an order guarantee? If not, see my above comment
/// | ||
/// It is *not* guaranteed that all the requested bodies are fetched: the downloader may close | ||
/// the stream before the entire range has been fetched for any reason | ||
fn stream_bodies<'a, 'b, I>(&'a self, headers: I) -> BodiesStream<'a> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is okay for now I think but I think we need to roll a custom impl of a buffered Stream that handles retries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason to roll our own? Adding retries can be done in this loop w/o issue I think, we just need a generalized retry abstraction for futures
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.rs/futures-retry/latest/futures_retry/ have used this succesfully in the past
- https://github.com/interledger-rs/interledger-rs/blob/e15ae42d5ce80e45ace8c7bebd5875372b89dcfb/crates/interledger-settlement/src/core/settlement_client.rs#L37-L40
- https://github.com/interledger-rs/interledger-rs/blob/e15ae42d5ce80e45ace8c7bebd5875372b89dcfb/crates/interledger-settlement/src/core/settlement_client.rs#L157-L196
ac0b98c
to
ebcb176
Compare
143cc63
to
32728a9
Compare
Pulling it out of draft so we can get some of the other tweaks I have stacked in this PR merged. Will continue to iterate on it (adding tests for the downloader, addressing some of the todos I left in the code etc) if it gets merged |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
}) | ||
} | ||
})) | ||
.buffered(self.batch_size), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this we should test, can then tune
Merging and building on top |
Almost complete bodies stage, a few hanging todos + tests left. Also includes some minor touchups to documentation when I stumbled upon it.
The stage uses a
Downloader
like the headers stage, but this downloader is implemented using a stream. An iterator of block hashes for blocks we want to download is handed over to the downloader and the downloader will fetch the blocks. The blocks are fetched somewhat concurrently but emitted by the stream in order - so, if we are fetching 10 blocks (1-10) and block 3 finishes downloading first, then block 2 and then block 1, the order of emitted values from the stream will still be 1, 2, 3. This simplifies some of the stage logicThere are a few types we should probably consider consolidating wrt block bodies, I've marked them with todos for now
Edit: I've also moved the
proofs
module from the consensus crate to the primitives crate since the proofs are used by multiple modules. Some of the verification functions in the consensus crate that only operate on blocks/transactions/headers should probably also just live directly on those types in the primitives module, but I'll leave that for later