Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix/churn_test_with_tunnel: address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Mar 19, 2017
1 parent 600b014 commit 40449ba
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 40 deletions.
32 changes: 18 additions & 14 deletions src/peer_manager.rs
Expand Up @@ -22,7 +22,7 @@ use itertools::Itertools;
use rand;
use resource_proof::ResourceProof;
use routing_table::{Authority, OtherMergeDetails, OwnMergeDetails, OwnMergeState, Prefix,
RemovalDetails, RoutingTable, Xorable};
RemovalDetails, RoutingTable};
use routing_table::Error as RoutingTableError;
use rust_sodium::crypto::hash::sha256;
use rust_sodium::crypto::sign;
Expand Down Expand Up @@ -1213,30 +1213,34 @@ impl PeerManager {
let _ = self.set_state(peer_id, PeerState::Routing(RoutingConnection::Direct));
}

/// Returns the closest section to the peer and the closest section to own.
/// Returns direct-connected peers in our section and in the peer's section.
pub fn potential_tunnel_nodes(&self, name: &XorName) -> Vec<(XorName, PeerId)> {
self.routing_table
.our_section()
.iter()
.chain(self.routing_table.closest_section(name).1.iter())
.sorted_by(|name0, name1| name.cmp_distance(name0, name1))
.into_iter()
.chain(self.routing_table.get_section(name).unwrap_or(&BTreeSet::new()).iter())
.filter_map(|name| {
if name == self.our_public_id.name() {
return None;
}
self.peer_map.get_by_name(name).map_or(None,
|peer| if peer.state.can_tunnel_for() {
peer.peer_id().map_or(None, |peer_id| {
Some((*name, *peer_id))
})
} else {
None
})
self.peer_map.get_by_name(name).and_then(|peer| if peer.state.can_tunnel_for() {
peer.peer_id().and_then(|peer_id| Some((*name, *peer_id)))
} else {
None
})
})
.collect()
}

/// Returns true if peer is direct-connected and in our section or in tunnel_client's section.
pub fn is_potential_tunnel_node(&self, peer: &PublicId, tunnel_client: &XorName) -> bool {
(self.routing_table.our_section().contains(peer.name()) ||
self.routing_table
.get_section(peer.name())
.map_or(false, |section| section.contains(tunnel_client))) &&
self.get_state_by_name(peer.name()).map_or(false, PeerState::can_tunnel_for)
}

