diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index afc1401843..ec57fca2f8 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -43,6 +43,7 @@ use std::time::{Duration, Instant}; pub const TX_PROPOSAL_TOKEN: u64 = 0; pub const ASK_FOR_TXS_TOKEN: u64 = 1; pub const TX_HASHES_TOKEN: u64 = 2; +pub const SEARCH_ORPHAN_POOL_TOKEN: u64 = 3; pub const MAX_RELAY_PEERS: usize = 128; pub const MAX_RELAY_TXS_NUM_PER_BATCH: usize = 32767; @@ -606,6 +607,9 @@ impl CKBProtocolHandler for Relayer { .expect("set_notify at init is ok"); nc.set_notify(Duration::from_millis(300), TX_HASHES_TOKEN) .expect("set_notify at init is ok"); + // todo: remove when the asynchronous verification is completed + nc.set_notify(Duration::from_secs(5), SEARCH_ORPHAN_POOL_TOKEN) + .expect("set_notify at init is ok"); } fn received( @@ -711,6 +715,12 @@ impl CKBProtocolHandler for Relayer { } ASK_FOR_TXS_TOKEN => self.ask_for_txs(nc.as_ref()), TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(nc.as_ref()), + SEARCH_ORPHAN_POOL_TOKEN => tokio::task::block_in_place(|| { + self.shared.try_search_orphan_pool( + &self.chain, + &self.shared.active_chain().tip_header().hash(), + ) + }), _ => unreachable!(), } trace_target!( diff --git a/sync/src/synchronizer/block_fetcher.rs b/sync/src/synchronizer/block_fetcher.rs index 0f9d83c030..4a1cf87a82 100644 --- a/sync/src/synchronizer/block_fetcher.rs +++ b/sync/src/synchronizer/block_fetcher.rs @@ -148,18 +148,12 @@ impl<'a> BlockFetcher<'a> { let hash = header.hash(); let status = self.active_chain.get_block_status(&hash); - if status == BlockStatus::BLOCK_STORED { + if status.contains(BlockStatus::BLOCK_STORED) { // If the block is stored, its ancestor must on store // So we can skip the search of this space directly break; - } else if self.ibd.into() && status.contains(BlockStatus::BLOCK_RECEIVED) { - // NOTE: NO-IBD Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding - // stopping synchronization even when orphan_pool maintains dirty items by bugs. - // TODO: If async validation is achieved, then the IBD state judgement here can be removed - - // On IBD, BLOCK_RECEIVED means this block had been received, so this block doesn't need to fetch - // On NO-IBD, because of the model, this block has to be requested again - // But all of these can do nothing on this branch + } else if status.contains(BlockStatus::BLOCK_RECEIVED) { + // Do not download repeatedly } else if inflight.insert(self.peer, (header.number(), hash).into()) { fetch.push(header) } @@ -186,6 +180,19 @@ impl<'a> BlockFetcher<'a> { inflight.mark_slow_block(tip); } + if fetch.is_empty() { + debug!( + "[block fetch empty] fixed_last_common_header = {} \ + best_known_header = {}, tip = {}, inflight_len = {}, \ + inflight_state = {:?}", + fixed_last_common_header.number(), + best_known_header.number(), + tip, + inflight.total_inflight_count(), + *inflight + ) + } + Some( fetch .chunks(crate::INIT_BLOCKS_IN_TRANSIT_PER_PEER) diff --git a/sync/src/types.rs b/sync/src/types.rs index 1acf762668..7d6ef2f098 100644 --- a/sync/src/types.rs +++ b/sync/src/types.rs @@ -398,7 +398,7 @@ struct DebugHashSet<'a>(&'a HashSet); impl<'a> fmt::Debug for DebugHashSet<'a> { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_set() - .entries(self.0.iter().map(|h| format!("{}", h.hash))) + .entries(self.0.iter().map(|h| format!("{}, {}", h.number, h.hash))) .finish() } } @@ -416,7 +416,7 @@ impl fmt::Debug for InflightBlocks { .entries( self.inflight_states .iter() - .map(|(k, v)| (format!("{}", k.hash), v)), + .map(|(k, v)| (format!("{}, {}", k.number, k.hash), v)), ) .finish() } @@ -1019,7 +1019,12 @@ impl SyncShared { // The above block has been accepted. Attempt to accept its descendant blocks in orphan pool. // The returned blocks of `remove_blocks_by_parent` are in topology order by parents - let descendants = self.state.remove_orphan_by_parent(&block.as_ref().hash()); + self.try_search_orphan_pool(chain, &block.as_ref().hash()); + ret + } + + pub fn try_search_orphan_pool(&self, chain: &ChainController, parent_hash: &Byte32) { + let descendants = self.state.remove_orphan_by_parent(parent_hash); for (peer, block) in descendants { // If we can not find the block's parent in database, that means it was failed to accept // its parent, so we treat it as an invalid block as well. @@ -1042,7 +1047,6 @@ impl SyncShared { ); } } - ret } fn accept_block( @@ -1309,8 +1313,15 @@ impl SyncState { // Return true when the block is that we have requested and received first time. pub fn new_block_received(&self, block: &core::BlockView) -> bool { - self.write_inflight_blocks() + if self + .write_inflight_blocks() .remove_by_block((block.number(), block.hash()).into()) + { + self.insert_block_status(block.hash(), BlockStatus::BLOCK_RECEIVED); + true + } else { + false + } } pub fn insert_inflight_proposals(&self, ids: Vec) -> Vec { diff --git a/test/src/main.rs b/test/src/main.rs index a7c02ca49f..4458ae2187 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -390,6 +390,7 @@ fn all_specs() -> SpecMap { Box::new(DAOVerify), Box::new(AvoidDuplicatedProposalsWithUncles), Box::new(TemplateTxSelect), + Box::new(BlockSyncRelayerCollaboration), ]; specs.into_iter().map(|spec| (spec.name(), spec)).collect() } diff --git a/test/src/specs/sync/block_sync.rs b/test/src/specs/sync/block_sync.rs index 89e74fa2f2..7a2d156a9a 100644 --- a/test/src/specs/sync/block_sync.rs +++ b/test/src/specs/sync/block_sync.rs @@ -1,5 +1,6 @@ use crate::utils::{ - build_block, build_get_blocks, build_header, new_block_with_template, wait_until, + build_block, build_compact_block, build_get_blocks, build_header, new_block_with_template, + wait_until, }; use crate::{Net, Node, Spec, TestProtocol}; use ckb_jsonrpc_types::ChainInfo; @@ -296,6 +297,78 @@ impl Spec for BlockSyncOrphanBlocks { } } +/// test case: +/// 1. generate 1-17 block +/// 2. sync 1-16 header to node +/// 3. sync 2-16 block to node +/// 4. sync 1 block to node +/// 5. relay 17 block to node +pub struct BlockSyncRelayerCollaboration; + +impl Spec for BlockSyncRelayerCollaboration { + crate::name!("block_sync_relayer_collaboration"); + + crate::setup!( + num_nodes: 2, + connect_all: false, + protocols: vec![TestProtocol::sync(), TestProtocol::relay()], + ); + + fn run(&self, net: &mut Net) { + let node0 = &net.nodes[0]; + let node1 = &net.nodes[1]; + net.exit_ibd_mode(); + + // Generate some blocks from node1 + let mut blocks: Vec = (1..=17) + .map(|_| { + let block = node1.new_block(None, None, None); + node1.submit_block(&block); + block + }) + .collect(); + + net.connect(node0); + let (peer_id, _, _) = net + .receive_timeout(Duration::new(10, 0)) + .expect("net receive timeout"); + let rpc_client = node0.rpc_client(); + let tip_number = rpc_client.get_tip_block_number(); + + let last = blocks.pop().unwrap(); + + // Send headers to node0, keep blocks body + blocks.iter().for_each(|block| { + sync_header(&net, peer_id, block); + }); + + // Wait for block fetch timer + let (_, _, _) = net + .receive_timeout(Duration::new(10, 0)) + .expect("net receive timeout"); + + // Skip the next block, send the rest blocks to node0 + let first = blocks.remove(0); + blocks.into_iter().for_each(|block| { + sync_block(&net, peer_id, &block); + }); + + let ret = wait_until(5, || rpc_client.get_tip_block_number() > tip_number); + assert!(!ret, "node0 should stay the same"); + + sync_block(&net, peer_id, &first); + net.send( + NetworkProtocol::RELAY.into(), + peer_id, + build_compact_block(&last), + ); + + let ret = wait_until(10, || rpc_client.get_tip_block_number() >= tip_number + 17); + log::info!("{}", rpc_client.get_tip_block_number()); + assert!(ret, "node0 should grow up"); + } +} + pub struct BlockSyncNonAncestorBestBlocks; impl Spec for BlockSyncNonAncestorBestBlocks {