From ca957594f197fe84c84c72b424c90b7b2376a298 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Cig=C3=A1nek?= Date: Thu, 25 Apr 2019 11:32:32 +0200 Subject: [PATCH] Extract EstablishingNode from Node --- src/parsec.rs | 35 ++- src/peer_manager.rs | 18 -- src/state_machine.rs | 35 ++- src/states/establishing_node.rs | 463 ++++++++++++++++++++++++++++++++ src/states/mod.rs | 24 ++ src/states/node.rs | 335 +++++++---------------- src/states/proving_node.rs | 14 +- 7 files changed, 652 insertions(+), 272 deletions(-) create mode 100644 src/states/establishing_node.rs diff --git a/src/parsec.rs b/src/parsec.rs index a38baf418a..4fe60cdd3b 100644 --- a/src/parsec.rs +++ b/src/parsec.rs @@ -35,10 +35,9 @@ pub struct ParsecMap { } impl ParsecMap { - pub fn new(full_id: FullId, gen_pfx_info: &GenesisPfxInfo) -> Self { - let parsec = create(full_id, gen_pfx_info); + pub fn new(version: u64, parsec: Parsec) -> Self { let mut map = BTreeMap::new(); - let _ = map.insert(*gen_pfx_info.first_info.version(), parsec); + let _ = map.insert(version, parsec); Self { map } } @@ -68,7 +67,7 @@ impl ParsecMap { let response = parsec .handle_request(&pub_id, request) - .map(|response| Message::Direct(DirectMessage::ParsecResponse(msg_version, response))) + .map(|response| response.into_message(msg_version)) .map_err(|err| { debug!("{} - Error handling parsec request: {:?}", log_ident, err); err @@ -100,11 +99,13 @@ impl ParsecMap { } pub fn create_gossip(&mut self, version: u64, target: &id::PublicId) -> Option { - let parsec = self.map.get_mut(&version)?; - parsec - .create_gossip(target) - .ok() - .map(|request| Message::Direct(DirectMessage::ParsecRequest(version, request))) + Some( + self.map + .get_mut(&version)? + .create_gossip(target) + .ok()? + .into_message(version), + ) } pub fn vote_for(&mut self, event: chain::NetworkEvent, log_ident: &str) { @@ -167,6 +168,22 @@ impl ParsecMap { } } +pub trait GossipMessage { + fn into_message(self, version: u64) -> Message; +} + +impl GossipMessage for Request { + fn into_message(self, version: u64) -> Message { + Message::Direct(DirectMessage::ParsecRequest(version, self)) + } +} + +impl GossipMessage for Response { + fn into_message(self, version: u64) -> Message { + Message::Direct(DirectMessage::ParsecResponse(version, self)) + } +} + /// Create Parsec instance. pub fn create(full_id: FullId, gen_pfx_info: &GenesisPfxInfo) -> Parsec { if gen_pfx_info diff --git a/src/peer_manager.rs b/src/peer_manager.rs index 8ab473ea65..82e788ecf6 100644 --- a/src/peer_manager.rs +++ b/src/peer_manager.rs @@ -318,7 +318,6 @@ pub struct PeerManager { our_public_id: PublicId, candidate: Candidate, disable_client_rate_limiter: bool, - established: bool, } impl PeerManager { @@ -330,7 +329,6 @@ impl PeerManager { our_public_id: our_public_id, candidate: Candidate::None, disable_client_rate_limiter: disable_client_rate_limiter, - established: false, } } @@ -653,12 +651,7 @@ impl PeerManager { } /// Remove and return `PublicId`s of expired peers. - /// Will only be active once we are established. pub fn remove_expired_peers(&mut self) -> Vec { - if !self.established { - return vec![]; - } - let remove_candidate = if self.candidate.is_expired() { match self.candidate { Candidate::None => None, @@ -1001,17 +994,6 @@ impl PeerManager { self.peers.remove(pub_id).is_some() || remove_candidate } - - /// Sets this peer as established. - /// Expired peers will be purged once established. - pub fn set_established(&mut self) { - self.established = true; - } - - /// Returns whether this peer is established. - pub fn is_established(&self) -> bool { - self.established - } } impl fmt::Display for PeerManager { diff --git a/src/state_machine.rs b/src/state_machine.rs index c6910b0d07..0cbfae74ee 100644 --- a/src/state_machine.rs +++ b/src/state_machine.rs @@ -8,12 +8,12 @@ use crate::{ action::Action, - chain::GenesisPfxInfo, + chain::{GenesisPfxInfo, SectionInfo}, id::{FullId, PublicId}, outbox::EventBox, routing_table::Prefix, states::common::Base, - states::{Bootstrapping, Client, Node, ProvingNode, RelocatingNode}, + states::{Bootstrapping, Client, EstablishingNode, Node, ProvingNode, RelocatingNode}, timer::Timer, types::RoutingActionSender, xor_name::XorName, @@ -41,6 +41,7 @@ macro_rules! state_dispatch { State::Client($state) => $expr, State::RelocatingNode($state) => $expr, State::ProvingNode($state) => $expr, + State::EstablishingNode($state) => $expr, State::Node($state) => $expr, State::Terminated => $term_expr, } @@ -67,6 +68,7 @@ pub enum State { Client(Client), RelocatingNode(RelocatingNode), ProvingNode(ProvingNode), + EstablishingNode(EstablishingNode), Node(Node), Terminated, } @@ -123,6 +125,7 @@ impl State { #[cfg(feature = "mock")] fn chain(&self) -> Option<&Chain> { match *self { + State::EstablishingNode(ref state) => Some(state.chain()), State::Node(ref state) => Some(state.chain()), _ => None, } @@ -188,6 +191,7 @@ impl State { State::Client(ref mut state) => state.get_timed_out_tokens(), State::RelocatingNode(ref mut state) => state.get_timed_out_tokens(), State::ProvingNode(ref mut state) => state.get_timed_out_tokens(), + State::EstablishingNode(ref mut state) => state.get_timed_out_tokens(), State::Node(ref mut state) => state.get_timed_out_tokens(), } } @@ -207,6 +211,7 @@ impl State { | State::Client(_) | State::RelocatingNode(_) | State::ProvingNode(_) => false, + State::EstablishingNode(ref state) => state.has_unconsensused_observations(), State::Node(ref state) => state.has_unconsensused_observations(), } } @@ -225,6 +230,7 @@ impl State { State::Client(ref state) => state.in_authority(auth), State::RelocatingNode(ref state) => state.in_authority(auth), State::ProvingNode(ref state) => state.in_authority(auth), + State::EstablishingNode(ref state) => state.in_authority(auth), State::Node(ref state) => state.in_authority(auth), } } @@ -235,6 +241,7 @@ impl State { State::Client(ref state) => state.ack_mgr().has_unacked_msg(), State::RelocatingNode(ref state) => state.ack_mgr().has_unacked_msg(), State::ProvingNode(ref state) => state.ack_mgr().has_unacked_msg(), + State::EstablishingNode(ref state) => state.ack_mgr().has_unacked_msg(), State::Node(ref state) => state.ack_mgr().has_unacked_msg(), } } @@ -254,10 +261,15 @@ pub enum Transition { new_id: FullId, our_section: (Prefix, BTreeSet), }, - // `ProvingNode` state transitioning to `Node`. - IntoNode { + // `ProvingNode` state transitioning to `EstablishingNode`. + IntoEstablishingNode { gen_pfx_info: GenesisPfxInfo, }, + // `EstablishingNode` state transition to `Node`. + IntoNode { + sec_info: SectionInfo, + old_pfx: Prefix, + }, Terminate, } @@ -415,9 +427,20 @@ impl StateMachine { }; self.state = new_state; } - IntoNode { gen_pfx_info } => { + IntoEstablishingNode { gen_pfx_info } => { + let new_state = match mem::replace(&mut self.state, State::Terminated) { + State::ProvingNode(proving_node) => { + proving_node.into_establishing_node(gen_pfx_info) + } + _ => unreachable!(), + }; + self.state = new_state + } + IntoNode { sec_info, old_pfx } => { let new_state = match mem::replace(&mut self.state, State::Terminated) { - State::ProvingNode(proving_node) => proving_node.into_node(gen_pfx_info), + State::EstablishingNode(establishing_node) => { + establishing_node.into_node(sec_info, old_pfx, outbox) + } _ => unreachable!(), }; self.state = new_state diff --git a/src/states/establishing_node.rs b/src/states/establishing_node.rs new file mode 100644 index 0000000000..c9c2aaeb94 --- /dev/null +++ b/src/states/establishing_node.rs @@ -0,0 +1,463 @@ +// Copyright 2019 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use super::{ + common::{ + proxied, Approved, Base, Bootstrapped, NotEstablished, Relocated, RelocatedNotEstablished, + }, + node::Node, +}; +use crate::{ + ack_manager::AckManager, + cache::Cache, + chain::{Chain, GenesisPfxInfo, ProvingSection, SectionInfo}, + error::RoutingError, + id::{FullId, PublicId}, + messages::{DirectMessage, HopMessage, Message, RoutingMessage}, + outbox::EventBox, + parsec::{self, Block, GossipMessage, Parsec}, + peer_manager::{Peer, PeerManager, PeerState}, + routing_message_filter::RoutingMessageFilter, + routing_table::{Authority, Prefix}, + state_machine::{State, Transition}, + time::{Duration, Instant}, + timer::Timer, + xor_name::XorName, + Service, +}; +use itertools::Itertools; +use std::{ + collections::BTreeSet, + fmt::{self, Display, Formatter}, +}; + +const POKE_TIMEOUT: Duration = Duration::from_secs(60); + +pub struct EstablishingNode { + ack_mgr: AckManager, + cache: Box, + chain: Chain, + crust_service: Service, + full_id: FullId, + gen_pfx_info: GenesisPfxInfo, + /// Routing messages addressed to us that we cannot handle until we are established. + msg_backlog: Vec, + notified_nodes: BTreeSet, + parsec: Parsec, + peer_mgr: PeerManager, + poke_timer_token: u64, + routing_msg_filter: RoutingMessageFilter, + timer: Timer, +} + +impl EstablishingNode { + #[allow(clippy::too_many_arguments)] + pub fn from_proving_node( + ack_mgr: AckManager, + cache: Box, + crust_service: Service, + full_id: FullId, + gen_pfx_info: GenesisPfxInfo, + min_section_size: usize, + msg_backlog: Vec, + notified_nodes: BTreeSet, + peer_mgr: PeerManager, + routing_msg_filter: RoutingMessageFilter, + timer: Timer, + ) -> Self { + let public_id = *full_id.public_id(); + let poke_timer_token = timer.schedule(POKE_TIMEOUT); + + let parsec = parsec::create(full_id.clone(), &gen_pfx_info); + let chain = Chain::new(min_section_size, public_id, gen_pfx_info.clone()); + + let node = Self { + ack_mgr, + cache, + chain, + crust_service, + full_id, + gen_pfx_info, + msg_backlog, + notified_nodes, + parsec, + peer_mgr, + poke_timer_token, + routing_msg_filter, + timer, + }; + + debug!("{} - State changed to EstablishingNode.", node); + node + } + + pub fn into_node( + self, + sec_info: SectionInfo, + old_pfx: Prefix, + outbox: &mut EventBox, + ) -> State { + let node = Node::from_establishing_node( + self.ack_mgr, + self.cache, + self.chain, + self.crust_service, + self.full_id, + self.gen_pfx_info, + self.msg_backlog.into_iter().collect(), + self.notified_nodes, + old_pfx, + self.parsec, + self.peer_mgr, + self.routing_msg_filter, + sec_info, + self.timer, + outbox, + ); + + match node { + Ok(node) => State::Node(node), + Err(_) => State::Terminated, + } + } + + #[cfg(feature = "mock")] + pub fn chain(&self) -> &Chain { + &self.chain + } + + fn dispatch_routing_message( + &mut self, + msg: RoutingMessage, + outbox: &mut EventBox, + ) -> Result { + self.handle_routing_message(msg, outbox) + .map(|()| Transition::Stay) + } + + fn handle_parsec_request( + &mut self, + msg_version: u64, + par_request: parsec::Request, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result { + if !self.check_parsec_version(msg_version) { + return Ok(Transition::Stay); + } + + match self.parsec.handle_request(&pub_id, par_request) { + Ok(response) => self.send_message(&pub_id, response.into_message(msg_version)), + Err(err) => debug!("{} - Error handling Parsec request: {:?}", self, err), + } + + self.parsec_poll_all(outbox) + } + + fn handle_parsec_response( + &mut self, + msg_version: u64, + par_response: parsec::Response, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result { + if !self.check_parsec_version(msg_version) { + return Ok(Transition::Stay); + } + + match self.parsec.handle_response(&pub_id, par_response) { + Ok(()) => (), + Err(err) => debug!("{} - Error handling Parsec response: {:?}", self, err), + } + + self.parsec_poll_all(outbox) + } + + fn check_parsec_version(&self, version: u64) -> bool { + if version == *self.gen_pfx_info.first_info.version() { + true + } else { + debug!( + "{} - Unexpected Parsec message version: {} instead of {}", + self, + version, + self.gen_pfx_info.first_info.version() + ); + false + } + } + + // Sends a `ParsecPoke` message to trigger a gossip request from current section members to us. + // + // TODO: Should restrict targets to few(counter churn-threshold)/single. + // Currently this can result in incoming spam of gossip history from everyone. + // Can also just be a single target once node-ageing makes Offline votes Opaque which should + // remove invalid test failures for unaccumulated parsec::Remove blocks. + fn send_parsec_poke(&mut self) { + let version = *self.gen_pfx_info.first_info.version(); + let recipients = self + .gen_pfx_info + .latest_info + .members() + .iter() + .cloned() + .collect_vec(); + + for recipient in recipients { + self.send_message( + &recipient, + Message::Direct(DirectMessage::ParsecPoke(version)), + ); + } + } +} + +#[cfg(feature = "mock")] +impl EstablishingNode { + pub fn get_timed_out_tokens(&mut self) -> Vec { + self.timer.get_timed_out_tokens() + } + + pub fn has_unconsensused_observations(&self) -> bool { + self.parsec.has_unconsensused_observations() + } +} + +impl Base for EstablishingNode { + fn crust_service(&self) -> &Service { + &self.crust_service + } + + fn full_id(&self) -> &FullId { + &self.full_id + } + + fn in_authority(&self, auth: &Authority) -> bool { + if let Authority::Client { ref client_id, .. } = *auth { + client_id == self.full_id.public_id() + } else { + false + } + } + + fn min_section_size(&self) -> usize { + self.chain.min_sec_size() + } + + fn handle_timeout(&mut self, token: u64, _: &mut EventBox) -> Transition { + if self.poke_timer_token == token { + self.send_parsec_poke(); + self.poke_timer_token = self.timer.schedule(POKE_TIMEOUT); + } else { + self.resend_unacknowledged_timed_out_msgs(token); + } + + Transition::Stay + } + + fn handle_connect_success(&mut self, pub_id: PublicId, outbox: &mut EventBox) -> Transition { + Relocated::handle_connect_success(self, pub_id, outbox) + } + + fn handle_connect_failure(&mut self, pub_id: PublicId, _: &mut EventBox) -> Transition { + RelocatedNotEstablished::handle_connect_failure(self, pub_id) + } + + fn handle_direct_message( + &mut self, + msg: DirectMessage, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result { + self.check_direct_message_sender(&msg, &pub_id)?; + + use crate::messages::DirectMessage::*; + match msg { + ParsecRequest(version, par_request) => { + self.handle_parsec_request(version, par_request, pub_id, outbox) + } + ParsecResponse(version, par_response) => { + self.handle_parsec_response(version, par_response, pub_id, outbox) + } + _ => { + debug!("{} Unhandled direct message: {:?}", self, msg); + Ok(Transition::Stay) + } + } + } + + fn handle_hop_message( + &mut self, + hop_msg: HopMessage, + pub_id: PublicId, + outbox: &mut EventBox, + ) -> Result { + match self.peer_mgr.get_peer(&pub_id).map(Peer::state) { + Some(&PeerState::Connected) | Some(&PeerState::Proxy) => (), + _ => return Err(RoutingError::UnknownConnection(pub_id)), + } + + if let Some(routing_msg) = self.filter_hop_message(hop_msg, pub_id)? { + self.dispatch_routing_message(routing_msg, outbox) + } else { + Ok(Transition::Stay) + } + } +} + +impl Bootstrapped for EstablishingNode { + fn ack_mgr(&self) -> &AckManager { + &self.ack_mgr + } + + fn ack_mgr_mut(&mut self) -> &mut AckManager { + &mut self.ack_mgr + } + + fn routing_msg_filter(&mut self) -> &mut RoutingMessageFilter { + &mut self.routing_msg_filter + } + + fn timer(&mut self) -> &mut Timer { + &mut self.timer + } + + fn send_routing_message_via_route( + &mut self, + routing_msg: RoutingMessage, + route: u8, + expires_at: Option, + ) -> Result<(), RoutingError> { + self.send_routing_message_via_proxy(routing_msg, route, expires_at) + } +} + +impl Relocated for EstablishingNode { + fn peer_mgr(&self) -> &PeerManager { + &self.peer_mgr + } + + fn peer_mgr_mut(&mut self) -> &mut PeerManager { + &mut self.peer_mgr + } + + fn process_connection(&mut self, pub_id: PublicId, outbox: &mut EventBox) { + if self.chain.is_peer_valid(&pub_id) { + self.add_to_routing_table(&pub_id, outbox); + } + } + + fn is_peer_valid(&self, _pub_id: &PublicId) -> bool { + true + } + + fn add_to_notified_nodes(&mut self, pub_id: PublicId) -> bool { + self.notified_nodes.insert(pub_id) + } + + fn remove_from_notified_nodes(&mut self, pub_id: &PublicId) -> bool { + self.notified_nodes.remove(pub_id) + } + + fn add_to_routing_table_success(&mut self, _: &PublicId) {} + + fn add_to_routing_table_failure(&mut self, pub_id: &PublicId) { + self.disconnect_peer(pub_id) + } +} + +impl NotEstablished for EstablishingNode { + const SEND_ACK: bool = true; + + fn get_proxy_public_id(&self, proxy_name: &XorName) -> Result<&PublicId, RoutingError> { + proxied::find_proxy_public_id(self, &self.peer_mgr, proxy_name) + } +} + +impl RelocatedNotEstablished for EstablishingNode { + fn our_prefix(&self) -> &Prefix { + self.chain.our_prefix() + } + + fn push_message_to_backlog(&mut self, msg: RoutingMessage) { + self.msg_backlog.push(msg) + } +} + +impl Approved for EstablishingNode { + fn parsec_poll_one(&mut self) -> Option { + self.parsec.poll() + } + + fn chain_mut(&mut self) -> &mut Chain { + &mut self.chain + } + + fn handle_online_event( + &mut self, + new_pub_id: PublicId, + _: Authority, + _: &mut EventBox, + ) -> Result<(), RoutingError> { + let _ = self.chain.add_member(new_pub_id)?; + Ok(()) + } + + fn handle_offline_event( + &mut self, + pub_id: PublicId, + _: &mut EventBox, + ) -> Result<(), RoutingError> { + let _ = self.chain.remove_member(pub_id)?; + Ok(()) + } + + fn handle_section_info_event( + &mut self, + sec_info: SectionInfo, + old_pfx: Prefix, + _: &mut EventBox, + ) -> Result { + if self.chain.is_member() { + Ok(Transition::IntoNode { sec_info, old_pfx }) + } else { + debug!("{} - Unhandled SectionInfo event", self); + Ok(Transition::Stay) + } + } + + fn handle_our_merge_event(&mut self) -> Result<(), RoutingError> { + debug!("{} - Unhandled OurMerge event", self); + Ok(()) + } + + fn handle_neighbour_merge_event(&mut self) -> Result<(), RoutingError> { + debug!("{} - Unhandled NeighbourMerge event", self); + Ok(()) + } + + fn handle_proving_sections_event( + &mut self, + _: Vec, + _: SectionInfo, + ) -> Result<(), RoutingError> { + debug!("{} - Unhandled ProvingSections event", self); + Ok(()) + } +} + +impl Display for EstablishingNode { + fn fmt(&self, formatter: &mut Formatter) -> fmt::Result { + write!( + formatter, + "EstablishingNode({}({:b}))", + self.name(), + self.chain.our_prefix() + ) + } +} diff --git a/src/states/mod.rs b/src/states/mod.rs index ac48612393..0a0d154c9d 100644 --- a/src/states/mod.rs +++ b/src/states/mod.rs @@ -9,6 +9,7 @@ mod bootstrapping; mod client; pub mod common; +mod establishing_node; mod node; mod proving_node; mod relocating_node; @@ -16,6 +17,7 @@ mod relocating_node; pub use self::{ bootstrapping::{Bootstrapping, TargetState as BootstrappingTargetState}, client::{Client, RATE_EXCEED_RETRY}, + establishing_node::EstablishingNode, node::Node, proving_node::ProvingNode, relocating_node::RelocatingNode, @@ -38,8 +40,30 @@ pub use self::{ // │ └────────────────┘ └─────────────┘ // │ │ // │ │ +// │ ▼ +// │ ┌──────────────────┐ +// │ │ EstablishingNode │ +// │ └──────────────────┘ +// │ │ +// │ │ // ▼ ▼ // ┌────────┐ ┌──────┐ // │ Client │ │ Node │ // └────────┘ └──────┘ // +// +// # Common traits +// Bootstrapping +// │ Client +// │ │ RelocatingNode +// │ │ │ ProvingNode +// │ │ │ │ EstablishingNode +// │ │ │ │ │ Node +// │ │ │ │ │ │ +// Base * * * * * * +// Bootstrapped * * * * * +// NotEstablished * * * * +// Relocated * * * +// RelocatedNotEstablished * * +// Approved * * +// diff --git a/src/states/node.rs b/src/states/node.rs index 3c7185a555..7062c27c14 100644 --- a/src/states/node.rs +++ b/src/states/node.rs @@ -24,7 +24,7 @@ use crate::{ UserMessage, UserMessageCache, DEFAULT_PRIORITY, MAX_PARTS, MAX_PART_LEN, }, outbox::EventBox, - parsec::{self, Block, ParsecMap}, + parsec::{self, Block, Parsec, ParsecMap}, peer_manager::{Peer, PeerManager, PeerState}, rate_limiter::RateLimiter, resource_prover::RESOURCE_PROOF_DURATION, @@ -59,7 +59,6 @@ use std::{ /// Time after which a `Tick` event is sent. const TICK_TIMEOUT: Duration = Duration::from_secs(15); -const POKE_TIMEOUT: Duration = Duration::from_secs(60); const GOSSIP_TIMEOUT: Duration = Duration::from_secs(2); const RECONNECT_PEER_TIMEOUT: Duration = Duration::from_secs(20); //const MAX_IDLE_ROUNDS: u64 = 100; @@ -96,12 +95,10 @@ pub struct Node { next_relocation_dst: Option, /// Interval used for relocation in mock crust tests. next_relocation_interval: Option<(XorName, XorName)>, - /// `RoutingMessage`s affecting the routing table that arrived before `NodeApproval`. - routing_msg_backlog: Vec, /// The timer token for accepting a new candidate. candidate_timer_token: Option, /// The timer token for displaying the current candidate status. - candidate_status_token: Option, + candidate_status_token: u64, /// Limits the rate at which clients can pass messages through this node when it acts as their /// proxy. clients_rate_limiter: RateLimiter, @@ -118,7 +115,6 @@ pub struct Node { disable_resource_proof: bool, parsec_map: ParsecMap, gen_pfx_info: GenesisPfxInfo, - poke_timer_token: u64, gossip_timer_token: u64, chain: Chain, // Peers we want to try reconnecting to @@ -143,87 +139,93 @@ impl Node { first_info: create_first_section_info(public_id).ok()?, latest_info: SectionInfo::default(), }; + let parsec = parsec::create(full_id.clone(), &gen_pfx_info); + let chain = Chain::new(min_section_size, public_id, gen_pfx_info.clone()); let mut node = Self::new( AckManager::new(), cache, + chain, crust_service, full_id, gen_pfx_info, true, - min_section_size, Default::default(), Default::default(), + parsec, PeerManager::new(public_id, dev_config.disable_client_rate_limiter), RoutingMessageFilter::new(), timer, ); if let Err(error) = node.crust_service.start_listening_tcp() { - error!("{} Failed to start listening: {:?}", node, error); + error!("{} - Failed to start listening: {:?}", node, error); None } else { - debug!("{} State changed to Node.", node); - info!("{} Started a new network as a seed node.", node); + debug!("{} - State changed to Node.", node); + info!("{} - Started a new network as a seed node.", node); Some(node) } } #[allow(clippy::too_many_arguments)] - pub fn from_proving_node( + pub fn from_establishing_node( ack_mgr: AckManager, cache: Box, + chain: Chain, crust_service: Service, full_id: FullId, gen_pfx_info: GenesisPfxInfo, - min_section_size: usize, msg_queue: VecDeque, notified_nodes: BTreeSet, + old_pfx: Prefix, + parsec: Parsec, peer_mgr: PeerManager, routing_msg_filter: RoutingMessageFilter, + sec_info: SectionInfo, timer: Timer, - ) -> Self { - let node = Self::new( + outbox: &mut EventBox, + ) -> Result { + let mut node = Self::new( ack_mgr, cache, + chain, crust_service, full_id, gen_pfx_info, false, - min_section_size, msg_queue, notified_nodes, + parsec, peer_mgr, routing_msg_filter, timer, ); - - debug!("{} State changed to Node.", node); - node + node.init(sec_info, old_pfx, outbox)?; + Ok(node) } #[allow(clippy::too_many_arguments)] fn new( ack_mgr: AckManager, cache: Box, + chain: Chain, crust_service: Service, full_id: FullId, gen_pfx_info: GenesisPfxInfo, is_first_node: bool, - min_section_size: usize, msg_queue: VecDeque, notified_nodes: BTreeSet, + parsec: Parsec, peer_mgr: PeerManager, routing_msg_filter: RoutingMessageFilter, timer: Timer, ) -> Self { let dev_config = config_handler::get_config().dev.unwrap_or_default(); - let public_id = *full_id.public_id(); - let tick_timer_token = timer.schedule(TICK_TIMEOUT); - let poke_timer_token = timer.schedule(POKE_TIMEOUT); let gossip_timer_token = timer.schedule(GOSSIP_TIMEOUT); + let candidate_status_token = timer.schedule(CANDIDATE_STATUS_INTERVAL); let reconnect_peers_token = timer.schedule(RECONNECT_PEER_TIMEOUT); Self { @@ -244,19 +246,17 @@ impl Node { user_msg_cache: UserMessageCache::with_expiry_duration(USER_MSG_CACHE_EXPIRY_DURATION), next_relocation_dst: None, next_relocation_interval: None, - routing_msg_backlog: vec![], candidate_timer_token: None, - candidate_status_token: None, + candidate_status_token, clients_rate_limiter: RateLimiter::new(dev_config.disable_client_rate_limiter), banned_client_ips: LruCache::with_expiry_duration(CLIENT_BAN_DURATION), dropped_clients: LruCache::with_expiry_duration(DROPPED_CLIENT_TIMEOUT), proxy_load_amount: 0, disable_resource_proof: dev_config.disable_resource_proof, - parsec_map: ParsecMap::new(full_id, &gen_pfx_info), + parsec_map: ParsecMap::new(*gen_pfx_info.first_info.version(), parsec), gen_pfx_info: gen_pfx_info.clone(), - poke_timer_token, gossip_timer_token, - chain: Chain::new(min_section_size, public_id, gen_pfx_info), + chain, reconnect_peers: Default::default(), reconnect_peers_token, notified_nodes, @@ -271,7 +271,7 @@ impl Node { self, self.chain.valid_peers().len() ); - let network_estimate = match self.chain().network_size_estimate() { + let network_estimate = match self.chain.network_size_estimate() { (n, true) => format!("Exact network size: {}", n), (n, false) => format!("Estimated network size: {}", n), }; @@ -284,19 +284,57 @@ impl Node { } } + // Initialise regular node + fn init( + &mut self, + sec_info: SectionInfo, + old_pfx: Prefix, + outbox: &mut EventBox, + ) -> Result<(), RoutingError> { + debug!("{} - State changed to Node.", self); + trace!( + "{} - Node Established. Prefixes: {:?}", + self, + self.chain.prefixes() + ); + + // We have just become established. Now we can supply our votes for all latest neighbour + // infos that have accumulated so far. + let neighbour_info_events = self + .chain + .neighbour_infos() + .map(|info| info.clone().into_network_event()) + .collect_vec(); + + neighbour_info_events.into_iter().for_each(|event| { + self.vote_for_event(event); + }); + + outbox.send_event(Event::Connected); + + // Handle the SectionInfo event which triggered us becoming established node. + let _ = self.handle_section_info_event(sec_info, old_pfx, outbox)?; + + // Allow other peers to bootstrap via us. + if let Err(err) = self.crust_service.set_accept_bootstrap(true) { + warn!("{} Unable to accept bootstrap connections. {:?}", self, err); + } + self.crust_service.set_service_discovery_listen(true); + + Ok(()) + } + // Initialises the first node of the network fn init_first_node(&mut self, outbox: &mut EventBox) -> Result<(), RoutingError> { outbox.send_event(Event::Connected); - self.peer_mgr.set_established(); self.crust_service.set_accept_bootstrap(true)?; self.crust_service.set_service_discovery_listen(true); Ok(()) } - /// Routing table of this node. - #[allow(unused)] + #[cfg(feature = "mock")] pub fn chain(&self) -> &Chain { &self.chain } @@ -381,11 +419,6 @@ impl Node { // Peers no longer required currently connected as PeerState::Routing are disconnected // Establish connection to peers missing from peer manager fn update_peer_states(&mut self, outbox: &mut EventBox) { - // If we are not yet established, do not try to update any RT peer states - if !self.chain.is_member() { - return; - } - let mut peers_to_add = Vec::new(); let mut peers_to_remove = Vec::new(); @@ -578,7 +611,7 @@ impl Node { warn!("{} Unexpected section infos in {:?}", self, signed_msg); return Err(RoutingError::InvalidProvingSection); } - } else if self.chain.is_member() { + } else { // Remove any untrusted trailing section infos. // TODO: remove wasted clone. Only useful when msg isnt trusted for log msg. let msg_clone = signed_msg.clone(); @@ -654,34 +687,6 @@ impl Node { use crate::messages::MessageContent::*; use crate::Authority::{Client, ManagedNode, PrefixSection, Section}; - if !self.chain.is_member() { - match routing_msg.content { - ExpectCandidate { .. } - | AcceptAsCandidate { .. } - | NeighbourInfo(..) - | NeighbourConfirm(..) - | Merge(..) - | UserMessagePart { .. } => { - // These messages should not be handled before node becomes established - self.add_routing_message_to_backlog(routing_msg); - return Ok(()); - } - ConnectionInfoRequest { .. } => { - if !self.chain.our_prefix().matches(&routing_msg.src.name()) { - self.add_routing_message_to_backlog(routing_msg); - return Ok(()); - } - } - Relocate { .. } - | ConnectionInfoResponse { .. } - | RelocateResponse { .. } - | Ack(..) - | NodeApproval { .. } => { - // Handle like normal - } - } - } - match routing_msg.content { Ack(..) | UserMessagePart { .. } => (), _ => trace!("{} Got routing message {:?}.", self, routing_msg), @@ -815,16 +820,6 @@ impl Node { } } - // Backlog the message to be processed once we are approved. - fn add_routing_message_to_backlog(&mut self, msg: RoutingMessage) { - trace!( - "{} Not approved yet. Delaying message handling: {:?}", - self, - msg - ); - self.routing_msg_backlog.push(msg); - } - fn handle_candidate_approval( &mut self, new_pub_id: PublicId, @@ -888,35 +883,6 @@ impl Node { .init(self.full_id.clone(), &self.gen_pfx_info, &log_ident) } - // Completes a Node startup process and allows a node to behave as a `ManagedNode`. - // A given node is considered "established" when it exists in `chain.our_info().members()` - // first node: this method is skipped entirely as it behaves as a Node from startup. - fn node_established(&mut self, outbox: &mut EventBox) { - self.peer_mgr.set_established(); - outbox.send_event(Event::Connected); - - trace!( - "{} Node Established. Prefixes: {:?}", - self, - self.chain.prefixes() - ); - - self.update_peer_states(outbox); - - // Allow other peers to bootstrap via us. - if let Err(err) = self.crust_service.set_accept_bootstrap(true) { - warn!("{} Unable to accept bootstrap connections. {:?}", self, err); - } - self.crust_service.set_service_discovery_listen(true); - - let backlog = mem::replace(&mut self.routing_msg_backlog, vec![]); - backlog - .into_iter() - .rev() - .foreach(|msg| self.msg_queue.push_front(msg)); - self.candidate_status_token = Some(self.timer.schedule(CANDIDATE_STATUS_INTERVAL)); - } - fn handle_resource_proof_response( &mut self, pub_id: PublicId, @@ -1141,19 +1107,6 @@ impl Node { return Err(RoutingError::FailedSignature); } - if !self.chain.is_member() { - debug!( - "{} Client {:?} rejected: We are not an established node yet.", - self, pub_id - ); - self.send_direct_message( - pub_id, - DirectMessage::BootstrapResponse(Err(BootstrapResponseError::NotApproved)), - ); - self.disconnect_peer(&pub_id); - return Ok(()); - } - if (peer_kind == CrustUser::Client || !self.is_first_node) && self.chain.len() < self.min_section_size() - 1 { @@ -1426,16 +1379,11 @@ impl Node { self.peer_mgr .accept_as_candidate(old_pub_id, target_interval); - let own_section = if self.chain().is_member() { - let our_info = self.chain().our_info(); + let own_section = { + let our_info = self.chain.our_info(); (*our_info.prefix(), our_info.members().clone()) - } else { - //FIXME: do this properly - ( - Default::default(), - iter::once(*self.full_id().public_id()).collect::>(), - ) }; + let response_content = MessageContent::RelocateResponse { target_interval: target_interval, section: own_section, @@ -1552,30 +1500,6 @@ impl Node { } } - // Sends a `ParsecPoke` message to trigger a gossip request from current section members to us. - // - // TODO: Should restrict targets to few(counter churn-threshold)/single. - // Currently this can result in incoming spam of gossip history from everyone. - // Can also just be a single target once node-ageing makes Offline votes Opaque which should - // remove invalid test failures for unaccumulated parsec::Remove blocks. - fn send_parsec_poke(&mut self) { - let version = *self.gen_pfx_info.first_info.version(); - let recipients = self - .gen_pfx_info - .latest_info - .members() - .iter() - .cloned() - .collect_vec(); - - for recipient in recipients { - self.send_message( - &recipient, - Message::Direct(DirectMessage::ParsecPoke(version)), - ); - } - } - fn send_candidate_approval(&mut self) { let (new_id, client_auth) = match self.peer_mgr.verified_candidate_info() { Err(_) => { @@ -1651,18 +1575,6 @@ impl Node { } } - // Until we're established do not relay any messages. - let src = signed_msg.routing_message().src; - if !self.chain.is_member() { - if let Authority::Client { ref client_id, .. } = src { - if self.name() != client_id.name() { - return Ok(()); - } - } else { - return Ok(()); - } - } - let (new_sent_to, target_pub_ids) = self.get_targets(signed_msg.routing_message(), route, hop, sent_to)?; @@ -1731,9 +1643,7 @@ impl Node { /// may be us or another node. If our signature is not required, this returns `None`. fn get_signature_target(&self, src: &Authority, route: u8) -> Option { use crate::Authority::*; - if !self.chain().is_member() { - return Some(*self.name()); - } + let list: Vec = match *src { ClientManager(_) | NaeManager(_) | NodeManager(_) => { let mut v = self @@ -1784,7 +1694,7 @@ impl Node { _ => false, }; - if self.chain.is_member() && !force_via_proxy { + if !force_via_proxy { // TODO: even if having chain reply based on connected_state, // we remove self in targets info and can do same by not // chaining us to conn_peer list here? @@ -1863,7 +1773,7 @@ impl Node { ) -> bool { let _ = self.peer_mgr.remove_peer(pub_id); - if self.chain.is_member() && self.notified_nodes.remove(pub_id) { + if self.notified_nodes.remove(pub_id) { info!("{} Dropped {} from the routing table.", self, pub_id.name()); outbox.send_event(Event::NodeLost(*pub_id.name())); @@ -1878,7 +1788,6 @@ impl Node { .filter(|p| p.is_routing()) .count() == 0 - && self.chain().is_member() { debug!("{} Lost all routing connections.", self); if !self.is_first_node { @@ -1887,7 +1796,7 @@ impl Node { } } - if try_reconnect && self.chain.is_member() && self.chain.is_peer_valid(pub_id) { + if try_reconnect && self.chain.is_peer_valid(pub_id) { debug!("{} Caching {:?} to reconnect.", self, pub_id); self.reconnect_peers.push(*pub_id); } @@ -1965,7 +1874,7 @@ impl Base for Node { client_id == self.full_id.public_id() } else { let conn_peers = self.connected_peers(); - self.chain.is_member() && self.chain().in_authority(auth, &conn_peers) + self.chain.in_authority(auth, &conn_peers) } } @@ -2001,22 +1910,17 @@ impl Base for Node { } else if self.candidate_timer_token == Some(token) { self.candidate_timer_token = None; self.send_candidate_approval(); - } else if self.candidate_status_token == Some(token) { - self.candidate_status_token = Some(self.timer.schedule(CANDIDATE_STATUS_INTERVAL)); + } else if self.candidate_status_token == token { + self.candidate_status_token = self.timer.schedule(CANDIDATE_STATUS_INTERVAL); self.peer_mgr.show_candidate_status(); } else if self.reconnect_peers_token == token { self.reconnect_peers_token = self.timer.schedule(RECONNECT_PEER_TIMEOUT); self.reconnect_peers(outbox); - } else if self.poke_timer_token == token { - if !self.peer_mgr.is_established() { - self.send_parsec_poke(); - self.poke_timer_token = self.timer.schedule(POKE_TIMEOUT); - } } else if self.gossip_timer_token == token { self.gossip_timer_token = self.timer.schedule(GOSSIP_TIMEOUT); - // If we're the only node then invoke parsec_poll directly - if self.chain.is_member() && self.chain.our_info().members().len() == 1 { + // If we're the only node then invoke parsec_poll_all directly + if self.chain.our_info().members().len() == 1 { let _ = self.parsec_poll_all(outbox); } @@ -2085,7 +1989,7 @@ impl Base for Node { fn handle_connect_failure(&mut self, pub_id: PublicId, outbox: &mut EventBox) -> Transition { self.log_connect_failure(&pub_id); let _ = self.dropped_peer(&pub_id, outbox, true); - if self.chain.is_member() && self.chain.our_info().members().contains(&pub_id) { + if self.chain.our_info().members().contains(&pub_id) { self.vote_for_event(NetworkEvent::Offline(pub_id)); } @@ -2379,12 +2283,10 @@ impl Bootstrapped for Node { } use crate::routing_table::Authority::*; let sending_sec = match routing_msg.src { - ClientManager(_) | NaeManager(_) | NodeManager(_) | ManagedNode(_) | Section(_) - if self.chain.is_member() => - { + ClientManager(_) | NaeManager(_) | NodeManager(_) | ManagedNode(_) | Section(_) => { Some(self.chain.our_info().clone()) } - PrefixSection(ref pfx) if self.chain.is_member() => { + PrefixSection(ref pfx) => { let src_section = match self.chain.our_info_for_prefix(pfx) { Some(a) => a.clone(), None => { @@ -2395,10 +2297,6 @@ impl Bootstrapped for Node { Some(src_section) } Client { .. } => None, - _ => { - // Cannot send routing msgs as a Node until established. - return Ok(()); - } }; if route > 0 { @@ -2461,7 +2359,6 @@ impl Relocated for Node { &mut self.peer_mgr } - /// Adds a newly connected peer to the routing table. Must only be called for connected peers. fn process_connection(&mut self, pub_id: PublicId, outbox: &mut EventBox) { if self.chain.is_peer_valid(&pub_id) { self.add_to_routing_table(&pub_id, outbox); @@ -2469,7 +2366,7 @@ impl Relocated for Node { } fn is_peer_valid(&self, pub_id: &PublicId) -> bool { - !self.chain.is_member() || self.chain.is_peer_valid(pub_id) + self.chain.is_peer_valid(pub_id) } fn add_to_routing_table_success(&mut self, _: &PublicId) { @@ -2506,15 +2403,12 @@ impl Approved for Node { new_client_auth: Authority, outbox: &mut EventBox, ) -> Result<(), RoutingError> { - let should_act = self.chain.is_member(); let to_vote_infos = self.chain.add_member(new_pub_id)?; - if should_act { - let _ = self.handle_candidate_approval(new_pub_id, new_client_auth, outbox); - to_vote_infos - .into_iter() - .map(NetworkEvent::SectionInfo) - .for_each(|sec_info| self.vote_for_event(sec_info)); - } + let _ = self.handle_candidate_approval(new_pub_id, new_client_auth, outbox); + to_vote_infos + .into_iter() + .map(NetworkEvent::SectionInfo) + .for_each(|sec_info| self.vote_for_event(sec_info)); Ok(()) } @@ -2524,15 +2418,13 @@ impl Approved for Node { pub_id: PublicId, outbox: &mut EventBox, ) -> Result<(), RoutingError> { - let should_act = self.chain.is_member(); let self_info = self.chain.remove_member(pub_id)?; - if should_act { - self.vote_for_event(NetworkEvent::SectionInfo(self_info)); - if let Some(&pub_id) = self.peer_mgr.get_pub_id(pub_id.name()) { - let _ = self.dropped_peer(&pub_id, outbox, false); - self.disconnect_peer(&pub_id); - } + self.vote_for_event(NetworkEvent::SectionInfo(self_info)); + if let Some(&pub_id) = self.peer_mgr.get_pub_id(pub_id.name()) { + let _ = self.dropped_peer(&pub_id, outbox, false); + self.disconnect_peer(&pub_id); } + Ok(()) } @@ -2550,22 +2442,6 @@ impl Approved for Node { old_pfx: Prefix, outbox: &mut EventBox, ) -> Result { - if !self.peer_mgr.is_established() && self.chain.is_member() { - // We have just found a `SectionInfo` block that contains us. Now we can supply our - // votes for all latest neighbour infos that have accumulated so far. - let ni_events = self - .chain - .neighbour_infos() - .map(|ni| ni.clone().into_network_event()) - .collect_vec(); - - ni_events.into_iter().for_each(|ni_event| { - self.vote_for_event(ni_event); - }); - - self.node_established(outbox); - } - if sec_info.prefix().is_extension_of(&old_pfx) { self.finalise_prefix_change()?; outbox.send_event(Event::SectionSplit(*sec_info.prefix())); @@ -2575,19 +2451,16 @@ impl Approved for Node { outbox.send_event(Event::SectionMerge(*sec_info.prefix())); } - let our_name = *self.full_id.public_id().name(); - let self_sec_update = sec_info.prefix().matches(&our_name); + let self_sec_update = sec_info.prefix().matches(self.name()); - if self.chain.is_member() { - self.update_peer_states(outbox); + self.update_peer_states(outbox); - if self_sec_update { - self.send_neighbour_infos(); - } else { - // Vote for neighbour update if we haven't done so already. - // vote_for_event is expected to only generate a new vote if required. - self.vote_for_event(sec_info.into_network_event()); - } + if self_sec_update { + self.send_neighbour_infos(); + } else { + // Vote for neighbour update if we haven't done so already. + // vote_for_event is expected to only generate a new vote if required. + self.vote_for_event(sec_info.into_network_event()); } let _ = self.merge_if_necessary(); diff --git a/src/states/proving_node.rs b/src/states/proving_node.rs index c31e56085f..66fa1af9d4 100644 --- a/src/states/proving_node.rs +++ b/src/states/proving_node.rs @@ -8,7 +8,7 @@ use super::{ common::{proxied, Base, Bootstrapped, NotEstablished, Relocated, RelocatedNotEstablished}, - node::Node, + establishing_node::EstablishingNode, }; use crate::{ ack_manager::AckManager, @@ -136,22 +136,20 @@ impl ProvingNode { } } - pub fn into_node(self, gen_pfx_info: GenesisPfxInfo) -> State { - let node = Node::from_proving_node( + pub fn into_establishing_node(self, gen_pfx_info: GenesisPfxInfo) -> State { + State::EstablishingNode(EstablishingNode::from_proving_node( self.ack_mgr, self.cache, self.crust_service, self.full_id, gen_pfx_info, self.min_section_size, - self.msg_backlog.into_iter().collect(), + self.msg_backlog, self.notified_nodes, self.peer_mgr, self.routing_msg_filter, self.timer, - ); - - State::Node(node) + )) } fn dispatch_routing_message( @@ -180,7 +178,7 @@ impl ProvingNode { self ); - Transition::IntoNode { gen_pfx_info } + Transition::IntoEstablishingNode { gen_pfx_info } } #[cfg(feature = "mock")]