Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix collaboration issues between two protocol #2075

Merged
merged 2 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use std::time::{Duration, Instant};
pub const TX_PROPOSAL_TOKEN: u64 = 0;
pub const ASK_FOR_TXS_TOKEN: u64 = 1;
pub const TX_HASHES_TOKEN: u64 = 2;
pub const SEARCH_ORPHAN_POOL_TOKEN: u64 = 3;

pub const MAX_RELAY_PEERS: usize = 128;
pub const MAX_RELAY_TXS_NUM_PER_BATCH: usize = 32767;
Expand Down Expand Up @@ -606,6 +607,9 @@ impl CKBProtocolHandler for Relayer {
.expect("set_notify at init is ok");
nc.set_notify(Duration::from_millis(300), TX_HASHES_TOKEN)
.expect("set_notify at init is ok");
// todo: remove when the asynchronous verification is completed
nc.set_notify(Duration::from_secs(5), SEARCH_ORPHAN_POOL_TOKEN)
.expect("set_notify at init is ok");
}

fn received(
Expand Down Expand Up @@ -711,6 +715,12 @@ impl CKBProtocolHandler for Relayer {
}
ASK_FOR_TXS_TOKEN => self.ask_for_txs(nc.as_ref()),
TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(nc.as_ref()),
SEARCH_ORPHAN_POOL_TOKEN => tokio::task::block_in_place(|| {
self.shared.try_search_orphan_pool(
&self.chain,
&self.shared.active_chain().tip_header().hash(),
)
}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/nervosnetwork/p2p/blob/master/src/protocol_handle_stream.rs#L197
p2p already issuing notify handle as a blocking call, so I think tokio::task::block_in_place( is unnecessary, also the TX_PROPOSAL_TOKEN handle.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/nervosnetwork/ckb/blob/develop/ckb-bin/src/subcommand/run.rs#L91
For performance and control, p2p blocking is disabled at the ckb level and it is controlled by itself

_ => unreachable!(),
}
trace_target!(
Expand Down
25 changes: 16 additions & 9 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,12 @@ impl<'a> BlockFetcher<'a> {
let hash = header.hash();

let status = self.active_chain.get_block_status(&hash);
if status == BlockStatus::BLOCK_STORED {
if status.contains(BlockStatus::BLOCK_STORED) {
// If the block is stored, its ancestor must on store
// So we can skip the search of this space directly
break;
} else if self.ibd.into() && status.contains(BlockStatus::BLOCK_RECEIVED) {
// NOTE: NO-IBD Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
// stopping synchronization even when orphan_pool maintains dirty items by bugs.
// TODO: If async validation is achieved, then the IBD state judgement here can be removed

// On IBD, BLOCK_RECEIVED means this block had been received, so this block doesn't need to fetch
// On NO-IBD, because of the model, this block has to be requested again
// But all of these can do nothing on this branch
} else if status.contains(BlockStatus::BLOCK_RECEIVED) {
// Do not download repeatedly
} else if inflight.insert(self.peer, (header.number(), hash).into()) {
fetch.push(header)
}
Expand All @@ -186,6 +180,19 @@ impl<'a> BlockFetcher<'a> {
inflight.mark_slow_block(tip);
}

if fetch.is_empty() {
debug!(
"[block fetch empty] fixed_last_common_header = {} \
best_known_header = {}, tip = {}, inflight_len = {}, \
inflight_state = {:?}",
fixed_last_common_header.number(),
best_known_header.number(),
tip,
inflight.total_inflight_count(),
*inflight
)
}

Some(
fetch
.chunks(crate::INIT_BLOCKS_IN_TRANSIT_PER_PEER)
Expand Down
21 changes: 16 additions & 5 deletions sync/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ struct DebugHashSet<'a>(&'a HashSet<BlockNumberAndHash>);
impl<'a> fmt::Debug for DebugHashSet<'a> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_set()
.entries(self.0.iter().map(|h| format!("{}", h.hash)))
.entries(self.0.iter().map(|h| format!("{}, {}", h.number, h.hash)))
.finish()
}
}
Expand All @@ -416,7 +416,7 @@ impl fmt::Debug for InflightBlocks {
.entries(
self.inflight_states
.iter()
.map(|(k, v)| (format!("{}", k.hash), v)),
.map(|(k, v)| (format!("{}, {}", k.number, k.hash), v)),
)
.finish()
}
Expand Down Expand Up @@ -1019,7 +1019,12 @@ impl SyncShared {

// The above block has been accepted. Attempt to accept its descendant blocks in orphan pool.
// The returned blocks of `remove_blocks_by_parent` are in topology order by parents
let descendants = self.state.remove_orphan_by_parent(&block.as_ref().hash());
self.try_search_orphan_pool(chain, &block.as_ref().hash());
ret
}

pub fn try_search_orphan_pool(&self, chain: &ChainController, parent_hash: &Byte32) {
let descendants = self.state.remove_orphan_by_parent(parent_hash);
for (peer, block) in descendants {
// If we can not find the block's parent in database, that means it was failed to accept
// its parent, so we treat it as an invalid block as well.
Expand All @@ -1042,7 +1047,6 @@ impl SyncShared {
);
}
}
ret
}

fn accept_block(
Expand Down Expand Up @@ -1309,8 +1313,15 @@ impl SyncState {

// Return true when the block is that we have requested and received first time.
pub fn new_block_received(&self, block: &core::BlockView) -> bool {
self.write_inflight_blocks()
if self
.write_inflight_blocks()
.remove_by_block((block.number(), block.hash()).into())
{
self.insert_block_status(block.hash(), BlockStatus::BLOCK_RECEIVED);
true
} else {
false
}
}

pub fn insert_inflight_proposals(&self, ids: Vec<packed::ProposalShortId>) -> Vec<bool> {
Expand Down
1 change: 1 addition & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ fn all_specs() -> SpecMap {
Box::new(DAOVerify),
Box::new(AvoidDuplicatedProposalsWithUncles),
Box::new(TemplateTxSelect),
Box::new(BlockSyncRelayerCollaboration),
];
specs.into_iter().map(|spec| (spec.name(), spec)).collect()
}
Expand Down
75 changes: 74 additions & 1 deletion test/src/specs/sync/block_sync.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::utils::{
build_block, build_get_blocks, build_header, new_block_with_template, wait_until,
build_block, build_compact_block, build_get_blocks, build_header, new_block_with_template,
wait_until,
};
use crate::{Net, Node, Spec, TestProtocol};
use ckb_jsonrpc_types::ChainInfo;
Expand Down Expand Up @@ -296,6 +297,78 @@ impl Spec for BlockSyncOrphanBlocks {
}
}

/// test case:
/// 1. generate 1-17 block
/// 2. sync 1-16 header to node
/// 3. sync 2-16 block to node
/// 4. sync 1 block to node
/// 5. relay 17 block to node
pub struct BlockSyncRelayerCollaboration;

impl Spec for BlockSyncRelayerCollaboration {
crate::name!("block_sync_relayer_collaboration");

crate::setup!(
num_nodes: 2,
connect_all: false,
protocols: vec![TestProtocol::sync(), TestProtocol::relay()],
);

fn run(&self, net: &mut Net) {
let node0 = &net.nodes[0];
let node1 = &net.nodes[1];
net.exit_ibd_mode();

// Generate some blocks from node1
let mut blocks: Vec<BlockView> = (1..=17)
.map(|_| {
let block = node1.new_block(None, None, None);
node1.submit_block(&block);
block
})
.collect();

net.connect(node0);
let (peer_id, _, _) = net
.receive_timeout(Duration::new(10, 0))
.expect("net receive timeout");
let rpc_client = node0.rpc_client();
let tip_number = rpc_client.get_tip_block_number();

let last = blocks.pop().unwrap();

// Send headers to node0, keep blocks body
blocks.iter().for_each(|block| {
sync_header(&net, peer_id, block);
});

// Wait for block fetch timer
let (_, _, _) = net
.receive_timeout(Duration::new(10, 0))
.expect("net receive timeout");

// Skip the next block, send the rest blocks to node0
let first = blocks.remove(0);
blocks.into_iter().for_each(|block| {
sync_block(&net, peer_id, &block);
});

let ret = wait_until(5, || rpc_client.get_tip_block_number() > tip_number);
assert!(!ret, "node0 should stay the same");

sync_block(&net, peer_id, &first);
net.send(
NetworkProtocol::RELAY.into(),
peer_id,
build_compact_block(&last),
);

let ret = wait_until(10, || rpc_client.get_tip_block_number() >= tip_number + 17);
log::info!("{}", rpc_client.get_tip_block_number());
assert!(ret, "node0 should grow up");
}
}

pub struct BlockSyncNonAncestorBestBlocks;

impl Spec for BlockSyncNonAncestorBestBlocks {
Expand Down