Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix: send messages with correct MessageKind byte
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam authored and joshuef committed Feb 4, 2021
1 parent 0b28d19 commit 6756b43
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 27 deletions.
5 changes: 2 additions & 3 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,13 @@ impl Approved {
return Err(Error::InvalidDstLocation);
};
let response = InfrastructureQuery::GetSectionResponse(response);
let response = bincode::serialize(&response)?.into();

debug!("Sending {:?} to {}", response, sender);

Ok(vec![Command::SendMessage {
recipients: vec![sender],
delivery_group_size: 1,
kind: MessageKind::Infrastructure,
message: response,
message: bincode::serialize(&response)?.into(),
}])
}
InfrastructureQuery::GetSectionResponse(_) => {
Expand Down
51 changes: 32 additions & 19 deletions src/routing/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::{
net::SocketAddr,
};
use tokio::sync::mpsc;
use tracing::Instrument;
use xor_name::{Prefix, XorName};

const BACKLOG_CAPACITY: usize = 100;
Expand All @@ -49,12 +50,15 @@ pub(crate) async fn initial(
let (send_tx, send_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Raw(incoming_conns);

let span = trace_span!("bootstrap::initial", name = %node.name());

let state = State::new(node, send_tx, recv_rx)?;

future::join(
state.run(vec![bootstrap_addr], None),
send_messages(send_rx, comm),
)
.instrument(span)
.await
.0
}
Expand All @@ -74,12 +78,15 @@ pub(crate) async fn relocate(
let (send_tx, send_rx) = mpsc::channel(1);
let recv_rx = MessageReceiver::Deserialized(recv_rx);

let span = trace_span!("bootstrap::relocate", name = %node.name());

let state = State::new(node, send_tx, recv_rx)?;

future::join(
state.run(bootstrap_addrs, Some(relocate_details)),
send_messages(send_rx, comm),
)
.instrument(span)
.await
.0
}
Expand Down Expand Up @@ -153,8 +160,8 @@ impl<'a> State<'a> {
}
GetSectionResponse::Redirect(new_bootstrap_addrs) => {
info!(
"{} Bootstrapping redirected to another set of peers: {:?}",
self.node, new_bootstrap_addrs,
"Bootstrapping redirected to another set of peers: {:?}",
new_bootstrap_addrs,
);
bootstrap_addrs = new_bootstrap_addrs.to_vec();
}
Expand All @@ -167,6 +174,11 @@ impl<'a> State<'a> {
recipients: Vec<SocketAddr>,
relocate_details: Option<&SignedRelocateDetails>,
) -> Result<()> {
debug!(
"{} Sending GetSectionRequest to {:?}",
self.node, recipients
);

let destination = match relocate_details {
Some(details) => *details.destination(),
None => self.node.name(),
Expand All @@ -175,8 +187,6 @@ impl<'a> State<'a> {
let message = InfrastructureQuery::GetSectionRequest(destination);
let message = bincode::serialize(&message)?.into();

debug!("{} Sending BootstrapRequest to {:?}", self.node, recipients);

let _ = self
.send_tx
.send((MessageKind::Infrastructure.prepend_to(message), recipients))
Expand Down Expand Up @@ -210,7 +220,7 @@ impl<'a> State<'a> {
}
}

error!("{} Message sender unexpectedly closed", self.node);
error!("Message sender unexpectedly closed");
// TODO: consider more specific error here (e.g. `BootstrapInterrupted`)
Err(Error::InvalidState)
}
Expand All @@ -235,7 +245,7 @@ impl<'a> State<'a> {
let relocate_payload =
RelocatePayload::new(relocate_details, &new_name, &self.node.keypair);

info!("{} Changing name to {}.", self.node, new_name);
info!("Changing name to {}", new_name);
self.node = Node::new(new_keypair, self.node.addr).with_age(age);

relocate_payload
Expand Down Expand Up @@ -285,8 +295,8 @@ impl<'a> State<'a> {

if elders_info.prefix.matches(&self.node.name()) {
info!(
"{} Newer Join response for our prefix {:?} from {:?}",
self.node, elders_info, sender
"Newer Join response for our prefix {:?} from {:?}",
elders_info, sender
);
section_key = new_section_key;
let join_request = JoinRequest {
Expand Down Expand Up @@ -336,15 +346,15 @@ impl<'a> State<'a> {
join_request: JoinRequest,
recipients: Vec<SocketAddr>,
) -> Result<()> {
info!(
"{} Sending {:?} to {:?}",
self.node, join_request, recipients
);
info!("Sending {:?} to {:?}", join_request, recipients);

let variant = Variant::JoinRequest(Box::new(join_request));
let message = Message::single_src(&self.node, DstLocation::Direct, variant, None, None)?;

let _ = self.send_tx.send((message.to_bytes(), recipients)).await;
let _ = self
.send_tx
.send((MessageKind::Node.prepend_to(message.to_bytes()), recipients))
.await;

Ok(())
}
Expand Down Expand Up @@ -424,8 +434,8 @@ impl<'a> State<'a> {
let section_chain = message.proof_chain()?.clone();

info!(
"{} This node has been approved to join the network at {:?}!",
self.node, elders_info.value.prefix,
"This node has been approved to join the network at {:?}!",
elders_info.value.prefix,
);

return Ok((
Expand All @@ -442,7 +452,7 @@ impl<'a> State<'a> {
}
}

error!("{} Message sender unexpectedly closed", self.node);
error!("Message sender unexpectedly closed");
// TODO: consider more specific error here (e.g. `BootstrapInterrupted`)
Err(Error::InvalidState)
}
Expand Down Expand Up @@ -596,7 +606,8 @@ mod tests {

// Receive JoinRequest
let (bytes, recipients) = send_rx.try_recv()?;
let message = Message::from_bytes(&bytes)?;
let message = Envelope::from_bytes(&bytes)?;
let message = assert_matches!(message, Envelope::Node(message) => message);

itertools::assert_equal(&recipients, elders_info.peers().map(Peer::addr));
assert_matches!(message.variant(), Variant::JoinRequest(request) => {
Expand Down Expand Up @@ -843,7 +854,8 @@ mod tests {
task::yield_now().await;

let (bytes, _) = send_rx.try_recv()?;
let message = Message::from_bytes(&bytes)?;
let message = Envelope::from_bytes(&bytes)?;
let message = assert_matches!(message, Envelope::Node(message) => message);
assert_matches!(message.variant(), Variant::JoinRequest(_));

// Send `Rejoin` with bad prefix
Expand Down Expand Up @@ -878,7 +890,8 @@ mod tests {
task::yield_now().await;

let (bytes, _) = send_rx.try_recv()?;
let message = Message::from_bytes(&bytes)?;
let message = Envelope::from_bytes(&bytes)?;
let message = assert_matches!(message, Envelope::Node(message) => message);
assert_matches!(message.variant(), Variant::JoinRequest(_));

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions src/routing/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,11 @@ impl Debug for Command {
kind,
message,
} => f
.debug_struct("SendMessageToNodes")
.debug_struct("SendMessage")
.field("recipients", recipients)
.field("delivery_group_size", delivery_group_size)
.field("kind", kind)
.field("message", message)
.field("message", &format_args!("{:10}", hex_fmt::HexFmt(message)))
.finish(),
Self::SendUserMessage { src, dst, content } => f
.debug_struct("SendUserMessage")
Expand Down
14 changes: 11 additions & 3 deletions tests/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
mod utils;

use anyhow::{format_err, Result};
use bytes::Bytes;
use bytes::{BufMut, Bytes, BytesMut};
use qp2p::QuicP2p;
use sn_routing::{Config, DstLocation, Error, Event, NodeElderChange, SrcLocation};
use sn_routing::{Config, DstLocation, Error, Event, MessageKind, NodeElderChange, SrcLocation};
use std::net::{IpAddr, Ipv4Addr};
use utils::*;

Expand Down Expand Up @@ -54,7 +54,15 @@ async fn test_messages_client_node() -> Result<()> {
let client = QuicP2p::with_config(Some(config), &[node_addr], false)?;
let client_endpoint = client.new_endpoint()?;
let (conn, _) = client_endpoint.connect_to(&node_addr).await?;
let (_, mut recv) = conn.send_bi(Bytes::from_static(msg)).await?;

let buffer = {
let mut buffer = BytesMut::new();
buffer.put_u8(MessageKind::Client as u8);
buffer.put_slice(msg);
buffer.freeze()
};

let (_, mut recv) = conn.send_bi(buffer).await?;

// just await for node to respond to client
node_handler.await??;
Expand Down

0 comments on commit 6756b43

Please sign in to comment.