Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add block propagation handlers #205

Merged
merged 1 commit into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 31 additions & 0 deletions crates/net/eth-wire/src/types/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ pub struct NewBlockHashes(
pub Vec<BlockHashNumber>,
);

// === impl NewBlockHashes ===

impl NewBlockHashes {
/// Returns the latest block in the list of blocks.
pub fn latest(&self) -> Option<&BlockHashNumber> {
self.0.iter().fold(None, |latest, block| {
if let Some(latest) = latest {
return if latest.number > block.number { Some(latest) } else { Some(block) }
}
Some(block)
})
}
}

/// A block hash _and_ a block number.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)]
pub struct BlockHashNumber {
Expand Down Expand Up @@ -87,3 +101,20 @@ impl From<Vec<H256>> for NewPooledTransactionHashes {
NewPooledTransactionHashes(v)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn can_return_latest_block() {
let mut blocks = NewBlockHashes(vec![BlockHashNumber { hash: H256::random(), number: 0 }]);
let latest = blocks.latest().unwrap();
assert_eq!(latest.number, 0);

blocks.0.push(BlockHashNumber { hash: H256::random(), number: 100 });
blocks.0.push(BlockHashNumber { hash: H256::random(), number: 2 });
let latest = blocks.latest().unwrap();
assert_eq!(latest.number, 100);
}
}
18 changes: 16 additions & 2 deletions crates/net/network/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl StateFetcher {
&mut self,
peer_id: PeerId,
best_hash: H256,
best_number: Option<u64>,
best_number: u64,
) {
self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number });
}
Expand All @@ -61,6 +61,20 @@ impl StateFetcher {
}
}

/// Updates the block information for the peer.
///
/// Returns `true` if this a newer block
pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: H256, number: u64) -> bool {
if let Some(peer) = self.peers.get_mut(peer_id) {
if number > peer.best_number {
peer.best_hash = hash;
peer.best_number = number;
return true
}
}
false
}

/// Invoked when an active session is about to be disconnected.
pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.peers.get_mut(peer_id) {
Expand Down Expand Up @@ -246,7 +260,7 @@ struct Peer {
/// Best known hash that the peer has
best_hash: H256,
/// Tracks the best number of the peer.
best_number: Option<u64>,
best_number: u64,
}

/// Tracks the state of an individual peer
Expand Down
26 changes: 24 additions & 2 deletions crates/net/network/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,33 @@ pub struct BlockImportOutcome {
/// Sender of the `NewBlock` message.
pub peer: PeerId,
/// The result after validating the block
pub result: Result<NewBlockMessage, BlockImportError>,
pub result: Result<BlockValidation, BlockImportError>,
}

/// Represents the successful validation of a received `NewBlock` message.
#[derive(Debug)]
pub enum BlockValidation {
/// Basic Header validity check, after which the block should be relayed to peers via a
/// `NewBlock` message
ValidHeader {
/// received block
block: NewBlockMessage,
},
/// Successfully imported: state-root matches after execution. The block should be relayed via
/// `NewBlockHashes`
ValidBlock {
/// validated block.
block: NewBlockMessage,
},
}

/// Represents the error case of a failed block import
pub enum BlockImportError {}
#[derive(Debug, thiserror::Error)]
pub enum BlockImportError {
/// Consensus error
#[error(transparent)]
Consensus(#[from] reth_interfaces::consensus::Error),
}

/// An implementation of `BlockImport` that does nothing
#[derive(Debug, Default)]
Expand Down
25 changes: 21 additions & 4 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
config::NetworkConfig,
discovery::Discovery,
error::NetworkError,
import::{BlockImport, BlockImportOutcome},
import::{BlockImport, BlockImportOutcome, BlockValidation},
listener::ConnectionListener,
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender},
network::{NetworkHandle, NetworkHandleMessage},
Expand Down Expand Up @@ -192,10 +192,30 @@ where
}
}

/// Invoked after a `NewBlock` message from the peer was validated
fn on_block_import_result(&mut self, outcome: BlockImportOutcome) {
let BlockImportOutcome { peer, result } = outcome;
match result {
Ok(validated_block) => match validated_block {
BlockValidation::ValidHeader { block } => {
self.swarm.state_mut().update_peer_block(&peer, block.hash, block.number());
self.swarm.state_mut().announce_new_block(block);
}
BlockValidation::ValidBlock { block } => {
self.swarm.state_mut().announce_new_block_hash(block);
}
},
Err(_err) => {
// TODO report peer for bad block
}
}
}

