Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: implement lost peer detection
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Sep 29, 2020
1 parent 0fbced8 commit cbc57ba
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 158 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ rand_xorshift = "~0.2.0"
bls_signature_aggregator = { git = "https://github.com/madadam/bls_signature_aggregator.git", branch = "repetition" }
serde = { version = "1.0.111", features = ["derive" ,"rc"] }
tiny-keccak = { version = "2.0.2", features = ["sha3"] }
tokio = { version = "~0.2.5", features = ["sync", "time", "rt-util"] }
tokio = { version = "~0.2.22", features = ["sync", "time", "rt-util"] }
xor_name = "1.1.0"

[dev-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ pub enum Error {
InvalidSignatureShare,
#[error(display = "An Elder DKG result is invalid.")]
InvalidElderDkgResult,
#[error(display = "Failed to send a message.")]
FailedSend,
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
clippy::needless_borrow
)]
// FIXME: find a way to not need this.
#![type_length_limit = "2174929"]
#![type_length_limit = "2259754"]

#[macro_use]
extern crate log;
Expand Down
156 changes: 76 additions & 80 deletions src/node/stage/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use async_recursion::async_recursion;
use bls_dkg::key_gen::message::Message as DkgMessage;
use bytes::Bytes;
use itertools::Itertools;
use std::{net::SocketAddr, time::Duration};
use std::{net::SocketAddr, slice, time::Duration};
use xor_name::{Prefix, XorName};

