From 03d2b45bb19e47ccbece1b1b33d050a7c55445d4 Mon Sep 17 00:00:00 2001 From: Stanimal Date: Sun, 17 Oct 2021 20:47:11 +0400 Subject: [PATCH] fix: remove unbounded vec allocations from base node grpc/p2p messaging - replaces unbounded height lists with bound height ranges in internal base node service interfaces and p2p messaging - removes unused and deprecated base node p2p messaging requests/responses - load chain headers when requesting headers, this simplifies and reduces the database load when calling grpc get_blocks - use iterator with almost no footprint for paging in grpc calls - implement `DoubleSidedIterator` for `NonOverlappingIntegerPairIter` - add overflow protection to `NonOverlappingIntegerPairIter` and additional tests --- Cargo.lock | 1 + applications/tari_base_node/Cargo.toml | 1 + .../tari_base_node/src/command_handler.rs | 36 +-- .../src/grpc/base_node_grpc_server.rs | 273 ++++++++---------- .../tari_base_node/src/grpc/blocks.rs | 11 +- .../comms_interface/comms_request.rs | 16 +- .../comms_interface/comms_response.rs | 6 +- .../comms_interface/inbound_handlers.rs | 54 +--- .../comms_interface/local_interface.rs | 24 +- .../core/src/base_node/comms_interface/mod.rs | 1 + .../comms_interface/outbound_interface.rs | 167 +---------- base_layer/core/src/base_node/mod.rs | 2 +- .../core/src/base_node/proto/request.proto | 32 +- .../core/src/base_node/proto/request.rs | 91 +----- .../core/src/base_node/proto/response.proto | 22 -- .../core/src/base_node/proto/response.rs | 72 +---- .../core/src/base_node/service/service.rs | 79 ++--- .../state_machine_service/initializer.rs | 3 - .../state_machine_service/state_machine.rs | 5 +- base_layer/core/src/blocks/block_header.rs | 8 - base_layer/core/src/iterators/chunk.rs | 141 ++++++++- base_layer/core/tests/helpers/nodes.rs | 2 +- base_layer/core/tests/node_comms_interface.rs | 163 ++--------- base_layer/core/tests/node_service.rs | 173 +---------- base_layer/core/tests/node_state_machine.rs | 2 - 25 files changed, 435 insertions(+), 950 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0264fd4ec6f..d5480b9a498 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4224,6 +4224,7 @@ dependencies = [ "bincode", "chrono", "config", + "either", "futures 0.3.16", "log 0.4.14", "num_cpus", diff --git a/applications/tari_base_node/Cargo.toml b/applications/tari_base_node/Cargo.toml index c1fa83a5ba3..a6b20a05f9a 100644 --- a/applications/tari_base_node/Cargo.toml +++ b/applications/tari_base_node/Cargo.toml @@ -25,6 +25,7 @@ anyhow = "1.0.32" bincode = "1.3.1" chrono = "0.4" config = { version = "0.9.3" } +either = "1.6.1" futures = { version = "^0.3.16", default-features = false, features = ["alloc"] } log = { version = "0.4.8", features = ["std"] } num_cpus = "1" diff --git a/applications/tari_base_node/src/command_handler.rs b/applications/tari_base_node/src/command_handler.rs index fc99c70990c..2a279ce87c7 100644 --- a/applications/tari_base_node/src/command_handler.rs +++ b/applications/tari_base_node/src/command_handler.rs @@ -128,14 +128,9 @@ impl CommandHandler { status_line.add_field("State", state_info.borrow().state_info.short_desc()); let metadata = node.get_metadata().await.unwrap(); - - let last_header = node - .get_headers(vec![metadata.height_of_longest_chain()]) - .await - .unwrap() - .pop() - .unwrap(); - let last_block_time = DateTime::::from(last_header.timestamp); + let height = metadata.height_of_longest_chain(); + let last_header = node.get_headers(height, height).await.unwrap().pop().unwrap(); + let last_block_time = DateTime::::from(last_header.header().timestamp); status_line.add_field( "Tip", format!( @@ -866,8 +861,9 @@ impl CommandHandler { io::stdout().flush().unwrap(); // we can only check till the pruning horizon, 0 is archive node so it needs to check every block. if height > horizon_height { - match node.get_blocks(vec![height]).await { - Err(_err) => { + match node.get_blocks(height, height).await { + Err(err) => { + error!(target: LOG_TARGET, "{}", err); missing_blocks.push(height); }, Ok(mut data) => match data.pop() { @@ -879,8 +875,8 @@ impl CommandHandler { }; } height -= 1; - let next_header = node.get_headers(vec![height]).await; - if next_header.is_err() { + let next_header = node.get_headers(height, height).await.ok().filter(|h| !h.is_empty()); + if next_header.is_none() { // this header is missing, so we stop here and need to ask for this header missing_headers.push(height); }; @@ -917,9 +913,9 @@ impl CommandHandler { print!("{}", height); io::stdout().flush().unwrap(); - let block = match node.get_blocks(vec![height]).await { - Err(_err) => { - println!("Error in db, could not get block"); + let block = match node.get_blocks(height, height).await { + Err(err) => { + println!("Error in db, could not get block: {}", err); break; }, Ok(mut data) => match data.pop() { @@ -927,14 +923,14 @@ impl CommandHandler { // logging it. Some(historical_block) => historical_block, None => { - println!("Error in db, could not get block"); + println!("Error in db, block not found at height {}", height); break; }, }, }; - let prev_block = match node.get_blocks(vec![height - 1]).await { - Err(_err) => { - println!("Error in db, could not get block"); + let prev_block = match node.get_blocks(height - 1, height - 1).await { + Err(err) => { + println!("Error in db, could not get block: {}", err); break; }, Ok(mut data) => match data.pop() { @@ -942,7 +938,7 @@ impl CommandHandler { // logging it. Some(historical_block) => historical_block, None => { - println!("Error in db, could not get block"); + println!("Error in db, block not found at height {}", height - 1); break; }, }, diff --git a/applications/tari_base_node/src/grpc/base_node_grpc_server.rs b/applications/tari_base_node/src/grpc/base_node_grpc_server.rs index 7db518cbcc8..6608a7a0d23 100644 --- a/applications/tari_base_node/src/grpc/base_node_grpc_server.rs +++ b/applications/tari_base_node/src/grpc/base_node_grpc_server.rs @@ -26,6 +26,7 @@ use crate::{ helpers::{mean, median}, }, }; +use either::Either; use futures::{channel::mpsc, SinkExt}; use log::*; use std::{ @@ -49,6 +50,7 @@ use tari_core::{ chain_storage::ChainStorageError, consensus::{emission::Emission, ConsensusManager, NetworkConsensus}, crypto::tari_utilities::{hex::Hex, ByteArray}, + iterators::NonOverlappingIntegerPairIter, mempool::{service::LocalMempoolService, TxStorageResponse}, proof_of_work::PowAlgorithm, transactions::transaction::Transaction, @@ -64,7 +66,7 @@ const GET_TOKENS_IN_CIRCULATION_PAGE_SIZE: usize = 1_000; // The maximum number of difficulty ints that can be requested at a time. These will be streamed to the // client, so memory is not really a concern here, but a malicious client could request a large // number here to keep the node busy -const GET_DIFFICULTY_MAX_HEIGHTS: usize = 10_000; +const GET_DIFFICULTY_MAX_HEIGHTS: u64 = 10_000; const GET_DIFFICULTY_PAGE_SIZE: usize = 1_000; // The maximum number of headers a client can request at a time. If the client requests more than // this, this is the maximum that will be returned. @@ -104,7 +106,7 @@ impl BaseNodeGrpcServer { pub async fn get_heights( request: &tari_rpc::HeightRequest, handler: LocalNodeCommsInterface, -) -> Result, Status> { +) -> Result<(u64, u64), Status> { block_heights(handler, request.start_height, request.end_height, request.from_tip).await } @@ -132,111 +134,74 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { request.end_height ); let mut handler = self.node_service.clone(); - let mut heights: Vec = get_heights(&request, handler.clone()).await?; - heights = heights - .drain(..cmp::min(heights.len(), GET_DIFFICULTY_MAX_HEIGHTS)) - .collect(); - let (mut tx, rx) = mpsc::channel(GET_DIFFICULTY_MAX_HEIGHTS); + let (start_height, end_height) = get_heights(&request, handler.clone()).await?; + // Overflow safety: checked in get_heights + let num_requested = end_height - start_height; + if num_requested > GET_DIFFICULTY_MAX_HEIGHTS { + return Err(Status::invalid_argument(format!( + "Number of headers requested exceeds maximum. Expected less than {} but got {}", + GET_DIFFICULTY_MAX_HEIGHTS, num_requested + ))); + } + let (mut tx, rx) = mpsc::channel(cmp::min(num_requested as usize, GET_DIFFICULTY_PAGE_SIZE)); task::spawn(async move { - let mut page: Vec = heights - .drain(..cmp::min(heights.len(), GET_DIFFICULTY_PAGE_SIZE)) - .collect(); - while !page.is_empty() { - let mut difficulties = match handler.get_headers(page.clone()).await { + let page_iter = NonOverlappingIntegerPairIter::new(start_height, end_height + 1, GET_DIFFICULTY_PAGE_SIZE); + for (start, end) in page_iter { + // headers are returned by height + let headers = match handler.get_headers(start, end).await { + Ok(headers) => headers, Err(err) => { - warn!( - target: LOG_TARGET, - "Error communicating with local base node: {:?}", err, - ); + warn!(target: LOG_TARGET, "Base node service error: {:?}", err,); + let _ = tx + .send(Err(Status::internal("Internal error when fetching blocks"))) + .await; return; }, - Ok(mut data) => { - data.sort_by(|a, b| a.height.cmp(&b.height)); - let mut iter = data.iter().peekable(); - let mut result = Vec::new(); - while let Some(next) = iter.next() { - match handler.get_blocks(vec![next.height]).await { - Err(err) => { - warn!( - target: LOG_TARGET, - "Error communicating with local base node: {:?}", err, - ); - return; - }, - Ok(blocks) => { - match blocks.first() { - Some(block) => { - let current_difficulty: u64 = - block.accumulated_data.target_difficulty.as_u64(); - let current_timestamp = next.timestamp.as_u64(); - let current_height = next.height; - let pow_algo = next.pow.pow_algo.as_u64(); - let estimated_hash_rate = if let Some(peek) = iter.peek() { - let peeked_timestamp = peek.timestamp.as_u64(); - // Sometimes blocks can have the same timestamp, lucky miner and some - // clock drift. - if peeked_timestamp > current_timestamp { - current_difficulty / (peeked_timestamp - current_timestamp) - } else { - 0 - } - } else { - 0 - }; - result.push(( - current_difficulty, - estimated_hash_rate, - current_height, - current_timestamp, - pow_algo, - )) - }, - None => { - return; - }, - } - }, - }; - } - result - }, }; - difficulties.sort_by(|a, b| b.2.cmp(&a.2)); - let result_size = difficulties.len(); - for difficulty in difficulties { - match tx - .send(Ok({ - tari_rpc::NetworkDifficultyResponse { - difficulty: difficulty.0, - estimated_hash_rate: difficulty.1, - height: difficulty.2, - timestamp: difficulty.3, - pow_algo: difficulty.4, - } - })) - .await - { - Ok(_) => (), - Err(err) => { - warn!(target: LOG_TARGET, "Error sending difficulty via GRPC: {}", err); - match tx.send(Err(Status::unknown("Error sending data"))).await { - Ok(_) => (), - Err(send_err) => { - warn!(target: LOG_TARGET, "Error sending error to GRPC client: {}", send_err) - }, - } - return; - }, - } + if headers.is_empty() { + let _ = tx.send(Err(Status::invalid_argument(format!( + "No blocks found within range {} - {}", + start, end + )))); + return; } - if result_size < GET_DIFFICULTY_PAGE_SIZE { - break; + + let mut headers_iter = headers.iter().peekable(); + + while let Some(chain_header) = headers_iter.next() { + let current_difficulty = chain_header.accumulated_data().target_difficulty.as_u64(); + let current_timestamp = chain_header.header().timestamp.as_u64(); + let current_height = chain_header.header().height; + let pow_algo = chain_header.header().pow.pow_algo.as_u64(); + + let estimated_hash_rate = headers_iter + .peek() + .map(|chain_header| chain_header.header().timestamp.as_u64()) + .and_then(|peeked_timestamp| { + // Sometimes blocks can have the same timestamp, lucky miner and some + // clock drift. + peeked_timestamp + .checked_sub(current_timestamp) + .filter(|td| *td > 0) + .map(|time_diff| current_timestamp / time_diff) + }) + .unwrap_or(0); + + let difficulty = tari_rpc::NetworkDifficultyResponse { + difficulty: current_difficulty, + estimated_hash_rate, + height: current_height, + timestamp: current_timestamp, + pow_algo, + }; + + if let Err(err) = tx.send(Ok(difficulty)).await { + warn!(target: LOG_TARGET, "Error sending difficulties via GRPC: {}", err); + return; + } } - page = heights - .drain(..cmp::min(heights.len(), GET_DIFFICULTY_PAGE_SIZE)) - .collect(); } }); @@ -326,21 +291,18 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { let from_height = cmp::min(request.from_height, tip); - let headers: Vec = if from_height != 0 { + let (header_range, is_reversed) = if from_height != 0 { match sorting { Sorting::Desc => { let from = match from_height.overflowing_sub(num_headers) { (_, true) => 0, (res, false) => res + 1, }; - (from..=from_height).rev().collect() + (from..=from_height, true) }, Sorting::Asc => { - let to = match from_height.overflowing_add(num_headers) { - (_, true) => u64::MAX, - (res, false) => res, - }; - (from_height..to).collect() + let to = from_height.saturating_add(num_headers).saturating_sub(1); + (from_height..=to, false) }, } } else { @@ -350,34 +312,50 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { (_, true) => 0, (res, false) => res + 1, }; - (from..=tip).rev().collect() + (from..=tip, true) }, - Sorting::Asc => (0..num_headers).collect(), + Sorting::Asc => (0..=num_headers.saturating_sub(1), false), } }; task::spawn(async move { - trace!(target: LOG_TARGET, "Starting base node request"); - let mut headers = headers; - trace!(target: LOG_TARGET, "Headers:{:?}", headers); - let mut page: Vec = headers - .drain(..cmp::min(headers.len(), LIST_HEADERS_PAGE_SIZE)) - .collect(); - while !page.is_empty() { - trace!(target: LOG_TARGET, "Page: {:?}", page); - let result_headers = match handler.get_headers(page).await { + debug!( + target: LOG_TARGET, + "Starting base node request {}-{}", + header_range.start(), + header_range.end() + ); + let page_iter = NonOverlappingIntegerPairIter::new( + *header_range.start(), + *header_range.end() + 1, + LIST_HEADERS_PAGE_SIZE, + ); + let page_iter = if is_reversed { + Either::Left(page_iter.rev()) + } else { + Either::Right(page_iter) + }; + for (start, end) in page_iter { + debug!(target: LOG_TARGET, "Page: {}-{}", start, end); + let result_headers = match handler.get_headers(start, end).await { Err(err) => { - warn!(target: LOG_TARGET, "Error communicating with base node: {}", err,); + warn!(target: LOG_TARGET, "Internal base node service error: {}", err); return; }, - Ok(data) => data, + Ok(data) => { + if is_reversed { + data.into_iter().rev().collect::>() + } else { + data + } + }, }; - trace!(target: LOG_TARGET, "Result headers: {}", result_headers.len()); let result_size = result_headers.len(); + debug!(target: LOG_TARGET, "Result headers: {}", result_size); for header in result_headers { - trace!(target: LOG_TARGET, "Sending block header: {}", header.height); - match tx.send(Ok(header.into())).await { + debug!(target: LOG_TARGET, "Sending block header: {}", header.height()); + match tx.send(Ok(header.into_header().into())).await { Ok(_) => (), Err(err) => { warn!(target: LOG_TARGET, "Error sending block header via GRPC: {}", err); @@ -391,12 +369,6 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { }, } } - if result_size < LIST_HEADERS_PAGE_SIZE { - break; - } - page = headers - .drain(..cmp::min(headers.len(), LIST_HEADERS_PAGE_SIZE)) - .collect(); } }); @@ -670,18 +642,24 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { target: LOG_TARGET, "Incoming GRPC request for GetBlocks: {:?}", request.heights ); + let mut heights = request.heights; - heights = heights - .drain(..cmp::min(heights.len(), GET_BLOCKS_MAX_HEIGHTS)) - .collect(); + if heights.is_empty() { + return Err(Status::invalid_argument("heights cannot be empty")); + } + + heights.truncate(GET_BLOCKS_MAX_HEIGHTS); + heights.sort_unstable(); + // unreachable panic: `heights` is not empty + let start = *heights.first().expect("unreachable"); + let end = *heights.last().expect("unreachable"); let mut handler = self.node_service.clone(); let (mut tx, rx) = mpsc::channel(GET_BLOCKS_PAGE_SIZE); task::spawn(async move { - let mut page: Vec = heights.drain(..cmp::min(heights.len(), GET_BLOCKS_PAGE_SIZE)).collect(); - - while !page.is_empty() { - let blocks = match handler.get_blocks(page.clone()).await { + let page_iter = NonOverlappingIntegerPairIter::new(start, end + 1, GET_BLOCKS_PAGE_SIZE); + for (start, end) in page_iter { + let blocks = match handler.get_blocks(start, end).await { Err(err) => { warn!( target: LOG_TARGET, @@ -689,10 +667,19 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { ); return; }, - Ok(data) => data, + Ok(data) => { + // TODO: Change this interface to a start-end ranged one (clients like the block explorer + // convert start end ranges to integer lists anyway) + data.into_iter().filter(|b| heights.contains(&b.header().height)) + }, }; - let result_size = blocks.len(); + for block in blocks { + debug!( + target: LOG_TARGET, + "GetBlock GRPC sending block #{}", + block.header().height + ); match tx .send( block @@ -714,10 +701,6 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { }, } } - if result_size < GET_BLOCKS_PAGE_SIZE { - break; - } - page = heights.drain(..cmp::min(heights.len(), GET_BLOCKS_PAGE_SIZE)).collect(); } }); @@ -888,10 +871,10 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { ); let mut handler = self.node_service.clone(); - let heights: Vec = get_heights(&request, handler.clone()).await?; + let (start, end) = get_heights(&request, handler.clone()).await?; - let headers = match handler.get_headers(heights).await { - Ok(headers) => headers, + let headers = match handler.get_headers(start, end).await { + Ok(headers) => headers.into_iter().map(|h| h.into_header()).collect::>(), Err(err) => { warn!(target: LOG_TARGET, "Error getting headers for GRPC client: {}", err); Vec::new() @@ -1177,9 +1160,9 @@ async fn get_block_group( height_request.end_height ); - let heights = get_heights(&height_request, handler.clone()).await?; + let (start, end) = get_heights(&height_request, handler.clone()).await?; - let blocks = match handler.get_blocks(heights).await { + let blocks = match handler.get_blocks(start, end).await { Err(err) => { warn!( target: LOG_TARGET, diff --git a/applications/tari_base_node/src/grpc/blocks.rs b/applications/tari_base_node/src/grpc/blocks.rs index 0fe5a735c93..ee96a38b61d 100644 --- a/applications/tari_base_node/src/grpc/blocks.rs +++ b/applications/tari_base_node/src/grpc/blocks.rs @@ -21,7 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::cmp; -use tari_core::{base_node::LocalNodeCommsInterface, blocks::BlockHeader, chain_storage::HistoricalBlock}; +use tari_core::{base_node::LocalNodeCommsInterface, chain_storage::HistoricalBlock}; use tonic::Status; // The maximum number of blocks that can be requested at a time. These will be streamed to the @@ -43,9 +43,12 @@ pub async fn block_heights( start_height: u64, end_height: u64, from_tip: u64, -) -> Result, Status> { +) -> Result<(u64, u64), Status> { if end_height > 0 { - Ok(BlockHeader::get_height_range(start_height, end_height)) + if start_height > end_height { + return Err(Status::invalid_argument("Start height was greater than end height")); + } + Ok((start_height, end_height)) } else if from_tip > 0 { let metadata = handler .get_metadata() @@ -55,7 +58,7 @@ pub async fn block_heights( // Avoid overflow let height_from_tip = cmp::min(tip, from_tip); let start = cmp::max(tip - height_from_tip, 0); - Ok(BlockHeader::get_height_range(start, tip)) + Ok((start, tip)) } else { Err(Status::invalid_argument("Invalid arguments provided")) } diff --git a/base_layer/core/src/base_node/comms_interface/comms_request.rs b/base_layer/core/src/base_node/comms_interface/comms_request.rs index eef287d8f17..001223817e6 100644 --- a/base_layer/core/src/base_node/comms_interface/comms_request.rs +++ b/base_layer/core/src/base_node/comms_interface/comms_request.rs @@ -38,13 +38,13 @@ pub struct MmrStateRequest { #[derive(Debug, Serialize, Deserialize)] pub enum NodeCommsRequest { GetChainMetadata, - FetchHeaders(Vec), + FetchHeaders { start: u64, end_inclusive: u64 }, FetchHeadersWithHashes(Vec), FetchHeadersAfter(Vec, HashOutput), FetchMatchingUtxos(Vec), FetchMatchingTxos(Vec), - FetchMatchingBlocks(Vec), - FetchBlocksWithHashes(Vec), + FetchMatchingBlocks { start: u64, end_inclusive: u64 }, + FetchBlocksByHash(Vec), FetchBlocksWithKernels(Vec), FetchBlocksWithUtxos(Vec), GetHeaderByHash(HashOutput), @@ -65,13 +65,17 @@ impl Display for NodeCommsRequest { use NodeCommsRequest::*; match self { GetChainMetadata => write!(f, "GetChainMetadata"), - FetchHeaders(v) => write!(f, "FetchHeaders (n={})", v.len()), + FetchHeaders { start, end_inclusive } => { + write!(f, "FetchHeaders ({}-{})", start, end_inclusive) + }, FetchHeadersWithHashes(v) => write!(f, "FetchHeadersWithHashes (n={})", v.len()), FetchHeadersAfter(v, _hash) => write!(f, "FetchHeadersAfter (n={})", v.len()), FetchMatchingUtxos(v) => write!(f, "FetchMatchingUtxos (n={})", v.len()), FetchMatchingTxos(v) => write!(f, "FetchMatchingTxos (n={})", v.len()), - FetchMatchingBlocks(v) => write!(f, "FetchMatchingBlocks (n={})", v.len()), - FetchBlocksWithHashes(v) => write!(f, "FetchBlocksWithHashes (n={})", v.len()), + FetchMatchingBlocks { start, end_inclusive } => { + write!(f, "FetchMatchingBlocks ({}-{})", start, end_inclusive) + }, + FetchBlocksByHash(v) => write!(f, "FetchBlocksByHash (n={})", v.len()), FetchBlocksWithKernels(v) => write!(f, "FetchBlocksWithKernels (n={})", v.len()), FetchBlocksWithUtxos(v) => write!(f, "FetchBlocksWithUtxos (n={})", v.len()), GetHeaderByHash(v) => write!(f, "GetHeaderByHash({})", v.to_hex()), diff --git a/base_layer/core/src/base_node/comms_interface/comms_response.rs b/base_layer/core/src/base_node/comms_interface/comms_response.rs index 8f7ec1b9e50..6fa9e52bbb8 100644 --- a/base_layer/core/src/base_node/comms_interface/comms_response.rs +++ b/base_layer/core/src/base_node/comms_interface/comms_response.rs @@ -22,7 +22,7 @@ use crate::{ blocks::{block_header::BlockHeader, Block, NewBlockTemplate}, - chain_storage::HistoricalBlock, + chain_storage::{ChainHeader, HistoricalBlock}, proof_of_work::Difficulty, transactions::transaction::{TransactionKernel, TransactionOutput}, }; @@ -35,8 +35,8 @@ use tari_common_types::{chain_metadata::ChainMetadata, types::HashOutput}; pub enum NodeCommsResponse { ChainMetadata(ChainMetadata), TransactionKernels(Vec), - BlockHeaders(Vec), - BlockHeader(Option), + BlockHeaders(Vec), + BlockHeader(Option), TransactionOutputs(Vec), HistoricalBlocks(Vec), HistoricalBlock(Box>), diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index f760c5146c7..432e47fc0c8 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -20,13 +20,11 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::{ - base_node::{ - comms_interface::{ - error::CommsInterfaceError, - local_interface::BlockEventSender, - NodeCommsRequest, - NodeCommsResponse, - }, + base_node::comms_interface::{ + error::CommsInterfaceError, + local_interface::BlockEventSender, + NodeCommsRequest, + NodeCommsResponse, OutboundNodeCommsInterface, }, blocks::{block_header::BlockHeader, Block, NewBlock, NewBlockTemplate}, @@ -128,27 +126,15 @@ where T: BlockchainBackend + 'static NodeCommsRequest::GetChainMetadata => Ok(NodeCommsResponse::ChainMetadata( self.blockchain_db.get_chain_metadata().await?, )), - NodeCommsRequest::FetchHeaders(block_nums) => { - let mut block_headers = Vec::::with_capacity(block_nums.len()); - for block_num in block_nums { - match self.blockchain_db.fetch_header(block_num).await { - Ok(Some(block_header)) => { - block_headers.push(block_header); - }, - Ok(None) => return Err(CommsInterfaceError::BlockHeaderNotFound(block_num)), - Err(err) => { - error!(target: LOG_TARGET, "Could not fetch headers: {}", err.to_string()); - return Err(err.into()); - }, - } - } - Ok(NodeCommsResponse::BlockHeaders(block_headers)) + NodeCommsRequest::FetchHeaders { start, end_inclusive } => { + let headers = self.blockchain_db.fetch_chain_headers(start..=end_inclusive).await?; + Ok(NodeCommsResponse::BlockHeaders(headers)) }, NodeCommsRequest::FetchHeadersWithHashes(block_hashes) => { - let mut block_headers = Vec::::with_capacity(block_hashes.len()); + let mut block_headers = Vec::with_capacity(block_hashes.len()); for block_hash in block_hashes { let block_hex = block_hash.to_hex(); - match self.blockchain_db.fetch_header_by_block_hash(block_hash).await? { + match self.blockchain_db.fetch_chain_header_by_block_hash(block_hash).await? { Some(block_header) => { block_headers.push(block_header); }, @@ -248,23 +234,11 @@ where T: BlockchainBackend + 'static .collect(); Ok(NodeCommsResponse::TransactionOutputs(res)) }, - NodeCommsRequest::FetchMatchingBlocks(block_nums) => { - let mut blocks = Vec::with_capacity(block_nums.len()); - for block_num in block_nums { - debug!(target: LOG_TARGET, "A peer has requested block {}", block_num); - match self.blockchain_db.fetch_block(block_num).await { - Ok(block) => blocks.push(block), - // We need to suppress the error as another node might ask for a block we dont have, so we - // return ok([]) - Err(e) => debug!( - target: LOG_TARGET, - "Could not provide requested block {} to peer because: {}", block_num, e - ), - } - } + NodeCommsRequest::FetchMatchingBlocks { start, end_inclusive } => { + let blocks = self.blockchain_db.fetch_blocks(start..=end_inclusive).await?; Ok(NodeCommsResponse::HistoricalBlocks(blocks)) }, - NodeCommsRequest::FetchBlocksWithHashes(block_hashes) => { + NodeCommsRequest::FetchBlocksByHash(block_hashes) => { let mut blocks = Vec::with_capacity(block_hashes.len()); for block_hash in block_hashes { let block_hex = block_hash.to_hex(); @@ -340,7 +314,7 @@ where T: BlockchainBackend + 'static Ok(NodeCommsResponse::HistoricalBlocks(blocks)) }, NodeCommsRequest::GetHeaderByHash(hash) => { - let header = self.blockchain_db.fetch_header_by_block_hash(hash).await?; + let header = self.blockchain_db.fetch_chain_header_by_block_hash(hash).await?; Ok(NodeCommsResponse::BlockHeader(header)) }, NodeCommsRequest::GetBlockByHash(hash) => { diff --git a/base_layer/core/src/base_node/comms_interface/local_interface.rs b/base_layer/core/src/base_node/comms_interface/local_interface.rs index 0a270a78e25..251c6fb3e9b 100644 --- a/base_layer/core/src/base_node/comms_interface/local_interface.rs +++ b/base_layer/core/src/base_node/comms_interface/local_interface.rs @@ -28,7 +28,7 @@ use crate::{ NodeCommsRequest, NodeCommsResponse, }, - blocks::{Block, BlockHeader, NewBlockTemplate}, + blocks::{Block, NewBlockTemplate}, chain_storage::HistoricalBlock, proof_of_work::PowAlgorithm, transactions::transaction::TransactionKernel, @@ -42,6 +42,7 @@ pub type BlockEventSender = broadcast::Sender>; pub type BlockEventReceiver = broadcast::Receiver>; use crate::{ base_node::comms_interface::comms_request::GetNewBlockTemplateRequest, + chain_storage::ChainHeader, transactions::transaction::TransactionOutput, }; use tari_common_types::types::{Commitment, HashOutput, Signature}; @@ -82,10 +83,14 @@ impl LocalNodeCommsInterface { } /// Request the block header of the current tip at the block height - pub async fn get_blocks(&mut self, block_heights: Vec) -> Result, CommsInterfaceError> { + pub async fn get_blocks( + &mut self, + start: u64, + end_inclusive: u64, + ) -> Result, CommsInterfaceError> { match self .request_sender - .call(NodeCommsRequest::FetchMatchingBlocks(block_heights)) + .call(NodeCommsRequest::FetchMatchingBlocks { start, end_inclusive }) .await?? { NodeCommsResponse::HistoricalBlocks(blocks) => Ok(blocks), @@ -93,11 +98,16 @@ impl LocalNodeCommsInterface { } } - /// Request the block header of the current tip at the block height - pub async fn get_headers(&mut self, block_heights: Vec) -> Result, CommsInterfaceError> { + /// Request the block headers with the given range of heights. The returned headers are ordered from lowest to + /// highest block height + pub async fn get_headers( + &mut self, + start: u64, + end_inclusive: u64, + ) -> Result, CommsInterfaceError> { match self .request_sender - .call(NodeCommsRequest::FetchHeaders(block_heights)) + .call(NodeCommsRequest::FetchHeaders { start, end_inclusive }) .await?? { NodeCommsResponse::BlockHeaders(headers) => Ok(headers), @@ -204,7 +214,7 @@ impl LocalNodeCommsInterface { } /// Return header matching the given hash. If the header cannot be found `Ok(None)` is returned. - pub async fn get_header_by_hash(&mut self, hash: HashOutput) -> Result, CommsInterfaceError> { + pub async fn get_header_by_hash(&mut self, hash: HashOutput) -> Result, CommsInterfaceError> { match self .request_sender .call(NodeCommsRequest::GetHeaderByHash(hash)) diff --git a/base_layer/core/src/base_node/comms_interface/mod.rs b/base_layer/core/src/base_node/comms_interface/mod.rs index fca07c92f1e..66185bdb015 100644 --- a/base_layer/core/src/base_node/comms_interface/mod.rs +++ b/base_layer/core/src/base_node/comms_interface/mod.rs @@ -35,5 +35,6 @@ pub use inbound_handlers::{BlockEvent, Broadcast, InboundNodeCommsHandlers}; mod local_interface; pub use local_interface::{BlockEventReceiver, BlockEventSender, LocalNodeCommsInterface}; +// TODO: Remove this entirely when able mod outbound_interface; pub use outbound_interface::OutboundNodeCommsInterface; diff --git a/base_layer/core/src/base_node/comms_interface/outbound_interface.rs b/base_layer/core/src/base_node/comms_interface/outbound_interface.rs index 433b5325de4..263e1b6265b 100644 --- a/base_layer/core/src/base_node/comms_interface/outbound_interface.rs +++ b/base_layer/core/src/base_node/comms_interface/outbound_interface.rs @@ -22,21 +22,14 @@ use crate::{ base_node::comms_interface::{error::CommsInterfaceError, NodeCommsRequest, NodeCommsResponse}, - blocks::{block_header::BlockHeader, NewBlock}, + blocks::NewBlock, chain_storage::HistoricalBlock, - transactions::transaction::TransactionOutput, -}; -use log::*; -use tari_common_types::{ - chain_metadata::ChainMetadata, - types::{BlockHash, HashOutput}, }; +use tari_common_types::types::BlockHash; use tari_comms::peer_manager::NodeId; use tari_service_framework::{reply_channel::SenderService, Service}; use tokio::sync::mpsc::UnboundedSender; -pub const LOG_TARGET: &str = "c::bn::comms_interface::outbound_interface"; - /// The OutboundNodeCommsInterface provides an interface to request information from remove nodes. #[derive(Clone)] pub struct OutboundNodeCommsInterface { @@ -59,160 +52,6 @@ impl OutboundNodeCommsInterface { } } - /// Request metadata from remote base nodes. - pub async fn get_metadata(&mut self) -> Result { - self.request_metadata_from_peer(None).await - } - - /// Request metadata from a specific base node, if None is provided as a node_id then a random base node will be - /// queried. - pub async fn request_metadata_from_peer( - &mut self, - node_id: Option, - ) -> Result { - if let NodeCommsResponse::ChainMetadata(metadata) = self - .request_sender - .call((NodeCommsRequest::GetChainMetadata, node_id)) - .await?? - { - trace!(target: LOG_TARGET, "Remote metadata requested: {:?}", metadata,); - Ok(metadata) - } else { - // TODO: Potentially ban peer - Err(CommsInterfaceError::UnexpectedApiResponse) - } - } - - /// Fetch the block headers corresponding to the provided block numbers from remote base nodes. - pub async fn fetch_headers(&mut self, block_nums: Vec) -> Result, CommsInterfaceError> { - self.request_headers_from_peer(block_nums, None).await - } - - /// Fetch the block headers corresponding to the provided block numbers from a specific base node, if None is - /// provided as a node_id then a random base node will be queried. - pub async fn request_headers_from_peer( - &mut self, - block_nums: Vec, - node_id: Option, - ) -> Result, CommsInterfaceError> { - if let NodeCommsResponse::BlockHeaders(headers) = self - .request_sender - .call((NodeCommsRequest::FetchHeaders(block_nums), node_id)) - .await?? - { - Ok(headers) - } else { - Err(CommsInterfaceError::UnexpectedApiResponse) - } - } - - /// Fetch the Headers corresponding to the provided block hashes from remote base nodes. - pub async fn fetch_headers_with_hashes( - &mut self, - block_hashes: Vec, - ) -> Result, CommsInterfaceError> { - self.request_headers_with_hashes_from_peer(block_hashes, None).await - } - - /// Fetch the Headers corresponding to the provided block hashes from a specific base node, if None is provided as a - /// node_id then a random base node will be queried. - pub async fn request_headers_with_hashes_from_peer( - &mut self, - block_hashes: Vec, - node_id: Option, - ) -> Result, CommsInterfaceError> { - if let NodeCommsResponse::BlockHeaders(headers) = self - .request_sender - .call((NodeCommsRequest::FetchHeadersWithHashes(block_hashes), node_id)) - .await?? - { - Ok(headers) - } else { - Err(CommsInterfaceError::UnexpectedApiResponse) - } - } - - /// Fetch the UTXOs with the provided hashes from remote base nodes. - pub async fn fetch_utxos( - &mut self, - hashes: Vec, - ) -> Result, CommsInterfaceError> { - self.request_utxos_from_peer(hashes, None).await - } - - /// Fetch the UTXOs with the provided hashes from a specific base node, if None is provided as a node_id then a - /// random base node will be queried. - pub async fn request_utxos_from_peer( - &mut self, - hashes: Vec, - node_id: Option, - ) -> Result, CommsInterfaceError> { - if let NodeCommsResponse::TransactionOutputs(utxos) = self - .request_sender - .call((NodeCommsRequest::FetchMatchingUtxos(hashes), node_id)) - .await?? - { - Ok(utxos) - } else { - Err(CommsInterfaceError::UnexpectedApiResponse) - } - } - - /// Fetch the UTXOs or STXOs with the provided hashes from remote base nodes. - pub async fn fetch_txos(&mut self, hashes: Vec) -> Result, CommsInterfaceError> { - self.request_txos_from_peer(hashes, None).await - } - - /// Fetch the UTXOs or STXOS with the provided hashes from a specific base node, if None is provided as a node_id - /// then a random base node will be queried. - pub async fn request_txos_from_peer( - &mut self, - hashes: Vec, - node_id: Option, - ) -> Result, CommsInterfaceError> { - if let NodeCommsResponse::TransactionOutputs(txos) = self - .request_sender - .call((NodeCommsRequest::FetchMatchingTxos(hashes), node_id)) - .await?? - { - Ok(txos) - } else { - Err(CommsInterfaceError::UnexpectedApiResponse) - } - } - - /// Fetch the Historical Blocks corresponding to the provided block numbers from remote base nodes. - pub async fn fetch_blocks(&mut self, block_nums: Vec) -> Result, CommsInterfaceError> { - self.request_blocks_from_peer(block_nums, None).await - } - - /// Fetch the Historical Blocks corresponding to the provided block numbers from a specific base node, if None is - /// provided as a node_id then a random base node will be queried. - pub async fn request_blocks_from_peer( - &mut self, - block_nums: Vec, - node_id: Option, - ) -> Result, CommsInterfaceError> { - if let NodeCommsResponse::HistoricalBlocks(blocks) = self - .request_sender - .call((NodeCommsRequest::FetchMatchingBlocks(block_nums), node_id)) - .await?? - { - Ok(blocks) - } else { - Err(CommsInterfaceError::UnexpectedApiResponse) - } - } - - /// Fetch the Blocks corresponding to the provided block hashes from remote base nodes. The requested blocks could - /// be chain blocks or orphan blocks. - pub async fn fetch_blocks_with_hashes( - &mut self, - block_hashes: Vec, - ) -> Result, CommsInterfaceError> { - self.request_blocks_with_hashes_from_peer(block_hashes, None).await - } - /// Fetch the Blocks corresponding to the provided block hashes from a specific base node. The requested blocks /// could be chain blocks or orphan blocks. pub async fn request_blocks_with_hashes_from_peer( @@ -222,7 +61,7 @@ impl OutboundNodeCommsInterface { ) -> Result, CommsInterfaceError> { if let NodeCommsResponse::HistoricalBlocks(blocks) = self .request_sender - .call((NodeCommsRequest::FetchBlocksWithHashes(block_hashes), node_id)) + .call((NodeCommsRequest::FetchBlocksByHash(block_hashes), node_id)) .await?? { Ok(blocks) diff --git a/base_layer/core/src/base_node/mod.rs b/base_layer/core/src/base_node/mod.rs index 826a0447048..0275c4e9d00 100644 --- a/base_layer/core/src/base_node/mod.rs +++ b/base_layer/core/src/base_node/mod.rs @@ -38,7 +38,7 @@ pub mod chain_metadata_service; #[cfg(feature = "base_node")] pub mod comms_interface; #[cfg(feature = "base_node")] -pub use comms_interface::{LocalNodeCommsInterface, OutboundNodeCommsInterface}; +pub use comms_interface::LocalNodeCommsInterface; #[cfg(feature = "base_node")] pub mod service; diff --git a/base_layer/core/src/base_node/proto/request.proto b/base_layer/core/src/base_node/proto/request.proto index 323c6c6a648..3f7893abe43 100644 --- a/base_layer/core/src/base_node/proto/request.proto +++ b/base_layer/core/src/base_node/proto/request.proto @@ -10,36 +10,8 @@ package tari.base_node; message BaseNodeServiceRequest { uint64 request_key = 1; oneof request { - // Indicates a GetChainMetadata request. The value of the bool should be ignored. - bool get_chain_metadata = 2; - // Indicates a FetchHeaders request. - BlockHeights fetch_headers = 4; - // Indicates a FetchHeadersWithHashes request. - HashOutputs fetch_headers_with_hashes = 5; - // Indicates a FetchMatchingUtxos request. - HashOutputs fetch_matching_utxos = 6; - // Indicates a FetchMatchingBlocks request. - BlockHeights fetch_matching_blocks = 7; - // Indicates a FetchBlocksWithHashes request. - HashOutputs fetch_blocks_with_hashes = 8; - // Indicates a GetNewBlockTemplate request. - NewBlockTemplateRequest get_new_block_template = 9; - // Indicates a GetNewBlock request. - tari.core.NewBlockTemplate get_new_block = 10; - // Get headers in best chain following any headers in this list - FetchHeadersAfter fetch_headers_after = 12; - // Indicates a FetchMatchingTxos request. - HashOutputs fetch_matching_txos = 15; - // Indicates a Fetch block with kernels request - Signatures fetch_blocks_with_kernels = 16; - // Indicates a Fetch block with kernels request - Commitments fetch_blocks_with_utxos = 18; - // Indicates a Fetch kernel by excess signature request - tari.types.Signature fetch_kernel_by_excess_sig = 19; - // Indicates a GetHeaderByHash request. - bytes get_header_by_hash = 20; - // Indicates a GetBlockByHash request. - bytes get_block_by_hash = 21; + // Indicates a FetchBlocksByHash request. + HashOutputs fetch_blocks_by_hash = 8; } } diff --git a/base_layer/core/src/base_node/proto/request.rs b/base_layer/core/src/base_node/proto/request.rs index bf766bffc23..79eccc40dc9 100644 --- a/base_layer/core/src/base_node/proto/request.rs +++ b/base_layer/core/src/base_node/proto/request.rs @@ -21,21 +21,11 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use crate::{ - base_node::{comms_interface as ci, comms_interface::GetNewBlockTemplateRequest}, - proof_of_work::PowAlgorithm, - proto::{ - base_node as proto, - base_node::{ - base_node_service_request::Request as ProtoNodeCommsRequest, - BlockHeights, - FetchHeadersAfter as ProtoFetchHeadersAfter, - HashOutputs, - }, - }, + base_node::comms_interface as ci, + proto::base_node::{base_node_service_request::Request as ProtoNodeCommsRequest, BlockHeights, HashOutputs}, }; use std::convert::{From, TryFrom, TryInto}; -use tari_common_types::types::{Commitment, HashOutput, Signature}; -use tari_crypto::tari_utilities::ByteArrayError; +use tari_common_types::types::HashOutput; //---------------------------------- BaseNodeRequest --------------------------------------------// impl TryInto for ProtoNodeCommsRequest { @@ -44,81 +34,20 @@ impl TryInto for ProtoNodeCommsRequest { fn try_into(self) -> Result { use ProtoNodeCommsRequest::*; let request = match self { - // Field was not specified - GetChainMetadata(_) => ci::NodeCommsRequest::GetChainMetadata, - FetchHeaders(block_heights) => ci::NodeCommsRequest::FetchHeaders(block_heights.heights), - FetchHeadersWithHashes(block_hashes) => ci::NodeCommsRequest::FetchHeadersWithHashes(block_hashes.outputs), - FetchHeadersAfter(request) => { - ci::NodeCommsRequest::FetchHeadersAfter(request.hashes, request.stopping_hash) - }, - FetchMatchingUtxos(hash_outputs) => ci::NodeCommsRequest::FetchMatchingUtxos(hash_outputs.outputs), - FetchMatchingTxos(hash_outputs) => ci::NodeCommsRequest::FetchMatchingTxos(hash_outputs.outputs), - FetchMatchingBlocks(block_heights) => ci::NodeCommsRequest::FetchMatchingBlocks(block_heights.heights), - FetchBlocksWithHashes(block_hashes) => ci::NodeCommsRequest::FetchBlocksWithHashes(block_hashes.outputs), - FetchBlocksWithKernels(signatures) => { - let mut sigs = Vec::new(); - for sig in signatures.sigs { - sigs.push(Signature::try_from(sig).map_err(|err: ByteArrayError| err.to_string())?) - } - ci::NodeCommsRequest::FetchBlocksWithKernels(sigs) - }, - FetchBlocksWithUtxos(commitments) => { - let mut commits = Vec::new(); - for stxo in commitments.commitments { - commits.push(Commitment::try_from(stxo).map_err(|err: ByteArrayError| err.to_string())?) - } - ci::NodeCommsRequest::FetchBlocksWithUtxos(commits) - }, - GetHeaderByHash(hash) => ci::NodeCommsRequest::GetHeaderByHash(hash), - GetBlockByHash(hash) => ci::NodeCommsRequest::GetBlockByHash(hash), - GetNewBlockTemplate(message) => { - let request = GetNewBlockTemplateRequest { - algo: PowAlgorithm::try_from(message.algo)?, - max_weight: message.max_weight, - }; - ci::NodeCommsRequest::GetNewBlockTemplate(request) - }, - GetNewBlock(block_template) => ci::NodeCommsRequest::GetNewBlock(block_template.try_into()?), - FetchKernelByExcessSig(sig) => ci::NodeCommsRequest::FetchKernelByExcessSig( - Signature::try_from(sig).map_err(|err: ByteArrayError| err.to_string())?, - ), + FetchBlocksByHash(block_hashes) => ci::NodeCommsRequest::FetchBlocksByHash(block_hashes.outputs), }; Ok(request) } } -impl From for ProtoNodeCommsRequest { - fn from(request: ci::NodeCommsRequest) -> Self { +impl TryFrom for ProtoNodeCommsRequest { + type Error = String; + + fn try_from(request: ci::NodeCommsRequest) -> Result { use ci::NodeCommsRequest::*; match request { - GetChainMetadata => ProtoNodeCommsRequest::GetChainMetadata(true), - FetchHeaders(block_heights) => ProtoNodeCommsRequest::FetchHeaders(block_heights.into()), - FetchHeadersWithHashes(block_hashes) => ProtoNodeCommsRequest::FetchHeadersWithHashes(block_hashes.into()), - FetchHeadersAfter(hashes, stopping_hash) => { - ProtoNodeCommsRequest::FetchHeadersAfter(ProtoFetchHeadersAfter { hashes, stopping_hash }) - }, - FetchMatchingUtxos(hash_outputs) => ProtoNodeCommsRequest::FetchMatchingUtxos(hash_outputs.into()), - FetchMatchingTxos(hash_outputs) => ProtoNodeCommsRequest::FetchMatchingTxos(hash_outputs.into()), - FetchMatchingBlocks(block_heights) => ProtoNodeCommsRequest::FetchMatchingBlocks(block_heights.into()), - FetchBlocksWithHashes(block_hashes) => ProtoNodeCommsRequest::FetchBlocksWithHashes(block_hashes.into()), - FetchBlocksWithKernels(signatures) => { - let sigs = signatures.into_iter().map(Into::into).collect(); - ProtoNodeCommsRequest::FetchBlocksWithKernels(proto::Signatures { sigs }) - }, - FetchBlocksWithUtxos(commitments) => { - let commits = commitments.into_iter().map(Into::into).collect(); - ProtoNodeCommsRequest::FetchBlocksWithUtxos(proto::Commitments { commitments: commits }) - }, - GetHeaderByHash(hash) => ProtoNodeCommsRequest::GetHeaderByHash(hash), - GetBlockByHash(hash) => ProtoNodeCommsRequest::GetBlockByHash(hash), - GetNewBlockTemplate(request) => { - ProtoNodeCommsRequest::GetNewBlockTemplate(proto::NewBlockTemplateRequest { - algo: request.algo as u64, - max_weight: request.max_weight, - }) - }, - GetNewBlock(block_template) => ProtoNodeCommsRequest::GetNewBlock(block_template.into()), - FetchKernelByExcessSig(signature) => ProtoNodeCommsRequest::FetchKernelByExcessSig(signature.into()), + FetchBlocksByHash(block_hashes) => Ok(ProtoNodeCommsRequest::FetchBlocksByHash(block_hashes.into())), + e => Err(format!("{} request is not supported", e)), } } } diff --git a/base_layer/core/src/base_node/proto/response.proto b/base_layer/core/src/base_node/proto/response.proto index fd581e3fef8..985225f58ae 100644 --- a/base_layer/core/src/base_node/proto/response.proto +++ b/base_layer/core/src/base_node/proto/response.proto @@ -10,30 +10,8 @@ package tari.base_node; message BaseNodeServiceResponse { uint64 request_key = 1; oneof response { - // Indicates a ChainMetadata response. - ChainMetadata chain_metadata = 2; - // Indicates a TransactionKernels response. - TransactionKernels transaction_kernels = 3; - // Indicates a BlockHeaders response. - BlockHeaders block_headers = 4; - // Indicates a TransactionOutputs response. - TransactionOutputs transaction_outputs = 5; // Indicates a HistoricalBlocks response. HistoricalBlocks historical_blocks = 6; - // Indicates a NewBlockTemplate response. - tari.core.NewBlockTemplate new_block_template = 7; - // Indicates a NewBlock response. - NewBlockResponse new_block = 8; - // Indicates a TargetDifficulty response. - uint64 target_difficulty = 9; - // Block headers in range response - BlockHeaders fetch_headers_after_response = 10; - // Indicates a MmrNodes response - MmrNodes MmrNodes = 12; - // A single header response - BlockHeaderResponse block_header = 14; - // A single historical block response - HistoricalBlockResponse historical_block = 15; } bool is_synced = 13; } diff --git a/base_layer/core/src/base_node/proto/response.rs b/base_layer/core/src/base_node/proto/response.rs index ef88be684f5..0036d6017a6 100644 --- a/base_layer/core/src/base_node/proto/response.rs +++ b/base_layer/core/src/base_node/proto/response.rs @@ -25,15 +25,12 @@ use crate::{ base_node::comms_interface as ci, blocks::BlockHeader, chain_storage::HistoricalBlock, - proof_of_work::Difficulty, proto, proto::{ base_node as base_node_proto, base_node::{ BlockHeaders as ProtoBlockHeaders, HistoricalBlocks as ProtoHistoricalBlocks, - MmrNodes as ProtoMmrNodes, - NewBlockResponse as ProtoNewBlockResponse, TransactionKernels as ProtoTransactionKernels, TransactionOutputs as ProtoTransactionOutputs, }, @@ -42,7 +39,7 @@ use crate::{ tari_utilities::convert::try_convert_all, }; use std::{ - convert::TryInto, + convert::{TryFrom, TryInto}, iter::{FromIterator, Iterator}, }; @@ -52,81 +49,28 @@ impl TryInto for ProtoNodeCommsResponse { fn try_into(self) -> Result { use ProtoNodeCommsResponse::*; let response = match self { - ChainMetadata(chain_metadata) => ci::NodeCommsResponse::ChainMetadata(chain_metadata.try_into()?), - TransactionKernels(kernels) => { - let kernels = try_convert_all(kernels.kernels)?; - ci::NodeCommsResponse::TransactionKernels(kernels) - }, - BlockHeaders(headers) => { - let headers = try_convert_all(headers.headers)?; - ci::NodeCommsResponse::BlockHeaders(headers) - }, - BlockHeader(header) => ci::NodeCommsResponse::BlockHeader(header.try_into()?), - HistoricalBlock(block) => ci::NodeCommsResponse::HistoricalBlock(Box::new(block.try_into()?)), - FetchHeadersAfterResponse(headers) => { - let headers = try_convert_all(headers.headers)?; - ci::NodeCommsResponse::FetchHeadersAfterResponse(headers) - }, - TransactionOutputs(outputs) => { - let outputs = try_convert_all(outputs.outputs)?; - ci::NodeCommsResponse::TransactionOutputs(outputs) - }, HistoricalBlocks(blocks) => { let blocks = try_convert_all(blocks.blocks)?; ci::NodeCommsResponse::HistoricalBlocks(blocks) }, - NewBlockTemplate(block_template) => ci::NodeCommsResponse::NewBlockTemplate(block_template.try_into()?), - NewBlock(block) => ci::NodeCommsResponse::NewBlock { - success: block.success, - error: Some(block.error), - block: match block.block { - Some(b) => Some(b.try_into()?), - None => None, - }, - }, - TargetDifficulty(difficulty) => ci::NodeCommsResponse::TargetDifficulty(Difficulty::from(difficulty)), - MmrNodes(response) => ci::NodeCommsResponse::MmrNodes(response.added, response.deleted), }; Ok(response) } } -impl From for ProtoNodeCommsResponse { - fn from(response: ci::NodeCommsResponse) -> Self { +impl TryFrom for ProtoNodeCommsResponse { + type Error = String; + + fn try_from(response: ci::NodeCommsResponse) -> Result { use ci::NodeCommsResponse::*; match response { - ChainMetadata(chain_metadata) => ProtoNodeCommsResponse::ChainMetadata(chain_metadata.into()), - TransactionKernels(kernels) => { - let kernels = kernels.into_iter().map(Into::into).collect(); - ProtoNodeCommsResponse::TransactionKernels(kernels) - }, - BlockHeaders(headers) => { - let block_headers = headers.into_iter().map(Into::into).collect(); - ProtoNodeCommsResponse::BlockHeaders(block_headers) - }, - BlockHeader(header) => ProtoNodeCommsResponse::BlockHeader(header.into()), - HistoricalBlock(block) => ProtoNodeCommsResponse::HistoricalBlock((*block).into()), - FetchHeadersAfterResponse(headers) => { - let block_headers = headers.into_iter().map(Into::into).collect(); - ProtoNodeCommsResponse::FetchHeadersAfterResponse(block_headers) - }, - TransactionOutputs(outputs) => { - let outputs = outputs.into_iter().map(Into::into).collect(); - ProtoNodeCommsResponse::TransactionOutputs(outputs) - }, HistoricalBlocks(historical_blocks) => { let historical_blocks = historical_blocks.into_iter().map(Into::into).collect(); - ProtoNodeCommsResponse::HistoricalBlocks(historical_blocks) + Ok(ProtoNodeCommsResponse::HistoricalBlocks(historical_blocks)) }, - NewBlockTemplate(block_template) => ProtoNodeCommsResponse::NewBlockTemplate(block_template.into()), - NewBlock { success, error, block } => ProtoNodeCommsResponse::NewBlock(ProtoNewBlockResponse { - success, - error: error.unwrap_or_else(|| "".to_string()), - block: block.map(|b| b.into()), - }), - TargetDifficulty(difficulty) => ProtoNodeCommsResponse::TargetDifficulty(difficulty.as_u64()), - MmrNodes(added, deleted) => ProtoNodeCommsResponse::MmrNodes(ProtoMmrNodes { added, deleted }), + // This would only occur if a programming error sent out the unsupported response + resp => Err(format!("Response not supported {:?}", resp)), } } } diff --git a/base_layer/core/src/base_node/service/service.rs b/base_layer/core/src/base_node/service/service.rs index 1d66cbf1b1d..20ea949f6c0 100644 --- a/base_layer/core/src/base_node/service/service.rs +++ b/base_layer/core/src/base_node/service/service.rs @@ -36,7 +36,7 @@ use crate::{ blocks::{Block, NewBlock}, chain_storage::BlockchainBackend, proto as shared_protos, - proto::{base_node as proto, base_node::base_node_service_request::Request}, + proto::base_node as proto, }; use futures::{pin_mut, stream::StreamExt, Stream}; use log::*; @@ -400,7 +400,7 @@ async fn handle_incoming_request( let message = proto::BaseNodeServiceResponse { request_key: inner_msg.request_key, - response: Some(response.into()), + response: Some(response.try_into().map_err(BaseNodeServiceError::InvalidResponse)?), is_synced, }; @@ -501,7 +501,7 @@ async fn handle_outbound_request( let request_key = generate_request_key(&mut OsRng); let service_request = proto::BaseNodeServiceRequest { request_key, - request: Some(request.into()), + request: Some(request.try_into().map_err(CommsInterfaceError::ApiError)?), }; let mut send_msg_params = SendMessageParams::new(); @@ -533,39 +533,46 @@ async fn handle_outbound_request( // Wait for matching responses to arrive waiting_requests.insert(request_key, reply_tx).await; // Spawn timeout for waiting_request - if let Some(r) = service_request.request.clone() { - match r { - Request::FetchMatchingBlocks(_) | - Request::FetchBlocksWithHashes(_) | - Request::FetchBlocksWithKernels(_) | - Request::FetchBlocksWithUtxos(_) => { - trace!( - target: LOG_TARGET, - "Timeout for service request FetchBlocks... ({}) set at {:?}", - request_key, - config.fetch_blocks_timeout - ); - spawn_request_timeout(timeout_sender, request_key, config.fetch_blocks_timeout) - }, - Request::FetchMatchingUtxos(_) => { - trace!( - target: LOG_TARGET, - "Timeout for service request FetchMatchingUtxos ({}) set at {:?}", - request_key, - config.fetch_utxos_timeout - ); - spawn_request_timeout(timeout_sender, request_key, config.fetch_utxos_timeout) - }, - _ => { - trace!( - target: LOG_TARGET, - "Timeout for service request ... ({}) set at {:?}", - request_key, - config.service_request_timeout - ); - spawn_request_timeout(timeout_sender, request_key, config.service_request_timeout) - }, - }; + if service_request.request.is_some() { + trace!( + target: LOG_TARGET, + "Timeout for service request ... ({}) set at {:?}", + request_key, + config.service_request_timeout + ); + spawn_request_timeout(timeout_sender, request_key, config.service_request_timeout) + // match r { + // Request::FetchMatchingBlocks(_) | + // Request::FetchBlocksByHash(_) | + // Request::FetchBlocksWithKernels(_) | + // Request::FetchBlocksWithUtxos(_) => { + // trace!( + // target: LOG_TARGET, + // "Timeout for service request FetchBlocks... ({}) set at {:?}", + // request_key, + // config.fetch_blocks_timeout + // ); + // spawn_request_timeout(timeout_sender, request_key, config.fetch_blocks_timeout) + // }, + // Request::FetchMatchingUtxos(_) => { + // trace!( + // target: LOG_TARGET, + // "Timeout for service request FetchMatchingUtxos ({}) set at {:?}", + // request_key, + // config.fetch_utxos_timeout + // ); + // spawn_request_timeout(timeout_sender, request_key, config.fetch_utxos_timeout) + // }, + // _ => { + // trace!( + // target: LOG_TARGET, + // "Timeout for service request ... ({}) set at {:?}", + // request_key, + // config.service_request_timeout + // ); + // spawn_request_timeout(timeout_sender, request_key, config.service_request_timeout) + // }, + // }; }; // Log messages let msg_tag = send_states[0].tag; diff --git a/base_layer/core/src/base_node/state_machine_service/initializer.rs b/base_layer/core/src/base_node/state_machine_service/initializer.rs index d731f6e0cdc..b3dea2f6af2 100644 --- a/base_layer/core/src/base_node/state_machine_service/initializer.rs +++ b/base_layer/core/src/base_node/state_machine_service/initializer.rs @@ -38,7 +38,6 @@ use crate::{ }, sync::SyncValidators, LocalNodeCommsInterface, - OutboundNodeCommsInterface, }, chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, consensus::ConsensusManager, @@ -95,7 +94,6 @@ where B: BlockchainBackend + 'static let config = self.config.clone(); context.spawn_when_ready(move |handles| async move { - let outbound_interface = handles.expect_handle::(); let chain_metadata_service = handles.expect_handle::(); let node_local_interface = handles.expect_handle::(); let connectivity = handles.expect_handle::(); @@ -113,7 +111,6 @@ where B: BlockchainBackend + 'static let node = BaseNodeStateMachine::new( db, node_local_interface, - outbound_interface, connectivity, peer_manager, chain_metadata_service.get_event_stream(), diff --git a/base_layer/core/src/base_node/state_machine_service/state_machine.rs b/base_layer/core/src/base_node/state_machine_service/state_machine.rs index 35125655b35..f886bb8fe77 100644 --- a/base_layer/core/src/base_node/state_machine_service/state_machine.rs +++ b/base_layer/core/src/base_node/state_machine_service/state_machine.rs @@ -22,7 +22,7 @@ use crate::{ base_node::{ chain_metadata_service::ChainMetadataEvent, - comms_interface::{LocalNodeCommsInterface, OutboundNodeCommsInterface}, + comms_interface::LocalNodeCommsInterface, state_machine_service::{ states, states::{BaseNodeState, HorizonSyncConfig, StateEvent, StateInfo, StatusInfo, SyncPeerConfig, SyncStatus}, @@ -83,7 +83,6 @@ impl Default for BaseNodeStateMachineConfig { pub struct BaseNodeStateMachine { pub(super) db: AsyncBlockchainDb, pub(super) local_node_interface: LocalNodeCommsInterface, - pub(super) _outbound_nci: OutboundNodeCommsInterface, pub(super) connectivity: ConnectivityRequester, pub(super) peer_manager: Arc, pub(super) metadata_event_stream: broadcast::Receiver>, @@ -104,7 +103,6 @@ impl BaseNodeStateMachine { pub fn new( db: AsyncBlockchainDb, local_node_interface: LocalNodeCommsInterface, - outbound_nci: OutboundNodeCommsInterface, connectivity: ConnectivityRequester, peer_manager: Arc, metadata_event_stream: broadcast::Receiver>, @@ -119,7 +117,6 @@ impl BaseNodeStateMachine { Self { db, local_node_interface, - _outbound_nci: outbound_nci, connectivity, peer_manager, metadata_event_stream, diff --git a/base_layer/core/src/blocks/block_header.rs b/base_layer/core/src/blocks/block_header.rs index fb7aad681e6..f8365104ae2 100644 --- a/base_layer/core/src/blocks/block_header.rs +++ b/base_layer/core/src/blocks/block_header.rs @@ -164,14 +164,6 @@ impl BlockHeader { BlockBuilder::new(self.version).with_header(self) } - /// Returns a height range in descending order - pub fn get_height_range(start: u64, end_inclusive: u64) -> Vec { - let mut heights: Vec = - (std::cmp::min(start, end_inclusive)..=std::cmp::max(start, end_inclusive)).collect(); - heights.reverse(); - heights - } - /// Given a slice of headers (in reverse order), calculate the maximum, minimum and average periods between them pub fn timing_stats(headers: &[BlockHeader]) -> (u64, u64, f64) { if headers.len() < 2 { diff --git a/base_layer/core/src/iterators/chunk.rs b/base_layer/core/src/iterators/chunk.rs index 59044bc7c62..1d36e2f1428 100644 --- a/base_layer/core/src/iterators/chunk.rs +++ b/base_layer/core/src/iterators/chunk.rs @@ -28,7 +28,7 @@ pub struct VecChunkIter { inner: NonOverlappingIntegerPairIter, } -impl VecChunkIter { +impl VecChunkIter { pub fn new(start: Idx, end_exclusive: Idx, chunk_size: usize) -> Self { Self { inner: NonOverlappingIntegerPairIter::new(start, end_exclusive, chunk_size), @@ -56,15 +56,21 @@ vec_chunk_impl!(usize); /// Iterator that produces non-overlapping integer pairs. pub struct NonOverlappingIntegerPairIter { current: Idx, + current_end: Idx, end: Idx, size: usize, } -impl NonOverlappingIntegerPairIter { +impl NonOverlappingIntegerPairIter { + /// Create a new iterator that emits non-overlapping integers. + /// + /// ## Panics + /// Panics if start > end_exclusive pub fn new(start: Idx, end_exclusive: Idx, chunk_size: usize) -> Self { assert!(start <= end_exclusive, "`start` must be less than `end`"); Self { current: start, + current_end: end_exclusive, end: end_exclusive, size: chunk_size, } @@ -80,20 +86,73 @@ macro_rules! non_overlapping_iter_impl { if self.size == 0 { return None; } + if self.current == <$ty>::MAX { + return None; + } + if self.current == self.end { + return None; + } + let size = self.size as $ty; + match self.current.checked_add(size) { + Some(next) => { + let next = cmp::min(next, self.end); + + if self.current == next { + return None; + } + let chunk = (self.current, next - 1); + self.current = next; + Some(chunk) + }, + None => { + let chunk = (self.current, <$ty>::MAX); + self.current = <$ty>::MAX; + Some(chunk) + }, + } + } + } + impl DoubleEndedIterator for NonOverlappingIntegerPairIter<$ty> { + fn next_back(&mut self) -> Option { + if self.size == 0 || self.current_end == 0 { + return None; + } + + let size = self.size as $ty; + // Is this the first iteration? + if self.end == self.current_end { + let rem = (self.end - self.current) % size; - let next = cmp::min(self.current + self.size as $ty, self.end); + // Would there be an overflow (if iterating from the forward to back) + if self.current_end.saturating_sub(rem).checked_add(size).is_none() { + self.current_end = self.current_end.saturating_sub(rem); + let chunk = (self.current_end, <$ty>::MAX); + return Some(chunk); + } - if self.current == next { + if rem > 0 { + self.current_end = self.end - rem; + let chunk = (self.current_end, self.end - 1); + return Some(chunk); + } + } + + // Check if end will go beyond start + if self.current_end == self.current { return None; } - let chunk = (self.current, next - 1); - self.current = next; + + let next = self.current_end.saturating_sub(size); + let chunk = (next, self.current_end - 1); + self.current_end = next; Some(chunk) } } }; } +non_overlapping_iter_impl!(u8); +non_overlapping_iter_impl!(u16); non_overlapping_iter_impl!(u32); non_overlapping_iter_impl!(u64); non_overlapping_iter_impl!(usize); @@ -101,6 +160,7 @@ non_overlapping_iter_impl!(usize); #[cfg(test)] mod test { use super::*; + use rand::{rngs::OsRng, Rng}; #[test] fn zero_size() { let mut iter = NonOverlappingIntegerPairIter::new(10u32, 10, 0); @@ -134,19 +194,26 @@ mod test { #[test] fn chunk_size_not_multiple_of_end() { - let mut iter = NonOverlappingIntegerPairIter::new(0u32, 11, 3); + let mut iter = NonOverlappingIntegerPairIter::new(0u32, 10, 3); assert_eq!(iter.next().unwrap(), (0, 2)); assert_eq!(iter.next().unwrap(), (3, 5)); assert_eq!(iter.next().unwrap(), (6, 8)); - assert_eq!(iter.next().unwrap(), (9, 10)); + assert_eq!(iter.next().unwrap(), (9, 9)); assert!(iter.next().is_none()); - let mut iter = VecChunkIter::new(0u32, 11, 3); + let mut iter = VecChunkIter::new(0u32, 10, 3); assert_eq!(iter.next().unwrap(), vec![0, 1, 2]); assert_eq!(iter.next().unwrap(), vec![3, 4, 5]); assert_eq!(iter.next().unwrap(), vec![6, 7, 8]); - assert_eq!(iter.next().unwrap(), vec![9, 10]); + assert_eq!(iter.next().unwrap(), vec![9]); assert!(iter.next().is_none()); + + let mut iter = NonOverlappingIntegerPairIter::new(0u32, 16, 5); + assert_eq!(iter.next().unwrap(), (0, 4)); + assert_eq!(iter.next().unwrap(), (5, 9)); + assert_eq!(iter.next().unwrap(), (10, 14)); + assert_eq!(iter.next().unwrap(), (15, 15)); + assert_eq!(iter.next(), None); } #[test] @@ -164,4 +231,58 @@ mod test { assert_eq!(iter.next().unwrap(), vec![19, 20]); assert!(iter.next().is_none()); } + + #[test] + fn overflow() { + let mut iter = NonOverlappingIntegerPairIter::new(250u8, 255, 3); + assert_eq!(iter.next().unwrap(), (250, 252)); + assert_eq!(iter.next().unwrap(), (253, 255)); + assert!(iter.next().is_none()); + } + + #[test] + fn double_ended() { + let mut iter = NonOverlappingIntegerPairIter::new(0u32, 9, 3).rev(); + assert_eq!(iter.next().unwrap(), (6, 8)); + assert_eq!(iter.next().unwrap(), (3, 5)); + assert_eq!(iter.next().unwrap(), (0, 2)); + assert!(iter.next().is_none()); + + let mut iter = NonOverlappingIntegerPairIter::new(0u32, 10, 3).rev(); + assert_eq!(iter.next().unwrap(), (9, 9)); + assert_eq!(iter.next().unwrap(), (6, 8)); + assert_eq!(iter.next().unwrap(), (3, 5)); + assert_eq!(iter.next().unwrap(), (0, 2)); + assert!(iter.next().is_none()); + + let mut iter = NonOverlappingIntegerPairIter::new(0u32, 16, 5).rev(); + assert_eq!(iter.next().unwrap(), (15, 15)); + assert_eq!(iter.next().unwrap(), (10, 14)); + assert_eq!(iter.next().unwrap(), (5, 9)); + assert_eq!(iter.next().unwrap(), (0, 4)); + assert!(iter.next().is_none()); + + let mut iter = NonOverlappingIntegerPairIter::new(1001u32, 4000, 1000).rev(); + assert_eq!(iter.next().unwrap(), (3001, 3999)); + assert_eq!(iter.next().unwrap(), (2001, 3000)); + assert_eq!(iter.next().unwrap(), (1001, 2000)); + assert!(iter.next().is_none()); + } + + #[test] + fn iterator_symmetry() { + let size = OsRng.gen_range(3usize..=10); + let rand_start = OsRng.gen::(); + let rand_end = OsRng.gen::().saturating_add(rand_start); + let iter_rev = NonOverlappingIntegerPairIter::new(rand_start, rand_end, size).rev(); + let iter = NonOverlappingIntegerPairIter::new(rand_start, rand_end, size); + assert_eq!( + iter.collect::>(), + iter_rev.collect::>().into_iter().rev().collect::>(), + "Failed with rand_start = {}, rand_end = {}, size = {}", + rand_start, + rand_end, + size + ); + } } diff --git a/base_layer/core/tests/helpers/nodes.rs b/base_layer/core/tests/helpers/nodes.rs index 06f2d5f8e0d..8e0efdd6952 100644 --- a/base_layer/core/tests/helpers/nodes.rs +++ b/base_layer/core/tests/helpers/nodes.rs @@ -34,9 +34,9 @@ use tari_comms_dht::{outbound::OutboundMessageRequester, Dht}; use tari_core::{ base_node::{ chain_metadata_service::{ChainMetadataHandle, ChainMetadataServiceInitializer}, + comms_interface::OutboundNodeCommsInterface, service::{BaseNodeServiceConfig, BaseNodeServiceInitializer}, LocalNodeCommsInterface, - OutboundNodeCommsInterface, StateMachineHandle, }, chain_storage::{BlockchainDatabase, Validators}, diff --git a/base_layer/core/tests/node_comms_interface.rs b/base_layer/core/tests/node_comms_interface.rs index a6096b8dc9e..83d16793ad0 100644 --- a/base_layer/core/tests/node_comms_interface.rs +++ b/base_layer/core/tests/node_comms_interface.rs @@ -20,21 +20,19 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::sync::Arc; - -use futures::StreamExt; use helpers::block_builders::append_block; +use std::sync::Arc; use tari_common::configuration::Network; -use tari_common_types::{chain_metadata::ChainMetadata, types::PublicKey}; -use tari_comms::peer_manager::NodeId; +use tari_common_types::types::PublicKey; use tari_core::{ - base_node::{ - comms_interface::{CommsInterfaceError, InboundNodeCommsHandlers, NodeCommsRequest, NodeCommsResponse}, + base_node::comms_interface::{ + InboundNodeCommsHandlers, + NodeCommsRequest, + NodeCommsResponse, OutboundNodeCommsInterface, }, - blocks::{BlockBuilder, BlockHeader}, - chain_storage::{BlockchainDatabaseConfig, DbTransaction, HistoricalBlock, Validators}, - consensus::{ConsensusManager, NetworkConsensus}, + chain_storage::{BlockchainDatabaseConfig, DbTransaction, Validators}, + consensus::ConsensusManager, mempool::{Mempool, MempoolConfig}, test_helpers::blockchain::{create_store_with_consensus_and_validators_and_config, create_test_blockchain_db}, transactions::{ @@ -53,42 +51,16 @@ use tari_crypto::{ script::TariScript, tari_utilities::hash::Hashable, }; -use tari_service_framework::{reply_channel, reply_channel::Receiver}; -use tokio::sync::broadcast; - -use tokio::sync::mpsc; +use tari_service_framework::reply_channel; +use tokio::sync::{broadcast, mpsc}; #[allow(dead_code)] mod helpers; -// use crate::helpers::database::create_test_db; - -async fn test_request_responder( - receiver: &mut Receiver<(NodeCommsRequest, Option), Result>, - response: NodeCommsResponse, -) { - let req_context = receiver.next().await.unwrap(); - req_context.reply(Ok(response)).unwrap() -} fn new_mempool() -> Mempool { let mempool_validator = MockValidator::new(true); Mempool::new(MempoolConfig::default(), Arc::new(mempool_validator)) } -#[tokio::test] -async fn outbound_get_metadata() { - let (request_sender, mut request_receiver) = reply_channel::unbounded(); - let (block_sender, _) = mpsc::unbounded_channel(); - let mut outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); - - let metadata = ChainMetadata::new(5, vec![0u8], 3, 0, 5); - let metadata_response = NodeCommsResponse::ChainMetadata(metadata.clone()); - let (received_metadata, _) = futures::join!( - outbound_nci.get_metadata(), - test_request_responder(&mut request_receiver, metadata_response) - ); - assert_eq!(received_metadata.unwrap(), metadata); -} - #[tokio::test] async fn inbound_get_metadata() { let store = create_test_blockchain_db(); @@ -152,24 +124,6 @@ async fn inbound_fetch_kernel_by_excess_sig() { } } -#[tokio::test] -async fn outbound_fetch_headers() { - let (request_sender, mut request_receiver) = reply_channel::unbounded(); - let (block_sender, _) = mpsc::unbounded_channel(); - let mut outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); - - let mut header = BlockHeader::new(0); - header.height = 0; - let header_response = NodeCommsResponse::BlockHeaders(vec![header.clone()]); - let (received_headers, _) = futures::join!( - outbound_nci.fetch_headers(vec![0]), - test_request_responder(&mut request_receiver, header_response) - ); - let received_headers = received_headers.unwrap(); - assert_eq!(received_headers.len(), 1); - assert_eq!(received_headers[0], header); -} - #[tokio::test] async fn inbound_fetch_headers() { let store = create_test_blockchain_db(); @@ -190,40 +144,19 @@ async fn inbound_fetch_headers() { let header = store.fetch_block(0).unwrap().header().clone(); if let Ok(NodeCommsResponse::BlockHeaders(received_headers)) = inbound_nch - .handle_request(NodeCommsRequest::FetchHeaders(vec![0])) + .handle_request(NodeCommsRequest::FetchHeaders { + start: 0, + end_inclusive: 0, + }) .await { assert_eq!(received_headers.len(), 1); - assert_eq!(received_headers[0], header); + assert_eq!(*received_headers[0].header(), header); } else { panic!(); } } -#[tokio::test] -async fn outbound_fetch_utxos() { - let factories = CryptoFactories::default(); - let (request_sender, mut request_receiver) = reply_channel::unbounded(); - let (block_sender, _) = mpsc::unbounded_channel(); - let mut outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); - - let (utxo, _, _) = create_utxo( - MicroTari(10_000), - &factories, - Default::default(), - &TariScript::default(), - ); - let hash = utxo.hash(); - let utxo_response = NodeCommsResponse::TransactionOutputs(vec![utxo.clone()]); - let (received_utxos, _) = futures::join!( - outbound_nci.fetch_utxos(vec![hash]), - test_request_responder(&mut request_receiver, utxo_response) - ); - let received_utxos = received_utxos.unwrap(); - assert_eq!(received_utxos.len(), 1); - assert_eq!(received_utxos[0], utxo); -} - #[tokio::test] async fn inbound_fetch_utxos() { let factories = CryptoFactories::default(); @@ -267,38 +200,6 @@ async fn inbound_fetch_utxos() { } } -#[tokio::test] -async fn outbound_fetch_txos() { - let factories = CryptoFactories::default(); - let (request_sender, mut request_receiver) = reply_channel::unbounded(); - let (block_sender, _) = mpsc::unbounded_channel(); - let mut outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); - - let (txo1, _, _) = create_utxo( - MicroTari(10_000), - &factories, - Default::default(), - &TariScript::default(), - ); - let (txo2, _, _) = create_utxo( - MicroTari(15_000), - &factories, - Default::default(), - &TariScript::default(), - ); - let hash1 = txo1.hash(); - let hash2 = txo2.hash(); - let txo_response = NodeCommsResponse::TransactionOutputs(vec![txo1.clone(), txo2.clone()]); - let (received_txos, _) = futures::join!( - outbound_nci.fetch_txos(vec![hash1, hash2]), - test_request_responder(&mut request_receiver, txo_response) - ); - let received_txos = received_txos.unwrap(); - assert_eq!(received_txos.len(), 2); - assert_eq!(received_txos[0], txo1); - assert_eq!(received_txos[1], txo2); -} - #[tokio::test] async fn inbound_fetch_txos() { let factories = CryptoFactories::default(); @@ -369,25 +270,6 @@ async fn inbound_fetch_txos() { } } -#[tokio::test] -async fn outbound_fetch_blocks() { - let (request_sender, mut request_receiver) = reply_channel::unbounded(); - let (block_sender, _) = mpsc::unbounded_channel(); - let mut outbound_nci = OutboundNodeCommsInterface::new(request_sender, block_sender); - let network = Network::LocalNet; - let consensus_constants = NetworkConsensus::from(network).create_consensus_constants(); - let gb = BlockBuilder::new(consensus_constants[0].blockchain_version()).build(); - let block = HistoricalBlock::new(gb, 0, Default::default(), vec![], 0); - let block_response = NodeCommsResponse::HistoricalBlocks(vec![block.clone()]); - let (received_blocks, _) = futures::join!( - outbound_nci.fetch_blocks(vec![0]), - test_request_responder(&mut request_receiver, block_response) - ); - let received_blocks = received_blocks.unwrap(); - assert_eq!(received_blocks.len(), 1); - assert_eq!(received_blocks[0], block); -} - #[tokio::test] async fn inbound_fetch_blocks() { let store = create_test_blockchain_db(); @@ -408,7 +290,10 @@ async fn inbound_fetch_blocks() { let block = store.fetch_block(0).unwrap().block().clone(); if let Ok(NodeCommsResponse::HistoricalBlocks(received_blocks)) = inbound_nch - .handle_request(NodeCommsRequest::FetchMatchingBlocks(vec![0])) + .handle_request(NodeCommsRequest::FetchMatchingBlocks { + start: 0, + end_inclusive: 0, + }) .await { assert_eq!(received_blocks.len(), 1); @@ -485,7 +370,10 @@ async fn inbound_fetch_blocks_before_horizon_height() { let _block5 = append_block(&store, &block4, vec![], &consensus_manager, 1.into()).unwrap(); if let Ok(NodeCommsResponse::HistoricalBlocks(received_blocks)) = inbound_nch - .handle_request(NodeCommsRequest::FetchMatchingBlocks(vec![1])) + .handle_request(NodeCommsRequest::FetchMatchingBlocks { + start: 1, + end_inclusive: 1, + }) .await { assert_eq!(received_blocks.len(), 1); @@ -495,7 +383,10 @@ async fn inbound_fetch_blocks_before_horizon_height() { } if let Ok(NodeCommsResponse::HistoricalBlocks(received_blocks)) = inbound_nch - .handle_request(NodeCommsRequest::FetchMatchingBlocks(vec![2])) + .handle_request(NodeCommsRequest::FetchMatchingBlocks { + start: 2, + end_inclusive: 2, + }) .await { assert_eq!(received_blocks.len(), 1); diff --git a/base_layer/core/tests/node_service.rs b/base_layer/core/tests/node_service.rs index c064fe95188..1243d0462a8 100644 --- a/base_layer/core/tests/node_service.rs +++ b/base_layer/core/tests/node_service.rs @@ -24,21 +24,9 @@ mod helpers; use crate::helpers::block_builders::{construct_chained_blocks, create_coinbase}; use helpers::{ - block_builders::{ - append_block, - chain_block, - create_genesis_block, - create_genesis_block_with_utxos, - generate_block, - }, + block_builders::{append_block, chain_block, create_genesis_block, create_genesis_block_with_utxos}, event_stream::event_stream_next, - nodes::{ - create_network_with_2_base_nodes_with_config, - create_network_with_3_base_nodes_with_config, - random_node_identity, - wait_until_online, - BaseNodeBuilder, - }, + nodes::{create_network_with_2_base_nodes_with_config, random_node_identity, wait_until_online, BaseNodeBuilder}, }; use randomx_rs::RandomXFlag; use std::{sync::Arc, time::Duration}; @@ -73,154 +61,6 @@ use tari_p2p::services::liveness::LivenessConfig; use tari_test_utils::unpack_enum; use tempfile::tempdir; -#[tokio::test] -async fn request_response_get_metadata() { - let factories = CryptoFactories::default(); - let temp_dir = tempdir().unwrap(); - let network = Network::LocalNet; - let consensus_constants = ConsensusConstantsBuilder::new(network) - .with_emission_amounts(100_000_000.into(), &EMISSION, 100.into()) - .build(); - let (block0, _) = create_genesis_block(&factories, &consensus_constants); - let consensus_manager = ConsensusManager::builder(network) - .add_consensus_constants(consensus_constants) - .with_block(block0) - .build(); - let (mut alice_node, bob_node, carol_node, _consensus_manager) = create_network_with_3_base_nodes_with_config( - BaseNodeServiceConfig::default(), - MempoolServiceConfig::default(), - LivenessConfig::default(), - consensus_manager, - temp_dir.path().to_str().unwrap(), - ) - .await; - - let received_metadata = alice_node.outbound_nci.get_metadata().await.unwrap(); - assert_eq!(received_metadata.height_of_longest_chain(), 0); - - alice_node.shutdown().await; - bob_node.shutdown().await; - carol_node.shutdown().await; -} - -#[tokio::test] -async fn request_and_response_fetch_blocks() { - let factories = CryptoFactories::default(); - let temp_dir = tempdir().unwrap(); - let network = Network::LocalNet; - let consensus_constants = ConsensusConstantsBuilder::new(network) - .with_emission_amounts(100_000_000.into(), &EMISSION, 100.into()) - .build(); - let (block0, _) = create_genesis_block(&factories, &consensus_constants); - let consensus_manager = ConsensusManager::builder(network) - .add_consensus_constants(consensus_constants) - .with_block(block0.clone()) - .build(); - let (mut alice_node, mut bob_node, carol_node, _) = create_network_with_3_base_nodes_with_config( - BaseNodeServiceConfig::default(), - MempoolServiceConfig::default(), - LivenessConfig::default(), - consensus_manager.clone(), - temp_dir.path().to_str().unwrap(), - ) - .await; - - let mut blocks = vec![block0]; - let db = &mut bob_node.blockchain_db; - generate_block(db, &mut blocks, vec![], &consensus_manager).unwrap(); - generate_block(db, &mut blocks, vec![], &consensus_manager).unwrap(); - generate_block(db, &mut blocks, vec![], &consensus_manager).unwrap(); - - carol_node - .blockchain_db - .add_block(blocks[1].to_arc_block()) - .unwrap() - .assert_added(); - carol_node - .blockchain_db - .add_block(blocks[2].to_arc_block()) - .unwrap() - .assert_added(); - - let received_blocks = alice_node.outbound_nci.fetch_blocks(vec![0]).await.unwrap(); - assert_eq!(received_blocks.len(), 1); - assert_eq!(received_blocks[0].block(), blocks[0].block()); - - let received_blocks = alice_node.outbound_nci.fetch_blocks(vec![0, 1]).await.unwrap(); - assert_eq!(received_blocks.len(), 2); - assert_ne!(*received_blocks[0].block(), *received_blocks[1].block()); - assert!(received_blocks[0].block() == blocks[0].block() || received_blocks[1].block() == blocks[0].block()); - assert!(received_blocks[0].block() == blocks[1].block() || received_blocks[1].block() == blocks[1].block()); - - alice_node.shutdown().await; - bob_node.shutdown().await; - carol_node.shutdown().await; -} - -#[tokio::test] -async fn request_and_response_fetch_blocks_with_hashes() { - let factories = CryptoFactories::default(); - let temp_dir = tempdir().unwrap(); - let network = Network::LocalNet; - let consensus_constants = ConsensusConstantsBuilder::new(network) - .with_emission_amounts(100_000_000.into(), &EMISSION, 100.into()) - .build(); - let (block0, _) = create_genesis_block(&factories, &consensus_constants); - let consensus_manager = ConsensusManager::builder(network) - .add_consensus_constants(consensus_constants) - .with_block(block0.clone()) - .build(); - let (mut alice_node, mut bob_node, carol_node, _) = create_network_with_3_base_nodes_with_config( - BaseNodeServiceConfig::default(), - MempoolServiceConfig::default(), - LivenessConfig::default(), - consensus_manager.clone(), - temp_dir.path().to_str().unwrap(), - ) - .await; - - let mut blocks = vec![block0]; - let db = &mut bob_node.blockchain_db; - generate_block(db, &mut blocks, vec![], &consensus_manager).unwrap(); - generate_block(db, &mut blocks, vec![], &consensus_manager).unwrap(); - generate_block(db, &mut blocks, vec![], &consensus_manager).unwrap(); - let block0_hash = blocks[0].hash(); - let block1_hash = blocks[1].hash(); - - carol_node - .blockchain_db - .add_block(blocks[1].to_arc_block()) - .unwrap() - .assert_added(); - carol_node - .blockchain_db - .add_block(blocks[2].to_arc_block()) - .unwrap() - .assert_added(); - - let received_blocks = alice_node - .outbound_nci - .fetch_blocks_with_hashes(vec![block0_hash.clone()]) - .await - .unwrap(); - assert_eq!(received_blocks.len(), 1); - assert_eq!(received_blocks[0].block(), blocks[0].block()); - - let received_blocks = alice_node - .outbound_nci - .fetch_blocks_with_hashes(vec![block0_hash.clone(), block1_hash.clone()]) - .await - .unwrap(); - assert_eq!(received_blocks.len(), 2); - assert_ne!(received_blocks[0], received_blocks[1]); - assert!(received_blocks[0].block() == blocks[0].block() || received_blocks[1].block() == blocks[0].block()); - assert!(received_blocks[0].block() == blocks[1].block() || received_blocks[1].block() == blocks[1].block()); - - alice_node.shutdown().await; - bob_node.shutdown().await; - carol_node.shutdown().await; -} - #[tokio::test] async fn propagate_and_forward_many_valid_blocks() { let temp_dir = tempdir().unwrap(); @@ -584,9 +424,16 @@ async fn service_request_timeout() { ) .await; + let bob_node_id = bob_node.node_identity.node_id().clone(); // Bob should not be reachable bob_node.shutdown().await; - unpack_enum!(CommsInterfaceError::RequestTimedOut = alice_node.outbound_nci.get_metadata().await.unwrap_err()); + unpack_enum!( + CommsInterfaceError::RequestTimedOut = alice_node + .outbound_nci + .request_blocks_with_hashes_from_peer(vec![], Some(bob_node_id)) + .await + .unwrap_err() + ); alice_node.shutdown().await; } diff --git a/base_layer/core/tests/node_state_machine.rs b/base_layer/core/tests/node_state_machine.rs index b7d11e7c176..a126a3c145b 100644 --- a/base_layer/core/tests/node_state_machine.rs +++ b/base_layer/core/tests/node_state_machine.rs @@ -89,7 +89,6 @@ async fn test_listening_lagging() { let mut alice_state_machine = BaseNodeStateMachine::new( alice_node.blockchain_db.clone().into(), alice_node.local_nci.clone(), - alice_node.outbound_nci.clone(), alice_node.comms.connectivity(), alice_node.comms.peer_manager(), alice_node.chain_metadata_handle.get_event_stream(), @@ -148,7 +147,6 @@ async fn test_event_channel() { let state_machine = BaseNodeStateMachine::new( db.into(), node.local_nci.clone(), - node.outbound_nci.clone(), node.comms.connectivity(), node.comms.peer_manager(), mock.subscription(),