/// Handles a received Message from the peer.
fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage) {
match msg {
PeerMessage::NewBlockHashes(hashes) => {
let hashes = Arc::try_unwrap(hashes).unwrap_or_else(|arc| (*arc).clone());
// update peer's state, to track what blocks this peer has seen
self.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0)
}
Expand Down Expand Up @@ -240,9 +260,6 @@ where
.send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
}
}

/// Invoked after a `NewBlock` message from the peer was validated
fn on_block_import_result(&mut self, _outcome: BlockImportOutcome) {}
}

impl<C> Future for NetworkManager<C>
Expand Down
11 changes: 10 additions & 1 deletion crates/net/network/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,20 @@ pub struct NewBlockMessage {
pub block: Arc<NewBlock>,
}

// === impl NewBlockMessage ===

impl NewBlockMessage {
/// Returns the block number of the block
pub fn number(&self) -> u64 {
self.block.block.header.number
}
}

/// Represents all messages that can be sent to a peer session
#[derive(Debug)]
pub enum PeerMessage {
/// Announce new block hashes
NewBlockHashes(NewBlockHashes),
NewBlockHashes(Arc<NewBlockHashes>),
/// Broadcast new block.
NewBlock(NewBlockMessage),
/// Broadcast transactions.
Expand Down
47 changes: 45 additions & 2 deletions crates/net/network/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
},
peers::{PeerAction, PeersManager},
};
use reth_eth_wire::{capability::Capabilities, BlockHashNumber, Status};
use reth_eth_wire::{capability::Capabilities, BlockHashNumber, NewBlockHashes, Status};
use reth_interfaces::provider::BlockProvider;
use reth_primitives::{PeerId, H256};
use std::{
Expand Down Expand Up @@ -93,7 +93,8 @@ where
debug_assert!(self.connected_peers.contains_key(&peer), "Already connected; not possible");

// find the corresponding block number
let block_number = self.client.block_number(status.blockhash).ok().flatten();
let block_number =
self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default();
self.state_fetcher.new_connected_peer(peer, status.blockhash, block_number);

self.connected_peers.insert(
Expand Down Expand Up @@ -129,6 +130,7 @@ where
// number of peers)
let num_propagate = (self.connected_peers.len() as f64).sqrt() as u64 + 1;

let number = msg.block.block.header.number;
let mut count = 0;
for (peer_id, peer) in self.connected_peers.iter_mut() {
if peer.blocks.contains(&msg.hash) {
Expand All @@ -141,6 +143,11 @@ where
self.queued_messages
.push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() });

// update peer block info
if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
peer.best_hash = msg.hash;
}

// mark the block as seen by the peer
peer.blocks.insert(msg.hash);

Expand All @@ -153,6 +160,36 @@ where
}
}

/// Completes the block propagation process started in [`NetworkState::announce_new_block()`]
/// but sending `NewBlockHash` broadcast to all peers that haven't seen it yet.
pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage) {
let number = msg.block.block.header.number;
let hashes = Arc::new(NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]));
for (peer_id, peer) in self.connected_peers.iter_mut() {
if peer.blocks.contains(&msg.hash) {
// skip peers which already reported the block
continue
}

if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
peer.best_hash = msg.hash;
}

self.queued_messages.push_back(StateAction::NewBlockHashes {
peer_id: *peer_id,
hashes: Arc::clone(&hashes),
});
}
}

/// Updates the block information for the peer.
pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: H256, number: u64) {
if let Some(peer) = self.connected_peers.get_mut(peer_id) {
peer.best_hash = hash;
}
self.state_fetcher.update_peer_block(peer_id, hash, number);
}

/// Invoked after a `NewBlock` message was received by the peer.
///
/// This will keep track of blocks we know a peer has
Expand Down Expand Up @@ -342,6 +379,12 @@ pub enum StateAction {
/// The `NewBlock` message
block: NewBlockMessage,
},
NewBlockHashes {
/// Target of the message
peer_id: PeerId,
/// `NewBlockHashes` message to send to the peer.
hashes: Arc<NewBlockHashes>,
},
/// Create a new connection to the given node.
Connect { remote_addr: SocketAddr, node_id: PeerId },
/// Disconnect an existing connection
Expand Down
4 changes: 4 additions & 0 deletions crates/net/network/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ where
let msg = PeerMessage::NewBlock(msg);
self.sessions.send_message(&peer_id, msg);
}
StateAction::NewBlockHashes { peer_id, hashes } => {
let msg = PeerMessage::NewBlockHashes(hashes);
self.sessions.send_message(&peer_id, msg);
}
}
None
}
Expand Down