Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
chore(tokio): upgrade tokio to v1.2.0 and qp2p 0.10.0
Browse files Browse the repository at this point in the history
BREAKING CHANGE: new Tokio v1 is not backward compatible with previous runtime versions < 1.
  • Loading branch information
bochaco authored and lionel-faber committed Mar 5, 2021
1 parent 4d86608 commit e5adc1a
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 49 deletions.
10 changes: 3 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ futures = "~0.3.12"
hex_fmt = "~0.3.0"
itertools = "~0.9.0"
lru_time_cache = "~0.11.0"
qp2p = "~0.9.22"
qp2p = "~0.10.0"
rand = "~0.7.3"
rand_chacha = "~0.2.2"
thiserror = "1.0.23"
Expand All @@ -45,8 +45,8 @@ sn_data_types = "~0.16.0"
features = [ "sha3" ]

[dependencies.tokio]
version = "~0.2.24"
features = [ "rt-util", "sync", "time" ]
version = "1.2.0"
features = [ "sync", "time", "rt", "macros", "net", "rt-multi-thread" ]

[dependencies.tracing]
version = "~0.1.22"
Expand All @@ -65,7 +65,3 @@ yansi = "~0.5.0"
[dev-dependencies.rand]
version = "~0.7.3"
features = [ "small_rng" ]

