Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Maid-1977 churn test with tunnel #1396

Merged
merged 7 commits into from Mar 20, 2017
4 changes: 3 additions & 1 deletion src/mock_crust/support.rs
Expand Up @@ -120,7 +120,9 @@ impl Network {
let service_1 = unwrap!(self.find_service(node_1),
"Cannot fetch service of {:?}.",
node_1);
let _ = service_1.borrow_mut().remove_connection_by_endpoint(node_2);
if service_1.borrow_mut().remove_connection_by_endpoint(node_2).is_none() {
return;
}
let service_2 = unwrap!(self.find_service(node_2),
"Cannot fetch service of {:?}.",
node_2);
Expand Down
48 changes: 29 additions & 19 deletions src/peer_manager.rs
Expand Up @@ -48,8 +48,6 @@ pub const RESOURCE_PROOF_DURATION_SECS: u64 = 300;
const CANDIDATE_ACCEPT_TIMEOUT_SECS: u64 = 60;
/// Time (in seconds) the node waits for connection from an expected node.
const NODE_CONNECT_TIMEOUT_SECS: u64 = 60;
/// Number of close nodes we try to use to establish a tunnel
const NUM_TUNNEL_VIA_NODES: usize = 10;

pub type SectionMap = BTreeMap<Prefix<XorName>, BTreeSet<PublicId>>;

Expand Down Expand Up @@ -1219,24 +1217,34 @@ impl PeerManager {
let _ = self.set_state(peer_id, PeerState::Routing(RoutingConnection::Direct));
}

/// Returns the `NUM_TUNNEL_VIA_NODES` closest peers from our section which can be potential
/// tunnel node.
pub fn potential_tunnel_nodes(&self) -> Vec<(XorName, PeerId)> {
let our_section = self.routing_table
.other_close_names(self.our_public_id.name())
.unwrap_or_default();
self.peer_map
.peers()
.filter_map(|peer| if our_section.contains(peer.name()) &&
peer.state.can_tunnel_for() {
peer.peer_id.map_or(None, |peer_id| Some((*peer.name(), peer_id)))
} else {
None
/// Returns direct-connected peers in our section and in the peer's section.
pub fn potential_tunnel_nodes(&self, name: &XorName) -> Vec<(XorName, PeerId)> {
let potential_tunnel_nodes =
self.routing_table.our_section() |
self.routing_table.get_section(name).unwrap_or(&BTreeSet::new());
potential_tunnel_nodes.iter()
.filter_map(|name| {
if name == self.our_public_id.name() {
return None;
}
self.peer_map.get_by_name(name).and_then(|peer| if peer.state.can_tunnel_for() {
peer.peer_id().and_then(|peer_id| Some((*name, *peer_id)))
} else {
None
})
})
.take(NUM_TUNNEL_VIA_NODES)
.collect()
}

/// Returns true if peer is direct-connected and in our section or in tunnel_client's section.
pub fn is_potential_tunnel_node(&self, peer: &PublicId, tunnel_client: &XorName) -> bool {
self.our_public_id != *peer &&
(self.routing_table.our_prefix().matches(peer.name()) ||
self.routing_table.get_section(peer.name()).map_or(false, |section| {
Some(section) == self.routing_table.get_section(tunnel_client)
})) && self.get_state_by_name(peer.name()).map_or(false, PeerState::can_tunnel_for)
}

/// Sets the given peer to state `SearchingForTunnel` and returns querying candidates.
/// Returns empty vector of candidates if it is already in Routing state.
pub fn set_searching_for_tunnel(&mut self,
Expand All @@ -1253,7 +1261,7 @@ impl PeerManager {
}

let _ = self.insert_peer(pub_id, Some(peer_id), PeerState::SearchingForTunnel);
self.potential_tunnel_nodes()
self.potential_tunnel_nodes(pub_id.name())
}

/// Inserts the given connection info in the map to wait for the peer's info, or returns both
Expand Down Expand Up @@ -1404,11 +1412,13 @@ impl PeerManager {
}

/// Returns all peers we are looking for a tunnel to.
pub fn peers_needing_tunnel(&self) -> Vec<PeerId> {
pub fn peers_needing_tunnel(&self) -> Vec<(PeerId, XorName)> {
self.peer_map
.peers()
.filter_map(|peer| match peer.state {
PeerState::SearchingForTunnel => peer.peer_id,
PeerState::SearchingForTunnel => {
peer.peer_id.and_then(|peer_id| Some((peer_id, *peer.name())))
}
_ => None,
})
.collect()
Expand Down
25 changes: 16 additions & 9 deletions src/states/node.rs
Expand Up @@ -1459,6 +1459,7 @@ impl Node {
peer_id: &PeerId,
outbox: &mut EventBox) {
let want_to_merge = self.we_want_to_merge() || self.they_want_to_merge();
let mut need_split = false;
match self.peer_mgr.add_to_routing_table(public_id, peer_id, want_to_merge) {
Err(RoutingTableError::AlreadyExists) => return, // already in RT
Err(error) => {
Expand All @@ -1476,6 +1477,7 @@ impl Node {
// `send_section_split()` here and also check whether another round of splitting is
// required in `handle_section_split()` so splitting becomes recursive like merging.
if our_prefix.matches(public_id.name()) {
need_split = true;
self.send_section_split(our_prefix, *public_id.name());
}
}
Expand All @@ -1500,9 +1502,9 @@ impl Node {
outbox.send_event(Event::NodeAdded(*public_id.name(),
self.peer_mgr.routing_table().clone()));

// TODO: we probably don't need to send this if we're splitting, but in that case
// we should send something else instead. This will do for now.
self.send_section_update(None);
if !need_split && self.our_prefix().matches(public_id.name()) {
self.send_section_update(None);
}

if let Some(prefix) = self.peer_mgr
.routing_table()
Expand All @@ -1517,10 +1519,8 @@ impl Node {
}
}

let peers_needing_tunnel = self.peer_mgr.peers_needing_tunnel();
if !peers_needing_tunnel.is_empty() &&
self.peer_mgr.potential_tunnel_nodes().contains(&(*public_id.name(), *peer_id)) {
for dst_id in peers_needing_tunnel {
for (dst_id, peer_name) in self.peer_mgr.peers_needing_tunnel() {
if self.peer_mgr.is_potential_tunnel_node(public_id, &peer_name) {
trace!("{:?} Asking {:?} to serve as a tunnel for {:?}",
self,
peer_id,
Expand All @@ -1539,11 +1539,15 @@ impl Node {
// or split a section.
fn send_section_update(&mut self, dst_prefix: Option<Prefix<XorName>>) {
if !self.peer_mgr.routing_table().is_valid() {
trace!("{:?} Not sending section update since RT invariant not held.",
warn!("{:?} Not sending section update since RT invariant not held.",
self);
return;
} else if self.they_want_to_merge() || self.we_want_to_merge() {
trace!("{:?} Not sending section update since we are in the process of merging.",
self);
return;
}
trace!("{:?} Sending section update", self);

let members = self.peer_mgr.get_pub_ids(self.peer_mgr.routing_table().our_section());

let content = MessageContent::SectionUpdate {
Expand All @@ -1556,6 +1560,9 @@ impl Node {
Some(prefix) => iter::once(prefix).collect(),
None => self.peer_mgr.routing_table().other_prefixes(),
};

trace!("{:?} Sending section update to {:?}", self, neighbours);

for neighbour_pfx in neighbours {
let src = Authority::Section(self.our_prefix().lower_bound());
let dst = Authority::PrefixSection(neighbour_pfx);
Expand Down
106 changes: 84 additions & 22 deletions tests/mock_crust/churn.rs
Expand Up @@ -20,7 +20,8 @@ use rand::Rng;
use routing::{Authority, DataIdentifier, Event, EventStream, MessageId, QUORUM, Request, XorName};
use routing::mock_crust::{Config, Network};
use std::cmp;
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::iter;
use super::{TestClient, TestNode, create_connected_clients, create_connected_nodes,
gen_range_except, poll_and_resend, verify_invariant_for_all_nodes};

Expand All @@ -39,9 +40,7 @@ fn drop_random_nodes<R: Rng>(rng: &mut R, nodes: &mut Vec<TestNode>, min_section

// Any network must allow at least one node to be lost:
let num_excess = cmp::max(1,
cmp::min(nodes[i].routing_table().our_section().len() -
min_quorum,
len - min_section_size));
nodes[i].routing_table().our_section().len() - min_section_size);
assert!(num_excess > 0);

let mut removed = 0;
Expand Down Expand Up @@ -84,11 +83,22 @@ fn add_random_node<R: Rng>(rng: &mut R,
let config = Config::with_contacts(&[nodes[proxy].handle.endpoint()]);

nodes.insert(index, TestNode::builder(network).config(config).create());
if index <= proxy {
let (new_node, proxy) = if index <= proxy {
(index, proxy + 1)
} else {
(index, proxy)
};

if len > (2 * min_section_size) {
let exclude = vec![new_node, proxy].into_iter().collect();
let block_peer = gen_range_except(rng, 0, nodes.len(), &exclude);
network.block_connection(nodes[new_node].handle.endpoint(),
nodes[block_peer].handle.endpoint());
network.block_connection(nodes[block_peer].handle.endpoint(),
nodes[new_node].handle.endpoint());
}

(new_node, proxy)
}

// Randomly adds or removes some nodes, causing churn.
Expand All @@ -107,11 +117,33 @@ fn random_churn<R: Rng>(rng: &mut R,

None
} else {
let proxy = rng.gen_range(0, len);
let mut proxy = rng.gen_range(0, len);
let index = rng.gen_range(1, len + 1);

if nodes.len() > 2 * network.min_section_size() {
let peer_1 = rng.gen_range(1, len);
let peer_2 = gen_range_except(rng, 1, len, &iter::once(peer_1).collect());
network.lost_connection(nodes[peer_1].handle.endpoint(),
nodes[peer_2].handle.endpoint());
}

let config = Config::with_contacts(&[nodes[proxy].handle.endpoint()]);

nodes.insert(index, TestNode::builder(network).config(config).create());

if nodes.len() > 2 * network.min_section_size() {
if index <= proxy {
// When new node sits before the proxy node, proxy index increases by 1
proxy += 1;
}
let exclude = vec![index, proxy].into_iter().collect();
let block_peer = gen_range_except(rng, 0, nodes.len(), &exclude);
network.block_connection(nodes[index].handle.endpoint(),
nodes[block_peer].handle.endpoint());
network.block_connection(nodes[block_peer].handle.endpoint(),
nodes[index].handle.endpoint());
}

Some(index)
}
}
Expand Down Expand Up @@ -190,18 +222,30 @@ impl ExpectedGets {
})
.collect();
let mut section_msgs_received = HashMap::new(); // The count of received section messages.
let mut unexpected_receive = BTreeSet::new();
for node in nodes {
while let Ok(event) = node.try_next_ev() {
if let Event::Request { request: Request::Get(data_id, msg_id), src, dst } = event {
let key = (data_id, msg_id, src, dst);
if dst.is_multiple() {
assert!(self.sections
.get(&key.3)
.map_or(false, |entry| entry.contains(&node.name())),
"Unexpected request for node {:?}: {:?} / {:?}",
node.name(),
key,
self.sections);
if !self.sections
.get(&key.3)
.map_or(false, |entry| entry.contains(&node.name())) {
// Unexpected receive shall only happen for group (only used NaeManager
// in this test), and shall have at most one for each message.
if let Authority::NaeManager(_) = dst {
assert!(unexpected_receive.insert(msg_id),
"Unexpected request for node {:?}: {:?} / {:?}",
node.name(),
key,
self.sections);
} else {
panic!("Unexpected request for node {:?}: {:?} / {:?}",
node.name(),
key,
self.sections);
}
}
*section_msgs_received.entry(key).or_insert(0usize) += 1;
} else {
assert_eq!(node.name(), dst.name());
Expand Down Expand Up @@ -244,8 +288,9 @@ fn send_and_receive<R: Rng>(mut rng: &mut R,
added_index: Option<usize>) {
// Create random data ID and pick random sending and receiving nodes.
let data_id = DataIdentifier::Immutable(rng.gen());
let index0 = gen_range_except(&mut rng, 0, nodes.len(), added_index);
let index1 = gen_range_except(&mut rng, 0, nodes.len(), added_index);
let exclude = added_index.map_or(BTreeSet::new(), |index| iter::once(index).collect());
let index0 = gen_range_except(&mut rng, 0, nodes.len(), &exclude);
let index1 = gen_range_except(&mut rng, 0, nodes.len(), &exclude);
let auth_n0 = Authority::ManagedNode(nodes[index0].name());
let auth_n1 = Authority::ManagedNode(nodes[index1].name());
let auth_g0 = Authority::NaeManager(rng.gen());
Expand Down Expand Up @@ -357,8 +402,18 @@ fn aggressive_churn() {
nodes.len(),
count_sections(&nodes));
while count_sections(&nodes) <= 5 || nodes.len() < 50 {
if nodes.len() > (2 * min_section_size) {
let peer_1 = rng.gen_range(0, nodes.len());
let peer_2 = gen_range_except(&mut rng, 0, nodes.len(), &iter::once(peer_1).collect());
info!("lost connection between {:?} and {:?}",
nodes[peer_1].name(),
nodes[peer_2].name());
network.lost_connection(nodes[peer_1].handle.endpoint(),
nodes[peer_2].handle.endpoint());
}
let (added_index, _) = add_random_node(&mut rng, &network, &mut nodes, min_section_size);
poll_and_resend(&mut nodes, &mut []);
info!("added {:?}", nodes[added_index].name());
verify_invariant_for_all_nodes(&nodes);
verify_section_list_signatures(&nodes);
send_and_receive(&mut rng, &mut nodes, min_section_size, Some(added_index));
Expand All @@ -372,13 +427,18 @@ fn aggressive_churn() {
let (added_index, proxy_index) =
add_random_node(&mut rng, &network, &mut nodes, min_section_size);
poll_and_resend(&mut nodes, &mut []);

info!("simultaneous added {:?}", nodes[added_index].name());
// An candidate could be blocked if it connected to a pre-merge minority section.
// Or be rejected when the proxy node's RT is not large enough due to a lost tunnel.
// In that case, a restart of candidate shall be carried out.
if nodes[added_index].inner.try_next_ev().is_err() {
let config = Config::with_contacts(&[nodes[proxy_index].handle.endpoint()]);
nodes[added_index] = TestNode::builder(&network).config(config).create();
poll_and_resend(&mut nodes, &mut []);
match nodes[added_index].inner.try_next_ev() {
Err(_) |
Ok(Event::Terminate) => {
let config = Config::with_contacts(&[nodes[proxy_index].handle.endpoint()]);
nodes[added_index] = TestNode::builder(&network).config(config).create();
poll_and_resend(&mut nodes, &mut []);
}
Ok(_) => {}
}

verify_invariant_for_all_nodes(&nodes);
Expand All @@ -392,6 +452,7 @@ fn aggressive_churn() {
nodes.len(),
count_sections(&nodes));
while nodes.len() > min_section_size {
info!("dropping ------ {}", nodes.len());
drop_random_nodes(&mut rng, &mut nodes, min_section_size);
poll_and_resend(&mut nodes, &mut []);
verify_invariant_for_all_nodes(&nodes);
Expand Down Expand Up @@ -424,8 +485,9 @@ fn messages_during_churn() {

// Create random data ID and pick random sending and receiving nodes.
let data_id = DataIdentifier::Immutable(rng.gen());
let index0 = gen_range_except(&mut rng, 0, nodes.len(), added_index);
let index1 = gen_range_except(&mut rng, 0, nodes.len(), added_index);
let exclude = added_index.map_or(BTreeSet::new(), |index| iter::once(index).collect());
let index0 = gen_range_except(&mut rng, 0, nodes.len(), &exclude);
let index1 = gen_range_except(&mut rng, 0, nodes.len(), &exclude);
let auth_n0 = Authority::ManagedNode(nodes[index0].name());
let auth_n1 = Authority::ManagedNode(nodes[index1].name());
let auth_g0 = Authority::NaeManager(rng.gen());
Expand Down
15 changes: 6 additions & 9 deletions tests/mock_crust/utils.rs
Expand Up @@ -40,18 +40,15 @@ const BALANCED_POLLING: bool = true;
pub fn gen_range_except<T: Rng>(rng: &mut T,
low: usize,
high: usize,
exclude: Option<usize>)
exclude: &BTreeSet<usize>)
-> usize {
match exclude {
None => rng.gen_range(low, high),
Some(exclude) => {
let mut r = rng.gen_range(low, high - 1);
if r >= exclude {
r += 1
}
r
let mut x = rng.gen_range(low, high - exclude.len());
for e in exclude {
if x >= *e {
x += 1;
}
}
x
}


Expand Down