Skip to content

Commit

Permalink
Sync: handle incompatible peers
Browse files Browse the repository at this point in the history
  • Loading branch information
styppo committed Apr 5, 2024
1 parent 46702f7 commit e7a0319
Show file tree
Hide file tree
Showing 19 changed files with 311 additions and 96 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions blockchain-interface/src/abstract_blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ pub trait AbstractBlockchain {
self.head().block_number()
}

/// Returns the epoch number at the head of the main chain.
fn batch_number(&self) -> u32 {
self.head().batch_number()
}

/// Returns the epoch number at the head of the main chain.
fn epoch_number(&self) -> u32 {
self.head().epoch_number()
Expand Down
4 changes: 4 additions & 0 deletions blockchain/src/blockchain/abstract_blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ impl AbstractBlockchain for Blockchain {
self.state.main_chain.head.block_number()
}

fn batch_number(&self) -> u32 {
self.state.main_chain.head.batch_number()
}

fn epoch_number(&self) -> u32 {
self.state.main_chain.head.epoch_number()
}
Expand Down
1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ serde = "1.0"
thiserror = "1.0"
tokio = { version = "1.37", features = ["rt", "sync", "time"] }
tokio-stream = { version = "0.1", features = ["sync"] }
wasm-timer = "0.2"

nimiq-account = { workspace = true, default-features = false }
nimiq-block = { workspace = true }
Expand Down
9 changes: 5 additions & 4 deletions consensus/src/consensus/head_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use nimiq_blockchain_proxy::BlockchainProxy;
use nimiq_hash::Blake2bHash;
use nimiq_network_interface::{network::Network, request::RequestError};

use crate::messages::{BlockError, RequestBlock, RequestHead};
use crate::messages::{BlockError, RequestBlock, RequestHead, ResponseHead};

/// Requests the head blocks for a set of peers.
/// Calculates the number of known/unknown blocks and a vector of unknown blocks.
pub struct HeadRequests<TNetwork: Network + 'static> {
peers: Vec<TNetwork::PeerId>,
head_hashes: FuturesUnordered<BoxFuture<'static, (usize, Result<Blake2bHash, RequestError>)>>,
head_hashes: FuturesUnordered<BoxFuture<'static, (usize, Result<ResponseHead, RequestError>)>>,
head_blocks: FuturesUnordered<
BoxFuture<
'static,
Expand Down Expand Up @@ -87,7 +87,7 @@ impl<TNetwork: Network + 'static> HeadRequests<TNetwork> {
async fn request_head(
network: Arc<TNetwork>,
peer_id: TNetwork::PeerId,
) -> Result<Blake2bHash, RequestError> {
) -> Result<ResponseHead, RequestError> {
network
.request::<RequestHead>(RequestHead {}, peer_id)
.await
Expand Down Expand Up @@ -119,7 +119,8 @@ impl<TNetwork: Network + 'static> Future for HeadRequests<TNetwork> {
while let Poll::Ready(Some((i, result))) = self.head_hashes.poll_next_unpin(cx) {
// If we got a result, check it and classify it as known block/unknown block.
match result {
Ok(hash) => {
Ok(head) => {
let hash = head.micro;
if self.blockchain.read().get_block(&hash, false).is_ok() {
self.num_known_blocks += 1;
} else {
Expand Down
9 changes: 7 additions & 2 deletions consensus/src/messages/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,13 @@ impl RequestMissingBlocks {
}

impl<N: Network> Handle<N, BlockchainProxy> for RequestHead {
fn handle(&self, _peer_id: N::PeerId, blockchain: &BlockchainProxy) -> Blake2bHash {
blockchain.read().head_hash()
fn handle(&self, _peer_id: N::PeerId, blockchain: &BlockchainProxy) -> ResponseHead {
let blockchain = blockchain.read();
ResponseHead {
micro: blockchain.head_hash(),
r#macro: blockchain.macro_head_hash(),
election: blockchain.election_head_hash(),
}
}
}

Expand Down
9 changes: 8 additions & 1 deletion consensus/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,17 @@ impl RequestCommon for RequestMissingBlocks {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RequestHead {}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ResponseHead {
pub micro: Blake2bHash,
pub r#macro: Blake2bHash,
pub election: Blake2bHash,
}

impl RequestCommon for RequestHead {
type Kind = RequestMarker;
const TYPE_ID: u16 = 210;
type Response = Blake2bHash;
type Response = ResponseHead;
const MAX_REQUESTS: u32 = MAX_REQUEST_RESPONSE_HEAD;
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/sync/history/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl<TNetwork: Network> HistoryMacroSync<TNetwork> {
}

impl<TNetwork: Network> MacroSync<TNetwork::PeerId> for HistoryMacroSync<TNetwork> {
fn add_peer(&self, peer_id: TNetwork::PeerId) {
fn add_peer(&mut self, peer_id: TNetwork::PeerId) {
// Ignore peer if we already know it.
if self.peers.contains_key(&peer_id) {
return;
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/sync/history/sync_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ impl<TNetwork: Network> HistoryMacroSync<TNetwork> {
if self.network.peer_provides_required_services(peer_id) {
// Request epoch_ids from the peer that joined.
self.add_peer(peer_id);
} else {
// We can't sync with this peer as it doesn't provide the services that we need.
// Emit the peer as incompatible.
return Poll::Ready(Some(MacroSyncReturn::Incompatible(peer_id)));
}
}
Ok(_) => {}
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/sync/light/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl<TNetwork: Network> LightMacroSync<TNetwork> {
}

impl<TNetwork: Network> MacroSync<TNetwork::PeerId> for LightMacroSync<TNetwork> {
fn add_peer(&self, peer_id: TNetwork::PeerId) {
fn add_peer(&mut self, peer_id: TNetwork::PeerId) {
info!(%peer_id, "Requesting zkp from peer");

self.zkp_requests
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/sync/light/sync_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ impl<TNetwork: Network> LightMacroSync<TNetwork> {
if self.network.peer_provides_required_services(peer_id) {
// Request zkps and start the macro sync process
self.add_peer(peer_id);
} else {
// We can't sync with this peer as it doesn't provide the services that we need.
// Emit the peer as incompatible.
return Poll::Ready(Some(MacroSyncReturn::Incompatible(peer_id)));
}
}
Ok(_) => {}
Expand Down

0 comments on commit e7a0319

Please sign in to comment.