From 826722ff63ad76eb873b3eb6b06409924b47994c Mon Sep 17 00:00:00 2001 From: Seva Zhidkov Date: Fri, 3 May 2024 23:28:23 +0100 Subject: [PATCH 01/21] feat: --debug.etherscan for fake consensus client --- Cargo.lock | 118 ++++++++++++++++ Cargo.toml | 2 + crates/consensus/rpc/Cargo.toml | 25 ++++ crates/consensus/rpc/src/lib.rs | 194 ++++++++++++++++++++++++++ crates/node-core/src/args/debug.rs | 5 + crates/node/builder/Cargo.toml | 1 + crates/node/builder/src/launch/mod.rs | 19 +++ 7 files changed, 364 insertions(+) create mode 100644 crates/consensus/rpc/Cargo.toml create mode 100644 crates/consensus/rpc/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 1828b2a2faef..221d644f6fc5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3097,6 +3097,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -3782,6 +3797,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.3.1", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.3" @@ -5153,6 +5184,24 @@ dependencies = [ "unsigned-varint 0.7.2", ] +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "network" version = "0.0.0" @@ -5410,12 +5459,50 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "openssl" +version = "0.10.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" +dependencies = [ + "bitflags 2.5.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -6318,11 +6405,13 @@ dependencies = [ "http-body-util", "hyper 1.3.1", "hyper-rustls 0.26.0", + "hyper-tls", "hyper-util", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -6335,6 +6424,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", + "tokio-native-tls", "tokio-rustls 0.25.0", "tower-service", "url", @@ -6628,6 +6718,23 @@ dependencies = [ "reth-provider", ] +[[package]] +name = "reth-consensus-rpc" +version = "0.2.0-beta.6" +dependencies = [ + "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=af788af)", + "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=af788af)", + "jsonrpsee", + "reqwest 0.12.4", + "reth-node-api", + "reth-node-core", + "reth-rpc-api", + "reth-rpc-builder", + "reth-rpc-types", + "serde", + "tokio", +] + [[package]] name = "reth-db" version = "0.2.0-beta.6" @@ -7265,6 +7372,7 @@ dependencies = [ "reth-blockchain-tree", "reth-config", "reth-consensus", + "reth-consensus-rpc", "reth-db", "reth-downloaders", "reth-evm", @@ -9477,6 +9585,16 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" diff --git a/Cargo.toml b/Cargo.toml index 0aca2afbbaaa..f3414c2c2046 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "crates/consensus/beacon-core/", "crates/consensus/common/", "crates/consensus/consensus/", + "crates/consensus/rpc/", "crates/ethereum-forks/", "crates/e2e-test-utils/", "crates/etl/", @@ -217,6 +218,7 @@ reth-codecs = { path = "crates/storage/codecs" } reth-config = { path = "crates/config" } reth-consensus = { path = "crates/consensus/consensus" } reth-consensus-common = { path = "crates/consensus/common" } +reth-consensus-rpc = { path = "crates/consensus/rpc" } reth-db = { path = "crates/storage/db" } reth-discv4 = { path = "crates/net/discv4" } reth-discv5 = { path = "crates/net/discv5" } diff --git a/crates/consensus/rpc/Cargo.toml b/crates/consensus/rpc/Cargo.toml new file mode 100644 index 000000000000..5b7818c3d34d --- /dev/null +++ b/crates/consensus/rpc/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "reth-consensus-rpc" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dependencies] +reth-node-api.workspace = true +reth-node-core.workspace = true +reth-rpc-api.workspace = true +reth-rpc-types.workspace = true +reth-rpc-builder.workspace = true + +alloy-consensus = { workspace = true, features = ["serde"] } +alloy-eips = { workspace = true } +jsonrpsee.workspace = true +reqwest = { workspace = true, features = ["default-tls", "json"] } +serde = { workspace = true, features = ["derive"] } +tokio = { workspace = true } diff --git a/crates/consensus/rpc/src/lib.rs b/crates/consensus/rpc/src/lib.rs new file mode 100644 index 000000000000..6f109620b655 --- /dev/null +++ b/crates/consensus/rpc/src/lib.rs @@ -0,0 +1,194 @@ +/// TODO: Crate description + +use alloy_consensus::TxEnvelope; +use alloy_eips::eip2718::Encodable2718; +use reth_node_api::EngineTypes; +use reth_node_core::{ + primitives::{B256}, + rpc::types::{BlockTransactions, ExecutionPayloadV2, ExecutionPayloadV3, RichBlock}, +}; +use reth_rpc_builder::auth::AuthServerHandle; +use reth_rpc_types::ExecutionPayloadV1; +use serde::Deserialize; +use std::time::Duration; +use tokio::time::sleep; + +/// Fake consensus client that sends FCUs and new payloads using recent blocks from an external +/// provider, starting with Etherscan. +/// TODO: Naming - maybe "ExternalConsensusClient"? Or maybe simpler and just +/// "EtherscanConsensusClient"? +#[derive(Debug)] +pub struct RpcConsensusClient { + /// HTTP client to fetch blocks + http_client: reqwest::Client, + /// Handle to execution client + auth_server: AuthServerHandle, + /// Etherscan API key + etherscan_api_key: String, + /// Etherscan base API URL + etherscan_base_api_url: String, +} + +impl RpcConsensusClient { + /// Create a new fake consensus client that should sent FCUs and new payloads to `auth_server`. + pub fn new( + auth_server: AuthServerHandle, + etherscan_api_key: String, + etherscan_base_api_url: String, + ) -> Self { + Self { + http_client: reqwest::Client::new(), + auth_server, + etherscan_api_key, + etherscan_base_api_url, + } + } + + /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent blocks. + pub async fn spawn(&self) { + // TODO: Add logs + // TODO: Generalize over block-fetching code to support different sources + + let execution_client = self.auth_server.http_client(); + let mut last_block_number: Option = None; + + loop { + let block: EtherscanBlockResponse = self + .http_client + .get(&self.etherscan_base_api_url) + .query(&[ + ("module", "proxy"), + ("action", "eth_getBlockByNumber"), + ("tag", "latest"), + ("boolean", "true"), + ("apikey", &self.etherscan_api_key), + ]) + .send() + .await + .unwrap() + .json() + .await + .unwrap(); + + // Sleep if no new block is available + if block.result.header.number == last_block_number { + // TODO: Allow to configure this + sleep(Duration::from_secs(3)).await; + continue; + } + + let payload = rich_block_to_execution_payload_v3(block.result); + + let block_hash = payload.block_hash(); + let block_number = payload.block_number(); + + // Send new events to execution client + reth_rpc_api::EngineApiClient::::new_payload_v3( + &execution_client, + payload.execution_payload_v3, + payload.versioned_hashes, + payload.parent_beacon_block_root, + ) + .await + .unwrap(); + reth_rpc_api::EngineApiClient::::fork_choice_updated_v3( + &execution_client, + reth_rpc_types::engine::ForkchoiceState { + head_block_hash: block_hash, + safe_block_hash: block_hash, + finalized_block_hash: block_hash, + }, + None, + ) + .await + .unwrap(); + + last_block_number = Some(block_number); + + // TODO: Allow to configure this + sleep(Duration::from_secs(3)).await; + } + } +} + +/// Context for a Cancun "new payload" with additional metadata. +#[derive(Debug)] +struct ExecutionNewPayload { + execution_payload_v3: ExecutionPayloadV3, + versioned_hashes: Vec, + parent_beacon_block_root: B256, +} + +impl ExecutionNewPayload { + fn block_hash(&self) -> B256 { + self.execution_payload_v3.payload_inner.payload_inner.block_hash + } + + fn block_number(&self) -> u64 { + self.execution_payload_v3.payload_inner.payload_inner.block_number + } +} + +/// Convert a rich block from RPC / Etherscan to params for an execution client's "new payload" +/// method. Assumes that the block contains full transactions. +fn rich_block_to_execution_payload_v3(block: RichBlock) -> ExecutionNewPayload { + let transactions = match &block.transactions { + BlockTransactions::Full(txs) => txs.clone(), + // Empty array gets deserialized as BlockTransactions::Hashes. + BlockTransactions::Hashes(txs) if txs.is_empty() => vec![], + BlockTransactions::Hashes(_) | BlockTransactions::Uncle => { + panic!("Received uncle block or hash-only transactions from Etherscan API") + } + }; + + // Concatenate all blob hashes from all transactions in order + // https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#specification + let versioned_hashes = transactions + .iter() + .flat_map(|tx| (&tx.blob_versioned_hashes).clone().unwrap_or_default()) + .collect(); + + // TODO: Do we want to handle errors more gracefully here or this is fine? + let payload: ExecutionPayloadV3 = ExecutionPayloadV3 { + payload_inner: ExecutionPayloadV2 { + payload_inner: ExecutionPayloadV1 { + parent_hash: block.header.parent_hash, + fee_recipient: block.header.miner, + state_root: block.header.state_root, + receipts_root: block.header.receipts_root, + logs_bloom: block.header.logs_bloom, + prev_randao: block.header.mix_hash.unwrap(), + block_number: block.header.number.unwrap().try_into().unwrap(), + gas_limit: block.header.gas_limit.try_into().unwrap(), + gas_used: block.header.gas_used.try_into().unwrap(), + timestamp: block.header.timestamp.try_into().unwrap(), + extra_data: block.header.extra_data.clone(), + base_fee_per_gas: block.header.base_fee_per_gas.unwrap().try_into().unwrap(), + block_hash: block.header.hash.unwrap().into(), + transactions: transactions + .into_iter() + .map(|tx| { + let envelope: TxEnvelope = tx.try_into().unwrap(); + let mut buffer: Vec = vec![]; + envelope.encode_2718(&mut buffer); + buffer.into() + }) + .collect(), + }, + withdrawals: (&block.withdrawals).clone().unwrap(), + }, + blob_gas_used: block.header.blob_gas_used.unwrap().try_into().unwrap(), + excess_blob_gas: block.header.excess_blob_gas.unwrap().try_into().unwrap(), + }; + + ExecutionNewPayload { + execution_payload_v3: payload, + versioned_hashes, + parent_beacon_block_root: block.header.parent_beacon_block_root.unwrap(), + } +} + +#[derive(Deserialize, Debug)] +struct EtherscanBlockResponse { + result: RichBlock, +} diff --git a/crates/node-core/src/args/debug.rs b/crates/node-core/src/args/debug.rs index d1c4e9b73854..a14760bbdf6e 100644 --- a/crates/node-core/src/args/debug.rs +++ b/crates/node-core/src/args/debug.rs @@ -28,6 +28,11 @@ pub struct DebugArgs { #[arg(long = "debug.max-block", help_heading = "Debug")] pub max_block: Option, + /// Runs a fake consensus client that advances the chain using recent block hashes + /// on Etherscan. If specified, requires an `ETHERSCAN_API_KEY` environment + #[arg(long = "debug.etherscan", help_heading = "Debug", conflicts_with = "tip")] + pub etherscan: bool, + /// If provided, the engine will skip `n` consecutive FCUs. #[arg(long = "debug.skip-fcu", help_heading = "Debug")] pub skip_fcu: Option, diff --git a/crates/node/builder/Cargo.toml b/crates/node/builder/Cargo.toml index 26635e536deb..eb540ee11ecb 100644 --- a/crates/node/builder/Cargo.toml +++ b/crates/node/builder/Cargo.toml @@ -38,6 +38,7 @@ reth-config.workspace = true reth-downloaders.workspace = true reth-node-events.workspace = true reth-consensus.workspace = true +reth-consensus-rpc.workspace = true ## async futures.workspace = true diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index e8c5b2967eb5..51211f4ef258 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -40,6 +40,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; pub mod common; pub use common::LaunchContext; +use reth_consensus_rpc::RpcConsensusClient; /// A general purpose trait that launches a new node of any kind. /// @@ -443,6 +444,24 @@ where let _ = tx.send(res); }); + if ctx.node_config().debug.etherscan { + let chain = ctx.node_config().chain.chain; + let rpc_consensus_client = RpcConsensusClient::new( + rpc_server_handles.auth.clone(), + chain + .etherscan_api_key() + .expect("etherscan api key not found for rpc consensus client"), + chain + .etherscan_urls() + .expect("etherscan urls not found for rpc consensus client") + .0 + .to_owned(), + ); + ctx.task_executor().spawn_critical("rpc consensus client", async move { + rpc_consensus_client.spawn::().await + }); + } + let full_node = FullNode { evm_config: node_adapter.components.evm_config().clone(), pool: node_adapter.components.pool().clone(), From 603db3e42537468edd432ec7ab514583d0829562 Mon Sep 17 00:00:00 2001 From: Seva Zhidkov Date: Fri, 3 May 2024 23:31:37 +0100 Subject: [PATCH 02/21] add todo to handle errors gracefully --- crates/consensus/rpc/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/consensus/rpc/src/lib.rs b/crates/consensus/rpc/src/lib.rs index 6f109620b655..f7af73a91958 100644 --- a/crates/consensus/rpc/src/lib.rs +++ b/crates/consensus/rpc/src/lib.rs @@ -68,6 +68,7 @@ impl RpcConsensusClient { .unwrap() .json() .await + // TODO: Handle errors gracefully and do not stop the loop .unwrap(); // Sleep if no new block is available From 7789774aaee1d4586a7dbbe54cd7956a5c3430a2 Mon Sep 17 00:00:00 2001 From: Seva Zhidkov Date: Fri, 3 May 2024 23:34:42 +0100 Subject: [PATCH 03/21] fix clippy --- crates/consensus/rpc/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/consensus/rpc/src/lib.rs b/crates/consensus/rpc/src/lib.rs index f7af73a91958..09eec890bf64 100644 --- a/crates/consensus/rpc/src/lib.rs +++ b/crates/consensus/rpc/src/lib.rs @@ -146,7 +146,7 @@ fn rich_block_to_execution_payload_v3(block: RichBlock) -> ExecutionNewPayload { // https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#specification let versioned_hashes = transactions .iter() - .flat_map(|tx| (&tx.blob_versioned_hashes).clone().unwrap_or_default()) + .flat_map(|tx| tx.blob_versioned_hashes.clone().unwrap_or_default()) .collect(); // TODO: Do we want to handle errors more gracefully here or this is fine? @@ -159,13 +159,13 @@ fn rich_block_to_execution_payload_v3(block: RichBlock) -> ExecutionNewPayload { receipts_root: block.header.receipts_root, logs_bloom: block.header.logs_bloom, prev_randao: block.header.mix_hash.unwrap(), - block_number: block.header.number.unwrap().try_into().unwrap(), + block_number: block.header.number.unwrap(), gas_limit: block.header.gas_limit.try_into().unwrap(), gas_used: block.header.gas_used.try_into().unwrap(), - timestamp: block.header.timestamp.try_into().unwrap(), + timestamp: block.header.timestamp, extra_data: block.header.extra_data.clone(), base_fee_per_gas: block.header.base_fee_per_gas.unwrap().try_into().unwrap(), - block_hash: block.header.hash.unwrap().into(), + block_hash: block.header.hash.unwrap(), transactions: transactions .into_iter() .map(|tx| { @@ -176,7 +176,7 @@ fn rich_block_to_execution_payload_v3(block: RichBlock) -> ExecutionNewPayload { }) .collect(), }, - withdrawals: (&block.withdrawals).clone().unwrap(), + withdrawals: block.withdrawals.clone().unwrap(), }, blob_gas_used: block.header.blob_gas_used.unwrap().try_into().unwrap(), excess_blob_gas: block.header.excess_blob_gas.unwrap().try_into().unwrap(), From e11d8506354689c15e0ef282d1be07dfde05c289 Mon Sep 17 00:00:00 2001 From: Seva Zhidkov Date: Fri, 10 May 2024 14:34:03 +0100 Subject: [PATCH 04/21] add genericness over block provider --- Cargo.lock | 163 ++++++++++++++++- Cargo.toml | 4 +- .../{rpc => debug-client}/Cargo.toml | 7 +- .../src/lib.rs => debug-client/src/client.rs} | 168 ++++++++++-------- crates/consensus/debug-client/src/lib.rs | 10 ++ .../consensus/debug-client/src/providers.rs | 5 + .../debug-client/src/providers/etherscan.rs | 89 ++++++++++ .../debug-client/src/providers/rpc.rs | 61 +++++++ crates/node-core/src/args/debug.rs | 29 ++- crates/node/builder/Cargo.toml | 2 +- crates/node/builder/src/launch/mod.rs | 28 ++- 11 files changed, 469 insertions(+), 97 deletions(-) rename crates/consensus/{rpc => debug-client}/Cargo.toml (81%) rename crates/consensus/{rpc/src/lib.rs => debug-client/src/client.rs} (55%) create mode 100644 crates/consensus/debug-client/src/lib.rs create mode 100644 crates/consensus/debug-client/src/providers.rs create mode 100644 crates/consensus/debug-client/src/providers/etherscan.rs create mode 100644 crates/consensus/debug-client/src/providers/rpc.rs diff --git a/Cargo.lock b/Cargo.lock index 039adce8893c..cad55cc8cf3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -322,11 +322,13 @@ dependencies = [ "alloy-json-rpc", "alloy-network", "alloy-primitives", + "alloy-pubsub", "alloy-rpc-client", "alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=77c1240)", "alloy-rpc-types-trace", "alloy-transport", "alloy-transport-http", + "alloy-transport-ws", "async-stream", "async-trait", "auto_impl", @@ -341,6 +343,24 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-pubsub" +version = "0.1.0" +source = "git+https://github.com/alloy-rs/alloy?rev=77c1240#77c1240533b411ed0eb5533f94396eba8d7f6ab6" +dependencies = [ + "alloy-json-rpc", + "alloy-primitives", + "alloy-transport", + "bimap", + "futures", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tower", + "tracing", +] + [[package]] name = "alloy-rlp" version = "0.3.4" @@ -369,8 +389,11 @@ version = "0.1.0" source = "git+https://github.com/alloy-rs/alloy?rev=77c1240#77c1240533b411ed0eb5533f94396eba8d7f6ab6" dependencies = [ "alloy-json-rpc", + "alloy-primitives", + "alloy-pubsub", "alloy-transport", "alloy-transport-http", + "alloy-transport-ws", "futures", "pin-project", "reqwest 0.12.4", @@ -617,6 +640,22 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-transport-ws" +version = "0.1.0" +source = "git+https://github.com/alloy-rs/alloy?rev=77c1240#77c1240533b411ed0eb5533f94396eba8d7f6ab6" +dependencies = [ + "alloy-pubsub", + "alloy-transport", + "futures", + "http 0.2.12", + "serde_json", + "tokio", + "tokio-tungstenite", + "tracing", + "ws_stream_wasm", +] + [[package]] name = "alloy-trie" version = "0.3.1" @@ -988,6 +1027,17 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "async_io_stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" +dependencies = [ + "futures", + "pharos", + "rustc_version 0.4.0", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1124,6 +1174,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bincode" version = "1.3.3" @@ -3295,7 +3351,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" dependencies = [ "gloo-timers", - "send_wrapper", + "send_wrapper 0.4.0", ] [[package]] @@ -4387,7 +4443,7 @@ dependencies = [ "tokio-util", "tracing", "url", - "webpki-roots", + "webpki-roots 0.26.1", ] [[package]] @@ -5436,7 +5492,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "681030a937600a36906c185595136d26abfebb4aa9c65701cefcaf8578bb982b" dependencies = [ - "proc-macro-crate 2.0.0", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.60", @@ -5713,6 +5769,16 @@ dependencies = [ "wyhash", ] +[[package]] +name = "pharos" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" +dependencies = [ + "futures", + "rustc_version 0.4.0", +] + [[package]] name = "phf" version = "0.11.2" @@ -6464,7 +6530,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", + "webpki-roots 0.26.1", "winreg 0.52.0", ] @@ -6742,11 +6808,13 @@ dependencies = [ ] [[package]] -name = "reth-consensus-rpc" +name = "reth-consensus-debug-client" version = "0.2.0-beta.6" dependencies = [ - "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=af788af)", - "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=af788af)", + "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=77c1240)", + "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=77c1240)", + "alloy-provider", + "futures", "jsonrpsee", "reqwest 0.12.4", "reth-node-api", @@ -6754,6 +6822,7 @@ dependencies = [ "reth-rpc-api", "reth-rpc-builder", "reth-rpc-types", + "ringbuffer", "serde", "tokio", ] @@ -7409,7 +7478,7 @@ dependencies = [ "reth-blockchain-tree", "reth-config", "reth-consensus", - "reth-consensus-rpc", + "reth-consensus-debug-client", "reth-db", "reth-downloaders", "reth-evm", @@ -8299,6 +8368,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ringbuffer" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3df6368f71f205ff9c33c076d170dd56ebf68e8161c733c0caa07a7a5509ed53" + [[package]] name = "ripemd" version = "0.1.3" @@ -8748,6 +8823,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" +[[package]] +name = "send_wrapper" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" + [[package]] name = "serde" version = "1.0.200" @@ -9659,6 +9740,21 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +dependencies = [ + "futures-util", + "log", + "rustls 0.21.12", + "tokio", + "tokio-rustls 0.24.1", + "tungstenite", + "webpki-roots 0.25.4", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -10036,6 +10132,26 @@ dependencies = [ "toml", ] +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 0.2.12", + "httparse", + "log", + "rand 0.8.5", + "rustls 0.21.12", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "txpool-tracing" version = "0.0.0" @@ -10165,6 +10281,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf16_iter" version = "1.0.5" @@ -10365,6 +10487,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" + [[package]] name = "webpki-roots" version = "0.26.1" @@ -10631,6 +10759,25 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dad7bb64b8ef9c0aa27b6da38b452b0ee9fd82beaf276a87dd796fb55cbae14e" +[[package]] +name = "ws_stream_wasm" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7999f5f4217fe3818726b66257a4475f71e74ffd190776ad053fa159e50737f5" +dependencies = [ + "async_io_stream", + "futures", + "js-sys", + "log", + "pharos", + "rustc_version 0.4.0", + "send_wrapper 0.6.0", + "thiserror", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wyhash" version = "0.5.0" diff --git a/Cargo.toml b/Cargo.toml index 4575d4a9350f..e820e6b554c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ members = [ "crates/ethereum/consensus/", "crates/consensus/common/", "crates/consensus/consensus/", - "crates/consensus/rpc/", + "crates/consensus/debug-client/", "crates/ethereum-forks/", "crates/e2e-test-utils/", "crates/etl/", @@ -216,7 +216,7 @@ reth-codecs = { path = "crates/storage/codecs" } reth-config = { path = "crates/config" } reth-consensus = { path = "crates/consensus/consensus" } reth-consensus-common = { path = "crates/consensus/common" } -reth-consensus-rpc = { path = "crates/consensus/rpc" } +reth-consensus-debug-client = { path = "crates/consensus/debug-client" } reth-db = { path = "crates/storage/db" } reth-discv4 = { path = "crates/net/discv4" } reth-discv5 = { path = "crates/net/discv5" } diff --git a/crates/consensus/rpc/Cargo.toml b/crates/consensus/debug-client/Cargo.toml similarity index 81% rename from crates/consensus/rpc/Cargo.toml rename to crates/consensus/debug-client/Cargo.toml index 5b7818c3d34d..be2892e98279 100644 --- a/crates/consensus/rpc/Cargo.toml +++ b/crates/consensus/debug-client/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "reth-consensus-rpc" +name = "reth-consensus-debug-client" version.workspace = true edition.workspace = true rust-version.workspace = true @@ -19,7 +19,12 @@ reth-rpc-builder.workspace = true alloy-consensus = { workspace = true, features = ["serde"] } alloy-eips = { workspace = true } +alloy-provider = { workspace = true, features = ["ws"] } + +futures.workspace = true jsonrpsee.workspace = true reqwest = { workspace = true, features = ["default-tls", "json"] } serde = { workspace = true, features = ["derive"] } tokio = { workspace = true } + +ringbuffer = "0.15.0" diff --git a/crates/consensus/rpc/src/lib.rs b/crates/consensus/debug-client/src/client.rs similarity index 55% rename from crates/consensus/rpc/src/lib.rs rename to crates/consensus/debug-client/src/client.rs index 09eec890bf64..c47039a36d6a 100644 --- a/crates/consensus/rpc/src/lib.rs +++ b/crates/consensus/debug-client/src/client.rs @@ -1,84 +1,60 @@ -/// TODO: Crate description - use alloy_consensus::TxEnvelope; use alloy_eips::eip2718::Encodable2718; use reth_node_api::EngineTypes; use reth_node_core::{ - primitives::{B256}, + primitives::B256, rpc::types::{BlockTransactions, ExecutionPayloadV2, ExecutionPayloadV3, RichBlock}, }; use reth_rpc_builder::auth::AuthServerHandle; use reth_rpc_types::ExecutionPayloadV1; -use serde::Deserialize; -use std::time::Duration; -use tokio::time::sleep; - -/// Fake consensus client that sends FCUs and new payloads using recent blocks from an external -/// provider, starting with Etherscan. -/// TODO: Naming - maybe "ExternalConsensusClient"? Or maybe simpler and just -/// "EtherscanConsensusClient"? +use ringbuffer::{AllocRingBuffer, RingBuffer}; +use std::{future::Future, sync::Arc}; +use tokio::sync::mpsc; + +/// Supplies consensus client with new blocks sent in `tx` and a callback to find specific blocks +/// by number to fetch past finalized and safe blocks. +pub trait BlockProvider: Send + Sync + 'static { + /// Spawn a block provider to send new blocks to the given sender. + fn spawn(&self, tx: mpsc::Sender) -> impl Future + Send; + /// Get a past block by number. + fn get_block(&self, block_number: u64) -> impl Future + Send; +} + +/// Debyg consensus client that sends FCUs and new payloads using recent blocks from an external +/// provider like Etherscan or an RPC endpoint. #[derive(Debug)] -pub struct RpcConsensusClient { - /// HTTP client to fetch blocks - http_client: reqwest::Client, - /// Handle to execution client +pub struct DebugConsensusClient { + /// Handle to execution client. auth_server: AuthServerHandle, - /// Etherscan API key - etherscan_api_key: String, - /// Etherscan base API URL - etherscan_base_api_url: String, + /// Provider to get consensus blocks from. + block_provider: Arc

, } -impl RpcConsensusClient { - /// Create a new fake consensus client that should sent FCUs and new payloads to `auth_server`. - pub fn new( - auth_server: AuthServerHandle, - etherscan_api_key: String, - etherscan_base_api_url: String, - ) -> Self { - Self { - http_client: reqwest::Client::new(), - auth_server, - etherscan_api_key, - etherscan_base_api_url, - } +impl DebugConsensusClient

{ + /// Create a new debug consensus client with the given handle to execution + /// client and block provider. + pub fn new(auth_server: AuthServerHandle, block_provider: Arc

) -> Self { + Self { auth_server, block_provider } } - /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent blocks. + /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent + /// blocks. pub async fn spawn(&self) { // TODO: Add logs - // TODO: Generalize over block-fetching code to support different sources - let execution_client = self.auth_server.http_client(); - let mut last_block_number: Option = None; - - loop { - let block: EtherscanBlockResponse = self - .http_client - .get(&self.etherscan_base_api_url) - .query(&[ - ("module", "proxy"), - ("action", "eth_getBlockByNumber"), - ("tag", "latest"), - ("boolean", "true"), - ("apikey", &self.etherscan_api_key), - ]) - .send() - .await - .unwrap() - .json() - .await - // TODO: Handle errors gracefully and do not stop the loop - .unwrap(); - - // Sleep if no new block is available - if block.result.header.number == last_block_number { - // TODO: Allow to configure this - sleep(Duration::from_secs(3)).await; - continue; - } - - let payload = rich_block_to_execution_payload_v3(block.result); + let mut previous_block_hashes = AllocRingBuffer::new(64); + + let mut block_stream = { + let (tx, rx) = mpsc::channel::(64); + let block_provider = self.block_provider.clone(); + tokio::spawn(async move { + block_provider.spawn(tx).await; + }); + rx + }; + + while let Some(block) = block_stream.recv().await { + let payload = rich_block_to_execution_payload_v3(block); let block_hash = payload.block_hash(); let block_number = payload.block_number(); @@ -92,27 +68,67 @@ impl RpcConsensusClient { ) .await .unwrap(); + + previous_block_hashes.push(block_hash); + + // Load previous block hashes. We're using (head - 32) and (head - 64) as the safe and + // finalized block hashes. + // todo: chdeck for off by ones in offset + let safe_block_hash = get_or_fetch_previous_block( + self.block_provider.as_ref(), + &previous_block_hashes, + block_number, + 32, + ); + let finalized_block_hash = get_or_fetch_previous_block( + self.block_provider.as_ref(), + &previous_block_hashes, + block_number, + 64, + ); + let (safe_block_hash, finalized_block_hash) = + tokio::join!(safe_block_hash, finalized_block_hash); reth_rpc_api::EngineApiClient::::fork_choice_updated_v3( &execution_client, reth_rpc_types::engine::ForkchoiceState { head_block_hash: block_hash, - safe_block_hash: block_hash, - finalized_block_hash: block_hash, + safe_block_hash, + finalized_block_hash, }, None, ) .await .unwrap(); - - last_block_number = Some(block_number); - - // TODO: Allow to configure this - sleep(Duration::from_secs(3)).await; } } } -/// Context for a Cancun "new payload" with additional metadata. +/// Get previous block hash using previous block hash buffer. If it isn't available (buffer +/// started more recently than `offset`), fetch it from block provider. +async fn get_or_fetch_previous_block( + block_provider: &P, + previous_block_hashes: &AllocRingBuffer, + current_block_number: u64, + offset: usize, +) -> B256 { + let stored_hash = previous_block_hashes + .len() + .checked_sub(offset) + .and_then(|index| previous_block_hashes.get(index)); + if let Some(hash) = stored_hash { + return *hash; + } + + // Return default hash if the chain isn't long enough to have the block at the offset. + let previous_block_number = match current_block_number.checked_sub(offset as u64) { + Some(number) => number, + None => return B256::default(), + }; + let block = block_provider.get_block(previous_block_number).await; + block.header.hash.unwrap() +} + +/// Cancun "new payload" event. #[derive(Debug)] struct ExecutionNewPayload { execution_payload_v3: ExecutionPayloadV3, @@ -121,10 +137,12 @@ struct ExecutionNewPayload { } impl ExecutionNewPayload { + /// Get block hash from block in the payload fn block_hash(&self) -> B256 { self.execution_payload_v3.payload_inner.payload_inner.block_hash } + /// Get block number from block in the payload fn block_number(&self) -> u64 { self.execution_payload_v3.payload_inner.payload_inner.block_number } @@ -149,7 +167,6 @@ fn rich_block_to_execution_payload_v3(block: RichBlock) -> ExecutionNewPayload { .flat_map(|tx| tx.blob_versioned_hashes.clone().unwrap_or_default()) .collect(); - // TODO: Do we want to handle errors more gracefully here or this is fine? let payload: ExecutionPayloadV3 = ExecutionPayloadV3 { payload_inner: ExecutionPayloadV2 { payload_inner: ExecutionPayloadV1 { @@ -188,8 +205,3 @@ fn rich_block_to_execution_payload_v3(block: RichBlock) -> ExecutionNewPayload { parent_beacon_block_root: block.header.parent_beacon_block_root.unwrap(), } } - -#[derive(Deserialize, Debug)] -struct EtherscanBlockResponse { - result: RichBlock, -} diff --git a/crates/consensus/debug-client/src/lib.rs b/crates/consensus/debug-client/src/lib.rs new file mode 100644 index 000000000000..99ec71ae02f7 --- /dev/null +++ b/crates/consensus/debug-client/src/lib.rs @@ -0,0 +1,10 @@ +//! Debug consensus client. +//! +//! This is a worker that sends FCUs and new payloads by fetching recent blocks from an external +//! provider like Etherscan or an RPC endpoint. This allows to quickly test the execution client +//! without running a consensus node. +mod client; +mod providers; + +pub use client::{BlockProvider, DebugConsensusClient}; +pub use providers::{EtherscanBlockProvider, RpcBlockProvider}; diff --git a/crates/consensus/debug-client/src/providers.rs b/crates/consensus/debug-client/src/providers.rs new file mode 100644 index 000000000000..3099350f27fd --- /dev/null +++ b/crates/consensus/debug-client/src/providers.rs @@ -0,0 +1,5 @@ +mod etherscan; +mod rpc; + +pub use etherscan::EtherscanBlockProvider; +pub use rpc::RpcBlockProvider; diff --git a/crates/consensus/debug-client/src/providers/etherscan.rs b/crates/consensus/debug-client/src/providers/etherscan.rs new file mode 100644 index 000000000000..d37170131c53 --- /dev/null +++ b/crates/consensus/debug-client/src/providers/etherscan.rs @@ -0,0 +1,89 @@ +use crate::BlockProvider; +use alloy_eips::BlockNumberOrTag; +use reqwest::Client; +use reth_node_core::rpc::types::RichBlock; +use serde::Deserialize; +use std::time::Duration; +use tokio::{sync::mpsc, time::sleep}; + +/// Block provider that fetches new blocks from Etherscan API. +#[derive(Debug)] +pub struct EtherscanBlockProvider { + http_client: Client, + base_url: String, + api_key: String, +} + +impl EtherscanBlockProvider { + /// Create a new Etherscan block provider with the given base URL and API key. + pub fn new(base_url: String, api_key: String) -> Self { + EtherscanBlockProvider { http_client: Client::new(), base_url, api_key } + } +} + +impl BlockProvider for EtherscanBlockProvider { + async fn spawn(&self, tx: mpsc::Sender) { + let mut last_block_number: Option = None; + loop { + let block = load_etherscan_block( + &self.http_client, + &self.base_url, + &self.api_key, + BlockNumberOrTag::Latest, + ) + .await; + let block_number = block.header.number.unwrap(); + if Some(block_number) == last_block_number { + // TODO: Make configurable + sleep(Duration::from_secs(3)).await; + continue; + } + + tx.send(block).await.unwrap(); + sleep(Duration::from_secs(3)).await; + last_block_number = Some(block_number); + } + } + + async fn get_block(&self, block_number: u64) -> RichBlock { + load_etherscan_block( + &self.http_client, + &self.base_url, + &self.api_key, + BlockNumberOrTag::Number(block_number), + ) + .await + } +} + +#[derive(Deserialize, Debug)] +struct EtherscanBlockResponse { + result: RichBlock, +} + +/// Load block using Etherscan API. Note: only BlockNumberOrTag::Latest, BlockNumberOrTag::Earliest, +/// BlockNumberOrTag::Pending, BlockNumberOrTag::Number(u64) are supported. +async fn load_etherscan_block( + http_client: &Client, + base_url: &str, + api_key: &str, + block_number_or_tag: BlockNumberOrTag, +) -> RichBlock { + let block: EtherscanBlockResponse = http_client + .get(base_url) + .query(&[ + ("module", "proxy"), + ("action", "eth_getBlockByNumber"), + ("tag", &block_number_or_tag.to_string()), + ("boolean", "true"), + ("apikey", api_key), + ]) + .send() + .await + .unwrap() + .json() + .await + // TODO: Handle errors gracefully and do not stop the loop + .unwrap(); + block.result +} diff --git a/crates/consensus/debug-client/src/providers/rpc.rs b/crates/consensus/debug-client/src/providers/rpc.rs new file mode 100644 index 000000000000..26212c113f2b --- /dev/null +++ b/crates/consensus/debug-client/src/providers/rpc.rs @@ -0,0 +1,61 @@ +use crate::BlockProvider; +use alloy_eips::BlockNumberOrTag; +use alloy_provider::{Provider, ProviderBuilder}; +use futures::StreamExt; +use reth_node_core::rpc::types::RichBlock; +use tokio::sync::mpsc::Sender; + +/// Block provider that fetches new blocks from an RPC endpoint using a websocket connection. +/// HTTP provider is used to fetch full blocks by hash and past blocks by number. +#[derive(Debug)] +pub struct RpcBlockProvider { + http_rpc_url: String, + ws_rpc_url: String, +} + +impl RpcBlockProvider { + /// Create a new RPC block provider with the given HTTP and WS RPC URLs. + pub fn new(http_rpc_url: String, ws_rpc_url: String) -> Self { + Self { http_rpc_url, ws_rpc_url } + } +} + +impl BlockProvider for RpcBlockProvider { + async fn spawn(&self, tx: Sender) { + let http_provider = ProviderBuilder::new() + .on_builtin(&self.http_rpc_url) + .await + .expect("failed to create HTTP provider"); + let ws_provider = ProviderBuilder::new() + .on_builtin(&self.ws_rpc_url) + .await + .expect("failed to create WS provider"); + let mut stream = ws_provider + .subscribe_blocks() + .await + .expect("failed to subscribe on new blocks") + .into_stream(); + + while let Some(block) = stream.next().await { + let full_block = http_provider + .get_block_by_hash(block.header.hash.unwrap(), true) + .await + .expect("failed to get block") + .expect("block not found"); + tx.send(full_block.into()).await.unwrap(); + } + } + + async fn get_block(&self, block_number: u64) -> RichBlock { + let http_provider = ProviderBuilder::new() + .on_builtin(&self.http_rpc_url) + .await + .expect("failed to create HTTP provider"); + http_provider + .get_block_by_number(BlockNumberOrTag::Number(block_number), true) + .await + .expect("failed to get block") + .expect("block not found") + .into() + } +} diff --git a/crates/node-core/src/args/debug.rs b/crates/node-core/src/args/debug.rs index a14760bbdf6e..f4e67a476be0 100644 --- a/crates/node-core/src/args/debug.rs +++ b/crates/node-core/src/args/debug.rs @@ -30,9 +30,36 @@ pub struct DebugArgs { /// Runs a fake consensus client that advances the chain using recent block hashes /// on Etherscan. If specified, requires an `ETHERSCAN_API_KEY` environment - #[arg(long = "debug.etherscan", help_heading = "Debug", conflicts_with = "tip")] + #[arg( + long = "debug.etherscan", + help_heading = "Debug", + conflicts_with = "tip", + conflicts_with = "rpc_consensus_http" + )] pub etherscan: bool, + /// Runs a fake consensus client using blocks from an RPC endpoint. HTTP endpoint for getting + /// past blocks. + #[arg( + long = "debug.rpc-consensus-http", + help_heading = "Debug", + conflicts_with = "tip", + conflicts_with = "etherscan", + requires = "rpc_consensus_ws" + )] + pub rpc_consensus_http: Option, + + /// Runs a fake consensus client using blocks from an RPC endpoint. WS endpoint for getting new + /// blocks. + #[arg( + long = "debug.rpc-consensus-ws", + help_heading = "Debug", + conflicts_with = "tip", + conflicts_with = "etherscan", + requires = "rpc_consensus_http" + )] + pub rpc_consensus_ws: Option, + /// If provided, the engine will skip `n` consecutive FCUs. #[arg(long = "debug.skip-fcu", help_heading = "Debug")] pub skip_fcu: Option, diff --git a/crates/node/builder/Cargo.toml b/crates/node/builder/Cargo.toml index eb540ee11ecb..74f0b74d79d9 100644 --- a/crates/node/builder/Cargo.toml +++ b/crates/node/builder/Cargo.toml @@ -38,7 +38,7 @@ reth-config.workspace = true reth-downloaders.workspace = true reth-node-events.workspace = true reth-consensus.workspace = true -reth-consensus-rpc.workspace = true +reth-consensus-debug-client.workspace = true ## async futures.workspace = true diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 13e8432a8acc..fd52e49946bc 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -40,7 +40,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; pub mod common; pub use common::LaunchContext; -use reth_consensus_rpc::RpcConsensusClient; +use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider}; /// A general purpose trait that launches a new node of any kind. /// @@ -446,16 +446,32 @@ where if ctx.node_config().debug.etherscan { let chain = ctx.node_config().chain.chain; - let rpc_consensus_client = RpcConsensusClient::new( - rpc_server_handles.auth.clone(), - chain - .etherscan_api_key() - .expect("etherscan api key not found for rpc consensus client"), + // todo: move this piping into a helperfunction + let block_provider = EtherscanBlockProvider::new( chain .etherscan_urls() .expect("etherscan urls not found for rpc consensus client") .0 .to_owned(), + chain + .etherscan_api_key() + .expect("etherscan api key not found for rpc consensus client"), + ); + let rpc_consensus_client = DebugConsensusClient::new( + rpc_server_handles.auth.clone(), + Arc::new(block_provider), + ); + ctx.task_executor().spawn_critical("etherscan consensus client", async move { + rpc_consensus_client.spawn::().await + }); + } + if let (Some(rpc_http_url), Some(rpc_ws_url)) = + (&ctx.node_config().debug.rpc_consensus_http, &ctx.node_config().debug.rpc_consensus_ws) + { + let block_provider = RpcBlockProvider::new(rpc_http_url.clone(), rpc_ws_url.clone()); + let rpc_consensus_client = DebugConsensusClient::new( + rpc_server_handles.auth.clone(), + Arc::new(block_provider), ); ctx.task_executor().spawn_critical("rpc consensus client", async move { rpc_consensus_client.spawn::().await From 9807037598a0b857ab16c667238600a8dcc74566 Mon Sep 17 00:00:00 2001 From: Seva Zhidkov Date: Fri, 10 May 2024 14:52:21 +0100 Subject: [PATCH 05/21] lock update --- Cargo.lock | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 79feb4294937..a665df8cf27b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -346,7 +346,7 @@ dependencies = [ [[package]] name = "alloy-pubsub" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=77c1240#77c1240533b411ed0eb5533f94396eba8d7f6ab6" +source = "git+https://github.com/alloy-rs/alloy?rev=899fc51#899fc51af8b5f4de6df1605ca3ffe8d8d6fa8c69" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -643,7 +643,7 @@ dependencies = [ [[package]] name = "alloy-transport-ws" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=77c1240#77c1240533b411ed0eb5533f94396eba8d7f6ab6" +source = "git+https://github.com/alloy-rs/alloy?rev=899fc51#899fc51af8b5f4de6df1605ca3ffe8d8d6fa8c69" dependencies = [ "alloy-pubsub", "alloy-transport", @@ -6800,10 +6800,10 @@ dependencies = [ [[package]] name = "reth-consensus-debug-client" -version = "0.2.0-beta.6" +version = "0.2.0-beta.7" dependencies = [ - "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=77c1240)", - "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=77c1240)", + "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=899fc51)", + "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=899fc51)", "alloy-provider", "futures", "jsonrpsee", From ef3b066cd30ab2130e3170685f69a8e4fdd75e6f Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Thu, 16 May 2024 20:50:13 -0700 Subject: [PATCH 06/21] move get_or_fetch_previous_block to associated trait method of block provider --- Cargo.lock | 10 ++-- crates/consensus/debug-client/src/client.rs | 61 ++++++++++----------- 2 files changed, 35 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ed4969da0031..fe0a1c2ba81d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -346,7 +346,7 @@ dependencies = [ [[package]] name = "alloy-pubsub" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=899fc51#899fc51af8b5f4de6df1605ca3ffe8d8d6fa8c69" +source = "git+https://github.com/alloy-rs/alloy?rev=dd7a999#dd7a999d9efe259c47a34dde046952de795a8f6a" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -643,7 +643,7 @@ dependencies = [ [[package]] name = "alloy-transport-ws" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=899fc51#899fc51af8b5f4de6df1605ca3ffe8d8d6fa8c69" +source = "git+https://github.com/alloy-rs/alloy?rev=dd7a999#dd7a999d9efe259c47a34dde046952de795a8f6a" dependencies = [ "alloy-pubsub", "alloy-transport", @@ -5568,7 +5568,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.60", + "syn 2.0.63", ] [[package]] @@ -6798,8 +6798,8 @@ dependencies = [ name = "reth-consensus-debug-client" version = "0.2.0-beta.7" dependencies = [ - "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=899fc51)", - "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=899fc51)", + "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=dd7a999)", + "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=dd7a999)", "alloy-provider", "futures", "jsonrpsee", diff --git a/crates/consensus/debug-client/src/client.rs b/crates/consensus/debug-client/src/client.rs index c47039a36d6a..6ad7533c6be9 100644 --- a/crates/consensus/debug-client/src/client.rs +++ b/crates/consensus/debug-client/src/client.rs @@ -16,8 +16,36 @@ use tokio::sync::mpsc; pub trait BlockProvider: Send + Sync + 'static { /// Spawn a block provider to send new blocks to the given sender. fn spawn(&self, tx: mpsc::Sender) -> impl Future + Send; + /// Get a past block by number. fn get_block(&self, block_number: u64) -> impl Future + Send; + + /// Get previous block hash using previous block hash buffer. If it isn't available (buffer + /// started more recently than `offset`), fetch it using `get_block`. + fn get_or_fetch_previous_block( + &self, + previous_block_hashes: &AllocRingBuffer, + current_block_number: u64, + offset: usize, + ) -> impl std::future::Future + Send { + async move { + let stored_hash = previous_block_hashes + .len() + .checked_sub(offset) + .and_then(|index| previous_block_hashes.get(index)); + if let Some(hash) = stored_hash { + return *hash; + } + + // Return zero hash if the chain isn't long enough to have the block at the offset. + let previous_block_number = match current_block_number.checked_sub(offset as u64) { + Some(number) => number, + None => return B256::default(), + }; + let block = self.get_block(previous_block_number).await; + block.header.hash.unwrap() + } + } } /// Debyg consensus client that sends FCUs and new payloads using recent blocks from an external @@ -40,7 +68,6 @@ impl DebugConsensusClient

{ /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent /// blocks. pub async fn spawn(&self) { - // TODO: Add logs let execution_client = self.auth_server.http_client(); let mut previous_block_hashes = AllocRingBuffer::new(64); @@ -73,15 +100,12 @@ impl DebugConsensusClient

{ // Load previous block hashes. We're using (head - 32) and (head - 64) as the safe and // finalized block hashes. - // todo: chdeck for off by ones in offset - let safe_block_hash = get_or_fetch_previous_block( - self.block_provider.as_ref(), + let safe_block_hash = self.block_provider.get_or_fetch_previous_block( &previous_block_hashes, block_number, 32, ); - let finalized_block_hash = get_or_fetch_previous_block( - self.block_provider.as_ref(), + let finalized_block_hash = self.block_provider.get_or_fetch_previous_block( &previous_block_hashes, block_number, 64, @@ -103,31 +127,6 @@ impl DebugConsensusClient

{ } } -/// Get previous block hash using previous block hash buffer. If it isn't available (buffer -/// started more recently than `offset`), fetch it from block provider. -async fn get_or_fetch_previous_block( - block_provider: &P, - previous_block_hashes: &AllocRingBuffer, - current_block_number: u64, - offset: usize, -) -> B256 { - let stored_hash = previous_block_hashes - .len() - .checked_sub(offset) - .and_then(|index| previous_block_hashes.get(index)); - if let Some(hash) = stored_hash { - return *hash; - } - - // Return default hash if the chain isn't long enough to have the block at the offset. - let previous_block_number = match current_block_number.checked_sub(offset as u64) { - Some(number) => number, - None => return B256::default(), - }; - let block = block_provider.get_block(previous_block_number).await; - block.header.hash.unwrap() -} - /// Cancun "new payload" event. #[derive(Debug)] struct ExecutionNewPayload { From ed22651b1e237cca1eee4d81fbc597ee61e6a167 Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Thu, 16 May 2024 21:18:32 -0700 Subject: [PATCH 07/21] more gentle error handling for etherscan --- Cargo.lock | 2 ++ crates/consensus/debug-client/Cargo.toml | 4 ++- crates/consensus/debug-client/src/client.rs | 25 ++++++++++---- .../debug-client/src/providers/etherscan.rs | 33 +++++++++++-------- .../debug-client/src/providers/rpc.rs | 12 +++---- 5 files changed, 49 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fe0a1c2ba81d..bd081c8aad96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6801,6 +6801,7 @@ dependencies = [ "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=dd7a999)", "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=dd7a999)", "alloy-provider", + "eyre", "futures", "jsonrpsee", "reqwest 0.12.4", @@ -6809,6 +6810,7 @@ dependencies = [ "reth-rpc-api", "reth-rpc-builder", "reth-rpc-types", + "reth-tracing", "ringbuffer", "serde", "tokio", diff --git a/crates/consensus/debug-client/Cargo.toml b/crates/consensus/debug-client/Cargo.toml index be2892e98279..36b29501555f 100644 --- a/crates/consensus/debug-client/Cargo.toml +++ b/crates/consensus/debug-client/Cargo.toml @@ -16,15 +16,17 @@ reth-node-core.workspace = true reth-rpc-api.workspace = true reth-rpc-types.workspace = true reth-rpc-builder.workspace = true +reth-tracing.workspace = true alloy-consensus = { workspace = true, features = ["serde"] } alloy-eips = { workspace = true } alloy-provider = { workspace = true, features = ["ws"] } futures.workspace = true +eyre.workspace = true jsonrpsee.workspace = true reqwest = { workspace = true, features = ["default-tls", "json"] } serde = { workspace = true, features = ["derive"] } -tokio = { workspace = true } +tokio = { workspace = true, features = ["time"] } ringbuffer = "0.15.0" diff --git a/crates/consensus/debug-client/src/client.rs b/crates/consensus/debug-client/src/client.rs index 6ad7533c6be9..365aa39d9236 100644 --- a/crates/consensus/debug-client/src/client.rs +++ b/crates/consensus/debug-client/src/client.rs @@ -7,6 +7,7 @@ use reth_node_core::{ }; use reth_rpc_builder::auth::AuthServerHandle; use reth_rpc_types::ExecutionPayloadV1; +use reth_tracing::tracing::warn; use ringbuffer::{AllocRingBuffer, RingBuffer}; use std::{future::Future, sync::Arc}; use tokio::sync::mpsc; @@ -18,7 +19,7 @@ pub trait BlockProvider: Send + Sync + 'static { fn spawn(&self, tx: mpsc::Sender) -> impl Future + Send; /// Get a past block by number. - fn get_block(&self, block_number: u64) -> impl Future + Send; + fn get_block(&self, block_number: u64) -> impl Future> + Send; /// Get previous block hash using previous block hash buffer. If it isn't available (buffer /// started more recently than `offset`), fetch it using `get_block`. @@ -27,23 +28,23 @@ pub trait BlockProvider: Send + Sync + 'static { previous_block_hashes: &AllocRingBuffer, current_block_number: u64, offset: usize, - ) -> impl std::future::Future + Send { + ) -> impl Future> + Send { async move { let stored_hash = previous_block_hashes .len() .checked_sub(offset) .and_then(|index| previous_block_hashes.get(index)); if let Some(hash) = stored_hash { - return *hash; + return Ok(*hash); } // Return zero hash if the chain isn't long enough to have the block at the offset. let previous_block_number = match current_block_number.checked_sub(offset as u64) { Some(number) => number, - None => return B256::default(), + None => return Ok(B256::default()), }; - let block = self.get_block(previous_block_number).await; - block.header.hash.unwrap() + let block = self.get_block(previous_block_number).await?; + Ok(block.header.hash.ok_or_else(|| eyre::eyre!("previous block does not have hash"))?) } } } @@ -112,6 +113,18 @@ impl DebugConsensusClient

{ ); let (safe_block_hash, finalized_block_hash) = tokio::join!(safe_block_hash, finalized_block_hash); + let (safe_block_hash, finalized_block_hash) = match ( + safe_block_hash, + finalized_block_hash, + ) { + (Ok(safe_block_hash), Ok(finalized_block_hash)) => { + (safe_block_hash, finalized_block_hash) + } + (safe_block_hash, finalized_block_hash) => { + warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan"); + continue; + } + }; reth_rpc_api::EngineApiClient::::fork_choice_updated_v3( &execution_client, reth_rpc_types::engine::ForkchoiceState { diff --git a/crates/consensus/debug-client/src/providers/etherscan.rs b/crates/consensus/debug-client/src/providers/etherscan.rs index d37170131c53..4bcff39b2105 100644 --- a/crates/consensus/debug-client/src/providers/etherscan.rs +++ b/crates/consensus/debug-client/src/providers/etherscan.rs @@ -2,9 +2,10 @@ use crate::BlockProvider; use alloy_eips::BlockNumberOrTag; use reqwest::Client; use reth_node_core::rpc::types::RichBlock; +use reth_tracing::tracing::warn; use serde::Deserialize; use std::time::Duration; -use tokio::{sync::mpsc, time::sleep}; +use tokio::{sync::mpsc, time::interval}; /// Block provider that fetches new blocks from Etherscan API. #[derive(Debug)] @@ -24,28 +25,35 @@ impl EtherscanBlockProvider { impl BlockProvider for EtherscanBlockProvider { async fn spawn(&self, tx: mpsc::Sender) { let mut last_block_number: Option = None; + // TODO: make interval configurable + let mut interval = interval(Duration::from_secs(3)); loop { - let block = load_etherscan_block( + interval.tick().await; + let block = match load_etherscan_block( &self.http_client, &self.base_url, &self.api_key, BlockNumberOrTag::Latest, ) - .await; + .await + { + Ok(block) => block, + Err(err) => { + warn!(target: "consensus::debug-client", %err, "failed to fetch a block from Etherscan"); + continue + } + }; let block_number = block.header.number.unwrap(); if Some(block_number) == last_block_number { - // TODO: Make configurable - sleep(Duration::from_secs(3)).await; continue; } tx.send(block).await.unwrap(); - sleep(Duration::from_secs(3)).await; last_block_number = Some(block_number); } } - async fn get_block(&self, block_number: u64) -> RichBlock { + async fn get_block(&self, block_number: u64) -> eyre::Result { load_etherscan_block( &self.http_client, &self.base_url, @@ -68,7 +76,7 @@ async fn load_etherscan_block( base_url: &str, api_key: &str, block_number_or_tag: BlockNumberOrTag, -) -> RichBlock { +) -> eyre::Result { let block: EtherscanBlockResponse = http_client .get(base_url) .query(&[ @@ -79,11 +87,8 @@ async fn load_etherscan_block( ("apikey", api_key), ]) .send() - .await - .unwrap() + .await? .json() - .await - // TODO: Handle errors gracefully and do not stop the loop - .unwrap(); - block.result + .await?; + Ok(block.result) } diff --git a/crates/consensus/debug-client/src/providers/rpc.rs b/crates/consensus/debug-client/src/providers/rpc.rs index 26212c113f2b..93aa902afd56 100644 --- a/crates/consensus/debug-client/src/providers/rpc.rs +++ b/crates/consensus/debug-client/src/providers/rpc.rs @@ -46,16 +46,16 @@ impl BlockProvider for RpcBlockProvider { } } - async fn get_block(&self, block_number: u64) -> RichBlock { + async fn get_block(&self, block_number: u64) -> eyre::Result { let http_provider = ProviderBuilder::new() .on_builtin(&self.http_rpc_url) .await .expect("failed to create HTTP provider"); - http_provider + let block: RichBlock = http_provider .get_block_by_number(BlockNumberOrTag::Number(block_number), true) - .await - .expect("failed to get block") - .expect("block not found") - .into() + .await? + .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))? + .into(); + Ok(block) } } From 7c40a26bbdfab37ad93c20253648309ac9cc72ea Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Thu, 16 May 2024 21:19:16 -0700 Subject: [PATCH 08/21] remove todo --- crates/node/builder/src/launch/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 2253adcc72ec..6bd5f7aa0dbb 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -445,7 +445,6 @@ where if ctx.node_config().debug.etherscan { let chain = ctx.node_config().chain.chain; - // todo: move this piping into a helperfunction let block_provider = EtherscanBlockProvider::new( chain .etherscan_urls() From 0e6cae9abed247aa476b2f9e7aa7cd21be1c1cb9 Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Thu, 16 May 2024 21:24:04 -0700 Subject: [PATCH 09/21] clippy --- crates/consensus/debug-client/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/consensus/debug-client/src/client.rs b/crates/consensus/debug-client/src/client.rs index 365aa39d9236..6f4033e26b92 100644 --- a/crates/consensus/debug-client/src/client.rs +++ b/crates/consensus/debug-client/src/client.rs @@ -44,7 +44,7 @@ pub trait BlockProvider: Send + Sync + 'static { None => return Ok(B256::default()), }; let block = self.get_block(previous_block_number).await?; - Ok(block.header.hash.ok_or_else(|| eyre::eyre!("previous block does not have hash"))?) + block.header.hash.ok_or_else(|| eyre::eyre!("previous block does not have hash")) } } } From cc121828456a1eaa2c1a69b3b322d7d1477d771a Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Thu, 16 May 2024 21:32:48 -0700 Subject: [PATCH 10/21] remove dependency on opentls --- Cargo.lock | 100 ----------------------- crates/consensus/debug-client/Cargo.toml | 2 +- 2 files changed, 1 insertion(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bd081c8aad96..5a4778788bd9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3178,21 +3178,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foreign-types" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" -dependencies = [ - "foreign-types-shared", -] - -[[package]] -name = "foreign-types-shared" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" - [[package]] name = "form_urlencoded" version = "1.2.1" @@ -3878,22 +3863,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "hyper-tls" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" -dependencies = [ - "bytes", - "http-body-util", - "hyper 1.3.1", - "hyper-util", - "native-tls", - "tokio", - "tokio-native-tls", - "tower-service", -] - [[package]] name = "hyper-util" version = "0.1.3" @@ -5272,24 +5241,6 @@ dependencies = [ "unsigned-varint 0.7.2", ] -[[package]] -name = "native-tls" -version = "0.2.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" -dependencies = [ - "lazy_static", - "libc", - "log", - "openssl", - "openssl-probe", - "openssl-sys", - "schannel", - "security-framework", - "security-framework-sys", - "tempfile", -] - [[package]] name = "network" version = "0.0.0" @@ -5545,50 +5496,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" -[[package]] -name = "openssl" -version = "0.10.64" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" -dependencies = [ - "bitflags 2.5.0", - "cfg-if", - "foreign-types", - "libc", - "once_cell", - "openssl-macros", - "openssl-sys", -] - -[[package]] -name = "openssl-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.63", -] - [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "openssl-sys" -version = "0.9.102" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "option-ext" version = "0.2.0" @@ -6492,13 +6405,11 @@ dependencies = [ "http-body-util", "hyper 1.3.1", "hyper-rustls 0.26.0", - "hyper-tls", "hyper-util", "ipnet", "js-sys", "log", "mime", - "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -6511,7 +6422,6 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", - "tokio-native-tls", "tokio-rustls 0.25.0", "tower-service", "url", @@ -9733,16 +9643,6 @@ dependencies = [ "syn 2.0.63", ] -[[package]] -name = "tokio-native-tls" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" -dependencies = [ - "native-tls", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.24.1" diff --git a/crates/consensus/debug-client/Cargo.toml b/crates/consensus/debug-client/Cargo.toml index 36b29501555f..1a7e0a2299ce 100644 --- a/crates/consensus/debug-client/Cargo.toml +++ b/crates/consensus/debug-client/Cargo.toml @@ -25,7 +25,7 @@ alloy-provider = { workspace = true, features = ["ws"] } futures.workspace = true eyre.workspace = true jsonrpsee.workspace = true -reqwest = { workspace = true, features = ["default-tls", "json"] } +reqwest = { workspace = true, features = ["rustls-tls", "json"] } serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["time"] } From 8cdb0323367c28dcf80dc0489f0794198358fbf2 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 22 May 2024 12:09:57 +0200 Subject: [PATCH 11/21] chore: smol touchups --- Cargo.lock | 1 - crates/consensus/debug-client/Cargo.toml | 5 ++-- crates/consensus/debug-client/src/client.rs | 22 ++++++++------- crates/consensus/debug-client/src/lib.rs | 9 +++++++ .../debug-client/src/providers/etherscan.rs | 27 ++++++++++++++----- .../src/{providers.rs => providers/mod.rs} | 0 .../debug-client/src/providers/rpc.rs | 9 ++++--- crates/node/builder/src/launch/mod.rs | 4 +-- 8 files changed, 54 insertions(+), 23 deletions(-) rename crates/consensus/debug-client/src/{providers.rs => providers/mod.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index 5a4778788bd9..7a5dd92ca6cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6713,7 +6713,6 @@ dependencies = [ "alloy-provider", "eyre", "futures", - "jsonrpsee", "reqwest 0.12.4", "reth-node-api", "reth-node-core", diff --git a/crates/consensus/debug-client/Cargo.toml b/crates/consensus/debug-client/Cargo.toml index 1a7e0a2299ce..e5ae9c5f09aa 100644 --- a/crates/consensus/debug-client/Cargo.toml +++ b/crates/consensus/debug-client/Cargo.toml @@ -11,6 +11,7 @@ repository.workspace = true workspace = true [dependencies] +# reth reth-node-api.workspace = true reth-node-core.workspace = true reth-rpc-api.workspace = true @@ -18,13 +19,13 @@ reth-rpc-types.workspace = true reth-rpc-builder.workspace = true reth-tracing.workspace = true +# ethereum alloy-consensus = { workspace = true, features = ["serde"] } -alloy-eips = { workspace = true } +alloy-eips.workspace = true alloy-provider = { workspace = true, features = ["ws"] } futures.workspace = true eyre.workspace = true -jsonrpsee.workspace = true reqwest = { workspace = true, features = ["rustls-tls", "json"] } serde = { workspace = true, features = ["derive"] } tokio = { workspace = true, features = ["time"] } diff --git a/crates/consensus/debug-client/src/client.rs b/crates/consensus/debug-client/src/client.rs index 6f4033e26b92..614f21ac2313 100644 --- a/crates/consensus/debug-client/src/client.rs +++ b/crates/consensus/debug-client/src/client.rs @@ -9,14 +9,16 @@ use reth_rpc_builder::auth::AuthServerHandle; use reth_rpc_types::ExecutionPayloadV1; use reth_tracing::tracing::warn; use ringbuffer::{AllocRingBuffer, RingBuffer}; -use std::{future::Future, sync::Arc}; +use std::future::Future; use tokio::sync::mpsc; /// Supplies consensus client with new blocks sent in `tx` and a callback to find specific blocks /// by number to fetch past finalized and safe blocks. pub trait BlockProvider: Send + Sync + 'static { - /// Spawn a block provider to send new blocks to the given sender. - fn spawn(&self, tx: mpsc::Sender) -> impl Future + Send; + /// Runs a block provider to send new blocks to the given sender. + /// + /// Note: This is expected to be spawned in a separate task. + fn subscribe_blocks(&self, tx: mpsc::Sender) -> impl Future + Send; /// Get a past block by number. fn get_block(&self, block_number: u64) -> impl Future> + Send; @@ -49,26 +51,28 @@ pub trait BlockProvider: Send + Sync + 'static { } } -/// Debyg consensus client that sends FCUs and new payloads using recent blocks from an external +/// Debug consensus client that sends FCUs and new payloads using recent blocks from an external /// provider like Etherscan or an RPC endpoint. #[derive(Debug)] pub struct DebugConsensusClient { /// Handle to execution client. auth_server: AuthServerHandle, /// Provider to get consensus blocks from. - block_provider: Arc

, + block_provider: P, } impl DebugConsensusClient

{ /// Create a new debug consensus client with the given handle to execution /// client and block provider. - pub fn new(auth_server: AuthServerHandle, block_provider: Arc

) -> Self { + pub fn new(auth_server: AuthServerHandle, block_provider: P) -> Self { Self { auth_server, block_provider } } +} +impl DebugConsensusClient

{ /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent /// blocks. - pub async fn spawn(&self) { + pub async fn run(self) { let execution_client = self.auth_server.http_client(); let mut previous_block_hashes = AllocRingBuffer::new(64); @@ -76,7 +80,7 @@ impl DebugConsensusClient

{ let (tx, rx) = mpsc::channel::(64); let block_provider = self.block_provider.clone(); tokio::spawn(async move { - block_provider.spawn(tx).await; + block_provider.subscribe_blocks(tx).await; }); rx }; @@ -205,7 +209,7 @@ fn rich_block_to_execution_payload_v3(block: RichBlock) -> ExecutionNewPayload { }) .collect(), }, - withdrawals: block.withdrawals.clone().unwrap(), + withdrawals: block.withdrawals.clone().unwrap_or_default(), }, blob_gas_used: block.header.blob_gas_used.unwrap().try_into().unwrap(), excess_blob_gas: block.header.excess_blob_gas.unwrap().try_into().unwrap(), diff --git a/crates/consensus/debug-client/src/lib.rs b/crates/consensus/debug-client/src/lib.rs index 99ec71ae02f7..bc244fafeb07 100644 --- a/crates/consensus/debug-client/src/lib.rs +++ b/crates/consensus/debug-client/src/lib.rs @@ -3,6 +3,15 @@ //! This is a worker that sends FCUs and new payloads by fetching recent blocks from an external //! provider like Etherscan or an RPC endpoint. This allows to quickly test the execution client //! without running a consensus node. + +#![doc( + html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png", + html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", + issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" +)] +#![cfg_attr(not(test), warn(unused_crate_dependencies))] +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + mod client; mod providers; diff --git a/crates/consensus/debug-client/src/providers/etherscan.rs b/crates/consensus/debug-client/src/providers/etherscan.rs index 4bcff39b2105..4724fd8fb45f 100644 --- a/crates/consensus/debug-client/src/providers/etherscan.rs +++ b/crates/consensus/debug-client/src/providers/etherscan.rs @@ -8,25 +8,36 @@ use std::time::Duration; use tokio::{sync::mpsc, time::interval}; /// Block provider that fetches new blocks from Etherscan API. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct EtherscanBlockProvider { http_client: Client, base_url: String, api_key: String, + interval: Duration, } impl EtherscanBlockProvider { /// Create a new Etherscan block provider with the given base URL and API key. pub fn new(base_url: String, api_key: String) -> Self { - EtherscanBlockProvider { http_client: Client::new(), base_url, api_key } + EtherscanBlockProvider { + http_client: Client::new(), + base_url, + api_key, + interval: Duration::from_secs(3), + } + } + + /// Sets the interval at which the provider fetches new blocks. + pub fn with_interval(mut self, interval: Duration) -> Self { + self.interval = interval; + self } } impl BlockProvider for EtherscanBlockProvider { - async fn spawn(&self, tx: mpsc::Sender) { + async fn subscribe_blocks(&self, tx: mpsc::Sender) { let mut last_block_number: Option = None; - // TODO: make interval configurable - let mut interval = interval(Duration::from_secs(3)); + let mut interval = interval(self.interval); loop { interval.tick().await; let block = match load_etherscan_block( @@ -48,7 +59,11 @@ impl BlockProvider for EtherscanBlockProvider { continue; } - tx.send(block).await.unwrap(); + if tx.send(block).await.is_err() { + // channel closed + break; + } + last_block_number = Some(block_number); } } diff --git a/crates/consensus/debug-client/src/providers.rs b/crates/consensus/debug-client/src/providers/mod.rs similarity index 100% rename from crates/consensus/debug-client/src/providers.rs rename to crates/consensus/debug-client/src/providers/mod.rs diff --git a/crates/consensus/debug-client/src/providers/rpc.rs b/crates/consensus/debug-client/src/providers/rpc.rs index 93aa902afd56..0ddb1f294f3f 100644 --- a/crates/consensus/debug-client/src/providers/rpc.rs +++ b/crates/consensus/debug-client/src/providers/rpc.rs @@ -7,7 +7,7 @@ use tokio::sync::mpsc::Sender; /// Block provider that fetches new blocks from an RPC endpoint using a websocket connection. /// HTTP provider is used to fetch full blocks by hash and past blocks by number. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RpcBlockProvider { http_rpc_url: String, ws_rpc_url: String, @@ -21,7 +21,7 @@ impl RpcBlockProvider { } impl BlockProvider for RpcBlockProvider { - async fn spawn(&self, tx: Sender) { + async fn subscribe_blocks(&self, tx: Sender) { let http_provider = ProviderBuilder::new() .on_builtin(&self.http_rpc_url) .await @@ -42,7 +42,10 @@ impl BlockProvider for RpcBlockProvider { .await .expect("failed to get block") .expect("block not found"); - tx.send(full_block.into()).await.unwrap(); + if tx.send(full_block.into()).await.is_err() { + // channel closed + break; + } } } diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 6bd5f7aa0dbb..4a127c4a7c6c 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -460,7 +460,7 @@ where Arc::new(block_provider), ); ctx.task_executor().spawn_critical("etherscan consensus client", async move { - rpc_consensus_client.spawn::().await + rpc_consensus_client.run::().await }); } if let (Some(rpc_http_url), Some(rpc_ws_url)) = @@ -472,7 +472,7 @@ where Arc::new(block_provider), ); ctx.task_executor().spawn_critical("rpc consensus client", async move { - rpc_consensus_client.spawn::().await + rpc_consensus_client.run::().await }); } From c3d56e634030eeae320cc3a3d00536511241be66 Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Thu, 30 May 2024 21:46:15 +0100 Subject: [PATCH 12/21] review fixes --- Cargo.lock | 8 +++---- crates/consensus/debug-client/src/client.rs | 17 ++++++++++++-- crates/node-core/src/args/debug.rs | 5 +++-- crates/node/builder/src/launch/mod.rs | 25 +++++++++++++-------- 4 files changed, 38 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8efd865e1638..ed3a3a528f2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -345,7 +345,7 @@ dependencies = [ [[package]] name = "alloy-pubsub" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=dd7a999#dd7a999d9efe259c47a34dde046952de795a8f6a" +source = "git+https://github.com/alloy-rs/alloy?rev=64feb9b#64feb9bc51c8021ea08535694c44de84222f474e" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -655,7 +655,7 @@ dependencies = [ [[package]] name = "alloy-transport-ws" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=dd7a999#dd7a999d9efe259c47a34dde046952de795a8f6a" +source = "git+https://github.com/alloy-rs/alloy?rev=64feb9b#64feb9bc51c8021ea08535694c44de84222f474e" dependencies = [ "alloy-pubsub", "alloy-transport", @@ -6688,8 +6688,8 @@ dependencies = [ name = "reth-consensus-debug-client" version = "0.2.0-beta.7" dependencies = [ - "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=dd7a999)", - "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=dd7a999)", + "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=64feb9b)", + "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=64feb9b)", "alloy-provider", "eyre", "futures", diff --git a/crates/consensus/debug-client/src/client.rs b/crates/consensus/debug-client/src/client.rs index 614f21ac2313..341bd8ea8d25 100644 --- a/crates/consensus/debug-client/src/client.rs +++ b/crates/consensus/debug-client/src/client.rs @@ -9,8 +9,8 @@ use reth_rpc_builder::auth::AuthServerHandle; use reth_rpc_types::ExecutionPayloadV1; use reth_tracing::tracing::warn; use ringbuffer::{AllocRingBuffer, RingBuffer}; -use std::future::Future; -use tokio::sync::mpsc; +use std::{future::Future, sync::Arc}; +use tokio::sync::{mpsc, mpsc::Sender}; /// Supplies consensus client with new blocks sent in `tx` and a callback to find specific blocks /// by number to fetch past finalized and safe blocks. @@ -51,6 +51,19 @@ pub trait BlockProvider: Send + Sync + 'static { } } +impl

BlockProvider for Arc

+where + P: BlockProvider, +{ + fn subscribe_blocks(&self, tx: Sender) -> impl Future + Send { + self.as_ref().subscribe_blocks(tx) + } + + fn get_block(&self, block_number: u64) -> impl Future> + Send { + self.as_ref().get_block(block_number) + } +} + /// Debug consensus client that sends FCUs and new payloads using recent blocks from an external /// provider like Etherscan or an RPC endpoint. #[derive(Debug)] diff --git a/crates/node-core/src/args/debug.rs b/crates/node-core/src/args/debug.rs index f4e67a476be0..0770b4140a17 100644 --- a/crates/node-core/src/args/debug.rs +++ b/crates/node-core/src/args/debug.rs @@ -29,14 +29,15 @@ pub struct DebugArgs { pub max_block: Option, /// Runs a fake consensus client that advances the chain using recent block hashes - /// on Etherscan. If specified, requires an `ETHERSCAN_API_KEY` environment + /// on Etherscan. If specified, requires an `ETHERSCAN_API_KEY` environment variable. + /// If a string provided to this argument, it will be used as a custom Etherscan API url. #[arg( long = "debug.etherscan", help_heading = "Debug", conflicts_with = "tip", conflicts_with = "rpc_consensus_http" )] - pub etherscan: bool, + pub etherscan: Option>, /// Runs a fake consensus client using blocks from an RPC endpoint. HTTP endpoint for getting /// past blocks. diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index cfa2a9a99ecf..e728e2e30f77 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -443,17 +443,24 @@ where let _ = tx.send(res); }); - if ctx.node_config().debug.etherscan { + if let Some(custom_etherscan_url) = &ctx.node_config().debug.etherscan { let chain = ctx.node_config().chain.chain; + let etherscan_url = match custom_etherscan_url { + Some(url) => url.to_owned(), + None => { + // If URL isn't provided, use default Etherscan URL for the chain + chain + .etherscan_urls() + .ok_or_else(|| eyre::eyre!("failed to get etherscan url for chain"))? + .0 + .to_owned() + } + }; let block_provider = EtherscanBlockProvider::new( - chain - .etherscan_urls() - .expect("etherscan urls not found for rpc consensus client") - .0 - .to_owned(), - chain - .etherscan_api_key() - .expect("etherscan api key not found for rpc consensus client"), + etherscan_url, + chain.etherscan_api_key().ok_or_else(|| { + eyre::eyre!("etherscan api key not found for rpc consensus client") + })?, ); let rpc_consensus_client = DebugConsensusClient::new( rpc_server_handles.auth.clone(), From 9b24ae7e75104d416a9a61940d9a071d48f41a46 Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Thu, 30 May 2024 22:00:37 +0100 Subject: [PATCH 13/21] use only ws endpoint --- Cargo.lock | 8 +++---- .../debug-client/src/providers/rpc.rs | 22 +++++++------------ crates/node-core/src/args/debug.rs | 19 +++------------- crates/node/builder/src/launch/mod.rs | 6 ++--- 4 files changed, 17 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4b214d0c4a97..5ab02a996a7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -343,7 +343,7 @@ dependencies = [ [[package]] name = "alloy-pubsub" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=64feb9b#64feb9bc51c8021ea08535694c44de84222f474e" +source = "git+https://github.com/alloy-rs/alloy?rev=7320d4c#7320d4ca3878bd059a96273ae33e52730618f1a0" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -653,7 +653,7 @@ dependencies = [ [[package]] name = "alloy-transport-ws" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=64feb9b#64feb9bc51c8021ea08535694c44de84222f474e" +source = "git+https://github.com/alloy-rs/alloy?rev=7320d4c#7320d4ca3878bd059a96273ae33e52730618f1a0" dependencies = [ "alloy-pubsub", "alloy-transport", @@ -6522,8 +6522,8 @@ dependencies = [ name = "reth-consensus-debug-client" version = "0.2.0-beta.7" dependencies = [ - "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=64feb9b)", - "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=64feb9b)", + "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=7320d4c)", + "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=7320d4c)", "alloy-provider", "eyre", "futures", diff --git a/crates/consensus/debug-client/src/providers/rpc.rs b/crates/consensus/debug-client/src/providers/rpc.rs index 0ddb1f294f3f..1e6fe9fc5712 100644 --- a/crates/consensus/debug-client/src/providers/rpc.rs +++ b/crates/consensus/debug-client/src/providers/rpc.rs @@ -6,26 +6,20 @@ use reth_node_core::rpc::types::RichBlock; use tokio::sync::mpsc::Sender; /// Block provider that fetches new blocks from an RPC endpoint using a websocket connection. -/// HTTP provider is used to fetch full blocks by hash and past blocks by number. #[derive(Debug, Clone)] pub struct RpcBlockProvider { - http_rpc_url: String, ws_rpc_url: String, } impl RpcBlockProvider { - /// Create a new RPC block provider with the given HTTP and WS RPC URLs. - pub fn new(http_rpc_url: String, ws_rpc_url: String) -> Self { - Self { http_rpc_url, ws_rpc_url } + /// Create a new RPC block provider with the given WS RPC URL. + pub fn new(ws_rpc_url: String) -> Self { + Self { ws_rpc_url } } } impl BlockProvider for RpcBlockProvider { async fn subscribe_blocks(&self, tx: Sender) { - let http_provider = ProviderBuilder::new() - .on_builtin(&self.http_rpc_url) - .await - .expect("failed to create HTTP provider"); let ws_provider = ProviderBuilder::new() .on_builtin(&self.ws_rpc_url) .await @@ -37,7 +31,7 @@ impl BlockProvider for RpcBlockProvider { .into_stream(); while let Some(block) = stream.next().await { - let full_block = http_provider + let full_block = ws_provider .get_block_by_hash(block.header.hash.unwrap(), true) .await .expect("failed to get block") @@ -50,11 +44,11 @@ impl BlockProvider for RpcBlockProvider { } async fn get_block(&self, block_number: u64) -> eyre::Result { - let http_provider = ProviderBuilder::new() - .on_builtin(&self.http_rpc_url) + let ws_provider = ProviderBuilder::new() + .on_builtin(&self.ws_rpc_url) .await - .expect("failed to create HTTP provider"); - let block: RichBlock = http_provider + .expect("failed to create WS provider"); + let block: RichBlock = ws_provider .get_block_by_number(BlockNumberOrTag::Number(block_number), true) .await? .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))? diff --git a/crates/node-core/src/args/debug.rs b/crates/node-core/src/args/debug.rs index 0770b4140a17..af487a130df2 100644 --- a/crates/node-core/src/args/debug.rs +++ b/crates/node-core/src/args/debug.rs @@ -35,29 +35,16 @@ pub struct DebugArgs { long = "debug.etherscan", help_heading = "Debug", conflicts_with = "tip", - conflicts_with = "rpc_consensus_http" + conflicts_with = "rpc_consensus_ws" )] pub etherscan: Option>, - /// Runs a fake consensus client using blocks from an RPC endpoint. HTTP endpoint for getting - /// past blocks. - #[arg( - long = "debug.rpc-consensus-http", - help_heading = "Debug", - conflicts_with = "tip", - conflicts_with = "etherscan", - requires = "rpc_consensus_ws" - )] - pub rpc_consensus_http: Option, - - /// Runs a fake consensus client using blocks from an RPC endpoint. WS endpoint for getting new - /// blocks. + /// Runs a fake consensus client using blocks fetched from an RPC WebSocket endpoint. #[arg( long = "debug.rpc-consensus-ws", help_heading = "Debug", conflicts_with = "tip", - conflicts_with = "etherscan", - requires = "rpc_consensus_http" + conflicts_with = "etherscan" )] pub rpc_consensus_ws: Option, diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 77f527a8fbce..acd2b4638203 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -479,10 +479,8 @@ where rpc_consensus_client.run::().await }); } - if let (Some(rpc_http_url), Some(rpc_ws_url)) = - (&ctx.node_config().debug.rpc_consensus_http, &ctx.node_config().debug.rpc_consensus_ws) - { - let block_provider = RpcBlockProvider::new(rpc_http_url.clone(), rpc_ws_url.clone()); + if let Some(rpc_ws_url) = &ctx.node_config().debug.rpc_consensus_ws { + let block_provider = RpcBlockProvider::new(rpc_ws_url.clone()); let rpc_consensus_client = DebugConsensusClient::new( rpc_server_handles.auth.clone(), Arc::new(block_provider), From 223ccf944f3885bd55b2bfe957df839dd8767f1b Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Thu, 30 May 2024 22:07:34 +0100 Subject: [PATCH 14/21] clippy --- crates/consensus/debug-client/src/client.rs | 6 +++--- crates/consensus/debug-client/src/providers/etherscan.rs | 9 ++------- crates/consensus/debug-client/src/providers/rpc.rs | 2 +- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/crates/consensus/debug-client/src/client.rs b/crates/consensus/debug-client/src/client.rs index 341bd8ea8d25..eebacf51e48d 100644 --- a/crates/consensus/debug-client/src/client.rs +++ b/crates/consensus/debug-client/src/client.rs @@ -77,7 +77,7 @@ pub struct DebugConsensusClient { impl DebugConsensusClient

{ /// Create a new debug consensus client with the given handle to execution /// client and block provider. - pub fn new(auth_server: AuthServerHandle, block_provider: P) -> Self { + pub const fn new(auth_server: AuthServerHandle, block_provider: P) -> Self { Self { auth_server, block_provider } } } @@ -167,12 +167,12 @@ struct ExecutionNewPayload { impl ExecutionNewPayload { /// Get block hash from block in the payload - fn block_hash(&self) -> B256 { + const fn block_hash(&self) -> B256 { self.execution_payload_v3.payload_inner.payload_inner.block_hash } /// Get block number from block in the payload - fn block_number(&self) -> u64 { + const fn block_number(&self) -> u64 { self.execution_payload_v3.payload_inner.payload_inner.block_number } } diff --git a/crates/consensus/debug-client/src/providers/etherscan.rs b/crates/consensus/debug-client/src/providers/etherscan.rs index 4724fd8fb45f..2eb995be6af6 100644 --- a/crates/consensus/debug-client/src/providers/etherscan.rs +++ b/crates/consensus/debug-client/src/providers/etherscan.rs @@ -19,16 +19,11 @@ pub struct EtherscanBlockProvider { impl EtherscanBlockProvider { /// Create a new Etherscan block provider with the given base URL and API key. pub fn new(base_url: String, api_key: String) -> Self { - EtherscanBlockProvider { - http_client: Client::new(), - base_url, - api_key, - interval: Duration::from_secs(3), - } + Self { http_client: Client::new(), base_url, api_key, interval: Duration::from_secs(3) } } /// Sets the interval at which the provider fetches new blocks. - pub fn with_interval(mut self, interval: Duration) -> Self { + pub const fn with_interval(mut self, interval: Duration) -> Self { self.interval = interval; self } diff --git a/crates/consensus/debug-client/src/providers/rpc.rs b/crates/consensus/debug-client/src/providers/rpc.rs index 1e6fe9fc5712..e706090ed3c2 100644 --- a/crates/consensus/debug-client/src/providers/rpc.rs +++ b/crates/consensus/debug-client/src/providers/rpc.rs @@ -13,7 +13,7 @@ pub struct RpcBlockProvider { impl RpcBlockProvider { /// Create a new RPC block provider with the given WS RPC URL. - pub fn new(ws_rpc_url: String) -> Self { + pub const fn new(ws_rpc_url: String) -> Self { Self { ws_rpc_url } } } From 06497afb8f62a4c9e08763c641366b16d6f3af1e Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Thu, 30 May 2024 22:13:52 +0100 Subject: [PATCH 15/21] update cli field descriptions --- book/cli/reth/node.md | 6 ++++++ crates/node-core/src/args/debug.rs | 6 +++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/book/cli/reth/node.md b/book/cli/reth/node.md index 999601f044bd..f85193497426 100644 --- a/book/cli/reth/node.md +++ b/book/cli/reth/node.md @@ -454,6 +454,12 @@ Debug: --debug.max-block Runs the sync only up to the specified block + --debug.etherscan [] + Runs a fake consensus client that advances the chain using recent block hashes on Etherscan. If specified, requires an `ETHERSCAN_API_KEY` environment variable + + --debug.rpc-consensus-ws + Runs a fake consensus client using blocks fetched from an RPC WebSocket endpoint + --debug.skip-fcu If provided, the engine will skip `n` consecutive FCUs diff --git a/crates/node-core/src/args/debug.rs b/crates/node-core/src/args/debug.rs index af487a130df2..466c91ddd410 100644 --- a/crates/node-core/src/args/debug.rs +++ b/crates/node-core/src/args/debug.rs @@ -30,12 +30,12 @@ pub struct DebugArgs { /// Runs a fake consensus client that advances the chain using recent block hashes /// on Etherscan. If specified, requires an `ETHERSCAN_API_KEY` environment variable. - /// If a string provided to this argument, it will be used as a custom Etherscan API url. #[arg( long = "debug.etherscan", help_heading = "Debug", conflicts_with = "tip", - conflicts_with = "rpc_consensus_ws" + conflicts_with = "rpc_consensus_ws", + value_name = "ETHERSCAN_API_URL", )] pub etherscan: Option>, @@ -44,7 +44,7 @@ pub struct DebugArgs { long = "debug.rpc-consensus-ws", help_heading = "Debug", conflicts_with = "tip", - conflicts_with = "etherscan" + conflicts_with = "etherscan", )] pub rpc_consensus_ws: Option, From bbb0420c54212d6b5c04e498059e3556c1654c8a Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Thu, 30 May 2024 22:21:44 +0100 Subject: [PATCH 16/21] fmt --- crates/node-core/src/args/debug.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/node-core/src/args/debug.rs b/crates/node-core/src/args/debug.rs index 466c91ddd410..7528120ece8d 100644 --- a/crates/node-core/src/args/debug.rs +++ b/crates/node-core/src/args/debug.rs @@ -35,7 +35,7 @@ pub struct DebugArgs { help_heading = "Debug", conflicts_with = "tip", conflicts_with = "rpc_consensus_ws", - value_name = "ETHERSCAN_API_URL", + value_name = "ETHERSCAN_API_URL" )] pub etherscan: Option>, @@ -44,7 +44,7 @@ pub struct DebugArgs { long = "debug.rpc-consensus-ws", help_heading = "Debug", conflicts_with = "tip", - conflicts_with = "etherscan", + conflicts_with = "etherscan" )] pub rpc_consensus_ws: Option, From 2ecb0f9443a0767935e1e246f0cb382d68b9b468 Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Thu, 30 May 2024 22:27:39 +0100 Subject: [PATCH 17/21] autoimpl for trait --- Cargo.lock | 1 + crates/consensus/debug-client/Cargo.toml | 1 + crates/consensus/debug-client/src/client.rs | 18 +++--------------- 3 files changed, 5 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5ab02a996a7d..5f26d6d5b6d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6525,6 +6525,7 @@ dependencies = [ "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=7320d4c)", "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=7320d4c)", "alloy-provider", + "auto_impl", "eyre", "futures", "reqwest 0.12.4", diff --git a/crates/consensus/debug-client/Cargo.toml b/crates/consensus/debug-client/Cargo.toml index e5ae9c5f09aa..bb31de5bebe9 100644 --- a/crates/consensus/debug-client/Cargo.toml +++ b/crates/consensus/debug-client/Cargo.toml @@ -24,6 +24,7 @@ alloy-consensus = { workspace = true, features = ["serde"] } alloy-eips.workspace = true alloy-provider = { workspace = true, features = ["ws"] } +auto_impl.workspace = true futures.workspace = true eyre.workspace = true reqwest = { workspace = true, features = ["rustls-tls", "json"] } diff --git a/crates/consensus/debug-client/src/client.rs b/crates/consensus/debug-client/src/client.rs index eebacf51e48d..d627b630106f 100644 --- a/crates/consensus/debug-client/src/client.rs +++ b/crates/consensus/debug-client/src/client.rs @@ -9,11 +9,12 @@ use reth_rpc_builder::auth::AuthServerHandle; use reth_rpc_types::ExecutionPayloadV1; use reth_tracing::tracing::warn; use ringbuffer::{AllocRingBuffer, RingBuffer}; -use std::{future::Future, sync::Arc}; -use tokio::sync::{mpsc, mpsc::Sender}; +use std::future::Future; +use tokio::sync::mpsc; /// Supplies consensus client with new blocks sent in `tx` and a callback to find specific blocks /// by number to fetch past finalized and safe blocks. +#[auto_impl::auto_impl(&, Arc, Box)] pub trait BlockProvider: Send + Sync + 'static { /// Runs a block provider to send new blocks to the given sender. /// @@ -51,19 +52,6 @@ pub trait BlockProvider: Send + Sync + 'static { } } -impl

BlockProvider for Arc

-where - P: BlockProvider, -{ - fn subscribe_blocks(&self, tx: Sender) -> impl Future + Send { - self.as_ref().subscribe_blocks(tx) - } - - fn get_block(&self, block_number: u64) -> impl Future> + Send { - self.as_ref().get_block(block_number) - } -} - /// Debug consensus client that sends FCUs and new payloads using recent blocks from an external /// provider like Etherscan or an RPC endpoint. #[derive(Debug)] From ffbf684331a62a8e388096d9d0058761876bcf5c Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Sun, 9 Jun 2024 22:52:29 +0300 Subject: [PATCH 18/21] lock bump --- Cargo.lock | 173 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 170 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 540e5dfa2bcb..5eba0acdbcf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -317,11 +317,13 @@ dependencies = [ "alloy-json-rpc", "alloy-network", "alloy-primitives", + "alloy-pubsub", "alloy-rpc-client", "alloy-rpc-types 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=cc68b93)", "alloy-rpc-types-trace", "alloy-transport", "alloy-transport-http", + "alloy-transport-ws", "async-stream", "async-trait", "auto_impl", @@ -338,6 +340,24 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-pubsub" +version = "0.1.0" +source = "git+https://github.com/alloy-rs/alloy?rev=cc68b93#cc68b93605f4521c2b0bce1a7efaeff2046cf07c" +dependencies = [ + "alloy-json-rpc", + "alloy-primitives", + "alloy-transport", + "bimap", + "futures", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tower", + "tracing", +] + [[package]] name = "alloy-rlp" version = "0.3.5" @@ -366,8 +386,11 @@ version = "0.1.0" source = "git+https://github.com/alloy-rs/alloy?rev=cc68b93#cc68b93605f4521c2b0bce1a7efaeff2046cf07c" dependencies = [ "alloy-json-rpc", + "alloy-primitives", + "alloy-pubsub", "alloy-transport", "alloy-transport-http", + "alloy-transport-ws", "futures", "pin-project", "reqwest 0.12.4", @@ -627,6 +650,22 @@ dependencies = [ "url", ] +[[package]] +name = "alloy-transport-ws" +version = "0.1.0" +source = "git+https://github.com/alloy-rs/alloy?rev=cc68b93#cc68b93605f4521c2b0bce1a7efaeff2046cf07c" +dependencies = [ + "alloy-pubsub", + "alloy-transport", + "futures", + "http 0.2.12", + "serde_json", + "tokio", + "tokio-tungstenite", + "tracing", + "ws_stream_wasm", +] + [[package]] name = "alloy-trie" version = "0.4.1" @@ -986,6 +1025,17 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "async_io_stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" +dependencies = [ + "futures", + "pharos", + "rustc_version 0.4.0", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1127,6 +1177,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bincode" version = "1.3.3" @@ -3124,7 +3180,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" dependencies = [ "gloo-timers", - "send_wrapper", + "send_wrapper 0.4.0", ] [[package]] @@ -4147,7 +4203,7 @@ dependencies = [ "tokio-util", "tracing", "url", - "webpki-roots", + "webpki-roots 0.26.2", ] [[package]] @@ -5413,6 +5469,16 @@ dependencies = [ "wyhash", ] +[[package]] +name = "pharos" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" +dependencies = [ + "futures", + "rustc_version 0.4.0", +] + [[package]] name = "phf" version = "0.11.2" @@ -6139,7 +6205,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "webpki-roots", + "webpki-roots 0.26.2", "winreg 0.52.0", ] @@ -6446,6 +6512,28 @@ dependencies = [ "reth-storage-api", ] +[[package]] +name = "reth-consensus-debug-client" +version = "0.2.0-beta.9" +dependencies = [ + "alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=cc68b93)", + "alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=cc68b93)", + "alloy-provider", + "auto_impl", + "eyre", + "futures", + "reqwest 0.12.4", + "reth-node-api", + "reth-node-core", + "reth-rpc-api", + "reth-rpc-builder", + "reth-rpc-types", + "reth-tracing", + "ringbuffer", + "serde", + "tokio", +] + [[package]] name = "reth-db" version = "0.2.0-beta.9" @@ -7229,6 +7317,7 @@ dependencies = [ "reth-blockchain-tree", "reth-config", "reth-consensus", + "reth-consensus-debug-client", "reth-db", "reth-db-api", "reth-db-common", @@ -8246,6 +8335,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ringbuffer" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3df6368f71f205ff9c33c076d170dd56ebf68e8161c733c0caa07a7a5509ed53" + [[package]] name = "ripemd" version = "0.1.3" @@ -8714,6 +8809,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" +[[package]] +name = "send_wrapper" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" + [[package]] name = "serde" version = "1.0.203" @@ -9618,6 +9719,21 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +dependencies = [ + "futures-util", + "log", + "rustls 0.21.12", + "tokio", + "tokio-rustls 0.24.1", + "tungstenite", + "webpki-roots 0.25.4", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -9937,6 +10053,26 @@ dependencies = [ "toml", ] +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 0.2.12", + "httparse", + "log", + "rand 0.8.5", + "rustls 0.21.12", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "txpool-tracing" version = "0.0.0" @@ -10076,6 +10212,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf16_iter" version = "1.0.5" @@ -10276,6 +10418,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" + [[package]] name = "webpki-roots" version = "0.26.2" @@ -10595,6 +10743,25 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +[[package]] +name = "ws_stream_wasm" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7999f5f4217fe3818726b66257a4475f71e74ffd190776ad053fa159e50737f5" +dependencies = [ + "async_io_stream", + "futures", + "js-sys", + "log", + "pharos", + "rustc_version 0.4.0", + "send_wrapper 0.6.0", + "thiserror", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wyhash" version = "0.5.0" From b7d47443d65cb05435727073bd92a32cda543893 Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Sun, 9 Jun 2024 23:33:11 +0300 Subject: [PATCH 19/21] clippy --- book/cli/reth/node.md | 2 +- crates/node-core/src/args/debug.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/book/cli/reth/node.md b/book/cli/reth/node.md index 02296e958f13..cd07d0692f58 100644 --- a/book/cli/reth/node.md +++ b/book/cli/reth/node.md @@ -467,7 +467,7 @@ Debug: Runs a fake consensus client that advances the chain using recent block hashes on Etherscan. If specified, requires an `ETHERSCAN_API_KEY` environment variable --debug.rpc-consensus-ws - Runs a fake consensus client using blocks fetched from an RPC WebSocket endpoint + Runs a fake consensus client using blocks fetched from an RPC `WebSocket` endpoint --debug.skip-fcu If provided, the engine will skip `n` consecutive FCUs diff --git a/crates/node-core/src/args/debug.rs b/crates/node-core/src/args/debug.rs index 7528120ece8d..b132eb3a611d 100644 --- a/crates/node-core/src/args/debug.rs +++ b/crates/node-core/src/args/debug.rs @@ -39,7 +39,7 @@ pub struct DebugArgs { )] pub etherscan: Option>, - /// Runs a fake consensus client using blocks fetched from an RPC WebSocket endpoint. + /// Runs a fake consensus client using blocks fetched from an RPC `WebSocket` endpoint. #[arg( long = "debug.rpc-consensus-ws", help_heading = "Debug", From 6bd3617dfd53a31715c75be7df4d3cf2d4675628 Mon Sep 17 00:00:00 2001 From: sevazhidkov Date: Sun, 9 Jun 2024 23:35:38 +0300 Subject: [PATCH 20/21] clippy --- crates/consensus/debug-client/src/providers/etherscan.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/consensus/debug-client/src/providers/etherscan.rs b/crates/consensus/debug-client/src/providers/etherscan.rs index 2eb995be6af6..e189913d1264 100644 --- a/crates/consensus/debug-client/src/providers/etherscan.rs +++ b/crates/consensus/debug-client/src/providers/etherscan.rs @@ -79,8 +79,9 @@ struct EtherscanBlockResponse { result: RichBlock, } -/// Load block using Etherscan API. Note: only BlockNumberOrTag::Latest, BlockNumberOrTag::Earliest, -/// BlockNumberOrTag::Pending, BlockNumberOrTag::Number(u64) are supported. +/// Load block using Etherscan API. Note: only `BlockNumberOrTag::Latest`, +/// `BlockNumberOrTag::Earliest`, `BlockNumberOrTag::Pending`, `BlockNumberOrTag::Number(u64)` are +/// supported. async fn load_etherscan_block( http_client: &Client, base_url: &str, From c2da02b366d0925ebb2ce1e489e22e56fec68707 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 10 Jun 2024 18:27:14 +0200 Subject: [PATCH 21/21] chore: touchups --- crates/node/builder/src/launch/mod.rs | 34 +++++++++++++++------------ 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index a7722ae17096..cd9971ad7806 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -398,23 +398,24 @@ where let _ = tx.send(res); }); - if let Some(custom_etherscan_url) = &ctx.node_config().debug.etherscan { + if let Some(maybe_custom_etherscan_url) = ctx.node_config().debug.etherscan.clone() { + info!(target: "reth::cli", "Using etherscan as consensus client"); + let chain = ctx.node_config().chain.chain; - let etherscan_url = match custom_etherscan_url { - Some(url) => url.to_owned(), - None => { - // If URL isn't provided, use default Etherscan URL for the chain - chain - .etherscan_urls() - .ok_or_else(|| eyre::eyre!("failed to get etherscan url for chain"))? - .0 - .to_owned() - } - }; + let etherscan_url = maybe_custom_etherscan_url.map(Ok).unwrap_or_else(|| { + // If URL isn't provided, use default Etherscan URL for the chain if it is known + chain + .etherscan_urls() + .map(|urls| urls.0.to_string()) + .ok_or_else(|| eyre::eyre!("failed to get etherscan url for chain: {chain}")) + })?; + let block_provider = EtherscanBlockProvider::new( etherscan_url, chain.etherscan_api_key().ok_or_else(|| { - eyre::eyre!("etherscan api key not found for rpc consensus client") + eyre::eyre!( + "etherscan api key not found for rpc consensus client for chain: {chain}" + ) })?, ); let rpc_consensus_client = DebugConsensusClient::new( @@ -425,8 +426,11 @@ where rpc_consensus_client.run::().await }); } - if let Some(rpc_ws_url) = &ctx.node_config().debug.rpc_consensus_ws { - let block_provider = RpcBlockProvider::new(rpc_ws_url.clone()); + + if let Some(rpc_ws_url) = ctx.node_config().debug.rpc_consensus_ws.clone() { + info!(target: "reth::cli", "Using rpc provider as consensus client"); + + let block_provider = RpcBlockProvider::new(rpc_ws_url); let rpc_consensus_client = DebugConsensusClient::new( rpc_server_handles.auth.clone(), Arc::new(block_provider),