-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: --debug.etherscan for fake consensus client #8082
Changes from 7 commits
826722f
603db3e
7789774
90bea52
e11d850
aa2acf9
9807037
5e8116c
ef3b066
ed22651
7c40a26
0e6cae9
cc12182
8cdb032
9f96d0b
c3d56e6
64a0788
9b24ae7
223ccf9
06497af
bbb0420
2ecb0f9
1b231f2
d1d3ec8
ffbf684
b7d4744
6bd3617
f837517
c2da02b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
[package] | ||
name = "reth-consensus-debug-client" | ||
version.workspace = true | ||
edition.workspace = true | ||
rust-version.workspace = true | ||
license.workspace = true | ||
homepage.workspace = true | ||
repository.workspace = true | ||
|
||
[lints] | ||
workspace = true | ||
|
||
[dependencies] | ||
reth-node-api.workspace = true | ||
reth-node-core.workspace = true | ||
reth-rpc-api.workspace = true | ||
reth-rpc-types.workspace = true | ||
reth-rpc-builder.workspace = true | ||
|
||
alloy-consensus = { workspace = true, features = ["serde"] } | ||
alloy-eips = { workspace = true } | ||
alloy-provider = { workspace = true, features = ["ws"] } | ||
|
||
futures.workspace = true | ||
jsonrpsee.workspace = true | ||
reqwest = { workspace = true, features = ["default-tls", "json"] } | ||
serde = { workspace = true, features = ["derive"] } | ||
tokio = { workspace = true } | ||
|
||
ringbuffer = "0.15.0" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,207 @@ | ||
use alloy_consensus::TxEnvelope; | ||
use alloy_eips::eip2718::Encodable2718; | ||
use reth_node_api::EngineTypes; | ||
use reth_node_core::{ | ||
primitives::B256, | ||
rpc::types::{BlockTransactions, ExecutionPayloadV2, ExecutionPayloadV3, RichBlock}, | ||
}; | ||
use reth_rpc_builder::auth::AuthServerHandle; | ||
use reth_rpc_types::ExecutionPayloadV1; | ||
use ringbuffer::{AllocRingBuffer, RingBuffer}; | ||
use std::{future::Future, sync::Arc}; | ||
use tokio::sync::mpsc; | ||
|
||
/// Supplies consensus client with new blocks sent in `tx` and a callback to find specific blocks | ||
/// by number to fetch past finalized and safe blocks. | ||
pub trait BlockProvider: Send + Sync + 'static { | ||
/// Spawn a block provider to send new blocks to the given sender. | ||
fn spawn(&self, tx: mpsc::Sender<RichBlock>) -> impl Future<Output = ()> + Send; | ||
/// Get a past block by number. | ||
fn get_block(&self, block_number: u64) -> impl Future<Output = RichBlock> + Send; | ||
} | ||
|
||
/// Debyg consensus client that sends FCUs and new payloads using recent blocks from an external | ||
/// provider like Etherscan or an RPC endpoint. | ||
#[derive(Debug)] | ||
pub struct DebugConsensusClient<P: BlockProvider> { | ||
/// Handle to execution client. | ||
auth_server: AuthServerHandle, | ||
/// Provider to get consensus blocks from. | ||
block_provider: Arc<P>, | ||
} | ||
|
||
impl<P: BlockProvider> DebugConsensusClient<P> { | ||
/// Create a new debug consensus client with the given handle to execution | ||
/// client and block provider. | ||
pub fn new(auth_server: AuthServerHandle, block_provider: Arc<P>) -> Self { | ||
Self { auth_server, block_provider } | ||
} | ||
|
||
/// Spawn the client to start sending FCUs and new payloads by periodically fetching recent | ||
/// blocks. | ||
pub async fn spawn<T: EngineTypes>(&self) { | ||
// TODO: Add logs | ||
let execution_client = self.auth_server.http_client(); | ||
let mut previous_block_hashes = AllocRingBuffer::new(64); | ||
mattsse marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
let mut block_stream = { | ||
let (tx, rx) = mpsc::channel::<RichBlock>(64); | ||
let block_provider = self.block_provider.clone(); | ||
tokio::spawn(async move { | ||
block_provider.spawn(tx).await; | ||
}); | ||
rx | ||
}; | ||
|
||
while let Some(block) = block_stream.recv().await { | ||
let payload = rich_block_to_execution_payload_v3(block); | ||
|
||
let block_hash = payload.block_hash(); | ||
let block_number = payload.block_number(); | ||
|
||
// Send new events to execution client | ||
reth_rpc_api::EngineApiClient::<T>::new_payload_v3( | ||
&execution_client, | ||
payload.execution_payload_v3, | ||
payload.versioned_hashes, | ||
payload.parent_beacon_block_root, | ||
) | ||
.await | ||
.unwrap(); | ||
|
||
previous_block_hashes.push(block_hash); | ||
|
||
// Load previous block hashes. We're using (head - 32) and (head - 64) as the safe and | ||
// finalized block hashes. | ||
// todo: chdeck for off by ones in offset | ||
let safe_block_hash = get_or_fetch_previous_block( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should be a function of self There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated, if understood this comment correctly. if not — ping again pls. |
||
self.block_provider.as_ref(), | ||
&previous_block_hashes, | ||
block_number, | ||
32, | ||
); | ||
let finalized_block_hash = get_or_fetch_previous_block( | ||
self.block_provider.as_ref(), | ||
&previous_block_hashes, | ||
block_number, | ||
64, | ||
); | ||
let (safe_block_hash, finalized_block_hash) = | ||
tokio::join!(safe_block_hash, finalized_block_hash); | ||
reth_rpc_api::EngineApiClient::<T>::fork_choice_updated_v3( | ||
&execution_client, | ||
reth_rpc_types::engine::ForkchoiceState { | ||
head_block_hash: block_hash, | ||
safe_block_hash, | ||
finalized_block_hash, | ||
}, | ||
None, | ||
) | ||
.await | ||
.unwrap(); | ||
} | ||
} | ||
} | ||
|
||
/// Get previous block hash using previous block hash buffer. If it isn't available (buffer | ||
/// started more recently than `offset`), fetch it from block provider. | ||
async fn get_or_fetch_previous_block<P: BlockProvider>( | ||
block_provider: &P, | ||
previous_block_hashes: &AllocRingBuffer<B256>, | ||
current_block_number: u64, | ||
offset: usize, | ||
) -> B256 { | ||
let stored_hash = previous_block_hashes | ||
.len() | ||
.checked_sub(offset) | ||
.and_then(|index| previous_block_hashes.get(index)); | ||
if let Some(hash) = stored_hash { | ||
return *hash; | ||
} | ||
|
||
// Return default hash if the chain isn't long enough to have the block at the offset. | ||
let previous_block_number = match current_block_number.checked_sub(offset as u64) { | ||
Some(number) => number, | ||
None => return B256::default(), | ||
}; | ||
let block = block_provider.get_block(previous_block_number).await; | ||
block.header.hash.unwrap() | ||
} | ||
|
||
/// Cancun "new payload" event. | ||
#[derive(Debug)] | ||
struct ExecutionNewPayload { | ||
execution_payload_v3: ExecutionPayloadV3, | ||
versioned_hashes: Vec<B256>, | ||
parent_beacon_block_root: B256, | ||
} | ||
|
||
impl ExecutionNewPayload { | ||
/// Get block hash from block in the payload | ||
fn block_hash(&self) -> B256 { | ||
self.execution_payload_v3.payload_inner.payload_inner.block_hash | ||
} | ||
|
||
/// Get block number from block in the payload | ||
fn block_number(&self) -> u64 { | ||
self.execution_payload_v3.payload_inner.payload_inner.block_number | ||
} | ||
} | ||
|
||
/// Convert a rich block from RPC / Etherscan to params for an execution client's "new payload" | ||
/// method. Assumes that the block contains full transactions. | ||
fn rich_block_to_execution_payload_v3(block: RichBlock) -> ExecutionNewPayload { | ||
let transactions = match &block.transactions { | ||
BlockTransactions::Full(txs) => txs.clone(), | ||
// Empty array gets deserialized as BlockTransactions::Hashes. | ||
BlockTransactions::Hashes(txs) if txs.is_empty() => vec![], | ||
BlockTransactions::Hashes(_) | BlockTransactions::Uncle => { | ||
panic!("Received uncle block or hash-only transactions from Etherscan API") | ||
} | ||
}; | ||
|
||
// Concatenate all blob hashes from all transactions in order | ||
// https://github.com/ethereum/execution-apis/blob/main/src/engine/cancun.md#specification | ||
let versioned_hashes = transactions | ||
.iter() | ||
.flat_map(|tx| tx.blob_versioned_hashes.clone().unwrap_or_default()) | ||
.collect(); | ||
|
||
let payload: ExecutionPayloadV3 = ExecutionPayloadV3 { | ||
payload_inner: ExecutionPayloadV2 { | ||
payload_inner: ExecutionPayloadV1 { | ||
parent_hash: block.header.parent_hash, | ||
fee_recipient: block.header.miner, | ||
state_root: block.header.state_root, | ||
receipts_root: block.header.receipts_root, | ||
logs_bloom: block.header.logs_bloom, | ||
prev_randao: block.header.mix_hash.unwrap(), | ||
block_number: block.header.number.unwrap(), | ||
gas_limit: block.header.gas_limit.try_into().unwrap(), | ||
gas_used: block.header.gas_used.try_into().unwrap(), | ||
timestamp: block.header.timestamp, | ||
extra_data: block.header.extra_data.clone(), | ||
base_fee_per_gas: block.header.base_fee_per_gas.unwrap().try_into().unwrap(), | ||
block_hash: block.header.hash.unwrap(), | ||
transactions: transactions | ||
.into_iter() | ||
.map(|tx| { | ||
let envelope: TxEnvelope = tx.try_into().unwrap(); | ||
let mut buffer: Vec<u8> = vec![]; | ||
envelope.encode_2718(&mut buffer); | ||
buffer.into() | ||
}) | ||
.collect(), | ||
}, | ||
withdrawals: block.withdrawals.clone().unwrap(), | ||
}, | ||
blob_gas_used: block.header.blob_gas_used.unwrap().try_into().unwrap(), | ||
excess_blob_gas: block.header.excess_blob_gas.unwrap().try_into().unwrap(), | ||
}; | ||
|
||
ExecutionNewPayload { | ||
execution_payload_v3: payload, | ||
versioned_hashes, | ||
parent_beacon_block_root: block.header.parent_beacon_block_root.unwrap(), | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
//! Debug consensus client. | ||
//! | ||
//! This is a worker that sends FCUs and new payloads by fetching recent blocks from an external | ||
//! provider like Etherscan or an RPC endpoint. This allows to quickly test the execution client | ||
//! without running a consensus node. | ||
mod client; | ||
mod providers; | ||
|
||
pub use client::{BlockProvider, DebugConsensusClient}; | ||
pub use providers::{EtherscanBlockProvider, RpcBlockProvider}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
mod etherscan; | ||
mod rpc; | ||
|
||
pub use etherscan::EtherscanBlockProvider; | ||
pub use rpc::RpcBlockProvider; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
use crate::BlockProvider; | ||
use alloy_eips::BlockNumberOrTag; | ||
use reqwest::Client; | ||
use reth_node_core::rpc::types::RichBlock; | ||
use serde::Deserialize; | ||
use std::time::Duration; | ||
use tokio::{sync::mpsc, time::sleep}; | ||
|
||
/// Block provider that fetches new blocks from Etherscan API. | ||
#[derive(Debug)] | ||
pub struct EtherscanBlockProvider { | ||
http_client: Client, | ||
base_url: String, | ||
api_key: String, | ||
} | ||
|
||
impl EtherscanBlockProvider { | ||
/// Create a new Etherscan block provider with the given base URL and API key. | ||
pub fn new(base_url: String, api_key: String) -> Self { | ||
EtherscanBlockProvider { http_client: Client::new(), base_url, api_key } | ||
} | ||
} | ||
|
||
impl BlockProvider for EtherscanBlockProvider { | ||
async fn spawn(&self, tx: mpsc::Sender<RichBlock>) { | ||
let mut last_block_number: Option<u64> = None; | ||
loop { | ||
let block = load_etherscan_block( | ||
&self.http_client, | ||
&self.base_url, | ||
&self.api_key, | ||
BlockNumberOrTag::Latest, | ||
) | ||
.await; | ||
let block_number = block.header.number.unwrap(); | ||
if Some(block_number) == last_block_number { | ||
// TODO: Make configurable | ||
sleep(Duration::from_secs(3)).await; | ||
continue; | ||
} | ||
|
||
tx.send(block).await.unwrap(); | ||
sleep(Duration::from_secs(3)).await; | ||
last_block_number = Some(block_number); | ||
} | ||
} | ||
|
||
async fn get_block(&self, block_number: u64) -> RichBlock { | ||
load_etherscan_block( | ||
&self.http_client, | ||
&self.base_url, | ||
&self.api_key, | ||
BlockNumberOrTag::Number(block_number), | ||
) | ||
.await | ||
} | ||
} | ||
|
||
#[derive(Deserialize, Debug)] | ||
struct EtherscanBlockResponse { | ||
result: RichBlock, | ||
} | ||
|
||
/// Load block using Etherscan API. Note: only BlockNumberOrTag::Latest, BlockNumberOrTag::Earliest, | ||
/// BlockNumberOrTag::Pending, BlockNumberOrTag::Number(u64) are supported. | ||
async fn load_etherscan_block( | ||
http_client: &Client, | ||
base_url: &str, | ||
api_key: &str, | ||
block_number_or_tag: BlockNumberOrTag, | ||
) -> RichBlock { | ||
let block: EtherscanBlockResponse = http_client | ||
.get(base_url) | ||
.query(&[ | ||
("module", "proxy"), | ||
("action", "eth_getBlockByNumber"), | ||
("tag", &block_number_or_tag.to_string()), | ||
("boolean", "true"), | ||
("apikey", api_key), | ||
]) | ||
.send() | ||
.await | ||
.unwrap() | ||
.json() | ||
.await | ||
// TODO: Handle errors gracefully and do not stop the loop | ||
sevazhidkov marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.unwrap(); | ||
block.result | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would you prefer, this or
impl Stream<Output = RichBlock>
? IMOimpl Stream<Output = RichBlock>
is much less nicer to implement (bc of manual state machine building vs justasync fn
), but can change it