Skip to content

Commit

Permalink
test: more body stage tests
Browse files Browse the repository at this point in the history
  • Loading branch information
onbjerg committed Nov 14, 2022
1 parent 74fd799 commit 32728a9
Show file tree
Hide file tree
Showing 10 changed files with 471 additions and 85 deletions.
6 changes: 3 additions & 3 deletions crates/interfaces/src/p2p/bodies/downloader.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::client::BodiesClient;
use crate::p2p::bodies::error::DownloadError;
use reth_eth_wire::BlockBody;
use reth_primitives::H256;
use reth_primitives::{BlockNumber, H256};
use std::{pin::Pin, time::Duration};
use tokio_stream::Stream;

Expand Down Expand Up @@ -34,11 +34,11 @@ pub trait BodyDownloader: Sync + Send {
/// the stream before the entire range has been fetched for any reason
fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> BodiesStream<'a>
where
I: IntoIterator<Item = H256>,
I: IntoIterator<Item = &'b (BlockNumber, H256)>,
<I as IntoIterator>::IntoIter: Send + 'b,
'b: 'a;
}

/// A stream of block bodies.
pub type BodiesStream<'a> =
Pin<Box<dyn Stream<Item = Result<(H256, BlockBody), DownloadError>> + Send + 'a>>;
Pin<Box<dyn Stream<Item = Result<(BlockNumber, H256, BlockBody), DownloadError>> + Send + 'a>>;
7 changes: 4 additions & 3 deletions crates/interfaces/src/p2p/bodies/error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::p2p::error::RequestError;
use reth_primitives::H256;
use thiserror::Error;

/// Body client errors.
#[derive(Error, Debug)]
#[derive(Error, Debug, Clone)]
pub enum BodiesClientError {
/// Timed out while waiting for a response.
#[error("Timed out while getting bodies for block {header_hash}.")]
Expand All @@ -12,11 +13,11 @@ pub enum BodiesClientError {
},
/// The client encountered an internal error.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
Internal(#[from] RequestError),
}

/// Body downloader errors.
#[derive(Error, Debug)]
#[derive(Error, Debug, Clone)]
pub enum DownloadError {
/// Timed out while waiting for a response.
#[error("Timed out while getting bodies for block {header_hash}.")]
Expand Down
2 changes: 1 addition & 1 deletion crates/interfaces/src/p2p/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio::sync::{mpsc, oneshot};
pub type RequestResult<T> = Result<T, RequestError>;

/// Error variants that can happen when sending requests to a session.
#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, Clone)]
#[allow(missing_docs)]
pub enum RequestError {
#[error("Closed channel to the peer.")]
Expand Down
2 changes: 1 addition & 1 deletion crates/interfaces/src/test_utils/generators.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use rand::{thread_rng, Rng};
use reth_primitives::{
proofs, Address, BlockLocked, Bytes, ChainId, Header, SealedHeader, Signature, Transaction,
proofs, Address, BlockLocked, Bytes, Header, SealedHeader, Signature, Transaction,
TransactionKind, TransactionSigned, H256, U256,
};

Expand Down
10 changes: 5 additions & 5 deletions crates/net/bodies-downloaders/src/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use reth_interfaces::p2p::bodies::{
downloader::{BodiesStream, BodyDownloader},
error::{BodiesClientError, DownloadError},
};
use reth_primitives::H256;
use reth_primitives::{BlockNumber, H256};
use std::{sync::Arc, time::Duration};

/// Downloads bodies in batches.
Expand Down Expand Up @@ -37,17 +37,17 @@ impl<C: BodiesClient> BodyDownloader for ConcurrentDownloader<C> {

fn bodies_stream<'a, 'b, I>(&'a self, headers: I) -> BodiesStream<'a>
where
I: IntoIterator<Item = H256>,
I: IntoIterator<Item = &'b (BlockNumber, H256)>,
<I as IntoIterator>::IntoIter: Send + 'b,
'b: 'a,
{
// TODO: Retry
Box::pin(
stream::iter(headers.into_iter().map(|header_hash| {
stream::iter(headers.into_iter().map(|(block_number, header_hash)| {
{
self.client
.get_block_body(header_hash)
.map_ok(move |body| (header_hash, body))
.get_block_body(*header_hash)
.map_ok(move |body| (*block_number, *header_hash, body))
.map_err(|err| match err {
BodiesClientError::Timeout { header_hash } => {
DownloadError::Timeout { header_hash }
Expand Down
30 changes: 28 additions & 2 deletions crates/primitives/src/header.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{BlockHash, BlockNumber, Bloom, H160, H256, U256};
use crate::{
proofs::{EMPTY_LIST_HASH, EMPTY_ROOT},
BlockHash, BlockNumber, Bloom, H160, H256, U256,
};
use bytes::{BufMut, BytesMut};
use ethers_core::{types::H64, utils::keccak256};
use reth_codecs::main_codec;
Expand All @@ -7,7 +10,7 @@ use std::ops::Deref;

/// Block header
#[main_codec]
#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Header {
/// The Keccak 256-bit hash of the parent
/// block’s header, in its entirety; formally Hp.
Expand Down Expand Up @@ -64,6 +67,29 @@ pub struct Header {
pub base_fee_per_gas: Option<u64>,
}

impl Default for Header {
fn default() -> Self {
Header {
parent_hash: Default::default(),
ommers_hash: EMPTY_LIST_HASH,
beneficiary: Default::default(),
state_root: EMPTY_ROOT,
transactions_root: EMPTY_ROOT,
receipts_root: EMPTY_ROOT,
logs_bloom: Default::default(),
difficulty: Default::default(),
number: 0,
gas_limit: 0,
gas_used: 0,
timestamp: 0,
extra_data: Default::default(),
mix_hash: Default::default(),
nonce: 0,
base_fee_per_gas: None,
}
}
}

impl Header {
/// Heavy function that will calculate hash of data and will *not* save the change to metadata.
/// Use [`Header::seal`], [`SealedHeader`] and unlock if you need hash to be persistent.
Expand Down
9 changes: 9 additions & 0 deletions crates/primitives/src/proofs.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
use crate::{keccak256, Bytes, Header, Log, Receipt, TransactionSigned, H256};
use ethers_core::utils::rlp::RlpStream;
use hash_db::Hasher;
use hex_literal::hex;
use plain_hasher::PlainHasher;
use reth_rlp::Encodable;
use triehash::sec_trie_root;

/// Keccak-256 hash of the RLP of an empty list, KEC("\xc0").
pub const EMPTY_LIST_HASH: H256 =
H256(hex!("1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347"));

/// Root hash of an empty trie.
pub const EMPTY_ROOT: H256 =
H256(hex!("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"));

/// A [Hasher] that calculates a keccak256 hash of the given data.
#[derive(Default, Debug, Clone, PartialEq, Eq)]
struct KeccakHasher;
Expand Down
6 changes: 6 additions & 0 deletions crates/stages/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ pub enum DatabaseIntegrityError {
/// The block hash key
hash: H256,
},
/// A block body is missing.
#[error("Block body not found for block #{number}")]
BlockBody {
/// The block number key
number: BlockNumber,
},
}

/// A pipeline execution error.
Expand Down

0 comments on commit 32728a9

Please sign in to comment.