Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix: ignore invalid bootstrap response
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Jan 18, 2021
1 parent dbf9807 commit 3d8cfd5
Showing 1 changed file with 268 additions and 43 deletions.
311 changes: 268 additions & 43 deletions src/routing/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ impl<'a> State<'a> {
return Ok((elders_info, section_key));
}
BootstrapResponse::Rebootstrap(new_bootstrap_addrs) => {
if new_bootstrap_addrs.is_empty() {
error!("{} Invalid rebootstrap response: missing peers", self.node);
return Err(Error::InvalidMessage);
}

info!(
"{} Bootstrapping redirected to another set of peers: {:?}",
self.node, new_bootstrap_addrs,
Expand Down Expand Up @@ -190,6 +185,20 @@ impl<'a> State<'a> {
while let Some((message, sender)) = self.recv_rx.next().await {
match message.variant() {
Variant::BootstrapResponse(response) => {
match response {
BootstrapResponse::Rebootstrap(addrs) if addrs.is_empty() => {
error!("Invalid Rebootstrap response: missing peers");
continue;
}
BootstrapResponse::Join { elders_info, .. }
if !elders_info.prefix.matches(&self.node.name()) =>
{
error!("Invalid Join response: bad prefix");
continue;
}
BootstrapResponse::Join { .. } | BootstrapResponse::Rebootstrap(_) => (),
}

if !self.verify_message(&message, None) {
continue;
}
Expand Down Expand Up @@ -528,8 +537,8 @@ mod tests {
};
use anyhow::{Error, Result};
use assert_matches::assert_matches;
use futures::future;
use tokio::task;
use futures::future::{self, Either};
use tokio::{sync::mpsc::error::TryRecvError, task};

#[tokio::test]
async fn bootstrap_as_adult() -> Result<()> {
Expand Down Expand Up @@ -633,53 +642,269 @@ mod tests {
let bootstrap_node = Node::new(crypto::gen_keypair(), gen_addr());

let node = Node::new(crypto::gen_keypair(), gen_addr());
let state = State::new(node, send_tx, recv_rx)?;
let mut state = State::new(node, send_tx, recv_rx)?;

let bootstrap_task = state.bootstrap(vec![bootstrap_node.addr], None);
let test_task = async {
task::yield_now().await;

// Receive BootstrapRequest
let (bytes, recipients) = send_rx.try_recv()?;
let message = Message::from_bytes(&bytes)?;

assert_eq!(recipients, vec![bootstrap_node.addr]);
assert_matches!(message.variant(), Variant::BootstrapRequest(_));

// Send Rebootstrap BootstrapResponse
let new_bootstrap_addrs: Vec<_> = (0..ELDER_SIZE).map(|_| gen_addr()).collect();

let message = Message::single_src(
&bootstrap_node,
DstLocation::Direct,
Variant::BootstrapResponse(BootstrapResponse::Rebootstrap(
new_bootstrap_addrs.clone(),
)),
None,
None,
)?;

recv_tx.try_send((message, bootstrap_node.addr))?;
task::yield_now().await;

// Receive new BootstrapRequests
let (bytes, recipients) = send_rx.try_recv()?;
let message = Message::from_bytes(&bytes)?;

assert_eq!(recipients, new_bootstrap_addrs);
assert_matches!(message.variant(), Variant::BootstrapRequest(_));

Ok(())
};

futures::pin_mut!(bootstrap_task);
futures::pin_mut!(test_task);

match future::select(bootstrap_task, test_task).await {
Either::Left(_) => unreachable!(),
Either::Right((output, _)) => output,
}
}

#[tokio::test]
async fn invalid_bootstrap_response_rebootstrap() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (mut recv_tx, recv_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Deserialized(recv_rx);

let bootstrap_node = Node::new(crypto::gen_keypair(), gen_addr());

let node = Node::new(crypto::gen_keypair(), gen_addr());
let mut state = State::new(node, send_tx, recv_rx)?;

let bootstrap_task = state.bootstrap(vec![bootstrap_node.addr], None);
let test_task = async {
task::yield_now().await;

let (bytes, _) = send_rx.try_recv()?;
let message = Message::from_bytes(&bytes)?;
assert_matches!(message.variant(), Variant::BootstrapRequest(_));

let message = Message::single_src(
&bootstrap_node,
DstLocation::Direct,
Variant::BootstrapResponse(BootstrapResponse::Rebootstrap(vec![])),
None,
None,
)?;

recv_tx.try_send((message, bootstrap_node.addr))?;
task::yield_now().await;
assert_matches!(send_rx.try_recv(), Err(TryRecvError::Empty));

let addrs = (0..ELDER_SIZE).map(|_| gen_addr()).collect();
let message = Message::single_src(
&bootstrap_node,
DstLocation::Direct,
Variant::BootstrapResponse(BootstrapResponse::Rebootstrap(addrs)),
None,
None,
)?;

recv_tx.try_send((message, bootstrap_node.addr))?;
task::yield_now().await;

let (bytes, _) = send_rx.try_recv()?;
let message = Message::from_bytes(&bytes)?;
assert_matches!(message.variant(), Variant::BootstrapRequest(_));

Ok(())
};

futures::pin_mut!(bootstrap_task);
futures::pin_mut!(test_task);

match future::select(bootstrap_task, test_task).await {
Either::Left(_) => unreachable!(),
Either::Right((output, _)) => output,
}
}

#[tokio::test]
async fn invalid_bootstrap_response_join() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (mut recv_tx, recv_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Deserialized(recv_rx);

let bootstrap_node = Node::new(crypto::gen_keypair(), gen_addr());
let node = Node::new(crypto::gen_keypair(), gen_addr());

// Spawn the bootstrap task on a `LocalSet` so that it runs concurrently with the main test
// task, but is aborted when the test task finishes because we don't need it to complete
// for the purpose of this test.
let local_set = task::LocalSet::new();
let (good_prefix, bad_prefix) = {
let p0 = Prefix::default().pushed(false);
let p1 = Prefix::default().pushed(true);

let _ = local_set.spawn_local(state.run(vec![bootstrap_node.addr], None));
if node.name().bit(0) {
(p1, p0)
} else {
(p0, p1)
}
};

local_set
.run_until(async {
task::yield_now().await;
let mut state = State::new(node, send_tx, recv_rx)?;

// Receive BootstrapRequest
let (bytes, recipients) = send_rx.try_recv()?;
let message = Message::from_bytes(&bytes)?;
let bootstrap_task = state.bootstrap(vec![bootstrap_node.addr], None);

assert_eq!(recipients, vec![bootstrap_node.addr]);
assert_matches!(message.variant(), Variant::BootstrapRequest(_));
// Send an invalid `BootstrapResponse::Join` followed by a valid one. The invalid one is
// ignored and the valid one processed normally.
let test_task = async {
task::yield_now().await;

// Send Rebootstrap BootstrapResponse
let new_bootstrap_addrs: Vec<_> = (0..ELDER_SIZE).map(|_| gen_addr()).collect();
let (bytes, _) = send_rx.try_recv()?;
let message = Message::from_bytes(&bytes)?;
assert_matches!(message.variant(), Variant::BootstrapRequest(_));

let message = Message::single_src(
&bootstrap_node,
DstLocation::Direct,
Variant::BootstrapResponse(BootstrapResponse::Rebootstrap(
new_bootstrap_addrs.clone(),
)),
None,
None,
)?;
let (elders_info, _) = gen_elders_info(bad_prefix, ELDER_SIZE);
let section_key = bls::SecretKey::random().public_key();
let message = Message::single_src(
&bootstrap_node,
DstLocation::Direct,
Variant::BootstrapResponse(BootstrapResponse::Join {
elders_info,
section_key,
}),
None,
None,
)?;

recv_tx.try_send((message, bootstrap_node.addr))?;
task::yield_now().await;
assert_matches!(send_rx.try_recv(), Err(TryRecvError::Empty));

recv_tx.try_send((message, bootstrap_node.addr))?;
task::yield_now().await;
let (elders_info, _) = gen_elders_info(good_prefix, ELDER_SIZE);
let section_key = bls::SecretKey::random().public_key();
let message = Message::single_src(
&bootstrap_node,
DstLocation::Direct,
Variant::BootstrapResponse(BootstrapResponse::Join {
elders_info,
section_key,
}),
None,
None,
)?;

// Receive new BootstrapRequests
let (bytes, recipients) = send_rx.try_recv()?;
let message = Message::from_bytes(&bytes)?;
recv_tx.try_send((message, bootstrap_node.addr))?;

assert_eq!(recipients, new_bootstrap_addrs);
assert_matches!(message.variant(), Variant::BootstrapRequest(_));
Ok(())
};

Ok(())
})
.await
let (bootstrap_result, test_result) = future::join(bootstrap_task, test_task).await;
let _ = bootstrap_result?;
test_result
}

// TODO: add test for bootstrap as relocated node
#[tokio::test]
async fn invalid_join_response_rejoin() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (mut recv_tx, recv_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Deserialized(recv_rx);

let bootstrap_node = Node::new(crypto::gen_keypair(), gen_addr());
let node = Node::new(crypto::gen_keypair(), gen_addr());

let (good_prefix, bad_prefix) = {
let p0 = Prefix::default().pushed(false);
let p1 = Prefix::default().pushed(true);

if node.name().bit(0) {
(p1, p0)
} else {
(p0, p1)
}
};

let state = State::new(node, send_tx, recv_rx)?;

let (elders_info, _) = gen_elders_info(good_prefix, ELDER_SIZE);
let section_key = bls::SecretKey::random().public_key();
let join_task = state.join(elders_info, section_key, None);

let test_task = async {
task::yield_now().await;

let (bytes, _) = send_rx.try_recv()?;
let message = Message::from_bytes(&bytes)?;
assert_matches!(message.variant(), Variant::JoinRequest(_));

// Send `BootstrapResponse::Join` with bad prefix
let (elders_info, _) = gen_elders_info(bad_prefix, ELDER_SIZE);
let section_key = bls::SecretKey::random().public_key();

let message = Message::single_src(
&bootstrap_node,
DstLocation::Direct,
Variant::BootstrapResponse(BootstrapResponse::Join {
elders_info,
section_key,
}),
None,
None,
)?;

recv_tx.try_send((message, bootstrap_node.addr))?;
task::yield_now().await;
assert_matches!(send_rx.try_recv(), Err(TryRecvError::Empty));

// Send `BootstrapResponse::Join` with good prefix
let (elders_info, _) = gen_elders_info(good_prefix, ELDER_SIZE);
let section_key = bls::SecretKey::random().public_key();

let message = Message::single_src(
&bootstrap_node,
DstLocation::Direct,
Variant::BootstrapResponse(BootstrapResponse::Join {
elders_info,
section_key,
}),
None,
None,
)?;

recv_tx.try_send((message, bootstrap_node.addr))?;
task::yield_now().await;

let (bytes, _) = send_rx.try_recv()?;
let message = Message::from_bytes(&bytes)?;
assert_matches!(message.variant(), Variant::JoinRequest(_));

Ok(())
};

futures::pin_mut!(join_task);
futures::pin_mut!(test_task);

match future::select(join_task, test_task).await {
Either::Left(_) => unreachable!(),
Either::Right((output, _)) => output,
}
}
}

0 comments on commit 3d8cfd5

Please sign in to comment.