// Interval to progress DKG timed phase
Expand Down Expand Up @@ -170,8 +170,7 @@ impl Approved {
.map(P2pNode::peer_addr)
.copied()
.collect();
self.comm
.send_message_to_targets(&recipients, recipients.len(), message.to_bytes())
self.send_message_to_targets(&recipients, recipients.len(), message.to_bytes())
.await?;

// We need to relay it to ourself as well
Expand All @@ -198,15 +197,13 @@ impl Approved {
&& public_key != *self.shared_state.our_history.last_key()
{
// The key is recognized as non-last, indicating the peer is lagging.
self.comm
.send_direct_message(
&self.node_info.full_id,
peer,
// TODO: consider sending only those parts of the shared state that are new
// since `public_key` was the latest key.
Variant::Sync(self.shared_state.clone()),
)
.await?;
self.send_direct_message(
peer,
// TODO: consider sending only those parts of the shared state that are new
// since `public_key` was the latest key.
Variant::Sync(self.shared_state.clone()),
)
.await?;
}

Ok(())
Expand Down Expand Up @@ -239,9 +236,6 @@ impl Approved {
}
*/

// TODO: review if we still need to call this function which used to be
// called when a message to a peer wasn't not sent even after retrying.
/*
async fn handle_peer_lost(&mut self, peer_addr: SocketAddr) -> Result<()> {
let name = if let Some(node) = self.shared_state.find_p2p_node_from_addr(&peer_addr) {
debug!("Lost known peer {}", node);
Expand All @@ -262,7 +256,6 @@ impl Approved {

Ok(())
}
*/

async fn check_dkg(&mut self, dkg_key: DkgKey) -> Result<()> {
match self.dkg_voter.check_dkg() {
Expand Down Expand Up @@ -580,7 +573,7 @@ impl Approved {
let bounce_msg = bounce_msg.to_bytes();

if let Some(sender) = sender {
self.comm.send_message_to_target(&sender, bounce_msg).await
self.send_message_to_target(&sender, bounce_msg).await
} else {
self.send_message_to_our_elders(bounce_msg).await
}
Expand Down Expand Up @@ -615,7 +608,7 @@ impl Approved {
.any(|p2p_node| p2p_node.peer_addr() == sender)
});
if let Some(sender) = our_elder_sender {
self.comm.send_message_to_target(&sender, bounce_msg).await
self.send_message_to_target(&sender, bounce_msg).await
} else {
self.send_message_to_our_elders(bounce_msg).await
}
Expand Down Expand Up @@ -644,8 +637,7 @@ impl Approved {
};

trace!(" ...resending with extended proof");
self.comm
.send_message_to_target(sender.peer_addr(), resend_msg.to_bytes())
self.send_message_to_target(sender.peer_addr(), resend_msg.to_bytes())
.await
} else {
trace!(" ...missing dst key, discarding");
Expand Down Expand Up @@ -680,15 +672,9 @@ impl Approved {
// First send Sync to update the peer, then resend the message itself. If the messages
// arrive in the same order they were sent, the Sync should update the peer so it will then
// be able to handle the resent message. If not, the peer will bounce the message again.
self.comm
.send_direct_message(
&self.node_info.full_id,
sender.peer_addr(),
Variant::Sync(self.shared_state.clone()),
)
self.send_direct_message(sender.peer_addr(), Variant::Sync(self.shared_state.clone()))
.await?;
self.comm
.send_message_to_target(sender.peer_addr(), bounced_msg_bytes)
self.send_message_to_target(sender.peer_addr(), bounced_msg_bytes)
.await
}

Expand Down Expand Up @@ -872,12 +858,7 @@ impl Approved {
};

debug!("Sending BootstrapResponse {:?} to {}", response, p2p_node);
self.comm
.send_direct_message(
&self.node_info.full_id,
p2p_node.peer_addr(),
Variant::BootstrapResponse(response),
)
self.send_direct_message(p2p_node.peer_addr(), Variant::BootstrapResponse(response))
.await
}

Expand All @@ -895,12 +876,7 @@ impl Approved {
};
trace!("Resending BootstrapResponse {:?} to {}", response, p2p_node,);
return self
.comm
.send_direct_message(
&self.node_info.full_id,
p2p_node.peer_addr(),
Variant::BootstrapResponse(response),
)
.send_direct_message(p2p_node.peer_addr(), Variant::BootstrapResponse(response))
.await;
}

Expand Down Expand Up @@ -1508,13 +1484,13 @@ impl Approved {

// Ping all members to detect recent lost nodes for which the section might need
// our Offline vote.
for p2p_node in self.shared_state.active_members() {
self.comm
.send_direct_message(
&self.node_info.full_id,
p2p_node.peer_addr(),
Variant::Ping,
)
for p2p_node in self
.shared_state
.active_members()
.cloned()
.collect::<Vec<_>>()
{
self.send_direct_message(p2p_node.peer_addr(), Variant::Ping)
.await?;
}
}
Expand Down Expand Up @@ -1585,8 +1561,7 @@ impl Approved {
Some(proof_chain),
None,
)?;
self.comm
.send_message_to_target(p2p_node.peer_addr(), message.to_bytes())
self.send_message_to_target(p2p_node.peer_addr(), message.to_bytes())
.await?;
Ok(())
}
Expand All @@ -1612,8 +1587,7 @@ impl Approved {
None,
None,
)?;
self.comm
.send_message_to_target(p2p_node.peer_addr(), message.to_bytes())
self.send_message_to_target(p2p_node.peer_addr(), message.to_bytes())
.await?;
}

Expand All @@ -1638,12 +1612,7 @@ impl Approved {

// Message accumulated at destination.
let message = self.to_accumulating_message(dst, variant, Some(knowledge_index))?;
self.comm
.send_direct_message(
&self.node_info.full_id,
&recipient,
Variant::MessageSignature(Box::new(message)),
)
self.send_direct_message(&recipient, Variant::MessageSignature(Box::new(message)))
.await
}

Expand All @@ -1660,12 +1629,7 @@ impl Approved {

// Message accumulated at destination
let message = self.to_accumulating_message(dst, variant, None)?;
self.comm
.send_direct_message(
&self.node_info.full_id,
&recipient,
Variant::MessageSignature(Box::new(message)),
)
self.send_direct_message(&recipient, Variant::MessageSignature(Box::new(message)))
.await?;

Ok(())
Expand Down Expand Up @@ -1705,8 +1669,7 @@ impl Approved {
None,
)?;

self.comm
.send_message_to_targets(&recipients, recipients.len(), message.to_bytes())
self.send_message_to_targets(&recipients, recipients.len(), message.to_bytes())
.await?;

self.dkg_voter.start_observing(
Expand Down Expand Up @@ -1742,8 +1705,7 @@ impl Approved {
None,
None,
)?;
self.comm
.send_message_to_targets(&recipients, recipients.len(), message.to_bytes())
self.send_message_to_targets(&recipients, recipients.len(), message.to_bytes())
.await?;

Ok(())
Expand Down Expand Up @@ -1777,8 +1739,7 @@ impl Approved {
.copied()
.collect();

self.comm
.send_message_to_targets(&recipients, recipients.len(), message.to_bytes())
self.send_message_to_targets(&recipients, recipients.len(), message.to_bytes())
.await?;

// TODO: remove the recursion caused by this call.
Expand Down Expand Up @@ -1815,8 +1776,7 @@ impl Approved {
trace!("relay {:?} to {:?}", msg, targets);

let targets: Vec<_> = targets.into_iter().map(|node| *node.peer_addr()).collect();
self.comm
.send_message_to_targets(&targets, dg_size, msg.to_bytes())
self.send_message_to_targets(&targets, dg_size, msg.to_bytes())
.await?;

Ok(())
Expand Down Expand Up @@ -1873,13 +1833,11 @@ impl Approved {
self.handle_accumulated_message(msg).await?;
}
} else {
self.comm
.send_direct_message(
&self.node_info.full_id,
target.peer_addr(),
Variant::MessageSignature(Box::new(accumulating_msg.clone())),
)
.await?;
self.send_direct_message(
target.peer_addr(),
Variant::MessageSignature(Box::new(accumulating_msg.clone())),
)
.await?;
}
}

Expand Down Expand Up @@ -1926,6 +1884,22 @@ impl Approved {
Ok(AccumulatingMessage::new(content, proof_chain, proof_share))
}

pub async fn send_direct_message(
&mut self,
recipient: &SocketAddr,
variant: Variant,
) -> Result<()> {
let message = Message::single_src(
&self.node_info.full_id,
DstLocation::Direct,
variant,
None,
None,
)?;
self.send_message_to_target(recipient, message.to_bytes())
.await
}

// TODO: consider changing this so it sends only to a subset of the elders
// (say 1/3 of the ones closest to our name or so)
async fn send_message_to_our_elders(&mut self, msg_bytes: Bytes) -> Result<()> {
Expand All @@ -1936,8 +1910,30 @@ impl Approved {
.map(P2pNode::peer_addr)
.copied()
.collect();
self.comm
.send_message_to_targets(&targets, targets.len(), msg_bytes)
self.send_message_to_targets(&targets, targets.len(), msg_bytes)
.await
}

async fn send_message_to_targets(
&mut self,
recipients: &[SocketAddr],
delivery_group_size: usize,
msg: Bytes,
) -> Result<()> {
let status = self
.comm
.send_message_to_targets(recipients, delivery_group_size, msg)
.await;

for addr in status.failed_recipients {
self.handle_peer_lost(addr).await?;
}

Ok(())
}

async fn send_message_to_target(&mut self, recipient: &SocketAddr, msg: Bytes) -> Result<()> {
self.send_message_to_targets(slice::from_ref(recipient), 1, msg)
.await
}

Expand Down

0 comments on commit cbc57ba

Please sign in to comment.