Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"crates/net/rpc",
"crates/net/rpc-api",
"crates/net/rpc-types",
"crates/net/headers-downloaders",
"crates/primitives",
"crates/stages",
"crates/transaction-pool",
Expand Down
5 changes: 5 additions & 0 deletions crates/interfaces/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@ serde = { version = "1.0.*", default-features = false }
postcard = { version = "1.0.2", features = ["alloc"] }
heapless = "0.7.16"
parity-scale-codec = { version = "3.2.1", features = ["bytes"] }
futures = "0.3.25"
tokio-stream = "0.1.11"
rand = "0.8.5"

[dev-dependencies]
test-fuzz = "3.0.4"
tokio = { version = "1.21.2", features = ["full"] }
tokio-stream = { version = "0.1.11", features = ["sync"] }

[features]
bench = []
test-utils = ["tokio-stream/sync"]
7 changes: 3 additions & 4 deletions crates/interfaces/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ use tokio::sync::watch::Receiver;
/// Consensus is a protocol that chooses canonical chain.
/// We are checking validity of block header here.
#[async_trait]
pub trait Consensus {
#[auto_impl::auto_impl(&, Arc)]
pub trait Consensus: Send + Sync {
/// Get a receiver for the fork choice state
fn fork_choice_state(&self) -> Receiver<ForkchoiceState>;

/// Validate if header is correct and follows consensus specification
fn validate_header(&self, _header: &Header) -> Result<(), Error> {
Ok(())
}
fn validate_header(&self, header: &Header, parent: &Header) -> Result<(), Error>;
}

/// Consensus errors (TODO)
Expand Down
7 changes: 7 additions & 0 deletions crates/interfaces/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ pub mod db;
/// Traits that provide chain access.
pub mod provider;

/// P2P traits.
pub mod p2p;

/// Possible errors when interacting with the chain.
mod error;

pub use error::{Error, Result};

#[cfg(any(test, feature = "test-utils"))]
/// Common test helpers for mocking out Consensus, Downloaders and Header Clients.
pub mod test_utils;
57 changes: 57 additions & 0 deletions crates/interfaces/src/p2p/headers/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use crate::p2p::MessageStream;

use reth_primitives::{rpc::BlockId, Header, H256, H512};

use async_trait::async_trait;
use std::{collections::HashSet, fmt::Debug};

/// Each peer returns a list of headers and the request id corresponding
/// to these headers. This allows clients to make multiple requests in parallel
/// and multiplex the responses accordingly.
pub type HeadersStream = MessageStream<HeadersResponse>;

/// The item contained in each [`MessageStream`] when used to fetch [`Header`]s via
/// [`HeadersClient`].
#[derive(Clone, Debug)]
pub struct HeadersResponse {
/// The request id associated with this response.
pub id: u64,
/// The headers the peer replied with.
pub headers: Vec<Header>,
}

impl From<(u64, Vec<Header>)> for HeadersResponse {
fn from((id, headers): (u64, Vec<Header>)) -> Self {
HeadersResponse { id, headers }
}
}

/// The header request struct to be sent to connected peers, which
/// will proceed to ask them to stream the requested headers to us.
#[derive(Clone, Debug)]
pub struct HeadersRequest {
/// The starting block
pub start: BlockId,
/// The response max size
pub limit: u64,
/// Flag indicating whether the blocks should
/// arrive in reverse
pub reverse: bool,
}

/// The block headers downloader client
#[async_trait]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait HeadersClient: Send + Sync + Debug {
/// Update the node's Status message.
///
/// The updated Status message will be used during any new eth/65 handshakes.
async fn update_status(&self, height: u64, hash: H256, td: H256);

/// Sends the header request to the p2p network.
// TODO: What does this return?
async fn send_header_request(&self, id: u64, request: HeadersRequest) -> HashSet<H512>;

/// Stream the header response messages
async fn stream_headers(&self) -> HeadersStream;
}
131 changes: 131 additions & 0 deletions crates/interfaces/src/p2p/headers/downloader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use super::client::{HeadersClient, HeadersRequest, HeadersStream};
use crate::consensus::Consensus;

use async_trait::async_trait;
use reth_primitives::{
rpc::{BlockId, BlockNumber},
Header, HeaderLocked, H256,
};
use reth_rpc_types::engine::ForkchoiceState;
use std::{fmt::Debug, time::Duration};
use thiserror::Error;
use tokio_stream::StreamExt;

