Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix: prevent losing incoming messages during bootstrapping
Browse files Browse the repository at this point in the history
The problem was during bootstrap, if we received the `NodeApproval` message followed immediately by some other message (say `Relocate`), then the `NodeApproval` would be processed and the bootstrap would complete, leaving the following message in the intermediary channel buffer never to be taken out of it and thus losing it.

The fix is to remove the intermediary channel and replace it with a simple wrapper around the `ConnectionEvent` receiver. Thus changing a the "push/pull" model into a simple "pull" model. This way, a message is never retrieved from the channel if we are not ready to process it.
  • Loading branch information
madadam committed Nov 24, 2020
1 parent 46bb2b8 commit 3c9357e
Showing 1 changed file with 43 additions and 47 deletions.
90 changes: 43 additions & 47 deletions src/routing/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use crate::{
SectionProofChain,
};
use bytes::Bytes;
use futures::future::{self, Either};
use std::{collections::VecDeque, future::Future, mem, net::SocketAddr};
use futures::future;
use std::{collections::VecDeque, mem, net::SocketAddr};
use tokio::sync::mpsc;
use xor_name::Prefix;

Expand All @@ -39,18 +39,16 @@ pub(crate) async fn infant(
bootstrap_addr: SocketAddr,
) -> Result<(Node, Section, Vec<(Message, SocketAddr)>)> {
let (send_tx, send_rx) = mpsc::channel(1);
let (recv_tx, recv_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Raw(incoming_conns);

let state = State::new(node, send_tx, recv_rx)?;

run_until_first(
future::join(
state.run(vec![bootstrap_addr], None),
future::join(
send_messages(send_rx, comm),
receive_messages(incoming_conns, recv_tx),
),
send_messages(send_rx, comm),
)
.await
.0
}

/// Re-bootstrap as a relocated node.
Expand All @@ -66,30 +64,33 @@ pub(crate) async fn relocate(
relocate_details: SignedRelocateDetails,
) -> Result<(Node, Section, Vec<(Message, SocketAddr)>)> {
let (send_tx, send_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Deserialized(recv_rx);

let state = State::new(node, send_tx, recv_rx)?;

run_until_first(
future::join(
state.run(bootstrap_addrs, Some(relocate_details)),
send_messages(send_rx, comm),
)
.await
.0
}

struct State {
struct State<'a> {
// Sender for outgoing messages.
send_tx: mpsc::Sender<(Bytes, Vec<SocketAddr>)>,
// Receiver for incoming messages.
recv_rx: mpsc::Receiver<(Message, SocketAddr)>,
recv_rx: MessageReceiver<'a>,
node: Node,
// Backlog for unknown messages
backlog: VecDeque<(Message, SocketAddr)>,
}

impl State {
impl<'a> State<'a> {
fn new(
node: Node,
send_tx: mpsc::Sender<(Bytes, Vec<SocketAddr>)>,
recv_rx: mpsc::Receiver<(Message, SocketAddr)>,
recv_rx: MessageReceiver<'a>,
) -> Result<Self> {
Ok(Self {
send_tx,
Expand Down Expand Up @@ -183,7 +184,7 @@ impl State {
}

async fn receive_bootstrap_response(&mut self) -> Result<(BootstrapResponse, SocketAddr)> {
while let Some((message, sender)) = self.recv_rx.recv().await {
while let Some((message, sender)) = self.recv_rx.next().await {
match message.variant() {
Variant::BootstrapResponse(response) => {
if !self.verify_message(&message, None) {
Expand Down Expand Up @@ -315,7 +316,7 @@ impl State {
&mut self,
relocate_payload: Option<&RelocatePayload>,
) -> Result<(JoinResponse, SocketAddr)> {
while let Some((message, sender)) = self.recv_rx.recv().await {
while let Some((message, sender)) = self.recv_rx.next().await {
match message.variant() {
Variant::BootstrapResponse(BootstrapResponse::Join {
elders_info,
Expand Down Expand Up @@ -413,25 +414,34 @@ enum JoinResponse {
},
}

// Keep receiving messages from `incoming_messages` and send them to `message_tx`.
async fn receive_messages(
incoming_conns: &mut mpsc::Receiver<ConnectionEvent>,
mut message_tx: mpsc::Sender<(Message, SocketAddr)>,
) {
while let Some(event) = incoming_conns.recv().await {
match event {
ConnectionEvent::Received(qp2p::Message::UniStream { bytes, src, .. }) => {
match Message::from_bytes(&bytes) {
Ok(message) => {
let _ = message_tx.send((message, src)).await;
// Receiver of incoming messages that can be backed either by a raw `qp2p::ConnectionEvent` receiver
// or by receiver of deserialized `Message` and provides a unified interface on top of them.
enum MessageReceiver<'a> {
Raw(&'a mut mpsc::Receiver<ConnectionEvent>),
Deserialized(mpsc::Receiver<(Message, SocketAddr)>),
}

impl<'a> MessageReceiver<'a> {
async fn next(&mut self) -> Option<(Message, SocketAddr)> {
match self {
Self::Raw(rx) => {
while let Some(event) = rx.recv().await {
match event {
ConnectionEvent::Received(qp2p::Message::UniStream {
bytes, src, ..
}) => match Message::from_bytes(&bytes) {
Ok(message) => return Some((message, src)),
Err(error) => debug!("Failed to deserialize message: {}", error),
},
ConnectionEvent::Received(qp2p::Message::BiStream { .. }) => {
trace!("Ignore bi-stream messages during bootstrap");
}
ConnectionEvent::Disconnected(_) => {}
}
Err(error) => debug!("Failed to deserialize message: {}", error),
}
None
}
ConnectionEvent::Received(qp2p::Message::BiStream { .. }) => {
trace!("Ignore bi-stream messages during bootstrap");
}
ConnectionEvent::Disconnected(_) => {}
Self::Deserialized(rx) => rx.recv().await,
}
}
}
Expand All @@ -445,22 +455,6 @@ async fn send_messages(mut rx: mpsc::Receiver<(Bytes, Vec<SocketAddr>)>, comm: &
}
}

// Runs the two futures concurrently until the first one completes. Returns the result of the first
// future, ignores the second.
async fn run_until_first<F, G>(f: F, g: G) -> F::Output
where
F: Future,
G: Future,
{
futures::pin_mut!(f);
futures::pin_mut!(g);

match future::select(f, g).await {
Either::Left((value, _)) => value,
Either::Right((_, f)) => f.await,
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -474,6 +468,7 @@ mod tests {
async fn bootstrap_as_infant() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (mut recv_tx, recv_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Deserialized(recv_rx);

let (elders_info, mut nodes) = gen_elders_info(Default::default(), ELDER_SIZE);
let bootstrap_node = nodes.remove(0);
Expand Down Expand Up @@ -561,6 +556,7 @@ mod tests {
async fn receive_bootstrap_response_rebootstrap() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (mut recv_tx, recv_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Deserialized(recv_rx);

let bootstrap_node = Node::new(crypto::gen_keypair(), gen_addr());

Expand Down

0 comments on commit 3c9357e

Please sign in to comment.