diff --git a/Cargo.lock b/Cargo.lock index ea9be80aa02..ee64320b2cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -317,11 +317,13 @@ dependencies = [ "alloy-json-rpc", "alloy-network", "alloy-primitives", + "alloy-pubsub", "alloy-rpc-client", "alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=cc68b93)", "alloy-rpc-types-trace", "alloy-transport", "alloy-transport-http", + "alloy-transport-ws", "async-stream", "async-trait", "auto_impl", @@ -338,6 +340,24 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-pubsub" +version = "0.1.0" +source = "git+https://github.com/alloy-rs/alloy?rev=cc68b93#cc68b93605f4521c2b0bce1a7efaeff2046cf07c" +dependencies = [ + "alloy-json-rpc", + "alloy-primitives", + "alloy-transport", + "bimap", + "futures", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tower", + "tracing", +] + [[package]] name = "alloy-rlp" version = "0.3.5" @@ -366,8 +386,11 @@ version = "0.1.0" source = "git+https://github.com/alloy-rs/alloy?rev=cc68b93#cc68b93605f4521c2b0bce1a7efaeff2046cf07c" dependencies = [ "alloy-json-rpc", + "alloy-primitives", + "alloy-pubsub", "alloy-transport", "alloy-transport-http", + "alloy-transport-ws", "futures", "pin-project", "reqwest 0.12.4", @@ -627,6 +650,22 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-transport-ws" +version = "0.1.0" +source = "git+https://github.com/alloy-rs/alloy?rev=cc68b93#cc68b93605f4521c2b0bce1a7efaeff2046cf07c" +dependencies = [ + "alloy-pubsub", + "alloy-transport", + "futures", + "http 0.2.12", + "serde_json", + "tokio", + "tokio-tungstenite", + "tracing", + "ws_stream_wasm", +] + [[package]] name = "alloy-trie" version = "0.4.1" @@ -986,6 +1025,17 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "async_io_stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" +dependencies = [ + "futures", + "pharos", + "rustc_version 0.4.0", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1127,6 +1177,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bincode" version = "1.3.3" @@ -3124,7 +3180,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" dependencies = [ "gloo-timers", - "send_wrapper", + "send_wrapper 0.4.0", ] [[package]] @@ -4147,7 +4203,7 @@ dependencies = [ "tokio-util", "tracing", "url", - "webpki-roots", + "webpki-roots 0.26.2", ] [[package]] @@ -5413,6 +5469,16 @@ dependencies = [ "wyhash", ] +[[package]] +name = "pharos" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" +dependencies = [ + "futures", + "rustc_version 0.4.0", +] + [[package]] name = "phf" version = "0.11.2" @@ -6139,7 +6205,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", + "webpki-roots 0.26.2", "winreg 0.52.0", ] @@ -6446,6 +6512,28 @@ dependencies = [ "reth-storage-api", ] +[[package]] +name = "reth-consensus-debug-client" +version = "0.2.0-beta.9" +dependencies = [ + "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=cc68b93)", + "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=cc68b93)", + "alloy-provider", + "auto_impl", + "eyre", + "futures", + "reqwest 0.12.4", + "reth-node-api", + "reth-node-core", + "reth-rpc-api", + "reth-rpc-builder", + "reth-rpc-types", + "reth-tracing", + "ringbuffer", + "serde", + "tokio", +] + [[package]] name = "reth-db" version = "0.2.0-beta.9" @@ -7231,6 +7319,7 @@ dependencies = [ "reth-blockchain-tree", "reth-config", "reth-consensus", + "reth-consensus-debug-client", "reth-db", "reth-db-api", "reth-db-common", @@ -8271,6 +8360,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ringbuffer" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3df6368f71f205ff9c33c076d170dd56ebf68e8161c733c0caa07a7a5509ed53" + [[package]] name = "ripemd" version = "0.1.3" @@ -8739,6 +8834,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" +[[package]] +name = "send_wrapper" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" + [[package]] name = "serde" version = "1.0.203" @@ -9643,6 +9744,21 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +dependencies = [ + "futures-util", + "log", + "rustls 0.21.12", + "tokio", + "tokio-rustls 0.24.1", + "tungstenite", + "webpki-roots 0.25.4", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -9962,6 +10078,26 @@ dependencies = [ "toml", ] +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 0.2.12", + "httparse", + "log", + "rand 0.8.5", + "rustls 0.21.12", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "txpool-tracing" version = "0.0.0" @@ -10101,6 +10237,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf16_iter" version = "1.0.5" @@ -10301,6 +10443,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" + [[package]] name = "webpki-roots" version = "0.26.2" @@ -10620,6 +10768,25 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +[[package]] +name = "ws_stream_wasm" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7999f5f4217fe3818726b66257a4475f71e74ffd190776ad053fa159e50737f5" +dependencies = [ + "async_io_stream", + "futures", + "js-sys", + "log", + "pharos", + "rustc_version 0.4.0", + "send_wrapper 0.6.0", + "thiserror", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wyhash" version = "0.5.0" diff --git a/Cargo.toml b/Cargo.toml index e1f799de704..6ebb09c2835 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,8 @@ members = [ "crates/consensus/beacon/", "crates/consensus/common/", "crates/consensus/consensus/", + "crates/consensus/debug-client/", + "crates/ethereum-forks/", "crates/e2e-test-utils/", "crates/engine-primitives/", "crates/errors/", @@ -246,6 +248,7 @@ reth-codecs-derive = { path = "crates/storage/codecs/derive" } reth-config = { path = "crates/config" } reth-consensus = { path = "crates/consensus/consensus" } reth-consensus-common = { path = "crates/consensus/common" } +reth-consensus-debug-client = { path = "crates/consensus/debug-client" } reth-db = { path = "crates/storage/db", default-features = false } reth-db-api = { path = "crates/storage/db-api" } reth-db-common = { path = "crates/storage/db-common" } diff --git a/book/cli/reth/node.md b/book/cli/reth/node.md index 65fcd824d48..cd07d0692f5 100644 --- a/book/cli/reth/node.md +++ b/book/cli/reth/node.md @@ -463,6 +463,12 @@ Debug: --debug.max-block Runs the sync only up to the specified block + --debug.etherscan [] + Runs a fake consensus client that advances the chain using recent block hashes on Etherscan. If specified, requires an `ETHERSCAN_API_KEY` environment variable + + --debug.rpc-consensus-ws + Runs a fake consensus client using blocks fetched from an RPC `WebSocket` endpoint + --debug.skip-fcu If provided, the engine will skip `n` consecutive FCUs diff --git a/crates/consensus/debug-client/Cargo.toml b/crates/consensus/debug-client/Cargo.toml new file mode 100644 index 00000000000..bb31de5bebe --- /dev/null +++ b/crates/consensus/debug-client/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "reth-consensus-debug-client" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dependencies] +# reth +reth-node-api.workspace = true +reth-node-core.workspace = true +reth-rpc-api.workspace = true +reth-rpc-types.workspace = true +reth-rpc-builder.workspace = true +reth-tracing.workspace = true + +# ethereum +alloy-consensus = { workspace = true, features = ["serde"] } +alloy-eips.workspace = true +alloy-provider = { workspace = true, features = ["ws"] } + +auto_impl.workspace = true +futures.workspace = true +eyre.workspace = true +reqwest = { workspace = true, features = ["rustls-tls", "json"] } +serde = { workspace = true, features = ["derive"] } +tokio = { workspace = true, features = ["time"] } + +ringbuffer = "0.15.0" diff --git a/crates/consensus/debug-client/src/client.rs b/crates/consensus/debug-client/src/client.rs new file mode 100644 index 00000000000..d627b630106 --- /dev/null +++ b/crates/consensus/debug-client/src/client.rs @@ -0,0 +1,224 @@ +use alloy_consensus::TxEnvelope; +use alloy_eips::eip2718::Encodable2718; +use reth_node_api::EngineTypes; +use reth_node_core::{ + primitives::B256, + rpc::types::{BlockTransactions, ExecutionPayloadV2, ExecutionPayloadV3, RichBlock}, +}; +use reth_rpc_builder::auth::AuthServerHandle; +use reth_rpc_types::ExecutionPayloadV1; +use reth_tracing::tracing::warn; +use ringbuffer::{AllocRingBuffer, RingBuffer}; +use std::future::Future; +use tokio::sync::mpsc; + +/// Supplies consensus client with new blocks sent in `tx` and a callback to find specific blocks +/// by number to fetch past finalized and safe blocks. +#[auto_impl::auto_impl(&, Arc, Box)] +pub trait BlockProvider: Send + Sync + 'static { + /// Runs a block provider to send new blocks to the given sender. + /// + /// Note: This is expected to be spawned in a separate task. + fn subscribe_blocks(&self, tx: mpsc::Sender) -> impl Future + Send; + + /// Get a past block by number. + fn get_block(&self, block_number: u64) -> impl Future> + Send; + + /// Get previous block hash using previous block hash buffer. If it isn't available (buffer + /// started more recently than `offset`), fetch it using `get_block`. + fn get_or_fetch_previous_block( + &self, + previous_block_hashes: &AllocRingBuffer, + current_block_number: u64, + offset: usize, + ) -> impl Future> + Send { + async move { + let stored_hash = previous_block_hashes + .len() + .checked_sub(offset) + .and_then(|index| previous_block_hashes.get(index)); + if let Some(hash) = stored_hash { + return Ok(*hash); + } + + // Return zero hash if the chain isn't long enough to have the block at the offset. + let previous_block_number = match current_block_number.checked_sub(offset as u64) { + Some(number) => number, + None => return Ok(B256::default()), + }; + let block = self.get_block(previous_block_number).await?; + block.header.hash.ok_or_else(|| eyre::eyre!("previous block does not have hash")) + } + } +} + +/// Debug consensus client that sends FCUs and new payloads using recent blocks from an external +/// provider like Etherscan or an RPC endpoint. +#[derive(Debug)] +pub struct DebugConsensusClient { + /// Handle to execution client. + auth_server: AuthServerHandle, + /// Provider to get consensus blocks from. + block_provider: P, +} + +impl DebugConsensusClient

{ + /// Create a new debug consensus client with the given handle to execution + /// client and block provider. + pub const fn new(auth_server: AuthServerHandle, block_provider: P) -> Self { + Self { auth_server, block_provider } + } +} + +impl DebugConsensusClient

{ + /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent + /// blocks. + pub async fn run(self) { + let execution_client = self.auth_server.http_client(); + let mut previous_block_hashes = AllocRingBuffer::new(64); + + let mut block_stream = { + let (tx, rx) = mpsc::channel::(64); + let block_provider = self.block_provider.clone(); + tokio::spawn(async move { + block_provider.subscribe_blocks(tx).await; + }); + rx + }; + + while let Some(block) = block_stream.recv().await { + let payload = rich_block_to_execution_payload_v3(block); + + let block_hash = payload.block_hash(); + let block_number = payload.block_number(); + + // Send new events to execution client + reth_rpc_api::EngineApiClient::::new_payload_v3( + &execution_client, + payload.execution_payload_v3, + payload.versioned_hashes, + payload.parent_beacon_block_root, + ) + .await + .unwrap(); + + previous_block_hashes.push(block_hash); + + // Load previous block hashes. We're using (head - 32) and (head - 64) as the safe and + // finalized block hashes. + let safe_block_hash = self.block_provider.get_or_fetch_previous_block( + &previous_block_hashes, + block_number, + 32, + ); + let finalized_block_hash = self.block_provider.get_or_fetch_previous_block( + &previous_block_hashes, + block_number, + 64, + ); + let (safe_block_hash, finalized_block_hash) = + tokio::join!(safe_block_hash, finalized_block_hash); + let (safe_block_hash, finalized_block_hash) = match ( + safe_block_hash, + finalized_block_hash, + ) { + (Ok(safe_block_hash), Ok(finalized_block_hash)) => { + (safe_block_hash, finalized_block_hash) + } + (safe_block_hash, finalized_block_hash) => { + warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan"); + continue; + } + }; + reth_rpc_api::EngineApiClient::::fork_choice_updated_v3( + &execution_client, + reth_rpc_types::engine::ForkchoiceState { + head_block_hash: block_hash, + safe_block_hash, + finalized_block_hash, + }, + None, + ) + .await + .unwrap(); + } + } +} + +/// Cancun "new payload" event. +#[derive(Debug)] +struct ExecutionNewPayload { + execution_payload_v3: ExecutionPayloadV3, + versioned_hashes: Vec, + parent_beacon_block_root: B256, +} + +impl ExecutionNewPayload { + /// Get block hash from block in the payload + const fn block_hash(&self) -> B256 { + self.execution_payload_v3.payload_inner.payload_inner.block_hash + } + + /// Get block number from block in the payload + const fn block_number(&self) -> u64 { + self.execution_payload_v3.payload_inner.payload_inner.block_number + } +} + +/// Convert a rich block from RPC / Etherscan to params for an execution client's "new payload" +/// method. Assumes that the block contains full transactions. +fn rich_block_to_execution_payload_v3(block: RichBlock) -> ExecutionNewPayload { + let transactions = match &block.transactions { + BlockTransactions::Full(txs) => txs.clone(), + // Empty array gets deserialized as BlockTransactions::Hashes. + BlockTransactions::Hashes(txs) if txs.is_empty() => vec![], + BlockTransactions::Hashes(_) | BlockTransactions::Uncle => { + panic!("Received uncle block or hash-only transactions from Etherscan API") + } + }; + + // Concatenate all blob hashes from all transactions in order + // https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#specification + let versioned_hashes = transactions + .iter() + .flat_map(|tx| tx.blob_versioned_hashes.clone().unwrap_or_default()) + .collect(); + + let payload: ExecutionPayloadV3 = ExecutionPayloadV3 { + payload_inner: ExecutionPayloadV2 { + payload_inner: ExecutionPayloadV1 { + parent_hash: block.header.parent_hash, + fee_recipient: block.header.miner, + state_root: block.header.state_root, + receipts_root: block.header.receipts_root, + logs_bloom: block.header.logs_bloom, + prev_randao: block.header.mix_hash.unwrap(), + block_number: block.header.number.unwrap(), + gas_limit: block.header.gas_limit.try_into().unwrap(), + gas_used: block.header.gas_used.try_into().unwrap(), + timestamp: block.header.timestamp, + extra_data: block.header.extra_data.clone(), + base_fee_per_gas: block.header.base_fee_per_gas.unwrap().try_into().unwrap(), + block_hash: block.header.hash.unwrap(), + transactions: transactions + .into_iter() + .map(|tx| { + let envelope: TxEnvelope = tx.try_into().unwrap(); + let mut buffer: Vec = vec![]; + envelope.encode_2718(&mut buffer); + buffer.into() + }) + .collect(), + }, + withdrawals: block.withdrawals.clone().unwrap_or_default(), + }, + blob_gas_used: block.header.blob_gas_used.unwrap().try_into().unwrap(), + excess_blob_gas: block.header.excess_blob_gas.unwrap().try_into().unwrap(), + }; + + ExecutionNewPayload { + execution_payload_v3: payload, + versioned_hashes, + parent_beacon_block_root: block.header.parent_beacon_block_root.unwrap(), + } +} diff --git a/crates/consensus/debug-client/src/lib.rs b/crates/consensus/debug-client/src/lib.rs new file mode 100644 index 00000000000..bc244fafeb0 --- /dev/null +++ b/crates/consensus/debug-client/src/lib.rs @@ -0,0 +1,19 @@ +//! Debug consensus client. +//! +//! This is a worker that sends FCUs and new payloads by fetching recent blocks from an external +//! provider like Etherscan or an RPC endpoint. This allows to quickly test the execution client +//! without running a consensus node. + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", + html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", + issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" +)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +mod client; +mod providers; + +pub use client::{BlockProvider, DebugConsensusClient}; +pub use providers::{EtherscanBlockProvider, RpcBlockProvider}; diff --git a/crates/consensus/debug-client/src/providers/etherscan.rs b/crates/consensus/debug-client/src/providers/etherscan.rs new file mode 100644 index 00000000000..e189913d126 --- /dev/null +++ b/crates/consensus/debug-client/src/providers/etherscan.rs @@ -0,0 +1,105 @@ +use crate::BlockProvider; +use alloy_eips::BlockNumberOrTag; +use reqwest::Client; +use reth_node_core::rpc::types::RichBlock; +use reth_tracing::tracing::warn; +use serde::Deserialize; +use std::time::Duration; +use tokio::{sync::mpsc, time::interval}; + +/// Block provider that fetches new blocks from Etherscan API. +#[derive(Debug, Clone)] +pub struct EtherscanBlockProvider { + http_client: Client, + base_url: String, + api_key: String, + interval: Duration, +} + +impl EtherscanBlockProvider { + /// Create a new Etherscan block provider with the given base URL and API key. + pub fn new(base_url: String, api_key: String) -> Self { + Self { http_client: Client::new(), base_url, api_key, interval: Duration::from_secs(3) } + } + + /// Sets the interval at which the provider fetches new blocks. + pub const fn with_interval(mut self, interval: Duration) -> Self { + self.interval = interval; + self + } +} + +impl BlockProvider for EtherscanBlockProvider { + async fn subscribe_blocks(&self, tx: mpsc::Sender) { + let mut last_block_number: Option = None; + let mut interval = interval(self.interval); + loop { + interval.tick().await; + let block = match load_etherscan_block( + &self.http_client, + &self.base_url, + &self.api_key, + BlockNumberOrTag::Latest, + ) + .await + { + Ok(block) => block, + Err(err) => { + warn!(target: "consensus::debug-client", %err, "failed to fetch a block from Etherscan"); + continue + } + }; + let block_number = block.header.number.unwrap(); + if Some(block_number) == last_block_number { + continue; + } + + if tx.send(block).await.is_err() { + // channel closed + break; + } + + last_block_number = Some(block_number); + } + } + + async fn get_block(&self, block_number: u64) -> eyre::Result { + load_etherscan_block( + &self.http_client, + &self.base_url, + &self.api_key, + BlockNumberOrTag::Number(block_number), + ) + .await + } +} + +#[derive(Deserialize, Debug)] +struct EtherscanBlockResponse { + result: RichBlock, +} + +/// Load block using Etherscan API. Note: only `BlockNumberOrTag::Latest`, +/// `BlockNumberOrTag::Earliest`, `BlockNumberOrTag::Pending`, `BlockNumberOrTag::Number(u64)` are +/// supported. +async fn load_etherscan_block( + http_client: &Client, + base_url: &str, + api_key: &str, + block_number_or_tag: BlockNumberOrTag, +) -> eyre::Result { + let block: EtherscanBlockResponse = http_client + .get(base_url) + .query(&[ + ("module", "proxy"), + ("action", "eth_getBlockByNumber"), + ("tag", &block_number_or_tag.to_string()), + ("boolean", "true"), + ("apikey", api_key), + ]) + .send() + .await? + .json() + .await?; + Ok(block.result) +} diff --git a/crates/consensus/debug-client/src/providers/mod.rs b/crates/consensus/debug-client/src/providers/mod.rs new file mode 100644 index 00000000000..3099350f27f --- /dev/null +++ b/crates/consensus/debug-client/src/providers/mod.rs @@ -0,0 +1,5 @@ +mod etherscan; +mod rpc; + +pub use etherscan::EtherscanBlockProvider; +pub use rpc::RpcBlockProvider; diff --git a/crates/consensus/debug-client/src/providers/rpc.rs b/crates/consensus/debug-client/src/providers/rpc.rs new file mode 100644 index 00000000000..e706090ed3c --- /dev/null +++ b/crates/consensus/debug-client/src/providers/rpc.rs @@ -0,0 +1,58 @@ +use crate::BlockProvider; +use alloy_eips::BlockNumberOrTag; +use alloy_provider::{Provider, ProviderBuilder}; +use futures::StreamExt; +use reth_node_core::rpc::types::RichBlock; +use tokio::sync::mpsc::Sender; + +/// Block provider that fetches new blocks from an RPC endpoint using a websocket connection. +#[derive(Debug, Clone)] +pub struct RpcBlockProvider { + ws_rpc_url: String, +} + +impl RpcBlockProvider { + /// Create a new RPC block provider with the given WS RPC URL. + pub const fn new(ws_rpc_url: String) -> Self { + Self { ws_rpc_url } + } +} + +impl BlockProvider for RpcBlockProvider { + async fn subscribe_blocks(&self, tx: Sender) { + let ws_provider = ProviderBuilder::new() + .on_builtin(&self.ws_rpc_url) + .await + .expect("failed to create WS provider"); + let mut stream = ws_provider + .subscribe_blocks() + .await + .expect("failed to subscribe on new blocks") + .into_stream(); + + while let Some(block) = stream.next().await { + let full_block = ws_provider + .get_block_by_hash(block.header.hash.unwrap(), true) + .await + .expect("failed to get block") + .expect("block not found"); + if tx.send(full_block.into()).await.is_err() { + // channel closed + break; + } + } + } + + async fn get_block(&self, block_number: u64) -> eyre::Result { + let ws_provider = ProviderBuilder::new() + .on_builtin(&self.ws_rpc_url) + .await + .expect("failed to create WS provider"); + let block: RichBlock = ws_provider + .get_block_by_number(BlockNumberOrTag::Number(block_number), true) + .await? + .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))? + .into(); + Ok(block) + } +} diff --git a/crates/node-core/src/args/debug.rs b/crates/node-core/src/args/debug.rs index d1c4e9b7385..b132eb3a611 100644 --- a/crates/node-core/src/args/debug.rs +++ b/crates/node-core/src/args/debug.rs @@ -28,6 +28,26 @@ pub struct DebugArgs { #[arg(long = "debug.max-block", help_heading = "Debug")] pub max_block: Option, + /// Runs a fake consensus client that advances the chain using recent block hashes + /// on Etherscan. If specified, requires an `ETHERSCAN_API_KEY` environment variable. + #[arg( + long = "debug.etherscan", + help_heading = "Debug", + conflicts_with = "tip", + conflicts_with = "rpc_consensus_ws", + value_name = "ETHERSCAN_API_URL" + )] + pub etherscan: Option>, + + /// Runs a fake consensus client using blocks fetched from an RPC `WebSocket` endpoint. + #[arg( + long = "debug.rpc-consensus-ws", + help_heading = "Debug", + conflicts_with = "tip", + conflicts_with = "etherscan" + )] + pub rpc_consensus_ws: Option, + /// If provided, the engine will skip `n` consecutive FCUs. #[arg(long = "debug.skip-fcu", help_heading = "Debug")] pub skip_fcu: Option, diff --git a/crates/node/builder/Cargo.toml b/crates/node/builder/Cargo.toml index 9b8bacdfcf8..d2e7c805279 100644 --- a/crates/node/builder/Cargo.toml +++ b/crates/node/builder/Cargo.toml @@ -41,7 +41,9 @@ reth-config.workspace = true reth-downloaders.workspace = true reth-node-events.workspace = true reth-consensus.workspace = true +reth-consensus-debug-client.workspace = true reth-rpc-types.workspace = true + ## async futures.workspace = true tokio = { workspace = true, features = [ diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 77d50a32be7..cd9971ad780 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -18,6 +18,7 @@ use reth_blockchain_tree::{ TreeExternals, }; use reth_consensus::Consensus; +use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider}; use reth_exex::ExExManagerHandle; use reth_network::NetworkEvents; use reth_node_api::{FullNodeComponents, FullNodeTypes}; @@ -397,6 +398,48 @@ where let _ = tx.send(res); }); + if let Some(maybe_custom_etherscan_url) = ctx.node_config().debug.etherscan.clone() { + info!(target: "reth::cli", "Using etherscan as consensus client"); + + let chain = ctx.node_config().chain.chain; + let etherscan_url = maybe_custom_etherscan_url.map(Ok).unwrap_or_else(|| { + // If URL isn't provided, use default Etherscan URL for the chain if it is known + chain + .etherscan_urls() + .map(|urls| urls.0.to_string()) + .ok_or_else(|| eyre::eyre!("failed to get etherscan url for chain: {chain}")) + })?; + + let block_provider = EtherscanBlockProvider::new( + etherscan_url, + chain.etherscan_api_key().ok_or_else(|| { + eyre::eyre!( + "etherscan api key not found for rpc consensus client for chain: {chain}" + ) + })?, + ); + let rpc_consensus_client = DebugConsensusClient::new( + rpc_server_handles.auth.clone(), + Arc::new(block_provider), + ); + ctx.task_executor().spawn_critical("etherscan consensus client", async move { + rpc_consensus_client.run::().await + }); + } + + if let Some(rpc_ws_url) = ctx.node_config().debug.rpc_consensus_ws.clone() { + info!(target: "reth::cli", "Using rpc provider as consensus client"); + + let block_provider = RpcBlockProvider::new(rpc_ws_url); + let rpc_consensus_client = DebugConsensusClient::new( + rpc_server_handles.auth.clone(), + Arc::new(block_provider), + ); + ctx.task_executor().spawn_critical("rpc consensus client", async move { + rpc_consensus_client.run::().await + }); + } + let full_node = FullNode { evm_config: node_adapter.components.evm_config().clone(), block_executor: node_adapter.components.block_executor().clone(),