Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request and response ID to every p2p message and route messages a… #311

Merged
merged 6 commits into from Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 7 additions & 7 deletions consensus/core/src/config/genesis.rs
Expand Up @@ -143,10 +143,10 @@ pub const TESTNET_GENESIS: GenesisBlock = GenesisBlock {

pub const TESTNET11_GENESIS: GenesisBlock = GenesisBlock {
hash: Hash::from_bytes([
0xf0, 0xa3, 0x15, 0x04, 0xfb, 0x4b, 0xf8, 0x83, 0xd0, 0xdd, 0x2a, 0x75, 0x1c, 0x7d, 0xac, 0x06, 0x1a, 0xb4, 0xc6, 0x33, 0x00,
0xc5, 0x4f, 0xd2, 0x64, 0xd8, 0xe3, 0x06, 0x99, 0xda, 0x51, 0x54,
0x5a, 0x90, 0xf8, 0x71, 0x09, 0x32, 0x3d, 0x61, 0x41, 0xff, 0x51, 0x04, 0xa2, 0xd5, 0xf8, 0xd8, 0x85, 0x7a, 0x6f, 0x39, 0x2e,
0xb4, 0x90, 0x5c, 0xe3, 0x55, 0x5e, 0xc9, 0x12, 0xcd, 0xfb, 0x9c,
]),
bits: 520421375, // see `gen_testnet11_genesis`
bits: 504155340, // see `gen_testnet11_genesis`
..TESTNET_GENESIS
};

Expand Down Expand Up @@ -227,11 +227,11 @@ mod tests {
let bps = Testnet11Bps::bps();
let mut genesis = TESTNET_GENESIS;
let target = kaspa_math::Uint256::from_compact_target_bits(genesis.bits);
let scaled_up_target = target * bps;
let scaled_up_bits = scaled_up_target.compact_target_bits();
genesis.bits = scaled_up_bits;
let scaled_target = target * bps / 100;
let scaled_bits = scaled_target.compact_target_bits();
genesis.bits = scaled_bits;
if genesis.bits != TESTNET11_GENESIS.bits {
panic!("Testnet 11: new bits: {}\nnew hash: {:#04x?}", scaled_up_bits, Block::from(&genesis).hash().as_bytes());
panic!("Testnet 11: new bits: {}\nnew hash: {:#04x?}", scaled_bits, Block::from(&genesis).hash().as_bytes());
}
}

Expand Down
10 changes: 5 additions & 5 deletions protocol/flows/src/flow_context.rs
@@ -1,5 +1,5 @@
use crate::flowcontext::{orphans::OrphanBlocksPool, process_queue::ProcessQueue, transactions::TransactionsSpread};
use crate::v5;
use crate::{v5, v6};
use async_trait::async_trait;
use futures::future::join_all;
use kaspa_addressmanager::AddressManager;
Expand Down Expand Up @@ -54,7 +54,7 @@ use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
use uuid::Uuid;

/// The P2P protocol version. Currently the only one supported.
const PROTOCOL_VERSION: u32 = 5;
const PROTOCOL_VERSION: u32 = 6;

/// See `check_orphan_resolution_range`
const BASELINE_ORPHAN_RESOLUTION_RANGE: u32 = 5;
Expand Down Expand Up @@ -573,8 +573,8 @@ impl ConnectionInitializer for FlowContext {

// Register all flows according to version
let (flows, applied_protocol_version) = match peer_version.protocol_version {
PROTOCOL_VERSION => (v5::register(self.clone(), router.clone()), PROTOCOL_VERSION),
// TODO: different errors for obsolete (low version) vs unknown (high)
v if v >= PROTOCOL_VERSION => (v6::register(self.clone(), router.clone()), PROTOCOL_VERSION),
5 => (v5::register(self.clone(), router.clone()), 5),
v => return Err(ProtocolError::VersionMismatch(PROTOCOL_VERSION, v)),
};

Expand All @@ -592,7 +592,7 @@ impl ConnectionInitializer for FlowContext {
// Send and receive the ready signal
handshake.exchange_ready_messages().await?;

info!("Registering p2p flows for peer {} for protocol version {}", router, peer_version.protocol_version);
info!("Registering p2p flows for peer {} for protocol version {}", router, applied_protocol_version);

// Launch all flows. Note we launch only after the ready signal was exchanged
for flow in flows {
Expand Down
1 change: 1 addition & 0 deletions protocol/flows/src/lib.rs
Expand Up @@ -3,3 +3,4 @@ pub mod flow_trait;
pub mod flowcontext;
pub mod service;
pub mod v5;
pub mod v6;
38 changes: 26 additions & 12 deletions protocol/flows/src/v5/blockrelay/flow.rs
Expand Up @@ -8,9 +8,9 @@ use kaspa_core::{debug, info};
use kaspa_hashes::Hash;
use kaspa_p2p_lib::{
common::ProtocolError,
dequeue, dequeue_with_timeout, make_message,
dequeue, dequeue_with_timeout, make_message, make_request,
pb::{kaspad_message::Payload, InvRelayBlockMessage, RequestBlockLocatorMessage, RequestRelayBlocksMessage},
IncomingRoute, Router,
IncomingRoute, Router, SharedIncomingRoute,
};
use std::{collections::VecDeque, sync::Arc};
use tokio::sync::mpsc::{error::TrySendError, Sender};
Expand All @@ -22,12 +22,12 @@ pub struct RelayInvMessage {

/// Encapsulates an incoming invs route which also receives data locally
pub struct TwoWayIncomingRoute {
incoming_route: IncomingRoute,
incoming_route: SharedIncomingRoute,
indirect_invs: VecDeque<Hash>,
}

impl TwoWayIncomingRoute {
pub fn new(incoming_route: IncomingRoute) -> Self {
pub fn new(incoming_route: SharedIncomingRoute) -> Self {
Self { incoming_route, indirect_invs: VecDeque::new() }
}

Expand Down Expand Up @@ -72,7 +72,7 @@ impl HandleRelayInvsFlow {
pub fn new(
ctx: FlowContext,
router: Arc<Router>,
invs_route: IncomingRoute,
invs_route: SharedIncomingRoute,
msg_route: IncomingRoute,
ibd_sender: Sender<Block>,
) -> Self {
Expand Down Expand Up @@ -111,7 +111,7 @@ impl HandleRelayInvsFlow {
}

// We keep the request scope alive until consensus processes the block
let Some((block, request_scope)) = self.request_block(inv.hash).await? else {
let Some((block, request_scope)) = self.request_block(inv.hash, self.msg_route.id()).await? else {
debug!("Relay block {} was already requested from another peer, continuing...", inv.hash);
continue;
};
Expand Down Expand Up @@ -184,13 +184,21 @@ impl HandleRelayInvsFlow {
}
}

async fn request_block(&mut self, requested_hash: Hash) -> Result<Option<(Block, RequestScope<Hash>)>, ProtocolError> {
async fn request_block(
&mut self,
requested_hash: Hash,
request_id: u32,
) -> Result<Option<(Block, RequestScope<Hash>)>, ProtocolError> {
// Note: the request scope is returned and should be captured until block processing is completed
let Some(request_scope) = self.ctx.try_adding_block_request(requested_hash) else {
return Ok(None);
};
self.router
.enqueue(make_message!(Payload::RequestRelayBlocks, RequestRelayBlocksMessage { hashes: vec![requested_hash.into()] }))
.enqueue(make_request!(
Payload::RequestRelayBlocks,
RequestRelayBlocksMessage { hashes: vec![requested_hash.into()] },
request_id
))
.await?;
let msg = dequeue_with_timeout!(self.msg_route, Payload::Block)?;
let block: Block = msg.try_into()?;
Expand All @@ -210,7 +218,7 @@ impl HandleRelayInvsFlow {
// Add the block to the orphan pool if it's within orphan resolution range.
// If the block is indirect it means one of its descendants was already is resolution range, so
// we can avoid the query.
if is_indirect_inv || self.check_orphan_resolution_range(consensus, block.hash()).await? {
if is_indirect_inv || self.check_orphan_resolution_range(consensus, block.hash(), self.msg_route.id()).await? {
let hash = block.hash();
self.ctx.add_orphan(block).await;
self.enqueue_orphan_roots(consensus, hash).await;
Expand All @@ -230,11 +238,17 @@ impl HandleRelayInvsFlow {
/// mechanism or via IBD. This method sends a BlockLocator request to the peer with
/// a limit of `ctx.orphan_resolution_range`. In the response, if we know none of the hashes,
/// we should retrieve the given block `hash` via IBD. Otherwise, via unorphaning.
async fn check_orphan_resolution_range(&mut self, consensus: &ConsensusProxy, hash: Hash) -> Result<bool, ProtocolError> {
async fn check_orphan_resolution_range(
&mut self,
consensus: &ConsensusProxy,
hash: Hash,
request_id: u32,
) -> Result<bool, ProtocolError> {
self.router
.enqueue(make_message!(
.enqueue(make_request!(
Payload::RequestBlockLocator,
RequestBlockLocatorMessage { high_hash: Some(hash.into()), limit: self.ctx.orphan_resolution_range() }
RequestBlockLocatorMessage { high_hash: Some(hash.into()), limit: self.ctx.orphan_resolution_range() },
request_id
))
.await?;
let msg = dequeue_with_timeout!(self.msg_route, Payload::BlockLocator)?;
Expand Down
6 changes: 3 additions & 3 deletions protocol/flows/src/v5/blockrelay/handle_requests.rs
Expand Up @@ -2,7 +2,7 @@ use crate::{flow_context::FlowContext, flow_trait::Flow};
use kaspa_core::debug;
use kaspa_p2p_lib::{
common::ProtocolError,
dequeue, make_message,
dequeue_with_request_id, make_message, make_response,
pb::{kaspad_message::Payload, InvRelayBlockMessage},
IncomingRoute, Router,
};
Expand Down Expand Up @@ -36,14 +36,14 @@ impl HandleRelayBlockRequests {
// Note: in go-kaspad this was done via a dedicated one-time flow.
self.send_sink().await?;
loop {
let msg = dequeue!(self.incoming_route, Payload::RequestRelayBlocks)?;
let (msg, request_id) = dequeue_with_request_id!(self.incoming_route, Payload::RequestRelayBlocks)?;
let hashes: Vec<_> = msg.try_into()?;

let session = self.ctx.consensus().unguarded_session();

for hash in hashes {
let block = session.async_get_block(hash).await?;
self.router.enqueue(make_message!(Payload::Block, (&block).into())).await?;
self.router.enqueue(make_response!(Payload::Block, (&block).into(), request_id)).await?;
debug!("relayed block with hash {} to peer {}", hash, self.router);
}
}
Expand Down
32 changes: 17 additions & 15 deletions protocol/flows/src/v5/mod.rs
Expand Up @@ -15,22 +15,22 @@ use self::{
};
use crate::{flow_context::FlowContext, flow_trait::Flow};

use kaspa_p2p_lib::{KaspadMessagePayloadType, Router};
use kaspa_p2p_lib::{KaspadMessagePayloadType, Router, SharedIncomingRoute};
use std::sync::Arc;

mod address;
mod blockrelay;
mod ibd;
mod ping;
mod request_anticone;
mod request_block_locator;
mod request_headers;
mod request_ibd_blocks;
mod request_ibd_chain_block_locator;
mod request_pp_proof;
mod request_pruning_point_and_anticone;
mod request_pruning_point_utxo_set;
mod txrelay;
pub(crate) mod address;
pub(crate) mod blockrelay;
pub(crate) mod ibd;
pub(crate) mod ping;
pub(crate) mod request_anticone;
pub(crate) mod request_block_locator;
pub(crate) mod request_headers;
pub(crate) mod request_ibd_blocks;
pub(crate) mod request_ibd_chain_block_locator;
pub(crate) mod request_pp_proof;
pub(crate) mod request_pruning_point_and_anticone;
pub(crate) mod request_pruning_point_utxo_set;
pub(crate) mod txrelay;

pub fn register(ctx: FlowContext, router: Arc<Router>) -> Vec<Box<dyn Flow>> {
// IBD flow <-> invs flow channel requires no buffering hence the minimal size possible
Expand Down Expand Up @@ -60,7 +60,9 @@ pub fn register(ctx: FlowContext, router: Arc<Router>) -> Vec<Box<dyn Flow>> {
Box::new(HandleRelayInvsFlow::new(
ctx.clone(),
router.clone(),
router.subscribe_with_capacity(vec![KaspadMessagePayloadType::InvRelayBlock], ctx.block_invs_channel_size()),
SharedIncomingRoute::new(
router.subscribe_with_capacity(vec![KaspadMessagePayloadType::InvRelayBlock], ctx.block_invs_channel_size()),
),
router.subscribe(vec![KaspadMessagePayloadType::Block, KaspadMessagePayloadType::BlockLocator]),
ibd_sender,
)),
Expand Down
11 changes: 6 additions & 5 deletions protocol/flows/src/v5/request_anticone.rs
Expand Up @@ -4,7 +4,7 @@ use kaspa_core::debug;
use kaspa_hashes::Hash;
use kaspa_p2p_lib::{
common::ProtocolError,
dequeue, make_message,
dequeue_with_request_id, make_response,
pb::{kaspad_message::Payload, BlockHeadersMessage, DoneHeadersMessage},
IncomingRoute, Router,
};
Expand Down Expand Up @@ -34,7 +34,7 @@ impl HandleAnticoneRequests {

async fn start_impl(&mut self) -> Result<(), ProtocolError> {
loop {
let msg = dequeue!(self.incoming_route, Payload::RequestAnticone)?;
let (msg, request_id) = dequeue_with_request_id!(self.incoming_route, Payload::RequestAnticone)?;
let (block, context): (Hash, Hash) = msg.try_into()?;

debug!("received anticone request with block hash: {}, context hash: {} for peer {}", block, context, self.router);
Expand All @@ -55,12 +55,13 @@ impl HandleAnticoneRequests {
headers.sort_by(|a, b| a.blue_work.cmp(&b.blue_work));

self.router
.enqueue(make_message!(
.enqueue(make_response!(
Payload::BlockHeaders,
BlockHeadersMessage { block_headers: headers.into_iter().map(|header| header.as_ref().into()).collect() }
BlockHeadersMessage { block_headers: headers.into_iter().map(|header| header.as_ref().into()).collect() },
request_id
))
.await?;
self.router.enqueue(make_message!(Payload::DoneHeaders, DoneHeadersMessage {})).await?;
self.router.enqueue(make_response!(Payload::DoneHeaders, DoneHeadersMessage {}, request_id)).await?;
}
}
}
9 changes: 5 additions & 4 deletions protocol/flows/src/v5/request_block_locator.rs
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use kaspa_p2p_lib::{
common::ProtocolError,
dequeue, make_message,
dequeue_with_request_id, make_response,
pb::{kaspad_message::Payload, BlockLocatorMessage},
IncomingRoute, Router,
};
Expand Down Expand Up @@ -33,16 +33,17 @@ impl RequestBlockLocatorFlow {

async fn start_impl(&mut self) -> Result<(), ProtocolError> {
loop {
let msg = dequeue!(self.incoming_route, Payload::RequestBlockLocator)?;
let (msg, request_id) = dequeue_with_request_id!(self.incoming_route, Payload::RequestBlockLocator)?;
let (high, limit) = msg.try_into()?;

let locator =
self.ctx.consensus().session().await.async_create_block_locator_from_pruning_point(high, limit as usize).await?;

self.router
.enqueue(make_message!(
.enqueue(make_response!(
Payload::BlockLocator,
BlockLocatorMessage { hashes: locator.into_iter().map(|hash| hash.into()).collect() }
BlockLocatorMessage { hashes: locator.into_iter().map(|hash| hash.into()).collect() },
request_id
))
.await?;
}
Expand Down
8 changes: 4 additions & 4 deletions protocol/flows/src/v5/request_headers.rs
Expand Up @@ -4,7 +4,7 @@ use kaspa_consensus_core::api::ConsensusApi;
use kaspa_hashes::Hash;
use kaspa_p2p_lib::{
common::ProtocolError,
dequeue, make_message,
dequeue, dequeue_with_request_id, make_response,
pb::{self, kaspad_message::Payload, BlockHeadersMessage, DoneHeadersMessage},
IncomingRoute, Router,
};
Expand Down Expand Up @@ -40,7 +40,7 @@ impl RequestHeadersFlow {
let max_blocks = max(MAX_BLOCKS, self.ctx.config.mergeset_size_limit as usize + 1);

loop {
let msg = dequeue!(self.incoming_route, Payload::RequestHeaders)?;
let (msg, request_id) = dequeue_with_request_id!(self.incoming_route, Payload::RequestHeaders)?;
let (high, mut low) = msg.try_into()?;

let consensus = self.ctx.consensus();
Expand Down Expand Up @@ -68,13 +68,13 @@ impl RequestHeadersFlow {
session.spawn_blocking(move |c| Self::get_headers_between(c, low, high, max_blocks)).await?;
debug!("Got {} header hashes above {}", block_headers.len(), low);
low = last;
self.router.enqueue(make_message!(Payload::BlockHeaders, BlockHeadersMessage { block_headers })).await?;
self.router.enqueue(make_response!(Payload::BlockHeaders, BlockHeadersMessage { block_headers }, request_id)).await?;

dequeue!(self.incoming_route, Payload::RequestNextHeaders)?;
session = consensus.session().await;
}

self.router.enqueue(make_message!(Payload::DoneHeaders, DoneHeadersMessage {})).await?;
self.router.enqueue(make_response!(Payload::DoneHeaders, DoneHeadersMessage {}, request_id)).await?;
}
}

Expand Down
8 changes: 5 additions & 3 deletions protocol/flows/src/v5/request_ibd_blocks.rs
@@ -1,6 +1,8 @@
use crate::{flow_context::FlowContext, flow_trait::Flow};
use kaspa_core::debug;
use kaspa_p2p_lib::{common::ProtocolError, dequeue, make_message, pb::kaspad_message::Payload, IncomingRoute, Router};
use kaspa_p2p_lib::{
common::ProtocolError, dequeue_with_request_id, make_response, pb::kaspad_message::Payload, IncomingRoute, Router,
};
use std::sync::Arc;

pub struct HandleIbdBlockRequests {
Expand All @@ -27,15 +29,15 @@ impl HandleIbdBlockRequests {

async fn start_impl(&mut self) -> Result<(), ProtocolError> {
loop {
let msg = dequeue!(self.incoming_route, Payload::RequestIbdBlocks)?;
let (msg, request_id) = dequeue_with_request_id!(self.incoming_route, Payload::RequestIbdBlocks)?;
let hashes: Vec<_> = msg.try_into()?;

debug!("got request for {} IBD blocks", hashes.len());
let session = self.ctx.consensus().unguarded_session();

for hash in hashes {
let block = session.async_get_block(hash).await?;
self.router.enqueue(make_message!(Payload::IbdBlock, (&block).into())).await?;
self.router.enqueue(make_response!(Payload::IbdBlock, (&block).into(), request_id)).await?;
}
}
}
Expand Down