/// Sets the given peer to state `SearchingForTunnel` and returns querying candidates.
/// Returns empty vector of candidates if it is already in Routing state.
pub fn set_searching_for_tunnel(&mut self,
Expand Down Expand Up @@ -1409,7 +1413,7 @@ impl PeerManager {
.peers()
.filter_map(|peer| match peer.state {
PeerState::SearchingForTunnel => {
peer.peer_id.map_or(None, |peer_id| Some((peer_id, *peer.name())))
peer.peer_id.and_then(|peer_id| Some((peer_id, *peer.name())))
}
_ => None,
})
Expand Down
2 changes: 1 addition & 1 deletion src/routing_table/mod.rs
Expand Up @@ -989,7 +989,7 @@ impl<T: Binary + Clone + Copy + Debug + Default + Hash + Xorable> RoutingTable<T

/// Returns the prefix of the closest non-empty section to `name`, regardless of whether `name`
/// belongs in that section or not, and the section itself.
pub fn closest_section(&self, name: &T) -> (&Prefix<T>, &BTreeSet<T>) {
fn closest_section(&self, name: &T) -> (&Prefix<T>, &BTreeSet<T>) {
let mut result = (&self.our_prefix, &self.our_section);
for (prefix, section) in &self.sections {
if !section.is_empty() && result.0.cmp_distance(prefix, name) == Ordering::Greater {
Expand Down
4 changes: 1 addition & 3 deletions src/states/node.rs
Expand Up @@ -1458,9 +1458,7 @@ impl Node {
}

for (dst_id, peer_name) in self.peer_mgr.peers_needing_tunnel() {
if self.peer_mgr
.potential_tunnel_nodes(&peer_name)
.contains(&(*public_id.name(), *peer_id)) {
if self.peer_mgr.is_potential_tunnel_node(public_id, &peer_name) {
trace!("{:?} Asking {:?} to serve as a tunnel for {:?}",
self,
peer_id,
Expand Down
58 changes: 36 additions & 22 deletions tests/mock_crust/churn.rs
Expand Up @@ -20,7 +20,7 @@ use rand::Rng;
use routing::{Authority, DataIdentifier, Event, EventStream, MessageId, QUORUM, Request, XorName};
use routing::mock_crust::{Config, Network};
use std::cmp;
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};
use super::{TestClient, TestNode, create_connected_clients, create_connected_nodes,
gen_range_except, poll_and_resend, verify_invariant_for_all_nodes};

Expand Down Expand Up @@ -91,9 +91,9 @@ fn add_random_node<R: Rng>(rng: &mut R,
};

if len > (2 * min_section_size) {
let mut block_peer = gen_range_except(rng, 0, nodes.len(), Some(new_node));
while block_peer == proxy {
block_peer = gen_range_except(rng, 0, nodes.len(), Some(new_node));
let mut block_peer = gen_range_except(rng, 0, nodes.len() - 1, Some(new_node));
if block_peer == proxy {
block_peer += 1;
}
network.block_connection(nodes[new_node].handle.endpoint(),
nodes[block_peer].handle.endpoint());
Expand Down Expand Up @@ -123,9 +123,9 @@ fn random_churn<R: Rng>(rng: &mut R,
let mut proxy = rng.gen_range(0, len);
let index = rng.gen_range(1, len + 1);

if nodes.len() > 16 {
let peer_1 = rng.gen_range(0, len);
let peer_2 = gen_range_except(rng, 0, len, Some(peer_1));
if nodes.len() > 2 * network.min_section_size() {
let peer_1 = rng.gen_range(1, len);
let peer_2 = gen_range_except(rng, 1, len, Some(peer_1));
network.lost_connection(nodes[peer_1].handle.endpoint(),
nodes[peer_2].handle.endpoint());
}
Expand All @@ -134,14 +134,15 @@ fn random_churn<R: Rng>(rng: &mut R,

nodes.insert(index, TestNode::builder(network).config(config).create());

if nodes.len() > 16 {
if nodes.len() > 2 * network.min_section_size() {
if index <= proxy {
// When new node sits before the proxy node, proxy index increases by 1
proxy += 1;
}

let mut block_peer = gen_range_except(rng, 0, nodes.len(), Some(index));
while block_peer == proxy {
block_peer = gen_range_except(rng, 0, nodes.len(), Some(index));
let mut block_peer = gen_range_except(rng, 1, nodes.len() - 1, Some(index));
if block_peer == proxy {
block_peer += 1;
}
network.block_connection(nodes[index].handle.endpoint(),
nodes[block_peer].handle.endpoint());
Expand Down Expand Up @@ -227,7 +228,7 @@ impl ExpectedGets {
})
.collect();
let mut section_msgs_received = HashMap::new(); // The count of received section messages.
let mut unexpected_receive = 0;
let mut unexpected_receive = BTreeSet::new();
for node in nodes {
while let Ok(event) = node.try_next_ev() {
if let Event::Request { request: Request::Get(data_id, msg_id), src, dst } = event {
Expand All @@ -236,11 +237,20 @@ impl ExpectedGets {
if !self.sections
.get(&key.3)
.map_or(false, |entry| entry.contains(&node.name())) {
trace!("Unexpected request for node {:?}: {:?} / {:?}",
node.name(),
key,
self.sections);
unexpected_receive += 1;
// Unexpected receive shall only happen for group (only used NaeManager
// in this test), and shall have at most one for each message.
if let Authority::NaeManager(_) = dst {
assert!(unexpected_receive.insert(msg_id),
"Unexpected request for node {:?}: {:?} / {:?}",
node.name(),
key,
self.sections);
} else {
panic!("Unexpected request for node {:?}: {:?} / {:?}",
node.name(),
key,
self.sections);
}
}
*section_msgs_received.entry(key).or_insert(0usize) += 1;
} else {
Expand All @@ -253,7 +263,6 @@ impl ExpectedGets {
}
}
}
assert!(unexpected_receive <= self.sections.len());
for client in clients {
while let Ok(event) = client.inner.try_next_ev() {
if let Event::Request { request: Request::Get(data_id, msg_id), src, dst } = event {
Expand Down Expand Up @@ -425,11 +434,16 @@ fn aggressive_churn() {
poll_and_resend(&mut nodes, &mut []);
info!("simultaneous added {:?}", nodes[added_index].name());
// An candidate could be blocked if it connected to a pre-merge minority section.
// Or be rejected when the proxy node's RT is not large enough due to a lost tunnel.
// In that case, a restart of candidate shall be carried out.
if nodes[added_index].inner.try_next_ev().is_err() {
let config = Config::with_contacts(&[nodes[proxy_index].handle.endpoint()]);
nodes[added_index] = TestNode::builder(&network).config(config).create();
poll_and_resend(&mut nodes, &mut []);
match nodes[added_index].inner.try_next_ev() {
Err(_) |
Ok(Event::Terminate) => {
let config = Config::with_contacts(&[nodes[proxy_index].handle.endpoint()]);
nodes[added_index] = TestNode::builder(&network).config(config).create();
poll_and_resend(&mut nodes, &mut []);
}
Ok(_) => {}
}

verify_invariant_for_all_nodes(&nodes);
Expand Down

0 comments on commit 40449ba

Please sign in to comment.