Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat(connectivity): refactor handling of lost connections
Browse files Browse the repository at this point in the history
- when sending a message to a node fails, signal all elders of our
section to test connectivity to that node and vote it offline if
required
- also adds an external API to signal elders to do the connectivity
check

BREAKING CHANGE: sn_messaging includes a breaking change
  • Loading branch information
lionel-faber committed Jun 10, 2021
1 parent 2818905 commit 96aecd9
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ qp2p = "~0.12.0"
rand = "~0.7.3"
rand_chacha = "~0.2.2"
resource_proof = "0.8.0"
sn_messaging = "34.0.0"
sn_messaging = "35.0.0"
sn_data_types = "~0.18.3"
thiserror = "1.0.23"
tokio = "1.3.0"
Expand Down
9 changes: 9 additions & 0 deletions src/routing/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ pub(crate) enum Command {
},
/// Proposes a peer as offline
ProposeOffline(XorName),
/// Send a signal to all Elders to
/// test the connectivity to a specific node
StartConnectivityTest(XorName),
/// Test Connectivity
TestConnectivity(XorName),
}

impl Command {
Expand Down Expand Up @@ -210,6 +215,10 @@ impl Debug for Command {
.field("previous_name", previous_name)
.finish(),
Self::ProposeOffline(name) => f.debug_tuple("ProposeOffline").field(name).finish(),
Self::TestConnectivity(name) => f.debug_tuple("TestConnectivity").field(name).finish(),
Self::StartConnectivityTest(name) => {
f.debug_tuple("StartConnectivityTest").field(name).finish()
}
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/routing/core/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ impl Core {
return Ok(vec![]);
}

self.propose_offline(name)
let mut commands = self.propose_offline(name)?;
commands.push(Command::StartConnectivityTest(name));
Ok(commands)
}

pub fn propose_offline(&self, name: XorName) -> Result<Vec<Command>> {
Expand Down
2 changes: 1 addition & 1 deletion src/routing/core/messaging/handling/decisions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use xor_name::XorName;
impl Core {
pub(crate) fn decide_message_status(&self, msg: &RoutingMsg) -> Result<MessageStatus> {
match &msg.variant {
Variant::SectionKnowledge { .. } => {
Variant::SectionKnowledge { .. } | Variant::StartConnectivityTest(_) => {
if !self.is_elder() {
return Ok(MessageStatus::Useless);
}
Expand Down
1 change: 1 addition & 0 deletions src/routing/core/messaging/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ impl Core {
Variant::RelocatePromise(promise) => {
self.handle_relocate_promise(*promise, msg.clone()).await
}
Variant::StartConnectivityTest(name) => Ok(vec![Command::TestConnectivity(*name)]),
Variant::JoinRequest(join_request) => {
let sender = sender.ok_or(Error::InvalidSrcLocation)?;
self.handle_join_request(msg.src.peer(sender)?, *join_request.clone())
Expand Down
53 changes: 49 additions & 4 deletions src/routing/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@

use super::{bootstrap, Comm, Command, Core};
use crate::{
error::Result, event::Event, peer::PeerUtils, routing::comm::SendStatus, section::SectionUtils,
Error, XorName,
error::Result, event::Event, messages::RoutingMsgUtils, peer::PeerUtils,
routing::comm::SendStatus, section::SectionUtils, Error, XorName,
};
use itertools::Itertools;
use sn_data_types::PublicKey;
use sn_messaging::{
node::{JoinRejectionReason, JoinResponse, SignedRelocateDetails, SrcAuthority, Variant},
MessageType,
node::{
JoinRejectionReason, JoinResponse, RoutingMsg, SignedRelocateDetails, SrcAuthority, Variant,
},
DstLocation, MessageType,
};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::{
Expand Down Expand Up @@ -217,6 +220,48 @@ impl Dispatcher {
.await
}
Command::ProposeOffline(name) => self.core.read().await.propose_offline(name),
Command::StartConnectivityTest(name) => {
let msg = {
let core = self.core.read().await;
let node = core.node();
let section_key = *core.section().chain.last_key();
RoutingMsg::single_src(
node,
DstLocation::Section(core.node().name()),
Variant::StartConnectivityTest(name),
section_key,
)?
};
let our_name = self.core.read().await.node().name();
let peers = self
.core
.read()
.await
.section()
.active_members()
.filter(|peer| peer.name() != &name && peer.name() != &our_name)
.cloned()
.collect_vec();
Ok(self.core.read().await.send_or_handle(msg, &peers))
}
Command::TestConnectivity(name) => {
let mut commands = vec![];
if let Some(peer) = self
.core
.read()
.await
.section()
.members
.members
.get(&name)
.map(|member_info| member_info.value.peer)
{
if self.comm.is_reachable(peer.addr()).await.is_err() {
commands.push(Command::ProposeOffline(*peer.name()));
}
}
Ok(commands)
}
}
}

Expand Down
9 changes: 9 additions & 0 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ impl Routing {
self.dispatcher.clone().handle_commands(command).await
}

/// Signals the Elders of our section to test connectivity to a node.
pub async fn start_connectivity_test(&self, name: XorName) -> Result<()> {
if !self.is_elder().await {
return Err(Error::InvalidState);
}
let command = Command::StartConnectivityTest(name);
self.dispatcher.clone().handle_commands(command).await
}

/// Returns the current age of this node.
pub async fn age(&self) -> u8 {
self.dispatcher.core.read().await.node().age()
Expand Down
7 changes: 5 additions & 2 deletions tests/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
mod utils;

use self::utils::*;
use anyhow::Result;
use anyhow::{anyhow, Result};
use bytes::Bytes;
use sn_messaging::{Aggregation, DstLocation, Itinerary, SrcLocation};
use sn_routing::{Event, NodeElderChange};
Expand Down Expand Up @@ -50,7 +50,10 @@ async fn test_node_drop() -> Result<()> {

tracing::info!("Dropped {} at {}", dropped_name, dropped_addr);

for (node, _) in &mut nodes {
// A failed send_message from any node should
// trigger voting by all nodes
{
let (node, _) = nodes.get(0).ok_or_else(|| anyhow!("Missing Node"))?;
let itinerary = Itinerary {
src: SrcLocation::Node(node.name().await),
dst: DstLocation::Node(dropped_name),
Expand Down

0 comments on commit 96aecd9

Please sign in to comment.