[dev-dependencies.tokio]
version = "~0.2.24"
features = [ "stream", "udp" ]
2 changes: 1 addition & 1 deletion examples/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async fn main() -> Result<()> {

loop {
tokio::select! {
event = event_rx.recv() => {
event = event_rx.recv().await => {
if let Some(event) = event {
network.handle_event(event).await?
} else {
Expand Down
87 changes: 61 additions & 26 deletions src/routing/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,16 +582,19 @@ mod tests {
consensus::test_utils::*, routing::tests::SecretKeySet, section::test_utils::*,
section::MemberInfo, ELDER_SIZE, MIN_AGE,
};
use anyhow::{Error, Result};
use anyhow::{anyhow, Error, Result};
use assert_matches::assert_matches;
use futures::future::{self, Either};
use futures::{
future::{self, Either},
pin_mut,
};
use sn_messaging::section_info::SectionInfo;
use tokio::{sync::mpsc::error::TryRecvError, task};
use tokio::task;

#[tokio::test]
async fn bootstrap_as_adult() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (mut recv_tx, recv_rx) = mpsc::channel(1);
let (recv_tx, recv_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Deserialized(recv_rx);

let (elders_info, mut nodes) = gen_elders_info(Default::default(), ELDER_SIZE);
Expand Down Expand Up @@ -620,7 +623,10 @@ mod tests {
task::yield_now().await;

// Receive GetSectionQuery
let (message, recipients) = send_rx.try_recv()?;
let (message, recipients) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("GetSectionQuery was not received"))?;

assert_eq!(recipients, [bootstrap_addr]);
assert_matches!(message, MessageType::SectionInfo(SectionInfoMsg::GetSectionQuery(name)) => {
Expand All @@ -643,7 +649,10 @@ mod tests {
task::yield_now().await;

// Receive JoinRequest
let (message, recipients) = send_rx.try_recv()?;
let (message, recipients) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("JoinRequest was not received"))?;
let message = assert_matches!(message, MessageType::NodeMessage(NodeMessage(bytes)) =>
Message::from_bytes(Bytes::from(bytes))?);

Expand Down Expand Up @@ -689,7 +698,7 @@ mod tests {
#[tokio::test]
async fn receive_get_section_response_redirect() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (mut recv_tx, recv_rx) = mpsc::channel(1);
let (recv_tx, recv_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Deserialized(recv_rx);

let bootstrap_node = Node::new(crypto::gen_keypair(), gen_addr());
Expand All @@ -702,7 +711,10 @@ mod tests {
task::yield_now().await;

// Receive GetSectionQuery
let (message, recipients) = send_rx.try_recv()?;
let (message, recipients) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("GetSectionQuery was not received"))?;

assert_eq!(recipients, vec![bootstrap_node.addr]);
assert_matches!(
Expand All @@ -720,7 +732,10 @@ mod tests {
task::yield_now().await;

// Receive new GetSectionQuery
let (message, recipients) = send_rx.try_recv()?;
let (message, recipients) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("GetSectionQuery was not received"))?;

assert_eq!(recipients, new_bootstrap_addrs);
assert_matches!(
Expand All @@ -731,8 +746,8 @@ mod tests {
Ok(())
};

futures::pin_mut!(bootstrap_task);
futures::pin_mut!(test_task);
pin_mut!(bootstrap_task);
pin_mut!(test_task);

match future::select(bootstrap_task, test_task).await {
Either::Left(_) => unreachable!(),
Expand All @@ -743,7 +758,7 @@ mod tests {
#[tokio::test]
async fn invalid_get_section_response_redirect() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (mut recv_tx, recv_rx) = mpsc::channel(1);
let (recv_tx, recv_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Deserialized(recv_rx);

let bootstrap_node = Node::new(crypto::gen_keypair(), gen_addr());
Expand All @@ -755,7 +770,11 @@ mod tests {
let test_task = async {
task::yield_now().await;

let (message, _) = send_rx.try_recv()?;
let (message, _) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("GetSectionQuery was not received"))?;

assert_matches!(
message,
MessageType::SectionInfo(SectionInfoMsg::GetSectionQuery(_))
Expand All @@ -765,15 +784,19 @@ mod tests {

recv_tx.try_send((MessageType::SectionInfo(message), bootstrap_node.addr))?;
task::yield_now().await;
assert_matches!(send_rx.try_recv(), Err(TryRecvError::Empty));
assert_matches!(send_rx.recv().await, None);

let addrs = (0..ELDER_SIZE).map(|_| gen_addr()).collect();
let message = SectionInfoMsg::GetSectionResponse(GetSectionResponse::Redirect(addrs));

recv_tx.try_send((MessageType::SectionInfo(message), bootstrap_node.addr))?;
task::yield_now().await;

let (message, _) = send_rx.try_recv()?;
let (message, _) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("GetSectionQuery was not received"))?;

assert_matches!(
message,
MessageType::SectionInfo(SectionInfoMsg::GetSectionQuery(_))
Expand All @@ -782,8 +805,8 @@ mod tests {
Ok(())
};

futures::pin_mut!(bootstrap_task);
futures::pin_mut!(test_task);
pin_mut!(bootstrap_task);
pin_mut!(test_task);

match future::select(bootstrap_task, test_task).await {
Either::Left(_) => unreachable!(),
Expand All @@ -794,7 +817,7 @@ mod tests {
#[tokio::test]
async fn invalid_get_section_response_success() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (mut recv_tx, recv_rx) = mpsc::channel(1);
let (recv_tx, recv_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Deserialized(recv_rx);

let bootstrap_node = Node::new(crypto::gen_keypair(), gen_addr());
Expand All @@ -820,7 +843,11 @@ mod tests {
let test_task = async {
task::yield_now().await;

let (message, _) = send_rx.try_recv()?;
let (message, _) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("GetSectionQuery was not received"))?;

assert_matches!(
message,
MessageType::SectionInfo(SectionInfoMsg::GetSectionQuery(_))
Expand All @@ -840,7 +867,7 @@ mod tests {

recv_tx.try_send((MessageType::SectionInfo(message), bootstrap_node.addr))?;
task::yield_now().await;
assert_matches!(send_rx.try_recv(), Err(TryRecvError::Empty));
assert_matches!(send_rx.recv().await, None);

let infrastructure_info = SectionInfo {
prefix: good_prefix,
Expand All @@ -867,7 +894,7 @@ mod tests {
#[tokio::test]
async fn invalid_join_response_rejoin() -> Result<()> {
let (send_tx, mut send_rx) = mpsc::channel(1);
let (mut recv_tx, recv_rx) = mpsc::channel(1);
let (recv_tx, recv_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Deserialized(recv_rx);

let bootstrap_node = Node::new(crypto::gen_keypair(), gen_addr());
Expand Down Expand Up @@ -895,7 +922,11 @@ mod tests {
let test_task = async {
task::yield_now().await;

let (message, _) = send_rx.try_recv()?;
let (message, _) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("NodeMessage was not received"))?;

let message = assert_matches!(message, MessageType::NodeMessage(NodeMessage(bytes)) => Message::from_bytes(Bytes::from(bytes))?);
assert_matches!(message.variant(), Variant::JoinRequest(_));

Expand All @@ -916,7 +947,7 @@ mod tests {
bootstrap_node.addr,
))?;
task::yield_now().await;
assert_matches!(send_rx.try_recv(), Err(TryRecvError::Empty));
assert_matches!(send_rx.recv().await, None);

// Send `Rejoin` with good prefix
let message = Message::single_src(
Expand All @@ -936,15 +967,19 @@ mod tests {
))?;
task::yield_now().await;

let (message, _) = send_rx.try_recv()?;
let (message, _) = send_rx
.recv()
.await
.ok_or_else(|| anyhow!("NodeMessage was not received"))?;

let message = assert_matches!(message, MessageType::NodeMessage(NodeMessage(bytes)) => Message::from_bytes(Bytes::from(bytes))?);
assert_matches!(message.variant(), Variant::JoinRequest(_));

Ok(())
};

futures::pin_mut!(join_task);
futures::pin_mut!(test_task);
pin_mut!(join_task);
pin_mut!(test_task);

match future::select(join_task, test_task).await {
Either::Left(_) => unreachable!(),
Expand Down
6 changes: 3 additions & 3 deletions src/routing/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl Debug for ConnectionEvent {

async fn handle_disconnection_events(
mut disconnections: qp2p::DisconnectionEvents,
mut event_tx: mpsc::Sender<ConnectionEvent>,
event_tx: mpsc::Sender<ConnectionEvent>,
) {
while let Some(peer_addr) = disconnections.next().await {
let _ = event_tx
Expand All @@ -264,7 +264,7 @@ async fn handle_disconnection_events(

async fn handle_incoming_messages(
mut incoming_msgs: qp2p::IncomingMessages,
mut event_tx: mpsc::Sender<ConnectionEvent>,
event_tx: mpsc::Sender<ConnectionEvent>,
) {
while let Some((src, msg)) = incoming_msgs.next().await {
let _ = event_tx.send(ConnectionEvent::Received((src, msg))).await;
Expand Down Expand Up @@ -493,7 +493,7 @@ mod tests {
transport.new_endpoint().await?;
let addr = endpoint.socket_addr();

let (mut tx, rx) = mpsc::channel(1);
let (tx, rx) = mpsc::channel(1);

let _ = tokio::spawn(async move {
while let Some((_src, msg)) = incoming_messages.next().await {
Expand Down
8 changes: 4 additions & 4 deletions src/routing/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Stage {
let (cancel_timer_tx, mut cancel_timer_rx) = watch::channel(false);

// Take out the initial value.
let _ = futures::executor::block_on(cancel_timer_rx.recv());
let _ = futures::executor::block_on(cancel_timer_rx.changed());

Self {
state: Mutex::new(state),
Expand Down Expand Up @@ -88,7 +88,7 @@ impl Stage {
// Terminate this routing instance - cancel all scheduled timers including any future ones,
// close all network connections and stop accepting new connections.
pub fn terminate(&self) {
let _ = self.cancel_timer_tx.broadcast(true);
let _ = self.cancel_timer_tx.send(true);
self.comm.terminate()
}

Expand Down Expand Up @@ -234,8 +234,8 @@ impl Stage {
}

tokio::select! {
_ = time::delay_for(duration) => Some(Command::HandleTimeout(token)),
_ = cancel_rx.recv() => None,
_ = time::sleep(duration) => Some(Command::HandleTimeout(token)),
_ = cancel_rx.changed() => None,
}
}

Expand Down
16 changes: 8 additions & 8 deletions src/routing/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ async fn handle_consensus_on_online() -> Result<()> {
let status = handle_online_command(&new_peer, &sk_set, &stage, &elders_info).await?;
assert!(status.node_approval_sent);

assert_matches!(event_rx.try_recv(), Ok(Event::MemberJoined { name, age, .. }) => {
assert_matches!(event_rx.recv().await, Some(Event::MemberJoined { name, age, .. }) => {
assert_eq!(name, *new_peer.name());
assert_eq!(age, MIN_AGE);
});
Expand Down Expand Up @@ -628,7 +628,7 @@ async fn handle_consensus_on_online_of_rejoined_node(phase: NetworkPhase, age: u

// Simulate peer with the same name is rejoin and verify resulted behaviours.
let status = handle_online_command(&peer, &sk_set, &stage, &elders_info).await?;
assert!(event_rx.try_recv().is_err());
assert!(event_rx.recv().await.is_none());

// A rejoin node with low age will be rejected.
if age / 2 <= MIN_AGE {
Expand Down Expand Up @@ -694,7 +694,7 @@ async fn handle_consensus_on_offline_of_non_elder() -> Result<()> {
.handle_command(Command::HandleConsensus { vote, proof })
.await?;

assert_matches!(event_rx.try_recv(), Ok(Event::MemberLeft { name, age, }) => {
assert_matches!(event_rx.recv().await, Some(Event::MemberLeft { name, age, }) => {
assert_eq!(name, *existing_peer.name());
assert_eq!(age, MIN_AGE);
});
Expand Down Expand Up @@ -790,7 +790,7 @@ async fn handle_consensus_on_offline_of_elder() -> Result<()> {

assert!(dkg_start_sent);

assert_matches!(event_rx.try_recv(), Ok(Event::MemberLeft { name, .. }) => {
assert_matches!(event_rx.recv().await, Some(Event::MemberLeft { name, .. }) => {
assert_eq!(name, *remove_peer.name());
});

Expand Down Expand Up @@ -1268,8 +1268,8 @@ async fn handle_sync() -> Result<()> {

// Verify our `Section` got updated.
assert_matches!(
event_rx.try_recv(),
Ok(Event::EldersChanged { key, elders, .. }) => {
event_rx.recv().await,
Some(Event::EldersChanged { key, elders, .. }) => {
assert_eq!(key, pk2);
assert_eq!(elders, new_elders);
}
Expand Down Expand Up @@ -1690,8 +1690,8 @@ async fn handle_elders_update() -> Result<()> {
assert_eq!(sync_actual_recipients, sync_expected_recipients);

assert_matches!(
event_rx.try_recv(),
Ok(Event::EldersChanged { key, elders, .. }) => {
event_rx.recv().await,
Some(Event::EldersChanged { key, elders, .. }) => {
assert_eq!(key, pk1);
assert_eq!(elders, elder_names1);
}
Expand Down

0 comments on commit e5adc1a

Please sign in to comment.