Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
chore: rebase fixes atop T4.2
Browse files Browse the repository at this point in the history
  • Loading branch information
Yoga07 committed Apr 29, 2021
1 parent 64b80ec commit 41b0908
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 43 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ rand = "~0.7.3"
rand_chacha = "~0.2.2"
resource_proof = "0.8.0"
sn_messaging = { git = "https://github.com/maidsafe/sn_messaging", branch = "anti-entropy" }
sn_data_types = "~0.18.0"
sn_data_types = "~0.18.3"
thiserror = "1.0.23"
tokio = "1.3.0"
xor_name = "1.1.0"
Expand Down
2 changes: 1 addition & 1 deletion src/routing/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl<'a> State<'a> {
self.send_get_section_request(mem::take(&mut bootstrap_addrs), relocate_details)
.await?;

let (response, sender, _hdr_info) =
let (response, sender, _dest_info) =
self.receive_get_section_response(relocate_details).await?;

match response {
Expand Down
67 changes: 39 additions & 28 deletions src/routing/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,35 +461,43 @@ impl Core {
}
}

pub fn handle_connection_lost(&self, addr: SocketAddr) -> Option<Command> {
if let Some(peer) = self.section.find_joined_member_by_addr(&addr) {
debug!("Lost connection to {}", peer);
pub fn handle_connection_lost(&self, addr: SocketAddr) -> Result<Vec<Command>> {
let name = if let Some(peer) = self.section.find_joined_member_by_addr(&addr) {
debug!("Lost known peer {}", peer);
*peer.name()
} else {
if let Some(end_user) = self.get_enduser_by_addr(&addr) {
debug!("Lost connection to client {:?}", end_user);
} else {
debug!("Lost connection to unknown peer {}", addr);
}
return None;
}
trace!("Lost unknown peer {}", addr);
return Ok(vec![]);
};

if !self.is_elder() {
if let Some(peer) = self.section.find_joined_member_by_addr(&addr) {
trace!("Lost connection to {}", peer);
// Try to send a "ping" message to probe the peer connection. If it succeeds, the
// connection loss was just temporary. Otherwise the peer is assumed lost and we will vote
// it offline.
Some(Command::SendMessage {
recipients: vec![(addr, *peer.name())],
delivery_group_size: 1,
message: MessageType::Ping(DestInfo {
dest: *peer.name(),
dest_section_pk: *self.section.chain().last_key(),
}),
})
} else {
None
// When self is not an elder, then the peer has to be an elder, and we shall complaint
// the lost to other elders.
let variant = Variant::ConnectivityComplaint(name);
let recipients: Vec<_> = self
.section
.elders_info()
.elders
.values()
.filter(|peer| *peer.name() != name)
.copied()
.collect();
trace!(
"Casting connectivity complaint against {:?} {:?}",
name,
recipients
);

return self.send_message_for_dst_accumulation(
self.node.name(),
DstLocation::Direct,
variant,
None,
&recipients,
);
}

self.propose_offline(name)
}

pub fn handle_peer_lost(&self, addr: &SocketAddr) -> Result<Vec<Command>> {
Expand Down Expand Up @@ -2162,12 +2170,15 @@ impl Core {

let message =
Message::single_src(&self.node, DstLocation::Direct, variant, None, None)?;
let recipients: Vec<_> = recipients.iter().map(Peer::addr).copied().collect();

Ok(Command::send_message_to_nodes(
&recipients,
recipients.clone(),
recipients.len(),
message.to_bytes(),
DestInfo {
dest: XorName::random(),
dest_section_pk: *self.section_chain().last_key(),
},
))
};

Expand All @@ -2177,7 +2188,7 @@ impl Core {
.section
.active_members()
.filter(|peer| !self.section.is_elder(peer.name()))
.copied()
.map(|peer| (*peer.addr(), *peer.name()))
.collect();

let variant = Variant::Sync {
Expand Down
12 changes: 4 additions & 8 deletions src/routing/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use super::{bootstrap, Comm, Command, Core};
use crate::{error::Result, event::Event, relocation::SignedRelocateDetails, XorName};
use sn_messaging::{section_info::Error as TargetSectionError, MessageType};
use sn_messaging::MessageType;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::{
sync::{mpsc, watch, Mutex},
Expand Down Expand Up @@ -109,13 +109,9 @@ impl Dispatcher {
Command::HandleAgreement { proposal, proof } => {
self.core.lock().await.handle_agreement(proposal, proof)
}
Command::HandleConnectionLost(addr) => Ok(self
.core
.lock()
.await
.handle_connection_lost(addr)
.into_iter()
.collect()),
Command::HandleConnectionLost(addr) => {
self.core.lock().await.handle_connection_lost(addr)
}
Command::HandlePeerLost(addr) => self.core.lock().await.handle_peer_lost(&addr),
Command::HandleDkgOutcome {
elders_info,
Expand Down
17 changes: 12 additions & 5 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ impl Routing {
public_key, socket_addr
);
return self
.send_message_to_client(socket_addr, ClientMessage::from(content)?)
.send_message_to_client(socket_addr, ClientMsg::from(content)?)
.await;
} else {
debug!(
Expand Down Expand Up @@ -516,8 +516,8 @@ async fn handle_message(dispatcher: Arc<Dispatcher>, bytes: Bytes, sender: Socke
};
let _ = task::spawn(dispatcher.handle_commands(command));
}
MessageType::Node {
msg: NodeMessage(msg_bytes),
MessageType::Routing {
msg: RoutingMsg(msg_bytes),
..
} => match Message::from_bytes(Bytes::from(msg_bytes)) {
Ok(message) => {
Expand All @@ -531,6 +531,13 @@ async fn handle_message(dispatcher: Arc<Dispatcher>, bytes: Bytes, sender: Socke
error!("Failed to deserialize node message: {}", error);
}
},
MessageType::Node {
msg: _,
dest_info: _,
src_section_pk: _,
} => {
unimplemented!()
}
MessageType::Client { msg, dest_info } => {
let end_user = dispatcher
.core
Expand All @@ -548,7 +555,7 @@ async fn handle_message(dispatcher: Arc<Dispatcher>, bytes: Bytes, sender: Socke
let dest = dest_info.dest;
let client_pk = dest_info.dest_section_pk;
let command = Command::SendMessage {
recipients: vec![sender],
recipients: vec![(sender, dest)],
delivery_group_size: 1,
message: MessageType::SectionInfo {
msg: SectionInfoMsg::RegisterEndUserError(
Expand All @@ -568,7 +575,7 @@ async fn handle_message(dispatcher: Arc<Dispatcher>, bytes: Bytes, sender: Socke
}
};

let event = Event::ClientMessageReceived {
let event = Event::ClientMsgReceived {
msg: Box::new(msg),
user: end_user,
};
Expand Down

0 comments on commit 41b0908

Please sign in to comment.