Skip to content

Commit

Permalink
test: more body stage tests
Browse files Browse the repository at this point in the history
  • Loading branch information
onbjerg committed Nov 14, 2022
1 parent 0e0e9cd commit e38e193
Showing 1 changed file with 33 additions and 8 deletions.
41 changes: 33 additions & 8 deletions crates/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,20 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let tx = db.get_mut();
let starting_block = input.stage_progress.unwrap_or_default() + 1;

let previous_stage_progress =
input.previous_stage.as_ref().map(|(_, block)| *block).unwrap_or_default();
if previous_stage_progress == 0 {
warn!("The body stage seems to be running first, no work can be completed.");
}

let target = previous_stage_progress.min(starting_block + self.batch_size);
// The block we ended at last sync, and the one we are starting on now
let previous_block = input.stage_progress.unwrap_or_default();
let starting_block = previous_block + 1;

// Short circuit in case we already reached the target block
if target <= starting_block {
let target = previous_stage_progress.min(starting_block + self.batch_size);
if target <= previous_block {
return Ok(ExecOutput { stage_progress: target, reached_tip: true, done: true })
}

Expand All @@ -103,11 +106,10 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
// Cursor used to look up headers for block pre-validation
let mut header_cursor = tx.cursor::<tables::Headers>()?;

let mut highest_block = starting_block;

// NOTE(onbjerg): The stream needs to live here otherwise it will just create a new iterator
// on every iteration of the while loop -_-
let mut bodies_stream = self.downloader.bodies_stream(bodies_to_download.iter());
let mut highest_block = previous_block;
while let Some((block_number, header_hash, body)) =
bodies_stream.try_next().await.map_err(|err| StageError::Internal(err.into()))?
{
Expand Down Expand Up @@ -234,7 +236,9 @@ mod tests {
use assert_matches::assert_matches;
use reth_eth_wire::BlockBody;
use reth_interfaces::{
consensus, p2p::bodies::error::DownloadError, test_utils::generators::random_block_range,
consensus,
p2p::bodies::error::DownloadError,
test_utils::generators::{random_block, random_block_range},
};
use reth_primitives::{BlockNumber, H256};
use std::collections::HashMap;
Expand Down Expand Up @@ -554,8 +558,29 @@ mod tests {
/// TODO: We should probably just exit as "OK", commit the blocks we downloaded successfully and
/// try again?
#[tokio::test]
#[ignore]
async fn downloader_timeout() {}
async fn downloader_timeout() {
// Generate a header
let header = random_block(1, Some(GENESIS_HASH)).header;
let runner = BodyTestRunner::new(|| {
TestBodyDownloader::new(HashMap::from([(
header.hash(),
Err(DownloadError::Timeout { header_hash: header.hash() }),
)]))
});

// Insert required state
runner.insert_genesis().expect("Could not insert genesis block");
runner.insert_header(&header).expect("Could not insert header");

// Run the stage
let rx = runner.execute(ExecInput {
previous_stage: Some((StageId("Headers"), 1)),
stage_progress: None,
});

// Check that the error bubbles up
assert_matches!(rx.await.unwrap(), Err(StageError::Internal(_)));
}

mod test_utils {
use crate::{
Expand Down

0 comments on commit e38e193

Please sign in to comment.