Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: add bootstrap message backlog
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Nov 3, 2020
1 parent 009fc35 commit 75f0a5c
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 61 deletions.
26 changes: 5 additions & 21 deletions src/consensus/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl DkgVoter {
) -> Vec<DkgCommand> {
if let Some(session) = &self.participant {
if session.dkg_key == dkg_key && session.elders_info.is_some() {
trace!("{} DKG for {} already in progress", our_name, elders_info);
trace!("DKG for {} already in progress", elders_info);
return vec![];
}
}
Expand All @@ -124,12 +124,7 @@ impl DkgVoter {

match KeyGen::initialize(our_name, threshold, participants) {
Ok((key_gen, message)) => {
trace!(
"{} DKG for {:?} {} starting",
our_name,
dkg_key,
elders_info
);
trace!("DKG for {} starting", elders_info);

let mut session = Participant {
dkg_key,
Expand Down Expand Up @@ -194,7 +189,7 @@ impl DkgVoter {

let dkg_key = session.dkg_key;

trace!("{} DKG for {} progressing", our_name, elders_info);
trace!("DKG for {} progressing", elders_info);

match session
.key_gen
Expand Down Expand Up @@ -361,12 +356,7 @@ impl Participant {
return vec![];
}

trace!(
"{} process DKG message {:?} {:?}",
our_name,
dkg_key,
message
);
trace!("process DKG message {:?}", message);
let responses = self
.key_gen
.handle_message(&mut rand::thread_rng(), message)
Expand Down Expand Up @@ -394,13 +384,7 @@ impl Participant {
.copied()
.collect();

trace!(
"{} broadcasting DKG message {:?} {:?} to {:?}",
our_name,
dkg_key,
message,
recipients
);
trace!("broadcasting DKG message {:?} to {:?}", message, recipients);

let mut commands = vec![];
commands.push(DkgCommand::SendMessage {
Expand Down
11 changes: 3 additions & 8 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ impl Approved {
dkg_key: DkgKey,
new_elders_info: EldersInfo,
) -> Result<Vec<Command>> {
trace!("{} Received DKGStart for {}", self.node, new_elders_info);
trace!("Received DKGStart for {}", new_elders_info);
self.dkg_voter
.start_participating(self.node.name(), dkg_key, new_elders_info)
.into_commands(&self.node)
Expand Down Expand Up @@ -1577,14 +1577,9 @@ impl Approved {
}

fn send_dkg_start(&mut self, new_elders_info: EldersInfo) -> Result<Vec<Command>> {
let dkg_key = DkgKey::new(&new_elders_info);
trace!("Send DKGStart for {}", new_elders_info);

trace!(
"{} Send DKGStart for {:?} {}",
self.node,
dkg_key,
new_elders_info
);
let dkg_key = DkgKey::new(&new_elders_info);

// Send to all participants.
let recipients: Vec<_> = new_elders_info.elders.values().copied().collect();
Expand Down
49 changes: 26 additions & 23 deletions src/routing/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ use crate::{
};
use bytes::Bytes;
use futures::future::{self, Either};
use std::{future::Future, mem, net::SocketAddr};
use std::{collections::VecDeque, future::Future, mem, net::SocketAddr};
use tokio::sync::mpsc;
use xor_name::Prefix;

const BACKLOG_CAPACITY: usize = 100;

/// Bootstrap into the network as an infant node.
///
/// NOTE: It's not guaranteed this function ever returns. This can happen due to messages being
Expand All @@ -35,7 +37,7 @@ pub(crate) async fn infant(
comm: &Comm,
incoming_messages: &mut IncomingMessages,
bootstrap_addr: SocketAddr,
) -> Result<(Node, Section)> {
) -> Result<(Node, Section, Vec<(Message, SocketAddr)>)> {
let (send_tx, send_rx) = mpsc::channel(1);
let (recv_tx, recv_rx) = mpsc::channel(1);

Expand All @@ -62,7 +64,7 @@ pub(crate) async fn relocate(
recv_rx: mpsc::Receiver<(Message, SocketAddr)>,
bootstrap_addrs: Vec<SocketAddr>,
relocate_details: SignedRelocateDetails,
) -> Result<(Node, Section)> {
) -> Result<(Node, Section, Vec<(Message, SocketAddr)>)> {
let (send_tx, send_rx) = mpsc::channel(1);
let state = State::new(node, send_tx, recv_rx)?;

Expand All @@ -79,6 +81,8 @@ struct State {
// Receiver for incoming messages.
recv_rx: mpsc::Receiver<(Message, SocketAddr)>,
node: Node,
// Backlog for unknown messages
backlog: VecDeque<(Message, SocketAddr)>,
}

impl State {
Expand All @@ -91,14 +95,15 @@ impl State {
send_tx,
recv_rx,
node,
backlog: VecDeque::with_capacity(BACKLOG_CAPACITY),
})
}

async fn run(
mut self,
bootstrap_addrs: Vec<SocketAddr>,
relocate_details: Option<SignedRelocateDetails>,
) -> Result<(Node, Section)> {
) -> Result<(Node, Section, Vec<(Message, SocketAddr)>)> {
let (elders_info, section_key) = self
.bootstrap(bootstrap_addrs, relocate_details.as_ref())
.await?;
Expand Down Expand Up @@ -187,14 +192,7 @@ impl State {

return Ok((response.clone(), sender));
}
_ => {
trace!(
"{} Useless message {:?} from {}",
self.node,
message,
sender,
);
}
_ => self.backlog_message(message, sender),
}
}

Expand Down Expand Up @@ -236,7 +234,7 @@ impl State {
mut elders_info: EldersInfo,
mut section_key: bls::PublicKey,
relocate_payload: Option<RelocatePayload>,
) -> Result<(Node, Section)> {
) -> Result<(Node, Section, Vec<(Message, SocketAddr)>)> {
loop {
self.send_join_requests(&elders_info, section_key, relocate_payload.as_ref())
.await?;
Expand All @@ -250,7 +248,11 @@ impl State {
elders_info,
section_chain,
} => {
return Ok((self.node, Section::new(section_chain, elders_info)?));
return Ok((
self.node,
Section::new(section_chain, elders_info)?,
self.backlog.into_iter().collect(),
));
}
JoinResponse::Rejoin {
elders_info: new_elders_info,
Expand Down Expand Up @@ -359,14 +361,7 @@ impl State {
));
}

_ => {
trace!(
"{} Useless message {:?} from {}",
self.node,
message,
sender,
);
}
_ => self.backlog_message(message, sender),
}
}

Expand Down Expand Up @@ -397,6 +392,14 @@ impl State {
}
}
}

fn backlog_message(&mut self, message: Message, sender: SocketAddr) {
while self.backlog.len() >= BACKLOG_CAPACITY {
let _ = self.backlog.pop_front();
}

self.backlog.push_back((message, sender))
}
}

enum JoinResponse {
Expand Down Expand Up @@ -543,7 +546,7 @@ mod tests {
};

// Drive both tasks to completion concurrently (but on the same thread).
let ((_node, section), _) = future::try_join(bootstrap, others).await?;
let ((_node, section, _backlog), _) = future::try_join(bootstrap, others).await?;

assert_eq!(*section.elders_info(), elders_info);
assert_eq!(*section.chain().last_key(), pk);
Expand Down
19 changes: 15 additions & 4 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Routing {

let (event_tx, event_rx) = mpsc::unbounded_channel();

let (state, comm, incoming_msgs) = if config.first {
let (state, comm, incoming_msgs, backlog) = if config.first {
info!("{} Starting a new network as the seed node.", node_name);
let comm = Comm::new(config.transport_config)?;
let incoming_msgs = comm.listen()?;
Expand All @@ -99,26 +99,37 @@ impl Routing {
state.send_event(Event::Connected(Connected::First));
state.send_event(Event::PromotedToElder);

(state, comm, incoming_msgs)
(state, comm, incoming_msgs, vec![])
} else {
info!("{} Bootstrapping a new node.", node_name);
let (comm, bootstrap_addr) = Comm::from_bootstrapping(config.transport_config).await?;
let mut incoming_msgs = comm.listen()?;

let node = Node::new(keypair, comm.our_connection_info()?);
let (node, section) =
let (node, section, backlog) =
bootstrap::infant(node, &comm, &mut incoming_msgs, bootstrap_addr).await?;
let state = Approved::new(node, section, None, event_tx);

state.send_event(Event::Connected(Connected::First));

(state, comm, incoming_msgs)
(state, comm, incoming_msgs, backlog)
};

let stage = Arc::new(Stage::new(state, comm));
let executor = Executor::new(stage.clone(), incoming_msgs).await;
let event_stream = EventStream::new(event_rx);

// Process message backlog
for (message, sender) in backlog {
stage
.clone()
.handle_commands(Command::HandleMessage {
message,
sender: Some(sender),
})
.await?;
}

let routing = Self {
stage,
_executor: executor,
Expand Down
16 changes: 11 additions & 5 deletions src/routing/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ impl Stage {
message_rx,
} => {
self.handle_relocate(bootstrap_addrs, details, message_rx)
.await?;
Ok(vec![])
.await
}
}
}
Expand Down Expand Up @@ -155,11 +154,11 @@ impl Stage {
bootstrap_addrs: Vec<SocketAddr>,
details: SignedRelocateDetails,
message_rx: mpsc::Receiver<(Message, SocketAddr)>,
) -> Result<()> {
) -> Result<Vec<Command>> {
let node = self.state.lock().await.node().clone();
let previous_name = node.name();

let (node, section) =
let (node, section, backlog) =
bootstrap::relocate(node, &self.comm, message_rx, bootstrap_addrs, details).await?;

let mut state = self.state.lock().await;
Expand All @@ -168,6 +167,13 @@ impl Stage {

state.send_event(Event::Connected(Connected::Relocate { previous_name }));

Ok(())
let commands = backlog
.into_iter()
.map(|(message, sender)| Command::HandleMessage {
message,
sender: Some(sender),
})
.collect();
Ok(commands)
}
}

0 comments on commit 75f0a5c

Please sign in to comment.