diff --git a/src/client.rs b/src/client.rs index 081a0b82a5..c21fd511a6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -19,7 +19,7 @@ use crate::messages::{Request, CLIENT_GET_PRIORITY, DEFAULT_PRIORITY}; use crate::outbox::{EventBox, EventBuf}; use crate::routing_table::Authority; use crate::state_machine::{State, StateMachine}; -use crate::states::{Bootstrapping, BootstrappingTargetState}; +use crate::states::{BootstrappingPeer, TargetState}; use crate::types::{MessageId, RoutingActionSender}; use crate::xor_name::XorName; use crate::{BootstrapConfig, MIN_SECTION_SIZE}; @@ -72,16 +72,17 @@ impl Client { StateMachine::new( move |action_sender, crust_service, timer, _outbox2| { - Bootstrapping::new( + BootstrappingPeer::new( action_sender, Box::new(NullCache), - BootstrappingTargetState::Client { msg_expiry_dur }, + TargetState::Client { msg_expiry_dur }, crust_service, full_id, min_section_size, timer, ) - .map_or(State::Terminated, State::Bootstrapping) + .map(State::BootstrappingPeer) + .unwrap_or(State::Terminated) }, pub_id, bootstrap_config, diff --git a/src/mock_parsec/mod.rs b/src/mock_parsec/mod.rs index 0c3a0c87e4..60ac9aa20f 100644 --- a/src/mock_parsec/mod.rs +++ b/src/mock_parsec/mod.rs @@ -184,10 +184,14 @@ where unimplemented!() } + // TODO: rename this to `has_unpolled_observations` pub fn has_unconsensused_observations(&self) -> bool { - self.observations - .iter() - .any(|(_, obs)| obs.state != ConsensusState::Polled) + state::with::(self.section_hash, |state| { + state + .observations + .iter() + .any(|(_, observation_state)| !observation_state.consensused()) + }) || self.our_unpolled_observations().next().is_some() } pub fn our_unpolled_observations(&self) -> impl Iterator> { diff --git a/src/mock_parsec/observation.rs b/src/mock_parsec/observation.rs index d86f3423b0..0c985cc2a8 100644 --- a/src/mock_parsec/observation.rs +++ b/src/mock_parsec/observation.rs @@ -83,6 +83,10 @@ impl ObservationState

{ } } + pub fn consensused(&self) -> bool { + self.consensused + } + fn compute_consensus( &mut self, peers: &BTreeSet

, diff --git a/src/node.rs b/src/node.rs index 2bd1528aba..895cd2a2e6 100644 --- a/src/node.rs +++ b/src/node.rs @@ -22,7 +22,7 @@ use crate::messages::{ use crate::outbox::{EventBox, EventBuf}; use crate::routing_table::Authority; use crate::state_machine::{State, StateMachine}; -use crate::states::{self, Bootstrapping, BootstrappingTargetState}; +use crate::states::{self, BootstrappingPeer, TargetState}; use crate::types::{MessageId, RoutingActionSender}; use crate::xor_name::XorName; #[cfg(feature = "mock_base")] @@ -141,17 +141,9 @@ impl NodeBuilder { StateMachine::new( move |action_sender, crust_service, timer, outbox2| { if self.first { - if let Some(state) = states::Node::first( - self.cache, - crust_service, - full_id, - min_section_size, - timer, - ) { - State::Node(state) - } else { - State::Terminated - } + states::Node::first(self.cache, crust_service, full_id, min_section_size, timer) + .map(State::Node) + .unwrap_or(State::Terminated) } else if !dev_config.allow_multiple_lan_nodes && crust_service.has_peers_on_lan() { error!( "More than one routing node found on LAN. Currently this is not supported." @@ -159,16 +151,17 @@ impl NodeBuilder { outbox2.send_event(Event::Terminated); State::Terminated } else { - Bootstrapping::new( + BootstrappingPeer::new( action_sender, self.cache, - BootstrappingTargetState::RelocatingNode, + TargetState::RelocatingNode, crust_service, full_id, min_section_size, timer, ) - .map_or(State::Terminated, State::Bootstrapping) + .map(State::BootstrappingPeer) + .unwrap_or(State::Terminated) } }, pub_id, @@ -616,10 +609,10 @@ impl Node { } /// Indicates if there are any pending observations in the parsec object - pub fn has_unconsensused_observations(&self, filter_opaque: bool) -> bool { + pub fn has_unpolled_observations(&self, filter_opaque: bool) -> bool { self.machine .current() - .has_unconsensused_observations(filter_opaque) + .has_unpolled_observations(filter_opaque) } /// Indicates if a given `PublicId` is in the peer manager as a routing peer diff --git a/src/parsec.rs b/src/parsec.rs index 5a8bc0d8a1..f106188cc8 100644 --- a/src/parsec.rs +++ b/src/parsec.rs @@ -36,9 +36,11 @@ pub struct ParsecMap { impl ParsecMap { pub fn new(full_id: FullId, gen_pfx_info: &GenesisPfxInfo) -> Self { - let parsec = create(full_id, gen_pfx_info); let mut map = BTreeMap::new(); - let _ = map.insert(*gen_pfx_info.first_info.version(), parsec); + let _ = map.insert( + *gen_pfx_info.first_info.version(), + create(full_id, gen_pfx_info), + ); Self { map } } @@ -100,11 +102,10 @@ impl ParsecMap { } pub fn create_gossip(&mut self, version: u64, target: &id::PublicId) -> Option { - let parsec = self.map.get_mut(&version)?; - parsec - .create_gossip(target) - .ok() - .map(|request| Message::Direct(DirectMessage::ParsecRequest(version, request))) + let request = self.map.get_mut(&version)?.create_gossip(target).ok()?; + Some(Message::Direct(DirectMessage::ParsecRequest( + version, request, + ))) } pub fn vote_for(&mut self, event: chain::NetworkEvent, log_ident: &str) { @@ -159,7 +160,7 @@ impl ParsecMap { } #[cfg(feature = "mock_base")] - pub fn has_unconsensused_observations(&self, filter_opaque: bool) -> bool { + pub fn has_unpolled_observations(&self, filter_opaque: bool) -> bool { let parsec = if let Some(parsec) = self.map.values().last() { parsec } else { @@ -177,7 +178,7 @@ impl ParsecMap { } /// Create Parsec instance. -pub fn create(full_id: FullId, gen_pfx_info: &GenesisPfxInfo) -> Parsec { +fn create(full_id: FullId, gen_pfx_info: &GenesisPfxInfo) -> Parsec { if gen_pfx_info .first_info .members() diff --git a/src/peer_manager.rs b/src/peer_manager.rs index 4ba146cd5d..f58c26ab8e 100644 --- a/src/peer_manager.rs +++ b/src/peer_manager.rs @@ -308,7 +308,6 @@ pub struct PeerManager { our_public_id: PublicId, candidate: Candidate, disable_client_rate_limiter: bool, - established: bool, } impl PeerManager { @@ -320,7 +319,6 @@ impl PeerManager { our_public_id: our_public_id, candidate: Candidate::None, disable_client_rate_limiter: disable_client_rate_limiter, - established: false, } } @@ -633,12 +631,7 @@ impl PeerManager { } /// Remove and return `PublicId`s of expired peers. - /// Will only be active once we are established. pub fn remove_expired_peers(&mut self) -> Vec { - if !self.established { - return vec![]; - } - let remove_candidate = if self.candidate.is_expired() { match self.candidate { Candidate::None => None, @@ -979,17 +972,6 @@ impl PeerManager { self.peers.remove(pub_id).is_some() || remove_candidate } - - /// Sets this peer as established. - /// Expired peers will be purged once established. - pub fn set_established(&mut self) { - self.established = true; - } - - /// Returns whether this peer is established. - pub fn is_established(&self) -> bool { - self.established - } } impl fmt::Display for PeerManager { diff --git a/src/state_machine.rs b/src/state_machine.rs index acba16792e..c19d7439e4 100644 --- a/src/state_machine.rs +++ b/src/state_machine.rs @@ -8,12 +8,12 @@ use crate::{ action::Action, - chain::GenesisPfxInfo, + chain::{GenesisPfxInfo, SectionInfo}, id::{FullId, PublicId}, outbox::EventBox, routing_table::Prefix, states::common::Base, - states::{Bootstrapping, Client, Node, ProvingNode, RelocatingNode}, + states::{BootstrappingPeer, Client, EstablishingNode, Node, ProvingNode, RelocatingNode}, timer::Timer, types::RoutingActionSender, xor_name::XorName, @@ -37,10 +37,11 @@ use std::{ macro_rules! state_dispatch { ($self:expr, $state:pat => $expr:expr, Terminated => $term_expr:expr) => { match $self { - State::Bootstrapping($state) => $expr, + State::BootstrappingPeer($state) => $expr, State::Client($state) => $expr, State::RelocatingNode($state) => $expr, State::ProvingNode($state) => $expr, + State::EstablishingNode($state) => $expr, State::Node($state) => $expr, State::Terminated => $term_expr, } @@ -63,10 +64,11 @@ pub struct StateMachine { // FIXME - See https://maidsafe.atlassian.net/browse/MAID-2026 for info on removing this exclusion. #[allow(clippy::large_enum_variant)] pub enum State { - Bootstrapping(Bootstrapping), + BootstrappingPeer(BootstrappingPeer), Client(Client), RelocatingNode(RelocatingNode), ProvingNode(ProvingNode), + EstablishingNode(EstablishingNode), Node(Node), Terminated, } @@ -123,8 +125,13 @@ impl State { #[cfg(feature = "mock_base")] fn chain(&self) -> Option<&Chain> { match *self { + State::EstablishingNode(ref state) => Some(state.chain()), State::Node(ref state) => Some(state.chain()), - _ => None, + State::BootstrappingPeer(_) + | State::Client(_) + | State::RelocatingNode(_) + | State::ProvingNode(_) + | State::Terminated => None, } } @@ -150,13 +157,31 @@ impl State { ) } - fn replace_with(&mut self, f: F) + fn replace_with(&mut self, f: F) where - F: FnOnce(Self) -> Self, + F: FnOnce(Self) -> Result, + E: Debug, { let old_state = mem::replace(self, State::Terminated); - let new_state = f(old_state); - *self = new_state; + let old_state_log_ident = format!("{}", old_state); + + match f(old_state) { + Ok(new_state) => *self = new_state, + Err(error) => error!( + "{} - Failed state transition: {:?}", + old_state_log_ident, error + ), + } + } +} + +impl Display for State { + fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { + state_dispatch!( + *self, + ref state => write!(formatter, "{}", state), + Terminated => write!(formatter, "Terminated") + ) } } @@ -193,49 +218,62 @@ impl State { pub fn get_timed_out_tokens(&mut self) -> Vec { match *self { - State::Node(ref mut state) => state.get_timed_out_tokens(), + State::BootstrappingPeer(_) | State::Terminated => vec![], State::Client(ref mut state) => state.get_timed_out_tokens(), State::RelocatingNode(ref mut state) => state.get_timed_out_tokens(), - _ => vec![], + State::ProvingNode(ref mut state) => state.get_timed_out_tokens(), + State::EstablishingNode(ref mut state) => state.get_timed_out_tokens(), + State::Node(ref mut state) => state.get_timed_out_tokens(), } } pub fn get_clients_usage(&self) -> Option> { - match *self { - State::Node(ref state) => Some(state.get_clients_usage()), - _ => None, + if let State::Node(ref state) = *self { + Some(state.get_clients_usage()) + } else { + None } } - pub fn has_unconsensused_observations(&self, filter_opaque: bool) -> bool { + pub fn has_unpolled_observations(&self, filter_opaque: bool) -> bool { match *self { - State::Node(ref state) => state.has_unconsensused_observations(filter_opaque), - _ => false, + State::Terminated + | State::BootstrappingPeer(_) + | State::Client(_) + | State::RelocatingNode(_) + | State::ProvingNode(_) => false, + State::EstablishingNode(ref state) => state.has_unpolled_observations(filter_opaque), + State::Node(ref state) => state.has_unpolled_observations(filter_opaque), } } pub fn is_routing_peer(&self, pub_id: &PublicId) -> bool { - match *self { - State::Node(ref state) => state.is_routing_peer(pub_id), - _ => false, + if let State::Node(ref state) = *self { + state.is_routing_peer(pub_id) + } else { + false } } pub fn in_authority(&self, auth: &Authority) -> bool { match *self { - State::Node(ref state) => state.in_authority(auth), + State::Terminated | State::BootstrappingPeer(_) => false, State::Client(ref state) => state.in_authority(auth), State::RelocatingNode(ref state) => state.in_authority(auth), - _ => false, + State::ProvingNode(ref state) => state.in_authority(auth), + State::EstablishingNode(ref state) => state.in_authority(auth), + State::Node(ref state) => state.in_authority(auth), } } pub fn has_unacked_msg(&self) -> bool { match *self { - State::Node(ref state) => state.ack_mgr().has_unacked_msg(), + State::Terminated | State::BootstrappingPeer(_) => false, State::Client(ref state) => state.ack_mgr().has_unacked_msg(), State::RelocatingNode(ref state) => state.ack_mgr().has_unacked_msg(), - _ => false, + State::ProvingNode(ref state) => state.ack_mgr().has_unacked_msg(), + State::EstablishingNode(ref state) => state.ack_mgr().has_unacked_msg(), + State::Node(ref state) => state.ack_mgr().has_unacked_msg(), } } } @@ -254,10 +292,15 @@ pub enum Transition { new_id: FullId, our_section: (Prefix, BTreeSet), }, - // `ProvingNode` state transitioning to `Node`. - IntoNode { + // `ProvingNode` state transitioning to `EstablishingNode`. + IntoEstablishingNode { gen_pfx_info: GenesisPfxInfo, }, + // `EstablishingNode` state transition to `Node`. + IntoNode { + sec_info: SectionInfo, + old_pfx: Prefix, + }, Terminate, } @@ -384,7 +427,7 @@ impl StateMachine { match transition { Stay => (), IntoBootstrapped { proxy_public_id } => self.state.replace_with(|state| match state { - State::Bootstrapping(src) => src.into_target_state(proxy_public_id, outbox), + State::BootstrappingPeer(src) => src.into_target_state(proxy_public_id, outbox), _ => unreachable!(), }), IntoBootstrapping { @@ -407,8 +450,12 @@ impl StateMachine { _ => unreachable!(), }) } - IntoNode { gen_pfx_info } => self.state.replace_with(|state| match state { - State::ProvingNode(src) => src.into_node(gen_pfx_info), + IntoEstablishingNode { gen_pfx_info } => self.state.replace_with(|state| match state { + State::ProvingNode(src) => src.into_establishing_node(gen_pfx_info, outbox), + _ => unreachable!(), + }), + IntoNode { sec_info, old_pfx } => self.state.replace_with(|state| match state { + State::EstablishingNode(src) => src.into_node(sec_info, old_pfx, outbox), _ => unreachable!(), }), Terminate => self.terminate(), @@ -546,6 +593,6 @@ impl StateMachine { impl Display for StateMachine { fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { - self.state.fmt(formatter) + write!(formatter, "{:?}", self.state) } } diff --git a/src/states/bootstrapping.rs b/src/states/bootstrapping_peer.rs similarity index 79% rename from src/states/bootstrapping.rs rename to src/states/bootstrapping_peer.rs index 38d19454ce..8e91186fe7 100644 --- a/src/states/bootstrapping.rs +++ b/src/states/bootstrapping_peer.rs @@ -7,23 +7,25 @@ // permissions and limitations relating to use of the SAFE Network Software. use super::{ - common::{from_crust_bytes, Base}, - Client, ProvingNode, RelocatingNode, + client::{Client, ClientDetails}, + common::Base, + proving_node::{ProvingNode, ProvingNodeDetails}, + relocating_node::{RelocatingNode, RelocatingNodeDetails}, }; use crate::{ cache::Cache, crust::CrustUser, - error::InterfaceError, + error::{InterfaceError, RoutingError}, event::Event, id::{FullId, PublicId}, - messages::{DirectMessage, Message, Request, UserMessage}, + messages::{DirectMessage, HopMessage, Message, Request, UserMessage}, outbox::EventBox, routing_table::{Authority, Prefix}, state_machine::{State, Transition}, timer::Timer, types::RoutingActionSender, xor_name::XorName, - CrustBytes, Service, + Service, }; use maidsafe_utilities::serialisation; use std::{ @@ -51,20 +53,19 @@ pub enum TargetState { } // State of Client or Node while bootstrapping. -pub struct Bootstrapping { +pub struct BootstrappingPeer { action_sender: RoutingActionSender, bootstrap_blacklist: HashSet, bootstrap_connection: Option<(PublicId, u64)>, cache: Box, - target_state: TargetState, crust_service: Service, full_id: FullId, min_section_size: usize, + target_state: TargetState, timer: Timer, } -impl Bootstrapping { - #[allow(clippy::new_ret_no_self)] +impl BootstrappingPeer { pub fn new( action_sender: RoutingActionSender, cache: Box, @@ -73,7 +74,7 @@ impl Bootstrapping { full_id: FullId, min_section_size: usize, timer: Timer, - ) -> Option { + ) -> Result { match target_state { TargetState::Client { .. } => { let _ = crust_service.start_bootstrap(HashSet::new(), CrustUser::Client); @@ -81,66 +82,82 @@ impl Bootstrapping { TargetState::RelocatingNode | TargetState::ProvingNode { .. } => { if let Err(error) = crust_service.start_listening_tcp() { error!("Failed to start listening: {:?}", error); - return None; + return Err(error.into()); } } } - Some(Bootstrapping { - action_sender: action_sender, - bootstrap_blacklist: HashSet::new(), - bootstrap_connection: None, + + Ok(Self { + action_sender, cache: cache, - target_state: target_state, - crust_service: crust_service, - full_id: full_id, - min_section_size: min_section_size, + crust_service, + full_id, + min_section_size, timer: timer, + bootstrap_blacklist: HashSet::new(), + bootstrap_connection: None, + target_state, }) } - pub fn into_target_state(self, proxy_public_id: PublicId, outbox: &mut EventBox) -> State { + pub fn into_target_state( + self, + proxy_pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result { match self.target_state { - TargetState::Client { msg_expiry_dur } => State::Client(Client::from_bootstrapping( - self.crust_service, - self.full_id, - self.min_section_size, - proxy_public_id, - self.timer, - msg_expiry_dur, - outbox, - )), + TargetState::Client { msg_expiry_dur } => { + Ok(State::Client(Client::from_bootstrapping( + ClientDetails { + crust_service: self.crust_service, + full_id: self.full_id, + min_section_size: self.min_section_size, + msg_expiry_dur, + proxy_pub_id, + timer: self.timer, + }, + outbox, + ))) + } TargetState::RelocatingNode => { - if let Some(node) = RelocatingNode::from_bootstrapping( - self.action_sender, - self.cache, - self.crust_service, - self.full_id, - self.min_section_size, - proxy_public_id, - self.timer, - ) { - State::RelocatingNode(node) - } else { - outbox.send_event(Event::RestartRequired); - State::Terminated - } + let details = RelocatingNodeDetails { + action_sender: self.action_sender, + cache: self.cache, + crust_service: self.crust_service, + full_id: self.full_id, + min_section_size: self.min_section_size, + proxy_pub_id, + timer: self.timer, + }; + + RelocatingNode::from_bootstrapping(details) + .map(State::RelocatingNode) + .map_err(|err| { + outbox.send_event(Event::RestartRequired); + err + }) } TargetState::ProvingNode { old_full_id, our_section, .. - } => State::ProvingNode(ProvingNode::from_bootstrapping( - our_section, - self.action_sender, - self.cache, - self.crust_service, - old_full_id, - self.full_id, - self.min_section_size, - proxy_public_id, - self.timer, - outbox, - )), + } => { + let details = ProvingNodeDetails { + action_sender: self.action_sender, + cache: self.cache, + crust_service: self.crust_service, + full_id: self.full_id, + min_section_size: self.min_section_size, + old_full_id, + our_section, + proxy_pub_id, + timer: self.timer, + }; + + Ok(State::ProvingNode(ProvingNode::from_bootstrapping( + details, outbox, + ))) + } } } @@ -151,28 +168,6 @@ impl Bootstrapping { } } - fn handle_direct_message( - &mut self, - direct_message: DirectMessage, - pub_id: PublicId, - ) -> Transition { - use self::DirectMessage::*; - match direct_message { - BootstrapResponse(Ok(())) => Transition::IntoBootstrapped { - proxy_public_id: pub_id, - }, - BootstrapResponse(Err(error)) => { - info!("{} Connection failed: {}", self, error); - self.rebootstrap(); - Transition::Stay - } - _ => { - debug!("{} - Unhandled direct message: {:?}", self, direct_message); - Transition::Stay - } - } - } - fn send_bootstrap_request(&mut self, pub_id: PublicId) { debug!("{} Sending BootstrapRequest to {}.", self, pub_id); @@ -222,7 +217,7 @@ impl Bootstrapping { } } -impl Base for Bootstrapping { +impl Base for BootstrappingPeer { fn crust_service(&self) -> &Service { &self.crust_service } @@ -323,25 +318,6 @@ impl Base for Bootstrapping { Transition::Stay } - fn handle_new_message( - &mut self, - pub_id: PublicId, - bytes: CrustBytes, - _: &mut EventBox, - ) -> Transition { - match from_crust_bytes(bytes) { - Ok(Message::Direct(direct_msg)) => self.handle_direct_message(direct_msg, pub_id), - Ok(message) => { - debug!("{} - Unhandled new message: {:?}", self, message); - Transition::Stay - } - Err(error) => { - debug!("{} - {:?}", self, error); - Transition::Stay - } - } - } - fn handle_listener_started(&mut self, port: u16, outbox: &mut EventBox) -> Transition { if self.client_restriction() { error!("{} - A client must not run a crust listener.", self); @@ -364,11 +340,44 @@ impl Base for Bootstrapping { outbox.send_event(Event::Terminated); Transition::Terminate } + + fn handle_direct_message( + &mut self, + msg: DirectMessage, + pub_id: PublicId, + _: &mut EventBox, + ) -> Result { + use self::DirectMessage::*; + match msg { + BootstrapResponse(Ok(())) => Ok(Transition::IntoBootstrapped { + proxy_public_id: pub_id, + }), + BootstrapResponse(Err(error)) => { + info!("{} Connection failed: {}", self, error); + self.rebootstrap(); + Ok(Transition::Stay) + } + _ => { + debug!("{} - Unhandled direct message: {:?}", self, msg); + Ok(Transition::Stay) + } + } + } + + fn handle_hop_message( + &mut self, + msg: HopMessage, + _: PublicId, + _: &mut EventBox, + ) -> Result { + debug!("{} - Unhandled hop message: {:?}", self, msg); + Ok(Transition::Stay) + } } -impl Display for Bootstrapping { +impl Display for BootstrappingPeer { fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { - write!(formatter, "Bootstrapping({})", self.name()) + write!(formatter, "BootstrappingPeer({})", self.name()) } } @@ -423,7 +432,7 @@ mod tests { let pub_id = *full_id.public_id(); StateMachine::new( move |action_sender, crust_service, timer, _outbox2| { - Bootstrapping::new( + BootstrappingPeer::new( action_sender, Box::new(NullCache), TargetState::Client { @@ -434,7 +443,8 @@ mod tests { min_section_size, timer, ) - .map_or(State::Terminated, State::Bootstrapping) + .map(State::BootstrappingPeer) + .unwrap_or(State::Terminated) }, pub_id, Some(config), @@ -455,14 +465,14 @@ mod tests { // caused it to send a `BootstrapRequest` and add the Crust service to its // `bootstrap_blacklist`. match *state_machine.current() { - State::Bootstrapping(ref state) => assert!(state.bootstrap_blacklist.is_empty()), + State::BootstrappingPeer(ref state) => assert!(state.bootstrap_blacklist.is_empty()), _ => panic!("Should be in `Bootstrapping` state."), } network.deliver_messages(); unwrap!(state_machine.step(&mut outbox)); assert!(outbox.take_all().is_empty()); match *state_machine.current() { - State::Bootstrapping(ref state) => assert_eq!(state.bootstrap_blacklist.len(), 1), + State::BootstrappingPeer(ref state) => assert_eq!(state.bootstrap_blacklist.len(), 1), _ => panic!("Should be in `Bootstrapping` state."), } diff --git a/src/states/client.rs b/src/states/client.rs index 882aa626ac..a0023fbd0c 100644 --- a/src/states/client.rs +++ b/src/states/client.rs @@ -7,7 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. use super::common::{ - from_crust_bytes, unrelocated, Base, Bootstrapped, Unapproved, USER_MSG_CACHE_EXPIRY_DURATION, + proxied, Base, Bootstrapped, BootstrappedNotEstablished, USER_MSG_CACHE_EXPIRY_DURATION, }; use crate::{ ack_manager::{Ack, AckManager, UnacknowledgedMessage}, @@ -16,7 +16,7 @@ use crate::{ event::Event, id::{FullId, PublicId}, messages::{ - DirectMessage, HopMessage, Message, MessageContent, Request, RoutingMessage, UserMessage, + DirectMessage, HopMessage, MessageContent, Request, RoutingMessage, UserMessage, UserMessageCache, }, outbox::EventBox, @@ -26,7 +26,7 @@ use crate::{ time::{Duration, Instant}, timer::Timer, xor_name::XorName, - CrustBytes, Service, + Service, }; use std::{ collections::BTreeMap, @@ -36,6 +36,15 @@ use std::{ /// Duration to wait before sending rate limit exceeded messages. pub const RATE_EXCEED_RETRY: Duration = Duration::from_millis(800); +pub struct ClientDetails { + pub crust_service: Service, + pub full_id: FullId, + pub min_section_size: usize, + pub msg_expiry_dur: Duration, + pub proxy_pub_id: PublicId, + pub timer: Timer, +} + /// A node connecting a user to the network, as opposed to a routing / data storage node. /// /// Each client has a _proxy_: a node through which all requests are routed. @@ -53,27 +62,18 @@ pub struct Client { } impl Client { - #[allow(clippy::too_many_arguments)] - pub fn from_bootstrapping( - crust_service: Service, - full_id: FullId, - min_section_size: usize, - proxy_pub_id: PublicId, - timer: Timer, - msg_expiry_dur: Duration, - outbox: &mut EventBox, - ) -> Self { + pub fn from_bootstrapping(details: ClientDetails, outbox: &mut EventBox) -> Self { let client = Client { ack_mgr: AckManager::new(), - crust_service: crust_service, - full_id: full_id, - min_section_size: min_section_size, - proxy_pub_id: proxy_pub_id, + crust_service: details.crust_service, + full_id: details.full_id, + min_section_size: details.min_section_size, + proxy_pub_id: details.proxy_pub_id, routing_msg_filter: RoutingMessageFilter::new(), - timer: timer, + timer: details.timer, user_msg_cache: UserMessageCache::with_expiry_duration(USER_MSG_CACHE_EXPIRY_DURATION), resend_buf: Default::default(), - msg_expiry_dur: msg_expiry_dur, + msg_expiry_dur: details.msg_expiry_dur, }; debug!("{} State changed to Client.", client); @@ -87,43 +87,6 @@ impl Client { Transition::Stay } - fn handle_hop_message( - &mut self, - hop_msg: HopMessage, - pub_id: PublicId, - outbox: &mut EventBox, - ) -> Result { - if self.proxy_pub_id != pub_id { - return Err(RoutingError::UnknownConnection(pub_id)); - } - - if let Some(routing_msg) = self.filter_hop_message(hop_msg, pub_id)? { - Ok(self.dispatch_routing_message(routing_msg, outbox)) - } else { - Ok(Transition::Stay) - } - } - - fn handle_direct_message( - &mut self, - direct_msg: DirectMessage, - ) -> Result { - if let DirectMessage::ProxyRateLimitExceeded { ack } = direct_msg { - if let Some(unack_msg) = self.ack_mgr.remove(&ack) { - let token = self.timer().schedule(RATE_EXCEED_RETRY); - let _ = self.resend_buf.insert(token, unack_msg); - } else { - debug!( - "{} Got ProxyRateLimitExceeded, but no corresponding request found", - self - ); - } - } else { - debug!("{} Unhandled direct message: {:?}", self, direct_msg); - } - Ok(Transition::Stay) - } - fn dispatch_routing_message( &mut self, routing_msg: RoutingMessage, @@ -207,6 +170,10 @@ impl Base for Client { } } + fn min_section_size(&self) -> usize { + self.min_section_size + } + fn handle_client_send_request( &mut self, dst: Authority, @@ -256,28 +223,6 @@ impl Base for Client { Transition::Stay } - fn handle_new_message( - &mut self, - pub_id: PublicId, - bytes: CrustBytes, - outbox: &mut EventBox, - ) -> Transition { - let transition = match from_crust_bytes(bytes) { - Ok(Message::Hop(hop_msg)) => self.handle_hop_message(hop_msg, pub_id, outbox), - Ok(Message::Direct(direct_msg)) => self.handle_direct_message(direct_msg), - Err(error) => Err(error), - }; - - match transition { - Ok(transition) => transition, - Err(RoutingError::FilterCheckFailed) => Transition::Stay, - Err(error) => { - debug!("{} - {:?}", self, error); - Transition::Stay - } - } - } - fn handle_lost_peer(&mut self, pub_id: PublicId, outbox: &mut EventBox) -> Transition { debug!("{} Received LostPeer - {:?}", self, pub_id); @@ -290,8 +235,43 @@ impl Base for Client { } } - fn min_section_size(&self) -> usize { - self.min_section_size + fn handle_direct_message( + &mut self, + msg: DirectMessage, + _: PublicId, + _: &mut EventBox, + ) -> Result { + if let DirectMessage::ProxyRateLimitExceeded { ack } = msg { + if let Some(unack_msg) = self.ack_mgr.remove(&ack) { + let token = self.timer().schedule(RATE_EXCEED_RETRY); + let _ = self.resend_buf.insert(token, unack_msg); + } else { + debug!( + "{} Got ProxyRateLimitExceeded, but no corresponding request found", + self + ); + } + } else { + debug!("{} Unhandled direct message: {:?}", self, msg); + } + Ok(Transition::Stay) + } + + fn handle_hop_message( + &mut self, + hop_msg: HopMessage, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result { + if self.proxy_pub_id != pub_id { + return Err(RoutingError::UnknownConnection(pub_id)); + } + + if let Some(routing_msg) = self.filter_hop_message(hop_msg, pub_id)? { + Ok(self.dispatch_routing_message(routing_msg, outbox)) + } else { + Ok(Transition::Stay) + } } } @@ -349,11 +329,11 @@ impl Bootstrapped for Client { } } -impl Unapproved for Client { +impl BootstrappedNotEstablished for Client { const SEND_ACK: bool = false; fn get_proxy_public_id(&self, proxy_name: &XorName) -> Result<&PublicId, RoutingError> { - unrelocated::get_proxy_public_id(self, &self.proxy_pub_id, proxy_name) + proxied::get_proxy_public_id(self, &self.proxy_pub_id, proxy_name) } } diff --git a/src/states/common/approved.rs b/src/states/common/approved.rs new file mode 100644 index 0000000000..32162490f8 --- /dev/null +++ b/src/states/common/approved.rs @@ -0,0 +1,206 @@ +// Copyright 2019 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use super::Relocated; +use crate::{ + chain::{ + Chain, ExpectCandidatePayload, NetworkEvent, Proof, ProofSet, ProvingSection, SectionInfo, + }, + error::RoutingError, + id::PublicId, + outbox::EventBox, + parsec::{self, Block, Observation, ParsecMap}, + routing_table::{Authority, Prefix}, + state_machine::Transition, + xor_name::XorName, +}; +use maidsafe_utilities::serialisation; + +/// Common functionality for node states post resource proof. +pub trait Approved: Relocated { + fn parsec_map_mut(&mut self) -> &mut ParsecMap; + fn chain_mut(&mut self) -> &mut Chain; + + /// Handles an accumulated `Online` event. + fn handle_online_event( + &mut self, + new_pub_id: PublicId, + new_client_auth: Authority, + outbox: &mut EventBox, + ) -> Result<(), RoutingError>; + + /// Handles an accumulated `Offline` event. + fn handle_offline_event( + &mut self, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result<(), RoutingError>; + + /// Handles an accumulated `OurMerge` event. + fn handle_our_merge_event(&mut self) -> Result<(), RoutingError>; + + /// Handles an accumulated `NeighbourMerge` event. + fn handle_neighbour_merge_event(&mut self) -> Result<(), RoutingError>; + + /// Handles an accumulated `SectionInfo` event. + fn handle_section_info_event( + &mut self, + sec_info: SectionInfo, + old_pfx: Prefix, + outbox: &mut EventBox, + ) -> Result; + + // Handles an accumulated `ExpectCandidate` event. + // Context: a node is joining our section. Send the node our section. If the + // network is unbalanced, send `ExpectCandidate` on to a section with a shorter prefix. + fn handle_expect_candidate_event( + &mut self, + vote: ExpectCandidatePayload, + ) -> Result<(), RoutingError>; + + /// Handles an accumulated `ProvingSections` event. + fn handle_proving_sections_event( + &mut self, + proving_secs: Vec, + sec_info: SectionInfo, + ) -> Result<(), RoutingError>; + + fn handle_parsec_request( + &mut self, + msg_version: u64, + par_request: parsec::Request, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result { + let log_ident = format!("{}", self); + let (response, poll) = + self.parsec_map_mut() + .handle_request(msg_version, par_request, pub_id, &log_ident); + + if let Some(response) = response { + self.send_message(&pub_id, response); + } + + if poll { + self.parsec_poll(outbox) + } else { + Ok(Transition::Stay) + } + } + + fn handle_parsec_response( + &mut self, + msg_version: u64, + par_response: parsec::Response, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result { + let log_ident = format!("{}", self); + if self + .parsec_map_mut() + .handle_response(msg_version, par_response, pub_id, &log_ident) + { + self.parsec_poll(outbox) + } else { + Ok(Transition::Stay) + } + } + + fn parsec_poll(&mut self, outbox: &mut EventBox) -> Result { + while let Some(block) = self.parsec_map_mut().poll() { + match block.payload() { + Observation::Accusation { .. } => { + // FIXME: Handle properly + unreachable!("...") + } + Observation::Genesis(_) => { + // FIXME: Validate with Chain info. + continue; + } + Observation::OpaquePayload(event) => { + if let Some(proof) = block.proofs().iter().next().map(|p| Proof { + pub_id: *p.public_id(), + sig: *p.signature(), + }) { + trace!( + "{} Parsec OpaquePayload: {} - {:?}", + self, + proof.pub_id(), + event + ); + self.chain_mut().handle_opaque_event(event, proof)?; + } + } + Observation::Add { + peer_id, + related_info, + } => { + let event = + NetworkEvent::Online(*peer_id, serialisation::deserialise(&related_info)?); + let proof_set = to_proof_set(&block); + trace!("{} Parsec Add: - {}", self, peer_id); + self.chain_mut().handle_churn_event(&event, proof_set)?; + } + Observation::Remove { peer_id, .. } => { + let event = NetworkEvent::Offline(*peer_id); + let proof_set = to_proof_set(&block); + trace!("{} Parsec Remove: - {}", self, peer_id); + self.chain_mut().handle_churn_event(&event, proof_set)?; + } + } + + match self.chain_poll_all(outbox)? { + Transition::Stay => (), + transition => return Ok(transition), + } + } + + Ok(Transition::Stay) + } + + fn chain_poll_all(&mut self, outbox: &mut EventBox) -> Result { + let mut our_pfx = *self.chain_mut().our_prefix(); + while let Some(event) = self.chain_mut().poll()? { + trace!("{} Handle accumulated event: {:?}", self, event); + + match event { + NetworkEvent::Online(pub_id, client_auth) => { + self.handle_online_event(pub_id, client_auth, outbox)?; + } + NetworkEvent::Offline(pub_id) => { + self.handle_offline_event(pub_id, outbox)?; + } + NetworkEvent::OurMerge => self.handle_our_merge_event()?, + NetworkEvent::NeighbourMerge(_) => self.handle_neighbour_merge_event()?, + NetworkEvent::SectionInfo(sec_info) => { + match self.handle_section_info_event(sec_info, our_pfx, outbox)? { + Transition::Stay => (), + transition => return Ok(transition), + } + } + NetworkEvent::ExpectCandidate(vote) => self.handle_expect_candidate_event(vote)?, + NetworkEvent::ProvingSections(proving_secs, sec_info) => { + self.handle_proving_sections_event(proving_secs, sec_info)?; + } + } + + our_pfx = *self.chain_mut().our_prefix(); + } + + Ok(Transition::Stay) + } +} + +fn to_proof_set(block: &Block) -> ProofSet { + let sigs = block + .proofs() + .iter() + .map(|proof| (*proof.public_id(), *proof.signature())) + .collect(); + ProofSet { sigs } +} diff --git a/src/states/common/base.rs b/src/states/common/base.rs index 5fcf46fc06..e551613b4c 100644 --- a/src/states/common/base.rs +++ b/src/states/common/base.rs @@ -28,6 +28,20 @@ pub trait Base: Display { fn in_authority(&self, auth: &Authority) -> bool; fn min_section_size(&self) -> usize; + fn handle_direct_message( + &mut self, + msg: DirectMessage, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result; + + fn handle_hop_message( + &mut self, + msg: HopMessage, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result; + fn handle_action(&mut self, action: Action, outbox: &mut EventBox) -> Transition { match action { Action::ClientSendRequest { @@ -204,11 +218,26 @@ pub trait Base: Display { fn handle_new_message( &mut self, - _pub_id: PublicId, - _bytes: CrustBytes, - _outbox: &mut EventBox, + pub_id: PublicId, + bytes: CrustBytes, + outbox: &mut EventBox, ) -> Transition { - Transition::Stay + let result = match from_crust_bytes(bytes) { + Ok(Message::Hop(hop_msg)) => self.handle_hop_message(hop_msg, pub_id, outbox), + Ok(Message::Direct(direct_msg)) => { + self.handle_direct_message(direct_msg, pub_id, outbox) + } + Err(error) => Err(error), + }; + + match result { + Ok(transition) => transition, + Err(RoutingError::FilterCheckFailed) => Transition::Stay, + Err(err) => { + debug!("{} - {:?}", self, err); + Transition::Stay + } + } } fn finish_handle_crust_event(&mut self, _outbox: &mut EventBox) -> Transition { diff --git a/src/states/common/unapproved.rs b/src/states/common/bootstrapped_not_established.rs similarity index 89% rename from src/states/common/unapproved.rs rename to src/states/common/bootstrapped_not_established.rs index 35a9b8cd03..349e3fc159 100644 --- a/src/states/common/unapproved.rs +++ b/src/states/common/bootstrapped_not_established.rs @@ -19,7 +19,7 @@ use crate::{ }; use std::collections::BTreeSet; -pub trait Unapproved: Bootstrapped { +pub trait BootstrappedNotEstablished: Bootstrapped { // Whether acknowledge hop messages sent to us. const SEND_ACK: bool; @@ -73,11 +73,17 @@ pub trait Unapproved: Bootstrapped { // Get PublicId of the proxy node let proxy_pub_id = match routing_msg.src { Authority::Client { + ref client_id, ref proxy_node_name, - .. - } => *self.get_proxy_public_id(proxy_node_name)?, + } => { + if self.name() != client_id.name() { + return Ok(()); + } + + *self.get_proxy_public_id(proxy_node_name)? + } _ => { - error!("{} Source should be client if our state is Client", self); + error!("{} - Source should be client in this state", self); return Err(RoutingError::InvalidSource); } }; diff --git a/src/states/common/mod.rs b/src/states/common/mod.rs index d2dd5c0898..9e3373f957 100644 --- a/src/states/common/mod.rs +++ b/src/states/common/mod.rs @@ -6,15 +6,21 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. +mod approved; mod base; mod bootstrapped; +mod bootstrapped_not_established; +pub mod proxied; mod relocated; -mod unapproved; -pub mod unrelocated; +mod relocated_not_established; pub use self::{ - base::from_crust_bytes, base::Base, bootstrapped::Bootstrapped, relocated::Relocated, - unapproved::Unapproved, + approved::Approved, + base::{from_crust_bytes, Base}, + bootstrapped::Bootstrapped, + bootstrapped_not_established::BootstrappedNotEstablished, + relocated::Relocated, + relocated_not_established::RelocatedNotEstablished, }; use crate::time::Duration; diff --git a/src/states/common/unrelocated.rs b/src/states/common/proxied.rs similarity index 52% rename from src/states/common/unrelocated.rs rename to src/states/common/proxied.rs index 653fff3994..fadfbf74d3 100644 --- a/src/states/common/unrelocated.rs +++ b/src/states/common/proxied.rs @@ -6,7 +6,14 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::{error::RoutingError, id::PublicId, xor_name::XorName}; +//! Utilities for node states that are connected via proxy. + +use crate::{ + error::RoutingError, + id::PublicId, + peer_manager::{Peer, PeerManager}, + xor_name::XorName, +}; use std::fmt::Display; pub fn get_proxy_public_id<'a, T: Display>( @@ -21,3 +28,24 @@ pub fn get_proxy_public_id<'a, T: Display>( Err(RoutingError::ProxyConnectionNotFound) } } + +pub fn find_proxy_public_id<'a, T: Display>( + label: &T, + peer_mgr: &'a PeerManager, + proxy_name: &XorName, +) -> Result<&'a PublicId, RoutingError> { + if let Some(pub_id) = peer_mgr.get_peer_by_name(proxy_name).map(Peer::pub_id) { + if peer_mgr.is_connected(pub_id) { + Ok(pub_id) + } else { + error!( + "{} - Unable to find connection to proxy in PeerManager.", + label + ); + Err(RoutingError::ProxyConnectionNotFound) + } + } else { + error!("{} - Unable to find proxy in PeerManager.", label); + Err(RoutingError::ProxyConnectionNotFound) + } +} diff --git a/src/states/common/relocated.rs b/src/states/common/relocated.rs index 5ce61cf359..c17ce13f1f 100644 --- a/src/states/common/relocated.rs +++ b/src/states/common/relocated.rs @@ -8,11 +8,12 @@ use super::bootstrapped::Bootstrapped; use crate::{ + ack_manager::Ack, crust::CrustError, error::RoutingError, event::Event, id::PublicId, - messages::{MessageContent, SignedMessage}, + messages::MessageContent, outbox::EventBox, peer_manager::{ConnectionInfoPreparedResult, Peer, PeerManager, PeerState}, routing_table::Authority, @@ -23,14 +24,15 @@ use crate::{ }; use log::LogLevel; use safe_crypto::SharedSecretKey; -use std::collections::BTreeSet; /// Common functionality for node states post-relocation. pub trait Relocated: Bootstrapped { - fn peer_mgr(&mut self) -> &mut PeerManager; + fn peer_mgr(&self) -> &PeerManager; + fn peer_mgr_mut(&mut self) -> &mut PeerManager; fn process_connection(&mut self, pub_id: PublicId, outbox: &mut EventBox); fn is_peer_valid(&self, pub_id: &PublicId) -> bool; fn add_to_notified_nodes(&mut self, pub_id: PublicId) -> bool; + fn remove_from_notified_nodes(&mut self, pub_id: &PublicId) -> bool; fn add_to_routing_table_success(&mut self, pub_id: &PublicId); fn add_to_routing_table_failure(&mut self, pub_id: &PublicId); @@ -45,7 +47,10 @@ pub trait Relocated: Bootstrapped { "{} Failed to prepare connection info: {:?}. Retrying.", self, err ); - let new_token = match self.peer_mgr().get_new_connection_info_token(result_token) { + let new_token = match self + .peer_mgr_mut() + .get_new_connection_info_token(result_token) + { Err(error) => { debug!( "{} Failed to prepare connection info, but no entry found in \ @@ -64,7 +69,7 @@ pub trait Relocated: Bootstrapped { let our_pub_info = our_connection_info.to_pub_connection_info(); match self - .peer_mgr() + .peer_mgr_mut() .connection_info_prepared(result_token, our_connection_info) { Err(error) => { @@ -131,7 +136,7 @@ pub trait Relocated: Bootstrapped { } use crate::peer_manager::ConnectionInfoReceivedResult::*; - match self.peer_mgr().connection_info_received( + match self.peer_mgr_mut().connection_info_received( src, dst, their_connection_info, @@ -207,7 +212,7 @@ pub trait Relocated: Bootstrapped { } use crate::peer_manager::ConnectionInfoReceivedResult::*; - match self.peer_mgr().connection_info_received( + match self.peer_mgr_mut().connection_info_received( Authority::ManagedNode(src), dst, their_connection_info, @@ -255,13 +260,24 @@ pub trait Relocated: Bootstrapped { return Transition::Stay; } - self.peer_mgr().connected_to(&pub_id); + self.peer_mgr_mut().connected_to(&pub_id); debug!("{} Received ConnectSuccess from {}.", self, pub_id); self.process_connection(pub_id, outbox); Transition::Stay } + fn handle_ack_response(&mut self, ack: Ack) { + self.ack_mgr_mut().receive(ack) + } + + fn log_connect_failure(&mut self, pub_id: &PublicId) { + if let Some(&PeerState::CrustConnecting) = self.peer_mgr().get_peer(pub_id).map(Peer::state) + { + debug!("{} Failed to connect to peer {:?}.", self, pub_id); + } + } + fn decrypt_connection_info( &self, encrypted_connection_info: &[u8], @@ -308,7 +324,7 @@ pub trait Relocated: Bootstrapped { // This will insert the peer if peer is not in peer_mgr and flag them to `valid` if let Some(token) = self - .peer_mgr() + .peer_mgr_mut() .get_connection_token(src, dst, their_public_id) { self.crust_service().prepare_connection_info(token); @@ -403,37 +419,12 @@ pub trait Relocated: Bootstrapped { self, pub_id ); let _ = self.crust_service().disconnect(pub_id); - let _ = self.peer_mgr().remove_peer(pub_id); + let _ = self.peer_mgr_mut().remove_peer(pub_id); } } - // Filter, then convert the message to a `Hop` and serialise. - // Send this byte string. - fn send_signed_message_to_peer( - &mut self, - signed_msg: SignedMessage, - target: &PublicId, - route: u8, - sent_to: BTreeSet, - ) -> Result<(), RoutingError> { - if !self.crust_service().is_connected(target) { - trace!("{} Not connected to {:?}. Dropping peer.", self, target); - self.disconnect_peer(target); - return Ok(()); - } - - if self.filter_outgoing_routing_msg(signed_msg.routing_message(), target, route) { - return Ok(()); - } - - let priority = signed_msg.priority(); - let bytes = self.to_hop_bytes(signed_msg, route, sent_to)?; - self.send_or_drop(target, bytes, priority); - Ok(()) - } - fn add_to_routing_table(&mut self, pub_id: &PublicId, outbox: &mut EventBox) { - match self.peer_mgr().add_to_routing_table(pub_id) { + match self.peer_mgr_mut().add_to_routing_table(pub_id) { Err(error) => { debug!("{} Peer {:?} was not updated: {:?}", self, pub_id, error); self.add_to_routing_table_failure(pub_id); diff --git a/src/states/common/relocated_not_established.rs b/src/states/common/relocated_not_established.rs new file mode 100644 index 0000000000..9e33ab92db --- /dev/null +++ b/src/states/common/relocated_not_established.rs @@ -0,0 +1,153 @@ +// Copyright 2019 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use super::Relocated; +use crate::{ + error::{BootstrapResponseError, RoutingError}, + id::PublicId, + messages::{DirectMessage, RoutingMessage}, + outbox::EventBox, + peer_manager::{Peer, PeerState}, + routing_table::Prefix, + state_machine::Transition, + xor_name::XorName, +}; + +pub trait RelocatedNotEstablished: Relocated { + fn our_prefix(&self) -> &Prefix; + fn push_message_to_backlog(&mut self, msg: RoutingMessage); + + fn check_direct_message_sender( + &self, + msg: &DirectMessage, + pub_id: &PublicId, + ) -> Result<(), RoutingError> { + match self.peer_mgr().get_peer(pub_id).map(Peer::state) { + Some(&PeerState::Connected) | Some(&PeerState::Proxy) => Ok(()), + _ => { + debug!( + "{} Illegitimate direct message {:?} from {:?}.", + self, msg, pub_id + ); + Err(RoutingError::InvalidStateForOperation) + } + } + } + + fn handle_routing_message( + &mut self, + msg: RoutingMessage, + outbox: &mut EventBox, + ) -> Result<(), RoutingError> { + use crate::{messages::MessageContent::*, routing_table::Authority::*}; + + let src_name = msg.src.name(); + + match msg { + RoutingMessage { + content: + ConnectionInfoRequest { + encrypted_conn_info, + pub_id, + msg_id, + }, + src: ManagedNode(_), + dst: ManagedNode(_), + } => { + if self.our_prefix().matches(&src_name) { + self.handle_connection_info_request( + encrypted_conn_info, + pub_id, + msg_id, + msg.src, + msg.dst, + outbox, + ) + } else { + self.add_message_to_backlog(RoutingMessage { + content: ConnectionInfoRequest { + encrypted_conn_info, + pub_id, + msg_id, + }, + ..msg + }); + Ok(()) + } + } + RoutingMessage { + content: + ConnectionInfoResponse { + encrypted_conn_info, + pub_id, + msg_id, + }, + src: ManagedNode(src_name), + dst: Client { .. }, + } => self.handle_connection_info_response( + encrypted_conn_info, + pub_id, + msg_id, + src_name, + msg.dst, + ), + RoutingMessage { + content: Ack(ack, _), + .. + } => { + self.handle_ack_response(ack); + Ok(()) + } + _ => { + self.add_message_to_backlog(msg); + Ok(()) + } + } + } + + // Backlog the message to be processed once we are established. + fn add_message_to_backlog(&mut self, msg: RoutingMessage) { + trace!( + "{} Not established yet. Delaying message handling: {:?}", + self, + msg + ); + self.push_message_to_backlog(msg); + } + + fn handle_connect_failure(&mut self, pub_id: PublicId) -> Transition { + self.log_connect_failure(&pub_id); + let _ = self.dropped_peer(&pub_id); + Transition::Stay + } + + fn handle_bootstrap_request(&mut self, pub_id: PublicId) { + debug!( + "{} - Client {:?} rejected: We are not an established node yet.", + self, pub_id + ); + self.send_direct_message( + pub_id, + DirectMessage::BootstrapResponse(Err(BootstrapResponseError::NotApproved)), + ); + self.disconnect_peer(&pub_id); + } + + fn dropped_peer(&mut self, pub_id: &PublicId) -> bool { + let was_proxy = self.peer_mgr().is_proxy(pub_id); + let _ = self.peer_mgr_mut().remove_peer(pub_id); + let _ = self.remove_from_notified_nodes(pub_id); + + if was_proxy { + debug!("{} Lost connection to proxy {}.", self, pub_id); + false + } else { + true + } + } +} diff --git a/src/states/establishing_node.rs b/src/states/establishing_node.rs new file mode 100644 index 0000000000..95f71042d2 --- /dev/null +++ b/src/states/establishing_node.rs @@ -0,0 +1,435 @@ +// Copyright 2019 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use super::{ + common::{ + proxied, Approved, Base, Bootstrapped, BootstrappedNotEstablished, Relocated, + RelocatedNotEstablished, + }, + node::{Node, NodeDetails}, +}; +use crate::{ + ack_manager::AckManager, + cache::Cache, + chain::{Chain, ExpectCandidatePayload, GenesisPfxInfo, ProvingSection, SectionInfo}, + error::RoutingError, + id::{FullId, PublicId}, + messages::{DirectMessage, HopMessage, Message, RoutingMessage}, + outbox::EventBox, + parsec::ParsecMap, + peer_manager::{Peer, PeerManager, PeerState}, + routing_message_filter::RoutingMessageFilter, + routing_table::{Authority, Prefix}, + state_machine::{State, Transition}, + time::{Duration, Instant}, + timer::Timer, + xor_name::XorName, + Service, +}; +use itertools::Itertools; +use std::{ + collections::BTreeSet, + fmt::{self, Display, Formatter}, +}; + +const POKE_TIMEOUT: Duration = Duration::from_secs(60); + +pub struct EstablishingNodeDetails { + pub ack_mgr: AckManager, + pub cache: Box, + pub crust_service: Service, + pub full_id: FullId, + pub gen_pfx_info: GenesisPfxInfo, + pub min_section_size: usize, + pub msg_backlog: Vec, + pub notified_nodes: BTreeSet, + pub peer_mgr: PeerManager, + pub routing_msg_filter: RoutingMessageFilter, + pub timer: Timer, +} + +pub struct EstablishingNode { + ack_mgr: AckManager, + cache: Box, + chain: Chain, + crust_service: Service, + full_id: FullId, + gen_pfx_info: GenesisPfxInfo, + /// Routing messages addressed to us that we cannot handle until we are established. + msg_backlog: Vec, + notified_nodes: BTreeSet, + parsec_map: ParsecMap, + peer_mgr: PeerManager, + poke_timer_token: u64, + routing_msg_filter: RoutingMessageFilter, + timer: Timer, +} + +impl EstablishingNode { + pub fn from_proving_node( + details: EstablishingNodeDetails, + outbox: &mut EventBox, + ) -> Result { + let public_id = *details.full_id.public_id(); + let poke_timer_token = details.timer.schedule(POKE_TIMEOUT); + + let parsec_map = ParsecMap::new(details.full_id.clone(), &details.gen_pfx_info); + let chain = Chain::new( + details.min_section_size, + public_id, + details.gen_pfx_info.clone(), + ); + + let mut node = Self { + ack_mgr: details.ack_mgr, + cache: details.cache, + chain, + crust_service: details.crust_service, + full_id: details.full_id, + gen_pfx_info: details.gen_pfx_info, + msg_backlog: details.msg_backlog, + notified_nodes: details.notified_nodes, + parsec_map, + peer_mgr: details.peer_mgr, + routing_msg_filter: details.routing_msg_filter, + timer: details.timer, + poke_timer_token, + }; + + node.init(outbox)?; + Ok(node) + } + + fn init(&mut self, outbox: &mut EventBox) -> Result<(), RoutingError> { + debug!("{} - State changed to EstablishingNode.", self); + + for msg in self.msg_backlog.drain(..).collect_vec() { + let _ = self.dispatch_routing_message(msg, outbox)?; + } + + Ok(()) + } + + pub fn into_node( + self, + sec_info: SectionInfo, + old_pfx: Prefix, + outbox: &mut EventBox, + ) -> Result { + let details = NodeDetails { + ack_mgr: self.ack_mgr, + cache: self.cache, + chain: self.chain, + crust_service: self.crust_service, + full_id: self.full_id, + gen_pfx_info: self.gen_pfx_info, + msg_queue: self.msg_backlog.into_iter().collect(), + notified_nodes: self.notified_nodes, + parsec_map: self.parsec_map, + peer_mgr: self.peer_mgr, + routing_msg_filter: self.routing_msg_filter, + timer: self.timer, + }; + + Node::from_establishing_node(details, sec_info, old_pfx, outbox).map(State::Node) + } + + fn dispatch_routing_message( + &mut self, + msg: RoutingMessage, + outbox: &mut EventBox, + ) -> Result { + self.handle_routing_message(msg, outbox) + .map(|()| Transition::Stay) + } + + // Sends a `ParsecPoke` message to trigger a gossip request from current section members to us. + // + // TODO: Should restrict targets to few(counter churn-threshold)/single. + // Currently this can result in incoming spam of gossip history from everyone. + // Can also just be a single target once node-ageing makes Offline votes Opaque which should + // remove invalid test failures for unaccumulated parsec::Remove blocks. + fn send_parsec_poke(&mut self) { + let version = *self.gen_pfx_info.first_info.version(); + let recipients = self + .gen_pfx_info + .latest_info + .members() + .iter() + .cloned() + .collect_vec(); + + for recipient in recipients { + self.send_message( + &recipient, + Message::Direct(DirectMessage::ParsecPoke(version)), + ); + } + } +} + +#[cfg(feature = "mock_base")] +impl EstablishingNode { + pub fn chain(&self) -> &Chain { + &self.chain + } + + pub fn get_timed_out_tokens(&mut self) -> Vec { + self.timer.get_timed_out_tokens() + } + + pub fn has_unpolled_observations(&self, filter_opaque: bool) -> bool { + self.parsec_map.has_unpolled_observations(filter_opaque) + } +} + +impl Base for EstablishingNode { + fn crust_service(&self) -> &Service { + &self.crust_service + } + + fn full_id(&self) -> &FullId { + &self.full_id + } + + fn in_authority(&self, auth: &Authority) -> bool { + if let Authority::Client { ref client_id, .. } = *auth { + client_id == self.full_id.public_id() + } else { + false + } + } + + fn min_section_size(&self) -> usize { + self.chain.min_sec_size() + } + + fn handle_timeout(&mut self, token: u64, _: &mut EventBox) -> Transition { + if self.poke_timer_token == token { + self.send_parsec_poke(); + self.poke_timer_token = self.timer.schedule(POKE_TIMEOUT); + } else { + self.resend_unacknowledged_timed_out_msgs(token); + } + + Transition::Stay + } + + fn handle_connect_success(&mut self, pub_id: PublicId, outbox: &mut EventBox) -> Transition { + Relocated::handle_connect_success(self, pub_id, outbox) + } + + fn handle_connect_failure(&mut self, pub_id: PublicId, _: &mut EventBox) -> Transition { + RelocatedNotEstablished::handle_connect_failure(self, pub_id) + } + + fn handle_direct_message( + &mut self, + msg: DirectMessage, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result { + self.check_direct_message_sender(&msg, &pub_id)?; + + use crate::messages::DirectMessage::*; + match msg { + ParsecRequest(version, par_request) => { + self.handle_parsec_request(version, par_request, pub_id, outbox) + } + ParsecResponse(version, par_response) => { + self.handle_parsec_response(version, par_response, pub_id, outbox) + } + BootstrapRequest(_) => { + self.handle_bootstrap_request(pub_id); + Ok(Transition::Stay) + } + _ => { + debug!("{} Unhandled direct message: {:?}", self, msg); + Ok(Transition::Stay) + } + } + } + + fn handle_hop_message( + &mut self, + hop_msg: HopMessage, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result { + match self.peer_mgr.get_peer(&pub_id).map(Peer::state) { + Some(&PeerState::Connected) | Some(&PeerState::Proxy) => (), + _ => return Err(RoutingError::UnknownConnection(pub_id)), + } + + if let Some(routing_msg) = self.filter_hop_message(hop_msg, pub_id)? { + self.dispatch_routing_message(routing_msg, outbox) + } else { + Ok(Transition::Stay) + } + } +} + +impl Bootstrapped for EstablishingNode { + fn ack_mgr(&self) -> &AckManager { + &self.ack_mgr + } + + fn ack_mgr_mut(&mut self) -> &mut AckManager { + &mut self.ack_mgr + } + + fn routing_msg_filter(&mut self) -> &mut RoutingMessageFilter { + &mut self.routing_msg_filter + } + + fn timer(&mut self) -> &mut Timer { + &mut self.timer + } + + fn send_routing_message_via_route( + &mut self, + routing_msg: RoutingMessage, + src_section: Option, + route: u8, + expires_at: Option, + ) -> Result<(), RoutingError> { + self.send_routing_message_via_proxy(routing_msg, src_section, route, expires_at) + } +} + +impl Relocated for EstablishingNode { + fn peer_mgr(&self) -> &PeerManager { + &self.peer_mgr + } + + fn peer_mgr_mut(&mut self) -> &mut PeerManager { + &mut self.peer_mgr + } + + fn process_connection(&mut self, pub_id: PublicId, outbox: &mut EventBox) { + if self.chain.is_peer_valid(&pub_id) { + self.add_to_routing_table(&pub_id, outbox); + } + } + + fn is_peer_valid(&self, _pub_id: &PublicId) -> bool { + true + } + + fn add_to_notified_nodes(&mut self, pub_id: PublicId) -> bool { + self.notified_nodes.insert(pub_id) + } + + fn remove_from_notified_nodes(&mut self, pub_id: &PublicId) -> bool { + self.notified_nodes.remove(pub_id) + } + + fn add_to_routing_table_success(&mut self, _: &PublicId) {} + + fn add_to_routing_table_failure(&mut self, pub_id: &PublicId) { + self.disconnect_peer(pub_id) + } +} + +impl BootstrappedNotEstablished for EstablishingNode { + const SEND_ACK: bool = true; + + fn get_proxy_public_id(&self, proxy_name: &XorName) -> Result<&PublicId, RoutingError> { + proxied::find_proxy_public_id(self, &self.peer_mgr, proxy_name) + } +} + +impl RelocatedNotEstablished for EstablishingNode { + fn our_prefix(&self) -> &Prefix { + self.chain.our_prefix() + } + + fn push_message_to_backlog(&mut self, msg: RoutingMessage) { + self.msg_backlog.push(msg) + } +} + +impl Approved for EstablishingNode { + fn parsec_map_mut(&mut self) -> &mut ParsecMap { + &mut self.parsec_map + } + + fn chain_mut(&mut self) -> &mut Chain { + &mut self.chain + } + + fn handle_online_event( + &mut self, + new_pub_id: PublicId, + _: Authority, + _: &mut EventBox, + ) -> Result<(), RoutingError> { + let _ = self.chain.add_member(new_pub_id)?; + Ok(()) + } + + fn handle_offline_event( + &mut self, + pub_id: PublicId, + _: &mut EventBox, + ) -> Result<(), RoutingError> { + let _ = self.chain.remove_member(pub_id)?; + Ok(()) + } + + fn handle_expect_candidate_event( + &mut self, + _: ExpectCandidatePayload, + ) -> Result<(), RoutingError> { + Ok(()) + } + + fn handle_section_info_event( + &mut self, + sec_info: SectionInfo, + old_pfx: Prefix, + _: &mut EventBox, + ) -> Result { + if self.chain.is_member() { + Ok(Transition::IntoNode { sec_info, old_pfx }) + } else { + debug!("{} - Unhandled SectionInfo event", self); + Ok(Transition::Stay) + } + } + + fn handle_our_merge_event(&mut self) -> Result<(), RoutingError> { + debug!("{} - Unhandled OurMerge event", self); + Ok(()) + } + + fn handle_neighbour_merge_event(&mut self) -> Result<(), RoutingError> { + debug!("{} - Unhandled NeighbourMerge event", self); + Ok(()) + } + + fn handle_proving_sections_event( + &mut self, + _: Vec, + _: SectionInfo, + ) -> Result<(), RoutingError> { + debug!("{} - Unhandled ProvingSections event", self); + Ok(()) + } +} + +impl Display for EstablishingNode { + fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { + write!( + formatter, + "EstablishingNode({}({:b}))", + self.name(), + self.chain.our_prefix() + ) + } +} diff --git a/src/states/mod.rs b/src/states/mod.rs index ac48612393..d2dcece947 100644 --- a/src/states/mod.rs +++ b/src/states/mod.rs @@ -6,16 +6,18 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -mod bootstrapping; +mod bootstrapping_peer; mod client; pub mod common; +mod establishing_node; mod node; mod proving_node; mod relocating_node; pub use self::{ - bootstrapping::{Bootstrapping, TargetState as BootstrappingTargetState}, + bootstrapping_peer::{BootstrappingPeer, TargetState}, client::{Client, RATE_EXCEED_RETRY}, + establishing_node::EstablishingNode, node::Node, proving_node::ProvingNode, relocating_node::RelocatingNode, @@ -38,8 +40,30 @@ pub use self::{ // │ └────────────────┘ └─────────────┘ // │ │ // │ │ +// │ ▼ +// │ ┌──────────────────┐ +// │ │ EstablishingNode │ +// │ └──────────────────┘ +// │ │ +// │ │ // ▼ ▼ // ┌────────┐ ┌──────┐ // │ Client │ │ Node │ // └────────┘ └──────┘ // +// +// # Common traits +// Bootstrapping +// │ Client +// │ │ RelocatingNode +// │ │ │ ProvingNode +// │ │ │ │ EstablishingNode +// │ │ │ │ │ Node +// │ │ │ │ │ │ +// Base * * * * * * +// Bootstrapped * * * * * +// BootstrappedNotEstablished * * * * +// Relocated * * * +// RelocatedNotEstablished * * +// Approved * * +// diff --git a/src/states/node.rs b/src/states/node.rs index d3c1aeafe4..94dc35a7e0 100644 --- a/src/states/node.rs +++ b/src/states/node.rs @@ -6,9 +6,7 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use super::common::{ - from_crust_bytes, Base, Bootstrapped, Relocated, USER_MSG_CACHE_EXPIRY_DURATION, -}; +use super::common::{Approved, Base, Bootstrapped, Relocated, USER_MSG_CACHE_EXPIRY_DURATION}; use crate::{ ack_manager::{Ack, AckManager}, cache::Cache, @@ -22,8 +20,8 @@ use crate::{ event::Event, id::{FullId, PublicId}, messages::{ - DirectMessage, HopMessage, Message, MessageContent, RoutingMessage, SignedMessage, - UserMessage, UserMessageCache, DEFAULT_PRIORITY, MAX_PARTS, MAX_PART_LEN, + DirectMessage, HopMessage, MessageContent, RoutingMessage, SignedMessage, UserMessage, + UserMessageCache, DEFAULT_PRIORITY, MAX_PARTS, MAX_PART_LEN, }, outbox::EventBox, parsec::{self, ParsecMap}, @@ -41,7 +39,7 @@ use crate::{ types::MessageId, utils::{self, DisplayDuration}, xor_name::XorName, - CrustBytes, Service, + Service, }; use itertools::Itertools; use log::LogLevel; @@ -55,15 +53,13 @@ use std::{ cmp, collections::{BTreeSet, VecDeque}, fmt::{self, Display, Formatter}, - iter, mem, + iter, net::{IpAddr, SocketAddr}, }; /// Time after which a `Ticked` event is sent. const TICK_TIMEOUT: Duration = Duration::from_secs(15); -const POKE_TIMEOUT: Duration = Duration::from_secs(60); const GOSSIP_TIMEOUT: Duration = Duration::from_secs(2); -const RECONNECT_PEER_TIMEOUT: Duration = Duration::from_secs(20); //const MAX_IDLE_ROUNDS: u64 = 100; //const TICK_TIMEOUT_SECS: u64 = 60; /// The number of required leading zero bits for the resource proof @@ -77,6 +73,21 @@ const CLIENT_BAN_DURATION: Duration = Duration::from_secs(2 * 60 * 60); /// Duration for which clients' IDs we disconnected from are retained. const DROPPED_CLIENT_TIMEOUT: Duration = Duration::from_secs(2 * 60 * 60); +pub struct NodeDetails { + pub ack_mgr: AckManager, + pub cache: Box, + pub chain: Chain, + pub crust_service: Service, + pub full_id: FullId, + pub gen_pfx_info: GenesisPfxInfo, + pub msg_queue: VecDeque, + pub notified_nodes: BTreeSet, + pub parsec_map: ParsecMap, + pub peer_mgr: PeerManager, + pub routing_msg_filter: RoutingMessageFilter, + pub timer: Timer, +} + pub struct Node { ack_mgr: AckManager, cacheable_user_msg_cache: UserMessageCache, @@ -98,12 +109,10 @@ pub struct Node { next_relocation_dst: Option, /// Interval used for relocation in mock crust tests. next_relocation_interval: Option<(XorName, XorName)>, - /// `RoutingMessage`s affecting the routing table that arrived before `NodeApproval`. - routing_msg_backlog: Vec, /// The timer token for accepting a new candidate. candidate_timer_token: Option, /// The timer token for displaying the current candidate status. - candidate_status_token: Option, + candidate_status_token: u64, /// Limits the rate at which clients can pass messages through this node when it acts as their /// proxy. clients_rate_limiter: RateLimiter, @@ -120,12 +129,8 @@ pub struct Node { disable_resource_proof: bool, parsec_map: ParsecMap, gen_pfx_info: GenesisPfxInfo, - poke_timer_token: u64, gossip_timer_token: u64, chain: Chain, - // Peers we want to try reconnecting to - reconnect_peers: Vec, - reconnect_peers_token: u64, // TODO: notify without local state notified_nodes: BTreeSet, } @@ -137,131 +142,97 @@ impl Node { full_id: FullId, min_section_size: usize, timer: Timer, - ) -> Option { + ) -> Result { let dev_config = config_handler::get_config().dev.unwrap_or_default(); let public_id = *full_id.public_id(); let gen_pfx_info = GenesisPfxInfo { - first_info: create_first_section_info(public_id).ok()?, + first_info: create_first_section_info(public_id)?, latest_info: SectionInfo::default(), }; + let parsec_map = ParsecMap::new(full_id.clone(), &gen_pfx_info); + let chain = Chain::new(min_section_size, public_id, gen_pfx_info.clone()); + let peer_mgr = PeerManager::new(public_id, dev_config.disable_client_rate_limiter); - let mut node = Self::new( - AckManager::new(), + let details = NodeDetails { + ack_mgr: AckManager::new(), cache, + chain, crust_service, full_id, gen_pfx_info, - true, - min_section_size, - Default::default(), - Default::default(), - PeerManager::new(public_id, dev_config.disable_client_rate_limiter), - RoutingMessageFilter::new(), + msg_queue: VecDeque::new(), + notified_nodes: BTreeSet::new(), + parsec_map, + peer_mgr, + routing_msg_filter: RoutingMessageFilter::new(), timer, - ); + }; - if let Err(error) = node.crust_service.start_listening_tcp() { - error!("{} Failed to start listening: {:?}", node, error); - None - } else { - debug!("{} State changed to Node.", node); - info!("{} Started a new network as a seed node.", node); - Some(node) + let mut node = Self::new(details, true); + + match node.crust_service.start_listening_tcp() { + Ok(()) => { + debug!("{} - State changed to Node.", node); + info!("{} - Started a new network as a seed node.", node); + Ok(node) + } + Err(error) => { + error!("{} - Failed to start listening: {:?}", node, error); + Err(error.into()) + } } } - #[allow(clippy::too_many_arguments)] - pub fn from_proving_node( - ack_mgr: AckManager, - cache: Box, - crust_service: Service, - full_id: FullId, - gen_pfx_info: GenesisPfxInfo, - min_section_size: usize, - msg_queue: VecDeque, - notified_nodes: BTreeSet, - peer_mgr: PeerManager, - routing_msg_filter: RoutingMessageFilter, - timer: Timer, - ) -> Self { - let node = Self::new( - ack_mgr, - cache, - crust_service, - full_id, - gen_pfx_info, - false, - min_section_size, - msg_queue, - notified_nodes, - peer_mgr, - routing_msg_filter, - timer, - ); - - debug!("{} State changed to Node.", node); - node + pub fn from_establishing_node( + details: NodeDetails, + sec_info: SectionInfo, + old_pfx: Prefix, + outbox: &mut EventBox, + ) -> Result { + let mut node = Self::new(details, false); + node.init(sec_info, old_pfx, outbox)?; + Ok(node) } - #[allow(clippy::too_many_arguments)] - fn new( - ack_mgr: AckManager, - cache: Box, - crust_service: Service, - full_id: FullId, - gen_pfx_info: GenesisPfxInfo, - is_first_node: bool, - min_section_size: usize, - msg_queue: VecDeque, - notified_nodes: BTreeSet, - peer_mgr: PeerManager, - routing_msg_filter: RoutingMessageFilter, - timer: Timer, - ) -> Self { + fn new(details: NodeDetails, is_first_node: bool) -> Self { let dev_config = config_handler::get_config().dev.unwrap_or_default(); - let public_id = *full_id.public_id(); - + let timer = details.timer; let tick_timer_token = timer.schedule(TICK_TIMEOUT); - let poke_timer_token = timer.schedule(POKE_TIMEOUT); let gossip_timer_token = timer.schedule(GOSSIP_TIMEOUT); - let reconnect_peers_token = timer.schedule(RECONNECT_PEER_TIMEOUT); + let candidate_status_token = timer.schedule(CANDIDATE_STATUS_INTERVAL); Self { - ack_mgr, + ack_mgr: details.ack_mgr, cacheable_user_msg_cache: UserMessageCache::with_expiry_duration( USER_MSG_CACHE_EXPIRY_DURATION, ), - crust_service: crust_service, - full_id: full_id.clone(), + crust_service: details.crust_service, + full_id: details.full_id.clone(), is_first_node, - msg_queue, - peer_mgr, - response_cache: cache, - routing_msg_filter, + msg_queue: details.msg_queue, + peer_mgr: details.peer_mgr, + response_cache: details.cache, + routing_msg_filter: details.routing_msg_filter, sig_accumulator: Default::default(), tick_timer_token: tick_timer_token, timer: timer, user_msg_cache: UserMessageCache::with_expiry_duration(USER_MSG_CACHE_EXPIRY_DURATION), next_relocation_dst: None, next_relocation_interval: None, - routing_msg_backlog: vec![], candidate_timer_token: None, - candidate_status_token: None, + candidate_status_token, clients_rate_limiter: RateLimiter::new(dev_config.disable_client_rate_limiter), banned_client_ips: LruCache::with_expiry_duration(CLIENT_BAN_DURATION), dropped_clients: LruCache::with_expiry_duration(DROPPED_CLIENT_TIMEOUT), proxy_load_amount: 0, disable_resource_proof: dev_config.disable_resource_proof, - parsec_map: ParsecMap::new(full_id, &gen_pfx_info), - gen_pfx_info: gen_pfx_info.clone(), - poke_timer_token, + parsec_map: details.parsec_map, + gen_pfx_info: details.gen_pfx_info, gossip_timer_token, - chain: Chain::new(min_section_size, public_id, gen_pfx_info), - reconnect_peers: Default::default(), - reconnect_peers_token, - notified_nodes, + chain: details.chain, + notified_nodes: details.notified_nodes, } } @@ -273,7 +244,7 @@ impl Node { self, self.chain.valid_peers().len() ); - let network_estimate = match self.chain().network_size_estimate() { + let network_estimate = match self.chain.network_size_estimate() { (n, true) => format!("Exact network size: {}", n), (n, false) => format!("Estimated network size: {}", n), }; @@ -286,19 +257,57 @@ impl Node { } } + // Initialise regular node + fn init( + &mut self, + sec_info: SectionInfo, + old_pfx: Prefix, + outbox: &mut EventBox, + ) -> Result<(), RoutingError> { + debug!("{} - State changed to Node.", self); + trace!( + "{} - Node Established. Prefixes: {:?}", + self, + self.chain.prefixes() + ); + + // We have just become established. Now we can supply our votes for all latest neighbour + // infos that have accumulated so far. + let neighbour_info_events = self + .chain + .neighbour_infos() + .map(|info| info.clone().into_network_event()) + .collect_vec(); + + neighbour_info_events.into_iter().for_each(|event| { + self.vote_for_event(event); + }); + + outbox.send_event(Event::Connected); + + // Handle the SectionInfo event which triggered us becoming established node. + let _ = self.handle_section_info_event(sec_info, old_pfx, outbox)?; + + // Allow other peers to bootstrap via us. + if let Err(err) = self.crust_service.set_accept_bootstrap(true) { + warn!("{} Unable to accept bootstrap connections. {:?}", self, err); + } + self.crust_service.set_service_discovery_listen(true); + + Ok(()) + } + // Initialises the first node of the network fn init_first_node(&mut self, outbox: &mut EventBox) -> Result<(), RoutingError> { outbox.send_event(Event::Connected); - self.peer_mgr.set_established(); self.crust_service.set_accept_bootstrap(true)?; self.crust_service.set_service_discovery_listen(true); Ok(()) } - /// Routing table of this node. - #[allow(unused)] + #[cfg(feature = "mock_base")] pub fn chain(&self) -> &Chain { &self.chain } @@ -313,263 +322,10 @@ impl Node { } } - // Deconstruct a `DirectMessage` and handle or forward as appropriate. - fn handle_direct_message( - &mut self, - direct_message: DirectMessage, - pub_id: PublicId, - outbox: &mut EventBox, - ) -> Result<(), RoutingError> { - use crate::messages::DirectMessage::*; - if let Err(error) = self.check_direct_message_sender(&direct_message, &pub_id) { - match error { - RoutingError::ClientConnectionNotFound => (), - _ => self.ban_and_disconnect_peer(&pub_id), - } - return Err(error); - } - - match direct_message { - MessageSignature(digest, sig) => self.handle_message_signature(digest, sig, pub_id)?, - BootstrapRequest(signature) => { - if let Err(error) = self.handle_bootstrap_request(pub_id, signature) { - warn!( - "{} Invalid BootstrapRequest received ({:?}), dropping {}.", - self, error, pub_id - ); - self.ban_and_disconnect_peer(&pub_id); - } - } - CandidateInfo { - ref old_public_id, - ref new_public_id, - ref signature_using_old, - ref signature_using_new, - ref new_client_auth, - } => { - if *new_public_id != pub_id { - error!( - "{} CandidateInfo(new_public_id: {}) does not match crust id {}.", - self, new_public_id, pub_id - ); - self.disconnect_peer(&pub_id); - return Err(RoutingError::InvalidSource); - } - self.handle_candidate_info( - old_public_id, - &pub_id, - signature_using_old, - signature_using_new, - new_client_auth, - outbox, - ); - } - ResourceProofResponse { - part_index, - part_count, - proof, - leading_zero_bytes, - } => { - self.handle_resource_proof_response( - pub_id, - part_index, - part_count, - proof, - leading_zero_bytes, - ); - } - ParsecPoke(version) => self.handle_parsec_poke(version, pub_id), - ParsecRequest(version, par_request) => { - self.handle_parsec_request(version, par_request, pub_id, outbox)?; - } - ParsecResponse(version, par_response) => { - self.handle_parsec_response(version, par_response, pub_id, outbox)?; - } - BootstrapResponse(_) - | ProxyRateLimitExceeded { .. } - | ResourceProof { .. } - | ResourceProofResponseReceipt => { - debug!("{} Unhandled direct message: {:?}", self, direct_message); - } - } - Ok(()) - } - fn handle_parsec_poke(&mut self, msg_version: u64, pub_id: PublicId) { self.send_parsec_gossip(Some((msg_version, pub_id))) } - fn handle_parsec_request( - &mut self, - msg_version: u64, - par_request: parsec::Request, - pub_id: PublicId, - outbox: &mut EventBox, - ) -> Result<(), RoutingError> { - let log_ident = format!("{}", self); - let (response, poll) = - self.parsec_map - .handle_request(msg_version, par_request, pub_id, &log_ident); - - if let Some(response) = response { - self.send_message(&pub_id, response); - } - - if poll { - self.parsec_poll(outbox) - } else { - Ok(()) - } - } - - fn handle_parsec_response( - &mut self, - msg_version: u64, - par_response: parsec::Response, - pub_id: PublicId, - outbox: &mut EventBox, - ) -> Result<(), RoutingError> { - let log_ident = format!("{}", self); - if self - .parsec_map - .handle_response(msg_version, par_response, pub_id, &log_ident) - { - self.parsec_poll(outbox) - } else { - Ok(()) - } - } - - fn parsec_poll(&mut self, outbox: &mut EventBox) -> Result<(), RoutingError> { - while let Some(block) = self.parsec_map.poll() { - match block.payload() { - parsec::Observation::Accusation { .. } => { - // FIXME: Handle properly - unreachable!("...") - } - parsec::Observation::Genesis(_) => { - // FIXME: Validate with Chain info. - continue; - } - parsec::Observation::OpaquePayload(event) => { - if let Some(proof) = block.proofs().iter().next().map(|p| Proof { - pub_id: *p.public_id(), - sig: *p.signature(), - }) { - trace!( - "{} Parsec OpaquePayload: {} - {:?}", - self, - proof.pub_id(), - event - ); - self.chain.handle_opaque_event(event, proof)?; - } - } - parsec::Observation::Add { - peer_id, - related_info, - } => { - let event = - NetworkEvent::Online(*peer_id, serialisation::deserialise(&related_info)?); - let to_sig = |p: &parsec::Proof<_>| (*p.public_id(), *p.signature()); - let sigs = block.proofs().iter().map(to_sig).collect(); - let proof_set = ProofSet { sigs }; - trace!("{} Parsec Add: - {}", self, peer_id); - self.chain.handle_churn_event(&event, proof_set)?; - } - parsec::Observation::Remove { peer_id, .. } => { - let event = NetworkEvent::Offline(*peer_id); - let to_sig = |p: &parsec::Proof<_>| (*p.public_id(), *p.signature()); - let sigs = block.proofs().iter().map(to_sig).collect(); - let proof_set = ProofSet { sigs }; - trace!("{} Parsec Remove: - {}", self, peer_id); - self.chain.handle_churn_event(&event, proof_set)?; - } - } - - self.chain_poll(outbox)?; - } - - Ok(()) - } - - fn chain_poll(&mut self, outbox: &mut EventBox) -> Result<(), RoutingError> { - let mut our_pfx = *self.chain.our_prefix(); - while let Some(event) = self.chain.poll()? { - trace!("{} Handle accumulated event: {:?}", self, event); - - match event { - NetworkEvent::Online(pub_id, client_auth) => { - self.handle_online_event(pub_id, client_auth, outbox)?; - } - NetworkEvent::Offline(pub_id) => { - self.handle_offline_event(pub_id, outbox)?; - } - NetworkEvent::OurMerge => self.handle_our_merge_event()?, - NetworkEvent::NeighbourMerge(_) => self.handle_neighbour_merge_event()?, - NetworkEvent::SectionInfo(ref sec_info) => { - self.handle_section_info_event(sec_info, our_pfx, outbox)?; - } - NetworkEvent::ExpectCandidate(vote) => self.handle_expect_candidate_event(vote)?, - NetworkEvent::ProvingSections(ps, sec_info) => { - self.handle_proving_sections_event(ps, sec_info)?; - } - } - - our_pfx = *self.chain.our_prefix(); - } - - Ok(()) - } - - /// Handles an accumulated `Online` event. - fn handle_online_event( - &mut self, - new_pub_id: PublicId, - new_client_auth: Authority, - outbox: &mut EventBox, - ) -> Result<(), RoutingError> { - let should_act = self.chain.is_member(); - let to_vote_infos = self.chain.add_member(new_pub_id)?; - if should_act { - let _ = self.handle_candidate_approval(new_pub_id, new_client_auth, outbox); - to_vote_infos - .into_iter() - .map(NetworkEvent::SectionInfo) - .for_each(|sec_info| self.vote_for_event(sec_info)); - } - - Ok(()) - } - - /// Handles an accumulated `Offline` event. - fn handle_offline_event( - &mut self, - pub_id: PublicId, - outbox: &mut EventBox, - ) -> Result<(), RoutingError> { - let should_act = self.chain.is_member(); - let self_info = self.chain.remove_member(pub_id)?; - if should_act { - self.vote_for_event(NetworkEvent::SectionInfo(self_info)); - if let Some(&pub_id) = self.peer_mgr.get_pub_id(pub_id.name()) { - let _ = self.dropped_peer(&pub_id, outbox, false); - self.disconnect_peer(&pub_id); - } - } - Ok(()) - } - - /// Handles an accumulated `OurMerge` event. - fn handle_our_merge_event(&mut self) -> Result<(), RoutingError> { - self.merge_if_necessary() - } - - /// Handles an accumulated `NeighbourMerge` event. - fn handle_neighbour_merge_event(&mut self) -> Result<(), RoutingError> { - self.merge_if_necessary() - } - /// Votes for `Merge` if necessary, or for the merged `SectionInfo` if both siblings have /// already accumulated `Merge`. fn merge_if_necessary(&mut self) -> Result<(), RoutingError> { @@ -591,115 +347,20 @@ impl Node { Ok(()) } - /// Handles an accumulated `SectionInfo` event. - fn handle_section_info_event( - &mut self, - sec_info: &SectionInfo, - old_pfx: Prefix, - outbox: &mut EventBox, - ) -> Result<(), RoutingError> { - if !self.peer_mgr.is_established() && self.chain.is_member() { - // We have just found a `SectionInfo` block that contains us. Now we can supply our - // votes for all latest neighbour infos that have accumulated so far. - let ni_events = self - .chain - .neighbour_infos() - .map(|ni| ni.clone().into_network_event()) - .collect_vec(); - - ni_events.into_iter().for_each(|ni_event| { - self.vote_for_event(ni_event); - }); + // Connected peers which are valid need added to RT + // Peers no longer required currently connected as PeerState::Routing are disconnected + // Establish connection to peers missing from peer manager + fn update_peer_states(&mut self, outbox: &mut EventBox) { + let mut peers_to_add = Vec::new(); + let mut peers_to_remove = Vec::new(); - self.node_established(outbox); - } - - if sec_info.prefix().is_extension_of(&old_pfx) { - self.finalise_prefix_change()?; - - outbox.send_event(Event::SectionSplit(*sec_info.prefix())); - self.send_neighbour_infos(); - } else if old_pfx.is_extension_of(sec_info.prefix()) { - self.finalise_prefix_change()?; - outbox.send_event(Event::SectionMerged(*sec_info.prefix())); - } - - let our_name = *self.full_id.public_id().name(); - let self_sec_update = sec_info.prefix().matches(&our_name); - - if self.chain.is_member() { - self.update_peer_states(outbox); - - if self_sec_update { - self.send_neighbour_infos(); - } else { - // Vote for neighbour update if we haven't done so already. - // vote_for_event is expected to only generate a new vote if required. - self.vote_for_event(sec_info.clone().into_network_event()); - } - } - - let _ = self.merge_if_necessary(); - - Ok(()) - } - - /// Handles an accumulated `ProvingSections` event. - /// - /// Votes for all sections that it can verify using the the chain of proving sections. - fn handle_proving_sections_event( - &mut self, - proving_secs: Vec, - sec_info: SectionInfo, - ) -> Result<(), RoutingError> { - if !self.chain.is_new_neighbour(&sec_info) - && !proving_secs - .iter() - .any(|ps| self.chain.is_new_neighbour(&ps.sec_info)) - { - return Ok(()); // Nothing new to learn here. - } - let validates = |trusted: &Option, si: &SectionInfo| { - trusted.as_ref().map_or(false, |tps| { - let valid = tps.validate(&si); - if !valid { - log_or_panic!(LogLevel::Info, "Received invalid proving section: {:?}", si); - } - valid - }) - }; - let mut trusted: Option = None; - for ps in proving_secs.into_iter().rev() { - if validates(&trusted, &ps.sec_info) || self.is_trusted(&ps.sec_info)? { - let _ = self.add_new_section(&ps.sec_info); - trusted = Some(ps); - } - } - if validates(&trusted, &sec_info) || self.is_trusted(&sec_info)? { - let _ = self.add_new_section(&sec_info); - } - Ok(()) - } - - // Connected peers which are valid need added to RT - // Peers no longer required currently connected as PeerState::Routing are disconnected - // Establish connection to peers missing from peer manager - fn update_peer_states(&mut self, outbox: &mut EventBox) { - // If we are not yet established, do not try to update any RT peer states - if !self.chain.is_member() { - return; - } - - let mut peers_to_add = Vec::new(); - let mut peers_to_remove = Vec::new(); - - for peer in self.peer_mgr.connected_peers() { - let pub_id = peer.pub_id(); - if self.chain.is_peer_valid(pub_id) { - peers_to_add.push(*pub_id); - } else if peer.is_routing() && self.chain.state() == &ChainState::Normal { - peers_to_remove.push(*peer.pub_id()); - } + for peer in self.peer_mgr.connected_peers() { + let pub_id = peer.pub_id(); + if self.chain.is_peer_valid(pub_id) { + peers_to_add.push(*pub_id); + } else if peer.is_routing() && self.chain.state() == &ChainState::Normal { + peers_to_remove.push(*peer.pub_id()); + } } for pub_id in peers_to_add { self.add_to_routing_table(&pub_id, outbox); @@ -870,90 +531,6 @@ impl Node { Ok(()) } - fn handle_hop_message( - &mut self, - hop_msg: HopMessage, - pub_id: PublicId, - ) -> Result<(), RoutingError> { - hop_msg.verify(pub_id.signing_public_key())?; - let mut client_ip = None; - let mut hop_name_result = match self.peer_mgr.get_peer(&pub_id).map(Peer::state) { - Some(&PeerState::Bootstrapper { .. }) => { - warn!( - "{} Hop message received from bootstrapper {:?}, disconnecting.", - self, pub_id - ); - Err(RoutingError::InvalidStateForOperation) - } - Some(&PeerState::Client { ip, .. }) => { - client_ip = Some(ip); - Ok(*self.name()) - } - Some(&PeerState::JoiningNode) => Ok(*self.name()), - Some(&PeerState::Candidate) | Some(&PeerState::Proxy) | Some(&PeerState::Routing) => { - Ok(*pub_id.name()) - } - Some(&PeerState::ConnectionInfoPreparing { .. }) - | Some(&PeerState::ConnectionInfoReady(_)) - | Some(&PeerState::CrustConnecting) - | Some(&PeerState::Connected) - | None => { - if self.dropped_clients.contains_key(&pub_id) { - debug!( - "{} Ignoring {:?} from recently-disconnected client {:?}.", - self, hop_msg, pub_id - ); - return Ok(()); - } else { - Ok(*self.name()) - // FIXME - confirm we can return with an error here by running soak tests - // debug!("{} Invalid sender {} of {:?}", self, pub_id, hop_msg); - // return Err(RoutingError::InvalidSource); - } - } - }; - - if let Some(ip) = client_ip { - match self.check_valid_client_message(&ip, hop_msg.content.routing_message()) { - Ok(added_bytes) => { - self.proxy_load_amount += added_bytes; - self.peer_mgr.add_client_traffic(&pub_id, added_bytes); - } - Err(e) => hop_name_result = Err(e), - } - } - - match hop_name_result { - Ok(hop_name) => { - let HopMessage { - content, - route, - sent_to, - .. - } = hop_msg; - self.handle_signed_message(content, route, hop_name, &sent_to) - } - Err(RoutingError::ExceedsRateLimit(hash)) => { - trace!( - "{} Temporarily can't proxy messages from client {:?} (rate-limit hit).", - self, - pub_id - ); - self.send_direct_message( - pub_id, - DirectMessage::ProxyRateLimitExceeded { - ack: Ack::compute(hop_msg.content.routing_message())?, - }, - ); - Err(RoutingError::ExceedsRateLimit(hash)) - } - Err(error) => { - self.ban_and_disconnect_peer(&pub_id); - Err(error) - } - } - } - // Verify the message, then, if it is for us, handle the enclosed routing message and swarm it // to the rest of our section when destination is targeting multiple; if not, forward it. fn handle_signed_message( @@ -970,7 +547,7 @@ impl Node { warn!("{} Unexpected section infos in {:?}", self, signed_msg); return Err(RoutingError::InvalidProvingSection); } - } else if self.chain.is_member() { + } else { // Remove any untrusted trailing section infos. // TODO: remove wasted clone. Only useful when msg isnt trusted for log msg. let msg_clone = signed_msg.clone(); @@ -1047,33 +624,6 @@ impl Node { use crate::messages::MessageContent::*; use crate::Authority::{Client, ManagedNode, PrefixSection, Section}; - if !self.chain.is_member() { - match routing_msg.content { - ExpectCandidate { .. } - | NeighbourInfo(..) - | NeighbourConfirm(..) - | Merge(..) - | UserMessagePart { .. } => { - // These messages should not be handled before node becomes established - self.add_routing_message_to_backlog(routing_msg); - return Ok(()); - } - ConnectionInfoRequest { .. } => { - if !self.chain.our_prefix().matches(&routing_msg.src.name()) { - self.add_routing_message_to_backlog(routing_msg); - return Ok(()); - } - } - Relocate { .. } - | ConnectionInfoResponse { .. } - | RelocateResponse { .. } - | Ack(..) - | NodeApproval { .. } => { - // Handle like normal - } - } - } - match routing_msg.content { Ack(..) | UserMessagePart { .. } => (), _ => trace!("{} Got routing message {:?}.", self, routing_msg), @@ -1162,7 +712,10 @@ impl Node { Section(_), ) => self.handle_neighbour_confirm(digest, proofs, sec_infos_and_proofs), (Merge(digest), PrefixSection(_), PrefixSection(_)) => self.handle_merge(digest), - (Ack(ack, _), _, _) => self.handle_ack_response(ack), + (Ack(ack, _), _, _) => { + self.handle_ack_response(ack); + Ok(()) + } ( UserMessagePart { hash, @@ -1192,16 +745,6 @@ impl Node { } } - // Backlog the message to be processed once we are approved. - fn add_routing_message_to_backlog(&mut self, msg: RoutingMessage) { - trace!( - "{} Not approved yet. Delaying message handling: {:?}", - self, - msg - ); - self.routing_msg_backlog.push(msg); - } - fn handle_candidate_approval( &mut self, new_pub_id: PublicId, @@ -1263,35 +806,6 @@ impl Node { .init(self.full_id.clone(), &self.gen_pfx_info, &log_ident) } - // Completes a Node startup process and allows a node to behave as a `ManagedNode`. - // A given node is considered "established" when it exists in `chain.our_info().members()` - // first node: this method is skipped entirely as it behaves as a Node from startup. - fn node_established(&mut self, outbox: &mut EventBox) { - self.peer_mgr.set_established(); - outbox.send_event(Event::Connected); - - trace!( - "{} Node Established. Prefixes: {:?}", - self, - self.chain.prefixes() - ); - - self.update_peer_states(outbox); - - // Allow other peers to bootstrap via us. - if let Err(err) = self.crust_service.set_accept_bootstrap(true) { - warn!("{} Unable to accept bootstrap connections. {:?}", self, err); - } - self.crust_service.set_service_discovery_listen(true); - - let backlog = mem::replace(&mut self.routing_msg_backlog, vec![]); - backlog - .into_iter() - .rev() - .foreach(|msg| self.msg_queue.push_front(msg)); - self.candidate_status_token = Some(self.timer.schedule(CANDIDATE_STATUS_INTERVAL)); - } - fn handle_resource_proof_response( &mut self, pub_id: PublicId, @@ -1514,27 +1028,14 @@ impl Node { return Err(RoutingError::FailedSignature); } - if !self.chain.is_member() { - debug!( - "{} Client {:?} rejected: We are not an established node yet.", - self, pub_id - ); - self.send_direct_message( - pub_id, - DirectMessage::BootstrapResponse(Err(BootstrapResponseError::NotApproved)), - ); - self.disconnect_peer(&pub_id); - return Ok(()); - } - if (peer_kind == CrustUser::Client || !self.is_first_node) - && self.chain().len() < self.min_section_size() - 1 + && self.chain.len() < self.min_section_size() - 1 { debug!( "{} Client {:?} rejected: Routing table has {} entries. {} required.", self, pub_id, - self.chain().len(), + self.chain.len(), self.min_section_size() - 1 ); self.send_direct_message( @@ -1593,7 +1094,7 @@ impl Node { } else { ( RESOURCE_PROOF_DIFFICULTY, - RESOURCE_PROOF_TARGET_SIZE / (self.chain().our_section().len() + 1), + RESOURCE_PROOF_TARGET_SIZE / (self.chain.our_section().len() + 1), ) }; let seed: Vec = if cfg!(feature = "mock_base") { @@ -1698,7 +1199,7 @@ impl Node { } let close_section = self - .chain() + .chain .close_names(&dst_name) .ok_or(RoutingError::InvalidDestination)?; @@ -1730,10 +1231,6 @@ impl Node { dst_name: XorName, message_id: MessageId, ) -> Result<(), RoutingError> { - if !self.chain.is_member() { - return Ok(()); - } - self.vote_for_event(NetworkEvent::ExpectCandidate(ExpectCandidatePayload { old_public_id, old_client_auth, @@ -1743,30 +1240,6 @@ impl Node { Ok(()) } - // Handles an accumulated `ExpectCandidate` event. - // Context: a node is joining our section. Send the node our section. If the - // network is unbalanced, send `ExpectCandidate` on to a section with a shorter prefix. - fn handle_expect_candidate_event( - &mut self, - vote: ExpectCandidatePayload, - ) -> Result<(), RoutingError> { - if !self.chain.is_member() { - return Ok(()); - } - - if let Some(prefix) = self.need_to_forward_expect_candidate_to_prefix() { - return self.forward_expect_candidate_to_prefix(vote, prefix); - } - - if let Some(target_interval) = self.accept_candidate_with_interval(&vote) { - self.candidate_timer_token = Some(self.timer.schedule(RESOURCE_PROOF_DURATION)); - return self.send_relocate_response(vote, target_interval); - } - - // Nothing to do with this event. - Ok(()) - } - // Return Prefix of section with shorter prefix to resend the `ExpectCandidate` to. // Return None if we are the best section. fn need_to_forward_expect_candidate_to_prefix(&self) -> Option> { @@ -1775,7 +1248,7 @@ impl Node { return None; } - let min_len_prefix = self.chain().min_len_prefix(); + let min_len_prefix = self.chain.min_len_prefix(); if min_len_prefix == *self.our_prefix() { // Our section is the best destination. return None; @@ -1813,7 +1286,7 @@ impl Node { } let target_interval = self.next_relocation_interval.take().unwrap_or_else(|| { - utils::calculate_relocation_interval(&self.our_prefix(), &self.chain().our_section()) + utils::calculate_relocation_interval(&self.our_prefix(), &self.chain.our_section()) }); self.peer_mgr .accept_as_candidate(vote.old_public_id, target_interval); @@ -1827,15 +1300,9 @@ impl Node { vote: ExpectCandidatePayload, target_interval: (XorName, XorName), ) -> Result<(), RoutingError> { - let own_section = if self.chain().is_member() { - let our_info = self.chain().our_info(); + let own_section = { + let our_info = self.chain.our_info(); (*our_info.prefix(), our_info.members().clone()) - } else { - //FIXME: do this properly - ( - Default::default(), - iter::once(*self.full_id().public_id()).collect::>(), - ) }; let src = Authority::Section(vote.dst_name); @@ -1925,11 +1392,6 @@ impl Node { Ok(()) } - fn handle_ack_response(&mut self, ack: Ack) -> Result<(), RoutingError> { - self.ack_mgr.receive(ack); - Ok(()) - } - fn send_parsec_gossip(&mut self, target: Option<(u64, PublicId)>) { let (version, gossip_target) = match target { Some((v, p)) => (v, p), @@ -1957,30 +1419,6 @@ impl Node { } } - // Sends a `ParsecPoke` message to trigger a gossip request from current section members to us. - // - // TODO: Should restrict targets to few(counter churn-threshold)/single. - // Currently this can result in incoming spam of gossip history from everyone. - // Can also just be a single target once node-ageing makes Offline votes Opaque which should - // remove invalid test failures for unaccumulated parsec::Remove blocks. - fn send_parsec_poke(&mut self) { - let version = *self.gen_pfx_info.first_info.version(); - let recipients = self - .gen_pfx_info - .latest_info - .members() - .iter() - .cloned() - .collect_vec(); - - for recipient in recipients { - self.send_message( - &recipient, - Message::Direct(DirectMessage::ParsecPoke(version)), - ); - } - } - fn send_candidate_approval(&mut self) { let (new_id, client_auth) = match self.peer_mgr.verified_candidate_info() { Err(_) => { @@ -2056,18 +1494,6 @@ impl Node { } } - // Until we're established do not relay any messages. - let src = signed_msg.routing_message().src; - if !self.chain.is_member() { - if let Authority::Client { ref client_id, .. } = src { - if self.name() != client_id.name() { - return Ok(()); - } - } else { - return Ok(()); - } - } - let (new_sent_to, target_pub_ids) = self.get_targets(signed_msg.routing_message(), route, hop, sent_to)?; @@ -2082,13 +1508,38 @@ impl Node { Ok(()) } - // Wraps the signed message in a `HopMessage` and sends it on. - // - // In the case that the `pub_id` is unknown, an ack is sent and the message dropped. - fn relay_to_client( + // Filter, then convert the message to a `Hop` and serialise. + // Send this byte string. + fn send_signed_message_to_peer( &mut self, - signed_msg: &SignedMessage, - pub_id: &PublicId, + signed_msg: SignedMessage, + target: &PublicId, + route: u8, + sent_to: BTreeSet, + ) -> Result<(), RoutingError> { + if !self.crust_service().is_connected(target) { + trace!("{} Not connected to {:?}. Dropping peer.", self, target); + self.disconnect_peer(target); + return Ok(()); + } + + if self.filter_outgoing_routing_msg(signed_msg.routing_message(), target, route) { + return Ok(()); + } + + let priority = signed_msg.priority(); + let bytes = self.to_hop_bytes(signed_msg, route, sent_to)?; + self.send_or_drop(target, bytes, priority); + Ok(()) + } + + // Wraps the signed message in a `HopMessage` and sends it on. + // + // In the case that the `pub_id` is unknown, an ack is sent and the message dropped. + fn relay_to_client( + &mut self, + signed_msg: &SignedMessage, + pub_id: &PublicId, ) -> Result<(), RoutingError> { let priority = signed_msg.priority(); let is_client = self.peer_mgr.is_client(pub_id); @@ -2130,13 +1581,11 @@ impl Node { /// may be us or another node. If our signature is not required, this returns `None`. fn get_signature_target(&self, src: &Authority, route: u8) -> Option { use crate::Authority::*; - if !self.chain().is_member() { - return Some(*self.name()); - } + let list: Vec = match *src { ClientManager(_) | NaeManager(_) | NodeManager(_) => { let mut v = self - .chain() + .chain .our_section() .into_iter() .sorted_by(|lhs, rhs| src.name().cmp_distance(lhs, rhs)); @@ -2144,7 +1593,7 @@ impl Node { v } Section(_) => self - .chain() + .chain .our_section() .into_iter() .sorted_by(|lhs, rhs| src.name().cmp_distance(lhs, rhs)), @@ -2153,7 +1602,7 @@ impl Node { // as by ack-failure, the new node would have been accepted to the RT. // Need a better network startup separation. PrefixSection(_) => { - Iterator::flatten(self.chain().all_sections().map(|(_, si)| si.member_names())) + Iterator::flatten(self.chain.all_sections().map(|(_, si)| si.member_names())) .sorted_by(|lhs, rhs| src.name().cmp_distance(lhs, rhs)) } ManagedNode(_) | Client { .. } => return Some(*self.name()), @@ -2183,13 +1632,13 @@ impl Node { _ => false, }; - if self.chain.is_member() && !force_via_proxy { + if !force_via_proxy { // TODO: even if having chain reply based on connected_state, // we remove self in targets info and can do same by not // chaining us to conn_peer list here? let conn_peers = self.connected_peers(); let targets: BTreeSet<_> = self - .chain() + .chain .targets(&routing_msg.dst, *exclude, route as usize, &conn_peers)? .into_iter() .filter(|target| !sent_to.contains(target)) @@ -2256,18 +1705,18 @@ impl Node { /// we should terminate. fn dropped_peer( &mut self, - pub_id: &PublicId, + pub_id: PublicId, outbox: &mut EventBox, try_reconnect: bool, ) -> bool { - let _ = self.peer_mgr.remove_peer(pub_id); + let _ = self.peer_mgr.remove_peer(&pub_id); - if self.chain.is_member() && self.notified_nodes.remove(pub_id) { + if self.notified_nodes.remove(&pub_id) { info!("{} Dropped {} from the routing table.", self, pub_id.name()); outbox.send_event(Event::NodeLost(*pub_id.name())); - if self.chain().our_info().members().contains(pub_id) { - self.vote_for_event(NetworkEvent::Offline(*pub_id)); + if self.chain.our_info().members().contains(&pub_id) { + self.vote_for_event(NetworkEvent::Offline(pub_id)); } } @@ -2277,45 +1726,35 @@ impl Node { .filter(|p| p.is_routing()) .count() == 0 - && self.chain().is_member() { debug!("{} Lost all routing connections.", self); - if !self.is_first_node { + // Except network startup, restart in other cases. + if *self.chain.our_info().version() > 0 { outbox.send_event(Event::RestartRequired); return false; } } - if try_reconnect && self.chain.is_member() && self.chain.is_peer_valid(pub_id) { - debug!("{} Caching {:?} to reconnect.", self, pub_id); - self.reconnect_peers.push(*pub_id); - } - - true - } - - // Reconnect to currently cached valid peers that are not connected - fn reconnect_peers(&mut self, outbox: &mut EventBox) { - for pub_id in mem::replace(&mut self.reconnect_peers, Default::default()) { - if self.chain.is_peer_valid(&pub_id) { + if try_reconnect && self.chain.is_peer_valid(&pub_id) { + debug!( + "{} - Sending connection info to {:?} due to dropped peer.", + self, pub_id + ); + let own_name = *self.name(); + if let Err(error) = self.send_connection_info_request( + pub_id, + Authority::ManagedNode(own_name), + Authority::ManagedNode(*pub_id.name()), + outbox, + ) { debug!( - "{} Sending connection info to {:?} due to dropped peer.", - self, pub_id + "{} - Failed to send connection info to {:?}: {:?}", + self, pub_id, error ); - let own_name = *self.name(); - if let Err(error) = self.send_connection_info_request( - pub_id, - Authority::ManagedNode(own_name), - Authority::ManagedNode(*pub_id.name()), - outbox, - ) { - debug!( - "{} - Failed to send connection info to {:?}: {:?}", - self, pub_id, error - ); - } } } + + true } fn remove_expired_peers(&mut self) { @@ -2331,7 +1770,7 @@ impl Node { } fn our_prefix(&self) -> &Prefix { - self.chain().our_prefix() + self.chain.our_prefix() } // While this can theoretically be called as a result of a misbehaving client or node, we're @@ -2364,13 +1803,17 @@ impl Base for Node { client_id == self.full_id.public_id() } else { let conn_peers = self.connected_peers(); - self.chain.is_member() && self.chain().in_authority(auth, &conn_peers) + self.chain.in_authority(auth, &conn_peers) } } fn close_group(&self, name: XorName, count: usize) -> Option> { let conn_peers = self.connected_peers(); - self.chain().closest_names(&name, count, &conn_peers) + self.chain.closest_names(&name, count, &conn_peers) + } + + fn min_section_size(&self) -> usize { + self.chain.min_sec_size() } fn handle_node_send_message( @@ -2391,34 +1834,19 @@ impl Base for Node { self.tick_timer_token = self.timer.schedule(TICK_TIMEOUT); self.remove_expired_peers(); self.proxy_load_amount = 0; - - if self.chain.is_member() { - self.update_peer_states(outbox); - outbox.send_event(Event::TimerTicked); - } - - return Transition::Stay; - } - - if self.candidate_timer_token == Some(token) { + self.update_peer_states(outbox); + outbox.send_event(Event::TimerTicked); + } else if self.candidate_timer_token == Some(token) { self.candidate_timer_token = None; self.send_candidate_approval(); - } else if self.candidate_status_token == Some(token) { - self.candidate_status_token = Some(self.timer.schedule(CANDIDATE_STATUS_INTERVAL)); + } else if self.candidate_status_token == token { + self.candidate_status_token = self.timer.schedule(CANDIDATE_STATUS_INTERVAL); self.peer_mgr.show_candidate_status(); - } else if self.reconnect_peers_token == token { - self.reconnect_peers_token = self.timer.schedule(RECONNECT_PEER_TIMEOUT); - self.reconnect_peers(outbox); - } else if self.poke_timer_token == token { - if !self.peer_mgr.is_established() { - self.send_parsec_poke(); - self.poke_timer_token = self.timer.schedule(POKE_TIMEOUT); - } } else if self.gossip_timer_token == token { self.gossip_timer_token = self.timer.schedule(GOSSIP_TIMEOUT); - // If we're the only node then invoke parsec_poll directly - if self.chain.is_member() && self.chain.our_info().members().len() == 1 { + // If we're the only node then invoke parsec_poll_all directly + if self.chain.our_info().members().len() == 1 { let _ = self.parsec_poll(outbox); } @@ -2485,12 +1913,9 @@ impl Base for Node { } fn handle_connect_failure(&mut self, pub_id: PublicId, outbox: &mut EventBox) -> Transition { - if let Some(&PeerState::CrustConnecting) = self.peer_mgr.get_peer(&pub_id).map(Peer::state) - { - debug!("{} Failed to connect to peer {:?}.", self, pub_id); - } - let _ = self.dropped_peer(&pub_id, outbox, true); - if self.chain.is_member() && self.chain.our_info().members().contains(&pub_id) { + self.log_connect_failure(&pub_id); + let _ = self.dropped_peer(pub_id, outbox, true); + if self.chain.our_info().members().contains(&pub_id) { self.vote_for_event(NetworkEvent::Offline(pub_id)); } @@ -2504,35 +1929,13 @@ impl Base for Node { debug!("{} Received LostPeer - {}", self, pub_id); - if self.dropped_peer(&pub_id, outbox, true) { + if self.dropped_peer(pub_id, outbox, true) { Transition::Stay } else { Transition::Terminate } } - fn handle_new_message( - &mut self, - pub_id: PublicId, - bytes: CrustBytes, - outbox: &mut EventBox, - ) -> Transition { - let result = match from_crust_bytes(bytes) { - Ok(Message::Hop(hop_msg)) => self.handle_hop_message(hop_msg, pub_id), - Ok(Message::Direct(direct_msg)) => { - self.handle_direct_message(direct_msg, pub_id, outbox) - } - Err(error) => Err(error), - }; - - match result { - Err(RoutingError::FilterCheckFailed) | Ok(_) => (), - Err(err) => debug!("{} - {:?}", self, err), - } - - Transition::Stay - } - fn handle_connection_info_prepared( &mut self, result_token: u32, @@ -2562,8 +1965,173 @@ impl Base for Node { Transition::Stay } - fn min_section_size(&self) -> usize { - self.chain.min_sec_size() + // Deconstruct a `DirectMessage` and handle or forward as appropriate. + fn handle_direct_message( + &mut self, + direct_message: DirectMessage, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result { + use crate::messages::DirectMessage::*; + if let Err(error) = self.check_direct_message_sender(&direct_message, &pub_id) { + match error { + RoutingError::ClientConnectionNotFound => (), + _ => self.ban_and_disconnect_peer(&pub_id), + } + return Err(error); + } + + match direct_message { + MessageSignature(digest, sig) => self.handle_message_signature(digest, sig, pub_id)?, + BootstrapRequest(signature) => { + if let Err(error) = self.handle_bootstrap_request(pub_id, signature) { + warn!( + "{} Invalid BootstrapRequest received ({:?}), dropping {}.", + self, error, pub_id + ); + self.ban_and_disconnect_peer(&pub_id); + } + } + CandidateInfo { + ref old_public_id, + ref new_public_id, + ref signature_using_old, + ref signature_using_new, + ref new_client_auth, + } => { + if *new_public_id != pub_id { + error!( + "{} CandidateInfo(new_public_id: {}) does not match crust id {}.", + self, new_public_id, pub_id + ); + self.disconnect_peer(&pub_id); + return Err(RoutingError::InvalidSource); + } + self.handle_candidate_info( + old_public_id, + &pub_id, + signature_using_old, + signature_using_new, + new_client_auth, + outbox, + ); + } + ResourceProofResponse { + part_index, + part_count, + proof, + leading_zero_bytes, + } => { + self.handle_resource_proof_response( + pub_id, + part_index, + part_count, + proof, + leading_zero_bytes, + ); + } + ParsecPoke(version) => self.handle_parsec_poke(version, pub_id), + ParsecRequest(version, par_request) => { + return self.handle_parsec_request(version, par_request, pub_id, outbox); + } + ParsecResponse(version, par_response) => { + return self.handle_parsec_response(version, par_response, pub_id, outbox); + } + BootstrapResponse(_) + | ProxyRateLimitExceeded { .. } + | ResourceProof { .. } + | ResourceProofResponseReceipt => { + debug!("{} Unhandled direct message: {:?}", self, direct_message); + } + } + + Ok(Transition::Stay) + } + + fn handle_hop_message( + &mut self, + hop_msg: HopMessage, + pub_id: PublicId, + _: &mut EventBox, + ) -> Result { + hop_msg.verify(pub_id.signing_public_key())?; + let mut client_ip = None; + let mut hop_name_result = match self.peer_mgr.get_peer(&pub_id).map(Peer::state) { + Some(&PeerState::Bootstrapper { .. }) => { + warn!( + "{} Hop message received from bootstrapper {:?}, disconnecting.", + self, pub_id + ); + Err(RoutingError::InvalidStateForOperation) + } + Some(&PeerState::Client { ip, .. }) => { + client_ip = Some(ip); + Ok(*self.name()) + } + Some(&PeerState::JoiningNode) => Ok(*self.name()), + Some(&PeerState::Candidate) | Some(&PeerState::Proxy) | Some(&PeerState::Routing) => { + Ok(*pub_id.name()) + } + Some(&PeerState::ConnectionInfoPreparing { .. }) + | Some(&PeerState::ConnectionInfoReady(_)) + | Some(&PeerState::CrustConnecting) + | Some(&PeerState::Connected) + | None => { + if self.dropped_clients.contains_key(&pub_id) { + debug!( + "{} Ignoring {:?} from recently-disconnected client {:?}.", + self, hop_msg, pub_id + ); + return Ok(Transition::Stay); + } else { + Ok(*self.name()) + // FIXME - confirm we can return with an error here by running soak tests + // debug!("{} Invalid sender {} of {:?}", self, pub_id, hop_msg); + // return Err(RoutingError::InvalidSource); + } + } + }; + + if let Some(ip) = client_ip { + match self.check_valid_client_message(&ip, hop_msg.content.routing_message()) { + Ok(added_bytes) => { + self.proxy_load_amount += added_bytes; + self.peer_mgr.add_client_traffic(&pub_id, added_bytes); + } + Err(e) => hop_name_result = Err(e), + } + } + + match hop_name_result { + Ok(hop_name) => { + let HopMessage { + content, + route, + sent_to, + .. + } = hop_msg; + self.handle_signed_message(content, route, hop_name, &sent_to) + .map(|()| Transition::Stay) + } + Err(RoutingError::ExceedsRateLimit(hash)) => { + trace!( + "{} Temporarily can't proxy messages from client {:?} (rate-limit hit).", + self, + pub_id + ); + self.send_direct_message( + pub_id, + DirectMessage::ProxyRateLimitExceeded { + ack: Ack::compute(hop_msg.content.routing_message())?, + }, + ); + Err(RoutingError::ExceedsRateLimit(hash)) + } + Err(error) => { + self.ban_and_disconnect_peer(&pub_id); + Err(error) + } + } } } @@ -2592,9 +2160,8 @@ impl Node { self.clients_rate_limiter.usage_map().clone() } - pub fn has_unconsensused_observations(&self, filter_opaque: bool) -> bool { - self.parsec_map - .has_unconsensused_observations(filter_opaque) + pub fn has_unpolled_observations(&self, filter_opaque: bool) -> bool { + self.parsec_map.has_unpolled_observations(filter_opaque) } pub fn is_routing_peer(&self, pub_id: &PublicId) -> bool { @@ -2639,16 +2206,8 @@ impl Bootstrapped for Node { let sending_sec = if route == 0 { match routing_msg.src { ClientManager(_) | NaeManager(_) | NodeManager(_) | ManagedNode(_) | Section(_) - | PrefixSection(_) - if self.chain.is_member() => - { - Some(self.chain.our_info().clone()) - } + | PrefixSection(_) => Some(self.chain.our_info().clone()), Client { .. } => None, - _ => { - // Cannot send routing msgs as a Node until established. - return Ok(()); - } } } else { src_section @@ -2714,11 +2273,14 @@ impl Bootstrapped for Node { } impl Relocated for Node { - fn peer_mgr(&mut self) -> &mut PeerManager { + fn peer_mgr(&self) -> &PeerManager { + &self.peer_mgr + } + + fn peer_mgr_mut(&mut self) -> &mut PeerManager { &mut self.peer_mgr } - /// Adds a newly connected peer to the routing table. Must only be called for connected peers. fn process_connection(&mut self, pub_id: PublicId, outbox: &mut EventBox) { if self.chain.is_peer_valid(&pub_id) { self.add_to_routing_table(&pub_id, outbox); @@ -2726,7 +2288,7 @@ impl Relocated for Node { } fn is_peer_valid(&self, pub_id: &PublicId) -> bool { - !self.chain.is_member() || self.chain.is_peer_valid(pub_id) + self.chain.is_peer_valid(pub_id) } fn add_to_routing_table_success(&mut self, _: &PublicId) { @@ -2742,6 +2304,145 @@ impl Relocated for Node { fn add_to_notified_nodes(&mut self, pub_id: PublicId) -> bool { self.notified_nodes.insert(pub_id) } + + fn remove_from_notified_nodes(&mut self, pub_id: &PublicId) -> bool { + self.notified_nodes.remove(pub_id) + } +} + +impl Approved for Node { + fn parsec_map_mut(&mut self) -> &mut ParsecMap { + &mut self.parsec_map + } + + fn chain_mut(&mut self) -> &mut Chain { + &mut self.chain + } + + fn handle_online_event( + &mut self, + new_pub_id: PublicId, + new_client_auth: Authority, + outbox: &mut EventBox, + ) -> Result<(), RoutingError> { + let to_vote_infos = self.chain.add_member(new_pub_id)?; + let _ = self.handle_candidate_approval(new_pub_id, new_client_auth, outbox); + to_vote_infos + .into_iter() + .map(NetworkEvent::SectionInfo) + .for_each(|sec_info| self.vote_for_event(sec_info)); + + Ok(()) + } + + fn handle_offline_event( + &mut self, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result<(), RoutingError> { + let self_info = self.chain.remove_member(pub_id)?; + self.vote_for_event(NetworkEvent::SectionInfo(self_info)); + if let Some(&pub_id) = self.peer_mgr.get_pub_id(pub_id.name()) { + let _ = self.dropped_peer(pub_id, outbox, false); + self.disconnect_peer(&pub_id); + } + + Ok(()) + } + + fn handle_our_merge_event(&mut self) -> Result<(), RoutingError> { + self.merge_if_necessary() + } + + fn handle_neighbour_merge_event(&mut self) -> Result<(), RoutingError> { + self.merge_if_necessary() + } + + fn handle_expect_candidate_event( + &mut self, + vote: ExpectCandidatePayload, + ) -> Result<(), RoutingError> { + if let Some(prefix) = self.need_to_forward_expect_candidate_to_prefix() { + return self.forward_expect_candidate_to_prefix(vote, prefix); + } + + if let Some(target_interval) = self.accept_candidate_with_interval(&vote) { + self.candidate_timer_token = Some(self.timer.schedule(RESOURCE_PROOF_DURATION)); + return self.send_relocate_response(vote, target_interval); + } + + // Nothing to do with this event. + Ok(()) + } + + fn handle_section_info_event( + &mut self, + sec_info: SectionInfo, + old_pfx: Prefix, + outbox: &mut EventBox, + ) -> Result { + if sec_info.prefix().is_extension_of(&old_pfx) { + self.finalise_prefix_change()?; + outbox.send_event(Event::SectionSplit(*sec_info.prefix())); + self.send_neighbour_infos(); + } else if old_pfx.is_extension_of(sec_info.prefix()) { + self.finalise_prefix_change()?; + outbox.send_event(Event::SectionMerged(*sec_info.prefix())); + } + + let self_sec_update = sec_info.prefix().matches(self.name()); + + self.update_peer_states(outbox); + + if self_sec_update { + self.send_neighbour_infos(); + } else { + // Vote for neighbour update if we haven't done so already. + // vote_for_event is expected to only generate a new vote if required. + self.vote_for_event(sec_info.into_network_event()); + } + + let _ = self.merge_if_necessary(); + + Ok(Transition::Stay) + } + + /// Handles an accumulated `ProvingSections` event. + /// + /// Votes for all sections that it can verify using the the chain of proving sections. + fn handle_proving_sections_event( + &mut self, + proving_secs: Vec, + sec_info: SectionInfo, + ) -> Result<(), RoutingError> { + if !self.chain.is_new_neighbour(&sec_info) + && !proving_secs + .iter() + .any(|ps| self.chain.is_new_neighbour(&ps.sec_info)) + { + return Ok(()); // Nothing new to learn here. + } + let validates = |trusted: &Option, si: &SectionInfo| { + trusted.as_ref().map_or(false, |tps| { + let valid = tps.validate(&si); + if !valid { + log_or_panic!(LogLevel::Info, "Received invalid proving section: {:?}", si); + } + valid + }) + }; + let mut trusted: Option = None; + for ps in proving_secs.into_iter().rev() { + if validates(&trusted, &ps.sec_info) || self.is_trusted(&ps.sec_info)? { + let _ = self.add_new_section(&ps.sec_info); + trusted = Some(ps); + } + } + if validates(&trusted, &sec_info) || self.is_trusted(&sec_info)? { + let _ = self.add_new_section(&sec_info); + } + Ok(()) + } } impl Display for Node { diff --git a/src/states/proving_node.rs b/src/states/proving_node.rs index a6653e39d4..01b506cfc4 100644 --- a/src/states/proving_node.rs +++ b/src/states/proving_node.rs @@ -7,12 +7,13 @@ // permissions and limitations relating to use of the SAFE Network Software. use super::{ - common::{Base, Bootstrapped, Relocated, Unapproved}, - node::Node, + common::{ + proxied, Base, Bootstrapped, BootstrappedNotEstablished, Relocated, RelocatedNotEstablished, + }, + establishing_node::{EstablishingNode, EstablishingNodeDetails}, }; -use crate::states::common::from_crust_bytes; use crate::{ - ack_manager::{Ack, AckManager}, + ack_manager::AckManager, cache::Cache, chain::{GenesisPfxInfo, SectionInfo}, config_handler, @@ -20,7 +21,7 @@ use crate::{ error::RoutingError, event::Event, id::{FullId, PublicId}, - messages::{DirectMessage, HopMessage, Message, RoutingMessage}, + messages::{DirectMessage, HopMessage, RoutingMessage}, outbox::EventBox, peer_manager::{Peer, PeerManager, PeerState}, resource_prover::ResourceProver, @@ -32,7 +33,7 @@ use crate::{ timer::Timer, types::RoutingActionSender, xor_name::XorName, - CrustBytes, Service, + Service, }; use maidsafe_utilities::serialisation; use std::{ @@ -40,73 +41,77 @@ use std::{ fmt::{self, Display, Formatter}, }; +pub struct ProvingNodeDetails { + pub action_sender: RoutingActionSender, + pub cache: Box, + pub crust_service: Service, + pub full_id: FullId, + pub min_section_size: usize, + pub old_full_id: FullId, + pub our_section: (Prefix, BTreeSet), + pub proxy_pub_id: PublicId, + pub timer: Timer, +} + pub struct ProvingNode { - crust_service: Service, ack_mgr: AckManager, - /// ID from before relocating. - old_full_id: FullId, - full_id: FullId, - /// Routing messages addressed to us that we cannot handle until we are approved. - msg_backlog: Vec, - min_section_size: usize, - peer_mgr: PeerManager, cache: Box, - routing_msg_filter: RoutingMessageFilter, - timer: Timer, - resource_prover: ResourceProver, + crust_service: Service, /// Whether resource proof is disabled. disable_resource_proof: bool, + full_id: FullId, joining_prefix: Prefix, + min_section_size: usize, + /// Routing messages addressed to us that we cannot handle until we are approved. + msg_backlog: Vec, // TODO: notify without local state notified_nodes: BTreeSet, + /// ID from before relocating. + old_full_id: FullId, + peer_mgr: PeerManager, + resource_prover: ResourceProver, + routing_msg_filter: RoutingMessageFilter, + timer: Timer, } impl ProvingNode { - #[allow(clippy::too_many_arguments)] - pub fn from_bootstrapping( - our_section: (Prefix, BTreeSet), - action_sender: RoutingActionSender, - cache: Box, - crust_service: Service, - old_full_id: FullId, - new_full_id: FullId, - min_section_size: usize, - proxy_pub_id: PublicId, - timer: Timer, - outbox: &mut EventBox, - ) -> Self { + pub fn from_bootstrapping(details: ProvingNodeDetails, outbox: &mut EventBox) -> Self { let dev_config = config_handler::get_config().dev.unwrap_or_default(); - let public_id = *new_full_id.public_id(); + let public_id = *details.full_id.public_id(); let mut peer_mgr = PeerManager::new(public_id, dev_config.disable_client_rate_limiter); - peer_mgr.insert_peer(Peer::new(proxy_pub_id, PeerState::Proxy)); + peer_mgr.insert_peer(Peer::new(details.proxy_pub_id, PeerState::Proxy)); - let challenger_count = our_section.1.len(); - let resource_prover = ResourceProver::new(action_sender, timer.clone(), challenger_count); + let challenger_count = details.our_section.1.len(); + let resource_prover = ResourceProver::new( + details.action_sender, + details.timer.clone(), + challenger_count, + ); let mut node = Self { - crust_service, ack_mgr: AckManager::new(), - old_full_id, - full_id: new_full_id, + cache: details.cache, + crust_service: details.crust_service, + full_id: details.full_id, + min_section_size: details.min_section_size, msg_backlog: Vec::new(), - min_section_size, + notified_nodes: Default::default(), peer_mgr, - cache, routing_msg_filter: RoutingMessageFilter::new(), - timer, - resource_prover, + timer: details.timer, disable_resource_proof: dev_config.disable_resource_proof, - joining_prefix: our_section.0, - notified_nodes: Default::default(), + joining_prefix: details.our_section.0, + old_full_id: details.old_full_id, + resource_prover, }; - node.start(our_section.1, &proxy_pub_id, outbox); + node.init(details.our_section.1, &details.proxy_pub_id, outbox); node } /// Called immediately after construction. Sends `ConnectionInfoRequest`s to all members of /// `our_section` to then start the candidate approval process. - fn start( + fn init( &mut self, our_section: BTreeSet, proxy_pub_id: &PublicId, @@ -141,95 +146,26 @@ impl ProvingNode { } } - pub fn into_node(self, gen_pfx_info: GenesisPfxInfo) -> State { - let node = Node::from_proving_node( - self.ack_mgr, - self.cache, - self.crust_service, - self.full_id, - gen_pfx_info, - self.min_section_size, - self.msg_backlog.into_iter().collect(), - self.notified_nodes, - self.peer_mgr, - self.routing_msg_filter, - self.timer, - ); - - State::Node(node) - } - - fn handle_direct_message( - &mut self, - msg: DirectMessage, - pub_id: PublicId, - _outbox: &mut EventBox, - ) -> Result<(), RoutingError> { - self.check_direct_message_sender(&msg, &pub_id)?; - - use crate::messages::DirectMessage::*; - match msg { - ResourceProof { - seed, - target_size, - difficulty, - } => { - let log_ident = format!("{}", self); - self.resource_prover.handle_request( - pub_id, - seed, - target_size, - difficulty, - log_ident, - ); - } - ResourceProofResponseReceipt => { - if let Some(msg) = self.resource_prover.handle_receipt(pub_id) { - self.send_direct_message(pub_id, msg); - } - } - _ => { - debug!("{} Unhandled direct message: {:?}", self, msg); - } - } - - Ok(()) - } - - /// Returns `Ok` if the peer's state indicates it's allowed to send the given message type. - fn check_direct_message_sender( - &self, - msg: &DirectMessage, - pub_id: &PublicId, - ) -> Result<(), RoutingError> { - match self.peer_mgr.get_peer(pub_id).map(Peer::state) { - Some(&PeerState::Connected) | Some(&PeerState::Proxy) => Ok(()), - _ => { - debug!( - "{} Illegitimate direct message {:?} from {:?}.", - self, msg, pub_id - ); - Err(RoutingError::InvalidStateForOperation) - } - } - } - - fn handle_hop_message( - &mut self, - hop_msg: HopMessage, - pub_id: PublicId, + pub fn into_establishing_node( + self, + gen_pfx_info: GenesisPfxInfo, outbox: &mut EventBox, - ) -> Result { - match self.peer_mgr.get_peer(&pub_id).map(Peer::state) { - Some(&PeerState::Connected) | Some(&PeerState::Proxy) => (), - _ => return Err(RoutingError::UnknownConnection(pub_id)), - } + ) -> Result { + let details = EstablishingNodeDetails { + ack_mgr: self.ack_mgr, + cache: self.cache, + crust_service: self.crust_service, + full_id: self.full_id, + gen_pfx_info, + min_section_size: self.min_section_size, + msg_backlog: self.msg_backlog, + notified_nodes: self.notified_nodes, + peer_mgr: self.peer_mgr, + routing_msg_filter: self.routing_msg_filter, + timer: self.timer, + }; - if let Some(routing_msg) = self.filter_hop_message(hop_msg, pub_id)? { - self.dispatch_routing_message(routing_msg, outbox) - } else { - Ok(Transition::Stay) - } + EstablishingNode::from_proving_node(details, outbox).map(State::EstablishingNode) } fn dispatch_routing_message( @@ -238,81 +174,16 @@ impl ProvingNode { outbox: &mut EventBox, ) -> Result { use crate::{messages::MessageContent::*, routing_table::Authority::*}; - - let src_name = msg.src.name(); - match msg { - RoutingMessage { - content: - ConnectionInfoRequest { - encrypted_conn_info, - pub_id, - msg_id, - }, - src: ManagedNode(_), - dst: ManagedNode(_), - } => { - if self.joining_prefix.matches(&src_name) { - self.handle_connection_info_request( - encrypted_conn_info, - pub_id, - msg_id, - msg.src, - msg.dst, - outbox, - )? - } else { - self.add_message_to_backlog(RoutingMessage { - content: ConnectionInfoRequest { - encrypted_conn_info, - pub_id, - msg_id, - }, - ..msg - }) - } - } - RoutingMessage { - content: - ConnectionInfoResponse { - encrypted_conn_info, - pub_id, - msg_id, - }, - src: ManagedNode(src_name), - dst: Client { .. }, - } => self.handle_connection_info_response( - encrypted_conn_info, - pub_id, - msg_id, - src_name, - msg.dst, - )?, RoutingMessage { content: NodeApproval(gen_info), src: PrefixSection(_), dst: Client { .. }, - } => return Ok(self.handle_node_approval(gen_info)), - RoutingMessage { - content: Ack(ack, _), - .. - } => self.handle_ack_response(ack), - _ => { - self.add_message_to_backlog(msg); - } + } => Ok(self.handle_node_approval(gen_info)), + _ => self + .handle_routing_message(msg, outbox) + .map(|()| Transition::Stay), } - - Ok(Transition::Stay) - } - - // Backlog the message to be processed once we are approved. - fn add_message_to_backlog(&mut self, msg: RoutingMessage) { - trace!( - "{} Not approved yet. Delaying message handling: {:?}", - self, - msg - ); - self.msg_backlog.push(msg); } fn handle_node_approval(&mut self, gen_pfx_info: GenesisPfxInfo) -> Transition { @@ -323,11 +194,7 @@ impl ProvingNode { self ); - Transition::IntoNode { gen_pfx_info } - } - - fn handle_ack_response(&mut self, ack: Ack) { - self.ack_mgr.receive(ack) + Transition::IntoEstablishingNode { gen_pfx_info } } fn dropped_peer(&mut self, pub_id: &PublicId) -> bool { @@ -342,6 +209,11 @@ impl ProvingNode { true } } + + #[cfg(feature = "mock_base")] + pub fn get_timed_out_tokens(&mut self) -> Vec { + self.timer.get_timed_out_tokens() + } } impl Base for ProvingNode { @@ -361,6 +233,10 @@ impl Base for ProvingNode { } } + fn min_section_size(&self) -> usize { + self.min_section_size + } + fn handle_timeout(&mut self, token: u64, outbox: &mut EventBox) -> Transition { let log_ident = format!("{}", self); if let Some(transition) = self @@ -386,13 +262,7 @@ impl Base for ProvingNode { } fn handle_connect_failure(&mut self, pub_id: PublicId, _: &mut EventBox) -> Transition { - if let Some(&PeerState::CrustConnecting) = self.peer_mgr.get_peer(&pub_id).map(Peer::state) - { - debug!("{} Failed to connect to peer {:?}.", self, pub_id); - } - - let _ = self.dropped_peer(&pub_id); - Transition::Stay + RelocatedNotEstablished::handle_connect_failure(self, pub_id) } fn handle_lost_peer(&mut self, pub_id: PublicId, outbox: &mut EventBox) -> Transition { @@ -414,32 +284,60 @@ impl Base for ProvingNode { Relocated::handle_connection_info_prepared(self, result_token, result) } - fn handle_new_message( + fn handle_direct_message( &mut self, + msg: DirectMessage, pub_id: PublicId, - bytes: CrustBytes, - outbox: &mut EventBox, - ) -> Transition { - let result = match from_crust_bytes(bytes) { - Ok(Message::Direct(msg)) => self - .handle_direct_message(msg, pub_id, outbox) - .map(|_| Transition::Stay), - Ok(Message::Hop(msg)) => self.handle_hop_message(msg, pub_id, outbox), - Err(error) => Err(error), - }; + _outbox: &mut EventBox, + ) -> Result { + self.check_direct_message_sender(&msg, &pub_id)?; - match result { - Ok(transition) => transition, - Err(RoutingError::FilterCheckFailed) => Transition::Stay, - Err(err) => { - debug!("{} - {:?}", self, err); - Transition::Stay + use crate::messages::DirectMessage::*; + match msg { + ResourceProof { + seed, + target_size, + difficulty, + } => { + let log_ident = format!("{}", self); + self.resource_prover.handle_request( + pub_id, + seed, + target_size, + difficulty, + log_ident, + ); + } + ResourceProofResponseReceipt => { + if let Some(msg) = self.resource_prover.handle_receipt(pub_id) { + self.send_direct_message(pub_id, msg); + } + } + BootstrapRequest(_) => self.handle_bootstrap_request(pub_id), + _ => { + debug!("{} Unhandled direct message: {:?}", self, msg); } } + + Ok(Transition::Stay) } - fn min_section_size(&self) -> usize { - self.min_section_size + fn handle_hop_message( + &mut self, + hop_msg: HopMessage, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result { + match self.peer_mgr.get_peer(&pub_id).map(Peer::state) { + Some(&PeerState::Connected) | Some(&PeerState::Proxy) => (), + _ => return Err(RoutingError::UnknownConnection(pub_id)), + } + + if let Some(routing_msg) = self.filter_hop_message(hop_msg, pub_id)? { + self.dispatch_routing_message(routing_msg, outbox) + } else { + Ok(Transition::Stay) + } } } @@ -472,7 +370,11 @@ impl Bootstrapped for ProvingNode { } impl Relocated for ProvingNode { - fn peer_mgr(&mut self) -> &mut PeerManager { + fn peer_mgr(&self) -> &PeerManager { + &self.peer_mgr + } + + fn peer_mgr_mut(&mut self) -> &mut PeerManager { &mut self.peer_mgr } @@ -531,26 +433,27 @@ impl Relocated for ProvingNode { fn add_to_notified_nodes(&mut self, pub_id: PublicId) -> bool { self.notified_nodes.insert(pub_id) } + + fn remove_from_notified_nodes(&mut self, pub_id: &PublicId) -> bool { + self.notified_nodes.remove(pub_id) + } } -impl Unapproved for ProvingNode { +impl BootstrappedNotEstablished for ProvingNode { const SEND_ACK: bool = true; fn get_proxy_public_id(&self, proxy_name: &XorName) -> Result<&PublicId, RoutingError> { - if let Some(pub_id) = self.peer_mgr.get_peer_by_name(proxy_name).map(Peer::pub_id) { - if self.peer_mgr.is_connected(pub_id) { - Ok(pub_id) - } else { - error!( - "{} Unable to find connection to proxy in PeerManager.", - self - ); - Err(RoutingError::ProxyConnectionNotFound) - } - } else { - error!("{} Unable to find proxy in PeerManager.", self); - Err(RoutingError::ProxyConnectionNotFound) - } + proxied::find_proxy_public_id(self, &self.peer_mgr, proxy_name) + } +} + +impl RelocatedNotEstablished for ProvingNode { + fn our_prefix(&self) -> &Prefix { + &self.joining_prefix + } + + fn push_message_to_backlog(&mut self, msg: RoutingMessage) { + self.msg_backlog.push(msg) } } diff --git a/src/states/relocating_node.rs b/src/states/relocating_node.rs index 6d4e03cc84..a670b7be10 100644 --- a/src/states/relocating_node.rs +++ b/src/states/relocating_node.rs @@ -7,17 +7,17 @@ // permissions and limitations relating to use of the SAFE Network Software. use super::{ - common::{from_crust_bytes, unrelocated, Base, Bootstrapped, Unapproved}, - Bootstrapping, BootstrappingTargetState, + common::{proxied, Base, Bootstrapped, BootstrappedNotEstablished}, + BootstrappingPeer, TargetState, }; use crate::{ ack_manager::{Ack, AckManager}, cache::Cache, chain::SectionInfo, - error::{Result, RoutingError}, + error::RoutingError, event::Event, id::{FullId, PublicId}, - messages::{HopMessage, Message, MessageContent, RoutingMessage}, + messages::{DirectMessage, HopMessage, MessageContent, RoutingMessage}, outbox::EventBox, resource_prover::RESOURCE_PROOF_DURATION, routing_message_filter::RoutingMessageFilter, @@ -27,7 +27,7 @@ use crate::{ timer::Timer, types::{MessageId, RoutingActionSender}, xor_name::XorName, - CrustBytes, CrustEvent, CrustEventSender, Service, + CrustEvent, CrustEventSender, Service, }; use log::LogLevel; use std::{ @@ -39,6 +39,16 @@ use std::{ /// Total time to wait for `RelocateResponse`. const RELOCATE_TIMEOUT: Duration = Duration::from_secs(60 + RESOURCE_PROOF_DURATION.as_secs()); +pub struct RelocatingNodeDetails { + pub action_sender: RoutingActionSender, + pub cache: Box, + pub crust_service: Service, + pub full_id: FullId, + pub min_section_size: usize, + pub proxy_pub_id: PublicId, + pub timer: Timer, +} + pub struct RelocatingNode { action_sender: RoutingActionSender, ack_mgr: AckManager, @@ -56,36 +66,30 @@ pub struct RelocatingNode { } impl RelocatingNode { - #[allow(clippy::too_many_arguments)] - pub fn from_bootstrapping( - action_sender: RoutingActionSender, - cache: Box, - crust_service: Service, - full_id: FullId, - min_section_size: usize, - proxy_pub_id: PublicId, - timer: Timer, - ) -> Option { - let relocation_timer_token = timer.schedule(RELOCATE_TIMEOUT); + pub fn from_bootstrapping(details: RelocatingNodeDetails) -> Result { + let relocation_timer_token = details.timer.schedule(RELOCATE_TIMEOUT); let mut node = Self { - action_sender: action_sender, + action_sender: details.action_sender, ack_mgr: AckManager::new(), - crust_service: crust_service, - full_id: full_id, - cache: cache, - min_section_size: min_section_size, - proxy_pub_id: proxy_pub_id, + crust_service: details.crust_service, + full_id: details.full_id, + cache: details.cache, + min_section_size: details.min_section_size, + proxy_pub_id: details.proxy_pub_id, routing_msg_filter: RoutingMessageFilter::new(), - relocation_timer_token: relocation_timer_token, - timer: timer, + relocation_timer_token, + timer: details.timer, }; - if let Err(error) = node.relocate() { - error!("{} Failed to start relocation: {:?}", node, error); - None - } else { - debug!("{} State changed to RelocatingNode.", node); - Some(node) + match node.relocate() { + Ok(()) => { + debug!("{} State changed to RelocatingNode.", node); + Ok(node) + } + Err(error) => { + error!("{} Failed to start relocation: {:?}", node, error); + Err(error) + } } } @@ -96,18 +100,19 @@ impl RelocatingNode { new_full_id: FullId, our_section: (Prefix, BTreeSet), outbox: &mut EventBox, - ) -> State { + ) -> Result { let service = Self::start_new_crust_service( self.crust_service, *new_full_id.public_id(), crust_rx, crust_sender, ); - let target_state = BootstrappingTargetState::ProvingNode { + let target_state = TargetState::ProvingNode { old_full_id: self.full_id, our_section: our_section, }; - if let Some(bootstrapping) = Bootstrapping::new( + + match BootstrappingPeer::new( self.action_sender, self.cache, target_state, @@ -116,10 +121,11 @@ impl RelocatingNode { self.min_section_size, self.timer, ) { - State::Bootstrapping(bootstrapping) - } else { - outbox.send_event(Event::RestartRequired); - State::Terminated + Ok(peer) => Ok(State::BootstrappingPeer(peer)), + Err(error) => { + outbox.send_event(Event::RestartRequired); + Err(error) + } } } @@ -153,18 +159,6 @@ impl RelocatingNode { old_crust_service } - fn handle_hop_message(&mut self, hop_msg: HopMessage, pub_id: PublicId) -> Result { - if self.proxy_pub_id != pub_id { - return Err(RoutingError::UnknownConnection(pub_id)); - } - - if let Some(routing_msg) = self.filter_hop_message(hop_msg, pub_id)? { - Ok(self.dispatch_routing_message(routing_msg)) - } else { - Ok(Transition::Stay) - } - } - fn dispatch_routing_message(&mut self, routing_msg: RoutingMessage) -> Transition { use crate::messages::MessageContent::*; match routing_msg.content { @@ -194,7 +188,7 @@ impl RelocatingNode { Transition::Stay } - fn relocate(&mut self) -> Result<()> { + fn relocate(&mut self) -> Result<(), RoutingError> { let request_content = MessageContent::Relocate { message_id: MessageId::new(), }; @@ -260,6 +254,10 @@ impl Base for RelocatingNode { } } + fn min_section_size(&self) -> usize { + self.min_section_size + } + fn handle_timeout(&mut self, token: u64, outbox: &mut EventBox) -> Transition { if self.relocation_timer_token == token { info!( @@ -285,33 +283,31 @@ impl Base for RelocatingNode { } } - fn handle_new_message( + fn handle_direct_message( &mut self, - pub_id: PublicId, - bytes: CrustBytes, + msg: DirectMessage, + _: PublicId, _: &mut EventBox, - ) -> Transition { - let result = match from_crust_bytes(bytes) { - Ok(Message::Hop(hop_msg)) => self.handle_hop_message(hop_msg, pub_id), - Ok(message) => { - debug!("{} - Unhandled new message: {:?}", self, message); - Ok(Transition::Stay) - } - Err(error) => Err(error), - }; + ) -> Result { + debug!("{} - Unhandled direct message: {:?}", self, msg); + Ok(Transition::Stay) + } - match result { - Ok(transition) => transition, - Err(RoutingError::FilterCheckFailed) => Transition::Stay, - Err(error) => { - debug!("{} - {:?}", self, error); - Transition::Stay - } + fn handle_hop_message( + &mut self, + hop_msg: HopMessage, + pub_id: PublicId, + _: &mut EventBox, + ) -> Result { + if self.proxy_pub_id != pub_id { + return Err(RoutingError::UnknownConnection(pub_id)); } - } - fn min_section_size(&self) -> usize { - self.min_section_size + if let Some(routing_msg) = self.filter_hop_message(hop_msg, pub_id)? { + Ok(self.dispatch_routing_message(routing_msg)) + } else { + Ok(Transition::Stay) + } } } @@ -333,7 +329,7 @@ impl Bootstrapped for RelocatingNode { src_section: Option, route: u8, expires_at: Option, - ) -> Result<()> { + ) -> Result<(), RoutingError> { self.send_routing_message_via_proxy(routing_msg, src_section, route, expires_at) } @@ -346,11 +342,11 @@ impl Bootstrapped for RelocatingNode { } } -impl Unapproved for RelocatingNode { +impl BootstrappedNotEstablished for RelocatingNode { const SEND_ACK: bool = true; - fn get_proxy_public_id(&self, proxy_name: &XorName) -> Result<&PublicId> { - unrelocated::get_proxy_public_id(self, &self.proxy_pub_id, proxy_name) + fn get_proxy_public_id(&self, proxy_name: &XorName) -> Result<&PublicId, RoutingError> { + proxied::get_proxy_public_id(self, &self.proxy_pub_id, proxy_name) } } diff --git a/tests/mock_crust/drop.rs b/tests/mock_crust/drop.rs index 7197105e24..33d56afa43 100644 --- a/tests/mock_crust/drop.rs +++ b/tests/mock_crust/drop.rs @@ -9,8 +9,8 @@ use super::{ create_connected_nodes, poll_all, poll_and_resend, verify_invariant_for_all_nodes, TestNode, }; -use routing::mock_crust::Network; -use routing::{Event, EventStream}; +use rand::Rng; +use routing::{mock_crust::Network, Event, EventStream}; // Drop node at index and verify its own section receives NodeLost. fn drop_node(nodes: &mut Vec, index: usize) { @@ -20,6 +20,8 @@ fn drop_node(nodes: &mut Vec, index: usize) { drop(node); + // Using poll_all instead of poll_and_resend here to specifically only detect + // the NodeLost event getting triggered by the remaining nodes. let _ = poll_all(nodes, &mut []); for node in nodes.iter_mut().filter(|n| close_names.contains(&n.name())) { @@ -39,8 +41,10 @@ fn node_drops() { let network = Network::new(min_section_size, None); let mut nodes = create_connected_nodes(&network, min_section_size + 2); drop_node(&mut nodes, 0); - poll_and_resend(&mut nodes, &mut []); + // Trigger poll_and_resend to allow remaining nodes to gossip and + // update their chain accordingly. + poll_and_resend(&mut nodes, &mut []); verify_invariant_for_all_nodes(&mut nodes); } @@ -50,14 +54,14 @@ fn node_restart() { // (with the exception of the first node which is special). let min_section_size = 5; let network = Network::new(min_section_size, None); + let mut rng = network.new_rng(); let mut nodes = create_connected_nodes(&network, min_section_size); - // Drop all but last node: + // Drop all but last node in random order: while nodes.len() > 1 { - drop_node(&mut nodes, 0); + let index = rng.gen_range(0, nodes.len()); + drop_node(&mut nodes, index); } - let _ = poll_all(&mut nodes, &mut []); - expect_next_event!(nodes[0], Event::RestartRequired); } diff --git a/tests/mock_crust/utils.rs b/tests/mock_crust/utils.rs index 77def2c6a8..e57e759a73 100644 --- a/tests/mock_crust/utils.rs +++ b/tests/mock_crust/utils.rs @@ -367,9 +367,7 @@ pub fn poll_and_resend(nodes: &mut [TestNode], clients: &mut [TestClient]) { // after MAX_POLL_CALLS / 2 only filter for opaque events // to avoid stalling the test due to lack of parsec voters. node.inner.has_unacked_msg() - || node - .inner - .has_unconsensused_observations(i > MAX_POLL_CALLS / 2) + || node.inner.has_unpolled_observations(i > MAX_POLL_CALLS / 2) }; let client_busy = |client: &TestClient| client.inner.has_unacked_msg(); if poll_all(nodes, clients)