Skip to content

Commit

Permalink
feat: add blog propagation handlers (#205)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Nov 15, 2022
1 parent f8fddcd commit 6b336c6
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 11 deletions.
31 changes: 31 additions & 0 deletions crates/net/eth-wire/src/types/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ pub struct NewBlockHashes(
pub Vec<BlockHashNumber>,
);

// === impl NewBlockHashes ===

impl NewBlockHashes {
/// Returns the latest block in the list of blocks.
pub fn latest(&self) -> Option<&BlockHashNumber> {
self.0.iter().fold(None, |latest, block| {
if let Some(latest) = latest {
return if latest.number > block.number { Some(latest) } else { Some(block) }
}
Some(block)
})
}
}

/// A block hash _and_ a block number.
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable)]
pub struct BlockHashNumber {
Expand Down Expand Up @@ -87,3 +101,20 @@ impl From<Vec<H256>> for NewPooledTransactionHashes {
NewPooledTransactionHashes(v)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn can_return_latest_block() {
let mut blocks = NewBlockHashes(vec![BlockHashNumber { hash: H256::random(), number: 0 }]);
let latest = blocks.latest().unwrap();
assert_eq!(latest.number, 0);

blocks.0.push(BlockHashNumber { hash: H256::random(), number: 100 });
blocks.0.push(BlockHashNumber { hash: H256::random(), number: 2 });
let latest = blocks.latest().unwrap();
assert_eq!(latest.number, 100);
}
}
18 changes: 16 additions & 2 deletions crates/net/network/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl StateFetcher {
&mut self,
peer_id: PeerId,
best_hash: H256,
best_number: Option<u64>,
best_number: u64,
) {
self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number });
}
Expand All @@ -61,6 +61,20 @@ impl StateFetcher {
}
}

/// Updates the block information for the peer.
///
/// Returns `true` if this a newer block
pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: H256, number: u64) -> bool {
if let Some(peer) = self.peers.get_mut(peer_id) {
if number > peer.best_number {
peer.best_hash = hash;
peer.best_number = number;
return true
}
}
false
}

/// Invoked when an active session is about to be disconnected.
pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.peers.get_mut(peer_id) {
Expand Down Expand Up @@ -246,7 +260,7 @@ struct Peer {
/// Best known hash that the peer has
best_hash: H256,
/// Tracks the best number of the peer.
best_number: Option<u64>,
best_number: u64,
}

/// Tracks the state of an individual peer
Expand Down
26 changes: 24 additions & 2 deletions crates/net/network/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,33 @@ pub struct BlockImportOutcome {
/// Sender of the `NewBlock` message.
pub peer: PeerId,
/// The result after validating the block
pub result: Result<NewBlockMessage, BlockImportError>,
pub result: Result<BlockValidation, BlockImportError>,
}

/// Represents the successful validation of a received `NewBlock` message.
#[derive(Debug)]
pub enum BlockValidation {
/// Basic Header validity check, after which the block should be relayed to peers via a
/// `NewBlock` message
ValidHeader {
/// received block
block: NewBlockMessage,
},
/// Successfully imported: state-root matches after execution. The block should be relayed via
/// `NewBlockHashes`
ValidBlock {
/// validated block.
block: NewBlockMessage,
},
}

/// Represents the error case of a failed block import
pub enum BlockImportError {}
#[derive(Debug, thiserror::Error)]
pub enum BlockImportError {
/// Consensus error
#[error(transparent)]
Consensus(#[from] reth_interfaces::consensus::Error),
}

/// An implementation of `BlockImport` that does nothing
#[derive(Debug, Default)]
Expand Down
25 changes: 21 additions & 4 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
config::NetworkConfig,
discovery::Discovery,
error::NetworkError,
import::{BlockImport, BlockImportOutcome},
import::{BlockImport, BlockImportOutcome, BlockValidation},
listener::ConnectionListener,
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender},
network::{NetworkHandle, NetworkHandleMessage},
Expand Down Expand Up @@ -192,10 +192,30 @@ where
}
}

/// Invoked after a `NewBlock` message from the peer was validated
fn on_block_import_result(&mut self, outcome: BlockImportOutcome) {
let BlockImportOutcome { peer, result } = outcome;
match result {
Ok(validated_block) => match validated_block {
BlockValidation::ValidHeader { block } => {
self.swarm.state_mut().update_peer_block(&peer, block.hash, block.number());
self.swarm.state_mut().announce_new_block(block);
}
BlockValidation::ValidBlock { block } => {
self.swarm.state_mut().announce_new_block_hash(block);
}
},
Err(_err) => {
// TODO report peer for bad block
}
}
}

