Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Maid-1977 churn test with tunnel #1396

Merged
merged 7 commits into from Mar 20, 2017
37 changes: 20 additions & 17 deletions src/peer_manager.rs
Expand Up @@ -22,7 +22,7 @@ use itertools::Itertools;
use rand;
use resource_proof::ResourceProof;
use routing_table::{Authority, OtherMergeDetails, OwnMergeDetails, OwnMergeState, Prefix,
RemovalDetails, RoutingTable, Xorable};
RemovalDetails, RoutingTable};
use routing_table::Error as RoutingTableError;
use rust_sodium::crypto::hash::sha256;
use rust_sodium::crypto::sign;
Expand Down Expand Up @@ -1213,30 +1213,33 @@ impl PeerManager {
let _ = self.set_state(peer_id, PeerState::Routing(RoutingConnection::Direct));
}

/// Returns the closest section to the peer and the closest section to own.
/// Returns direct-connected peers in our section and in the peer's section.
pub fn potential_tunnel_nodes(&self, name: &XorName) -> Vec<(XorName, PeerId)> {
self.routing_table
.our_section()
.iter()
.chain(self.routing_table.closest_section(name).1.iter())
.sorted_by(|name0, name1| name.cmp_distance(name0, name1))
.into_iter()
let potential_tunnel_nodes =
self.routing_table.our_section() |
self.routing_table.get_section(name).unwrap_or(&BTreeSet::new());
potential_tunnel_nodes.iter()
.filter_map(|name| {
if name == self.our_public_id.name() {
return None;
}
self.peer_map.get_by_name(name).map_or(None,
|peer| if peer.state.can_tunnel_for() {
peer.peer_id().map_or(None, |peer_id| {
Some((*name, *peer_id))
})
} else {
None
})
self.peer_map.get_by_name(name).and_then(|peer| if peer.state.can_tunnel_for() {
peer.peer_id().and_then(|peer_id| Some((*name, *peer_id)))
} else {
None
})
})
.collect()
}

/// Returns true if peer is direct-connected and in our section or in tunnel_client's section.
pub fn is_potential_tunnel_node(&self, peer: &PublicId, tunnel_client: &XorName) -> bool {
(self.routing_table.our_prefix().matches(peer.name()) ||
(self.routing_table.get_section(peer.name()) ==
self.routing_table.get_section(tunnel_client))) &&
self.get_state_by_name(peer.name()).map_or(false, PeerState::can_tunnel_for)
}