/// The downloader error type
#[derive(Error, Debug, Clone)]
pub enum DownloadError {
/// Header validation failed
#[error("Failed to validate header {hash}. Details: {details}.")]
HeaderValidation {
/// Hash of header failing validation
hash: H256,
/// The details of validation failure
details: String,
},
/// No headers reponse received
#[error("Failed to get headers for request {request_id}.")]
NoHeaderResponse {
/// The last request ID
request_id: u64,
},
/// Timed out while waiting for request id response.
#[error("Timed out while getting headers for request {request_id}.")]
Timeout {
/// The request id that timed out
request_id: u64,
},
/// Error when checking that the current [`Header`] has the parent's hash as the parent_hash
/// field, and that they have sequential block numbers.
#[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")]
MismatchedHeaders {
/// The header number being evaluated
header_number: BlockNumber,
/// The header hash being evaluated
header_hash: H256,
/// The parent number being evaluated
parent_number: BlockNumber,
/// The parent hash being evaluated
parent_hash: H256,
},
}

impl DownloadError {
/// Returns bool indicating whether this error is retryable or fatal, in the cases
/// where the peer responds with no headers, or times out.
pub fn is_retryable(&self) -> bool {
matches!(self, DownloadError::NoHeaderResponse { .. } | DownloadError::Timeout { .. })
}
}

/// The header downloading strategy
#[async_trait]
pub trait Downloader: Sync + Send {
/// The Consensus used to verify block validity when
/// downloading
type Consensus: Consensus;

/// The Client used to download the headers
type Client: HeadersClient;

/// The request timeout duration
fn timeout(&self) -> Duration;

/// The consensus engine
fn consensus(&self) -> &Self::Consensus;

/// The headers client
fn client(&self) -> &Self::Client;

/// Download the headers
async fn download(
&self,
head: &HeaderLocked,
forkchoice: &ForkchoiceState,
) -> Result<Vec<HeaderLocked>, DownloadError>;

/// Perform a header request and returns the headers.
// TODO: Isn't this effectively blocking per request per downloader?
// Might be fine, given we can spawn multiple downloaders?
// TODO: Rethink this function, I don't really like the `stream: &mut HeadersStream`
// in the signature. Why can we not call `self.client.stream_headers()`? Gives lifetime error.
async fn download_headers(
&self,
stream: &mut HeadersStream,
start: BlockId,
limit: u64,
) -> Result<Vec<Header>, DownloadError> {
let request_id = rand::random();
let request = HeadersRequest { start, limit, reverse: true };
let _ = self.client().send_header_request(request_id, request).await;

// Filter stream by request id and non empty headers content
let stream = stream
.filter(|resp| request_id == resp.id && !resp.headers.is_empty())
.timeout(self.timeout());

// Pop the first item.
match Box::pin(stream).try_next().await {
Ok(Some(item)) => Ok(item.headers),
_ => return Err(DownloadError::NoHeaderResponse { request_id }),
}
}

/// Validate whether the header is valid in relation to it's parent
///
/// Returns Ok(false) if the
fn validate(&self, header: &HeaderLocked, parent: &HeaderLocked) -> Result<(), DownloadError> {
if !(parent.hash() == header.parent_hash && parent.number + 1 == header.number) {
return Err(DownloadError::MismatchedHeaders {
header_number: header.number.into(),
parent_number: parent.number.into(),
header_hash: header.hash(),
parent_hash: parent.hash(),
})
}

self.consensus().validate_header(header, parent).map_err(|e| {
DownloadError::HeaderValidation { hash: parent.hash(), details: e.to_string() }
})?;
Ok(())
}
}
11 changes: 11 additions & 0 deletions crates/interfaces/src/p2p/headers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/// Trait definition for [`HeadersClient`]
///
/// [`HeadersClient`]: client::HeadersClient
pub mod client;

/// A downloader that receives and verifies block headers, is generic
/// over the Consensus and the HeadersClient being used.
///
/// [`Consensus`]: crate::consensus::Consensus
/// [`HeadersClient`]: client::HeadersClient
pub mod downloader;
13 changes: 13 additions & 0 deletions crates/interfaces/src/p2p/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/// Traits for implementing P2P Header Clients. Also includes implementations
/// of a Linear and a Parallel downloader generic over the [`Consensus`] and
/// [`HeadersClient`].
///
/// [`Consensus`]: crate::consensus::Consensus
/// [`HeadersClient`]: crate::p2p::headers::HeadersClient
pub mod headers;

use futures::Stream;
use std::pin::Pin;

/// The stream of responses from the connected peers, generic over the response type.
pub type MessageStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
Loading