/// Handles a received Message from the peer.
fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage) {
match msg {
PeerMessage::NewBlockHashes(hashes) => {
let hashes = Arc::try_unwrap(hashes).unwrap_or_else(|arc| (*arc).clone());
// update peer's state, to track what blocks this peer has seen
self.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0)
}
Expand Down Expand Up @@ -240,9 +260,6 @@ where
.send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
}
}

/// Invoked after a `NewBlock` message from the peer was validated
fn on_block_import_result(&mut self, _outcome: BlockImportOutcome) {}
}

impl<C> Future for NetworkManager<C>
Expand Down
11 changes: 10 additions & 1 deletion crates/net/network/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,20 @@ pub struct NewBlockMessage {
pub block: Arc<NewBlock>,
}

// === impl NewBlockMessage ===

impl NewBlockMessage {
/// Returns the block number of the block
pub fn number(&self) -> u64 {
self.block.block.header.number
}
}

/// Represents all messages that can be sent to a peer session
#[derive(Debug)]
pub enum PeerMessage {
/// Announce new block hashes
NewBlockHashes(NewBlockHashes),
NewBlockHashes(Arc<NewBlockHashes>),
/// Broadcast new block.
NewBlock(NewBlockMessage),
/// Broadcast transactions.
Expand Down
47 changes: 45 additions & 2 deletions crates/net/network/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
},
peers::{PeerAction, PeersManager},
};
use reth_eth_wire::{capability::Capabilities, BlockHashNumber, Status};
use reth_eth_wire::{capability::Capabilities, BlockHashNumber, NewBlockHashes, Status};
use reth_interfaces::provider::BlockProvider;
use reth_primitives::{PeerId, H256};
use std::{
Expand Down Expand Up @@ -93,7 +93,8 @@ where
debug_assert!(self.connected_peers.contains_key(&peer), "Already connected; not possible");

// find the corresponding block number
let block_number = self.client.block_number(status.blockhash).ok().flatten();
let block_number =
self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default();
self.state_fetcher.new_connected_peer(peer, status.blockhash, block_number);

self.connected_peers.insert(
Expand Down Expand Up @@ -129,6 +130,7 @@ where
// number of peers)
let num_propagate = (self.connected_peers.len() as f64).sqrt() as u64 + 1;

let number = msg.block.block.header.number;
let mut count = 0;
for (peer_id, peer) in self.connected_peers.iter_mut() {
if peer.blocks.contains(&msg.hash) {
Expand All @@ -141,6 +143,11 @@ where
self.queued_messages
.push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() });

// update peer block info
if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
peer.best_hash = msg.hash;
}

// mark the block as seen by the peer
peer.blocks.insert(msg.hash);

Expand All @@ -153,6 +160,36 @@ where
}
}

/// Completes the block propagation process started in [`NetworkState::announce_new_block()`]
/// but sending `NewBlockHash` broadcast to all peers that haven't seen it yet.
pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage) {
let number = msg.block.block.header.number;
let hashes = Arc::new(NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]));
for (peer_id, peer) in self.connected_peers.iter_mut() {
if peer.blocks.contains(&msg.hash) {
// skip peers which already reported the block
continue
}

if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
peer.best_hash = msg.hash;
}

self.queued_messages.push_back(StateAction::NewBlockHashes {
peer_id: *peer_id,
hashes: Arc::clone(&hashes),
});
}
}

/// Updates the block information for the peer.
pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: H256, number: u64) {
if let Some(peer) = self.connected_peers.get_mut(peer_id) {
peer.best_hash = hash;
}
self.state_fetcher.update_peer_block(peer_id, hash, number);
}

/// Invoked after a `NewBlock` message was received by the peer.
///
/// This will keep track of blocks we know a peer has
Expand Down Expand Up @@ -342,6 +379,12 @@ pub enum StateAction {
/// The `NewBlock` message
block: NewBlockMessage,
},
NewBlockHashes {
/// Target of the message
peer_id: PeerId,
/// `NewBlockHashes` message to send to the peer.
hashes: Arc<NewBlockHashes>,
},
/// Create a new connection to the given node.
Connect { remote_addr: SocketAddr, node_id: PeerId },
/// Disconnect an existing connection
Expand Down
4 changes: 4 additions & 0 deletions crates/net/network/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ where
let msg = PeerMessage::NewBlock(msg);
self.sessions.send_message(&peer_id, msg);
}
StateAction::NewBlockHashes { peer_id, hashes } => {
let msg = PeerMessage::NewBlockHashes(hashes);
self.sessions.send_message(&peer_id, msg);
}
}
None
}
Expand Down

0 comments on commit 6b336c6

Please sign in to comment.