/// Sets the given peer to state `SearchingForTunnel` and returns querying candidates.
/// Returns empty vector of candidates if it is already in Routing state.
pub fn set_searching_for_tunnel(&mut self,
Expand Down Expand Up @@ -1409,7 +1412,7 @@ impl PeerManager {
.peers()
.filter_map(|peer| match peer.state {
PeerState::SearchingForTunnel => {
peer.peer_id.map_or(None, |peer_id| Some((peer_id, *peer.name())))
peer.peer_id.and_then(|peer_id| Some((peer_id, *peer.name())))
}
_ => None,
})
Expand Down
2 changes: 1 addition & 1 deletion src/routing_table/mod.rs
Expand Up @@ -989,7 +989,7 @@ impl<T: Binary + Clone + Copy + Debug + Default + Hash + Xorable> RoutingTable<T

/// Returns the prefix of the closest non-empty section to `name`, regardless of whether `name`
/// belongs in that section or not, and the section itself.
pub fn closest_section(&self, name: &T) -> (&Prefix<T>, &BTreeSet<T>) {
fn closest_section(&self, name: &T) -> (&Prefix<T>, &BTreeSet<T>) {
let mut result = (&self.our_prefix, &self.our_section);
for (prefix, section) in &self.sections {
if !section.is_empty() && result.0.cmp_distance(prefix, name) == Ordering::Greater {
Expand Down
4 changes: 1 addition & 3 deletions src/states/node.rs
Expand Up @@ -1458,9 +1458,7 @@ impl Node {
}

for (dst_id, peer_name) in self.peer_mgr.peers_needing_tunnel() {
if self.peer_mgr
.potential_tunnel_nodes(&peer_name)
.contains(&(*public_id.name(), *peer_id)) {
if self.peer_mgr.is_potential_tunnel_node(public_id, &peer_name) {
trace!("{:?} Asking {:?} to serve as a tunnel for {:?}",
self,
peer_id,
Expand Down
62 changes: 37 additions & 25 deletions tests/mock_crust/churn.rs
Expand Up @@ -20,7 +20,7 @@ use rand::Rng;
use routing::{Authority, DataIdentifier, Event, EventStream, MessageId, QUORUM, Request, XorName};
use routing::mock_crust::{Config, Network};
use std::cmp;
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};
use super::{TestClient, TestNode, create_connected_clients, create_connected_nodes,
gen_range_except, poll_and_resend, verify_invariant_for_all_nodes};

Expand All @@ -39,9 +39,7 @@ fn drop_random_nodes<R: Rng>(rng: &mut R, nodes: &mut Vec<TestNode>, min_section

// Any network must allow at least one node to be lost:
let num_excess = cmp::max(1,
cmp::min(nodes[i].routing_table().our_section().len() -
min_quorum,
len - min_section_size));
nodes[i].routing_table().our_section().len() - min_section_size);
assert!(num_excess > 0);

let mut removed = 0;
Expand Down Expand Up @@ -91,9 +89,9 @@ fn add_random_node<R: Rng>(rng: &mut R,
};

if len > (2 * min_section_size) {
let mut block_peer = gen_range_except(rng, 0, nodes.len(), Some(new_node));
while block_peer == proxy {
block_peer = gen_range_except(rng, 0, nodes.len(), Some(new_node));
let mut block_peer = gen_range_except(rng, 0, nodes.len() - 1, Some(new_node));
if block_peer == proxy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be >= proxy. (Also below.)

block_peer += 1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can avoid the potentially unlimited number of random number generations:

let mut block_peer = gen_range_except(rng, 0, nodes.len() - 1, Some(new_node));
if block_peer >= proxy {
    block_peer += 1;
}

(Also below.)

network.block_connection(nodes[new_node].handle.endpoint(),
nodes[block_peer].handle.endpoint());
Expand Down Expand Up @@ -123,9 +121,9 @@ fn random_churn<R: Rng>(rng: &mut R,
let mut proxy = rng.gen_range(0, len);
let index = rng.gen_range(1, len + 1);

if nodes.len() > 16 {
let peer_1 = rng.gen_range(0, len);
let peer_2 = gen_range_except(rng, 0, len, Some(peer_1));
if nodes.len() > 2 * network.min_section_size() {
let peer_1 = rng.gen_range(1, len);
let peer_2 = gen_range_except(rng, 1, len, Some(peer_1));
network.lost_connection(nodes[peer_1].handle.endpoint(),
nodes[peer_2].handle.endpoint());
}
Expand All @@ -134,14 +132,15 @@ fn random_churn<R: Rng>(rng: &mut R,

nodes.insert(index, TestNode::builder(network).config(config).create());

if nodes.len() > 16 {
if nodes.len() > 2 * network.min_section_size() {
if index <= proxy {
// When new node sits before the proxy node, proxy index increases by 1
proxy += 1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this? Some comment would be useful here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new node may got inserted into a position before the proxy, hence push proxy_node's index by one.

Copy link
Contributor

@fizyk20 fizyk20 Mar 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I get it! Makes sense 👍 But a comment would still be nice ;)


let mut block_peer = gen_range_except(rng, 0, nodes.len(), Some(index));
while block_peer == proxy {
block_peer = gen_range_except(rng, 0, nodes.len(), Some(index));
let mut block_peer = gen_range_except(rng, 1, nodes.len() - 1, Some(index));
if block_peer == proxy {
block_peer += 1;
}
network.block_connection(nodes[index].handle.endpoint(),
nodes[block_peer].handle.endpoint());
Expand Down Expand Up @@ -227,7 +226,7 @@ impl ExpectedGets {
})
.collect();
let mut section_msgs_received = HashMap::new(); // The count of received section messages.
let mut unexpected_receive = 0;
let mut unexpected_receive = BTreeSet::new();
for node in nodes {
while let Ok(event) = node.try_next_ev() {
if let Event::Request { request: Request::Get(data_id, msg_id), src, dst } = event {
Expand All @@ -236,11 +235,20 @@ impl ExpectedGets {
if !self.sections
.get(&key.3)
.map_or(false, |entry| entry.contains(&node.name())) {
trace!("Unexpected request for node {:?}: {:?} / {:?}",
node.name(),
key,
self.sections);
unexpected_receive += 1;
// Unexpected receive shall only happen for group (only used NaeManager
// in this test), and shall have at most one for each message.
if let Authority::NaeManager(_) = dst {
assert!(unexpected_receive.insert(msg_id),
"Unexpected request for node {:?}: {:?} / {:?}",
node.name(),
key,
self.sections);
} else {
panic!("Unexpected request for node {:?}: {:?} / {:?}",
node.name(),
key,
self.sections);
}
}
*section_msgs_received.entry(key).or_insert(0usize) += 1;
} else {
Expand All @@ -253,7 +261,6 @@ impl ExpectedGets {
}
}
}
assert!(unexpected_receive <= self.sections.len());
for client in clients {
while let Ok(event) = client.inner.try_next_ev() {
if let Event::Request { request: Request::Get(data_id, msg_id), src, dst } = event {
Expand Down Expand Up @@ -425,11 +432,16 @@ fn aggressive_churn() {
poll_and_resend(&mut nodes, &mut []);
info!("simultaneous added {:?}", nodes[added_index].name());
// An candidate could be blocked if it connected to a pre-merge minority section.
// Or be rejected when the proxy node's RT is not large enough due to a lost tunnel.
// In that case, a restart of candidate shall be carried out.
if nodes[added_index].inner.try_next_ev().is_err() {
let config = Config::with_contacts(&[nodes[proxy_index].handle.endpoint()]);
nodes[added_index] = TestNode::builder(&network).config(config).create();
poll_and_resend(&mut nodes, &mut []);
match nodes[added_index].inner.try_next_ev() {
Err(_) |
Ok(Event::Terminate) => {
let config = Config::with_contacts(&[nodes[proxy_index].handle.endpoint()]);
nodes[added_index] = TestNode::builder(&network).config(config).create();
poll_and_resend(&mut nodes, &mut []);
}
Ok(_) => {}
}

verify_invariant_for_all_nodes(&nodes);
Expand Down