Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
p2p: add custom messages (#425)
Browse files Browse the repository at this point in the history
  • Loading branch information
p0lunin committed Apr 24, 2020
1 parent fb39682 commit 8cb43db
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 75 deletions.
24 changes: 23 additions & 1 deletion demo/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use rand::thread_rng;

/// Handle to interact with the p2p networking stack.
pub struct P2PHandle {
node_handle: Option<NodeHandle>,
node_handle: Option<NodeHandle<Message>>,
tokio_handle: tokio::runtime::Handle,
}

Expand Down Expand Up @@ -124,3 +124,25 @@ pub fn launch_p2p() -> P2PHandle {
});
receiver.recv().unwrap()
}

use p2p::reexport::{BufMut, Bytes, BytesMut};
use p2p::CustomMessage;
use std::convert::Infallible;

#[derive(Debug, Clone)]
struct Message(pub Vec<u8>);

impl CustomMessage for Message {
type Error = Infallible;

fn decode(src: &mut Bytes) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(Self(Vec::from(src.as_ref())))
}

fn encode(self, dst: &mut BytesMut) -> Result<(), Self::Error> {
Ok(dst.put(self.0.as_slice()))
}
}
39 changes: 35 additions & 4 deletions p2p/examples/chatter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ fn main() {
heartbeat_interval_sec: 3600,
};

let (node, mut notifications_channel) = Node::spawn(host_privkey, config)
let (node, mut notifications_channel) = Node::<Message>::spawn(host_privkey, config)
.await
.expect("Should bind normally.");

Expand Down Expand Up @@ -87,11 +87,11 @@ enum UserCommand {
}

pub struct Console {
node: NodeHandle,
node: NodeHandle<Message>,
}

impl Console {
pub fn spawn(node: NodeHandle) -> task::JoinHandle<Result<(), String>> {
pub fn spawn(node: NodeHandle<Message>) -> task::JoinHandle<Result<(), String>> {
task::spawn_local(async move {
let mut stdin = io::BufReader::new(io::stdin());
let mut console = Console { node };
Expand Down Expand Up @@ -148,7 +148,7 @@ impl Console {
}
UserCommand::Broadcast(msg) => {
println!("=> Broadcasting: {:?}", &msg);
self.node.broadcast(msg.as_bytes().to_vec()).await;
self.node.broadcast(Message(msg.as_bytes().to_vec())).await;
}
UserCommand::ListPeers => {
let peer_infos = self.node.list_peers().await;
Expand Down Expand Up @@ -205,3 +205,34 @@ impl Console {
}
}
}

use p2p::reexport::{BufMut, Bytes, BytesMut};
use p2p::CustomMessage;
use std::convert::Infallible;
use std::ops::Deref;

#[derive(Debug, Clone)]
pub struct Message(pub Vec<u8>);

impl Deref for Message {
type Target = Vec<u8>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl CustomMessage for Message {
type Error = Infallible;

fn decode(src: &mut Bytes) -> Result<Self, Self::Error>
where
Self: Sized,
{
Ok(Self(Vec::from(src.as_ref())))
}

fn encode(self, dst: &mut BytesMut) -> Result<(), Self::Error> {
Ok(dst.put(self.as_slice()))
}
}
83 changes: 59 additions & 24 deletions p2p/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use crate::cybershake::PublicKey;
use crate::peer::PeerAddr;
use crate::peer::{CustomMessage, PeerAddr};
use crate::{PeerID, PeerMessage};
use bytes::{Buf, BufMut, BytesMut};
use curve25519_dalek::ristretto::CompressedRistretto;
use std::convert::TryFrom;
use std::io;
use std::marker::PhantomData;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use tokio_util::codec::{Decoder, Encoder};

pub struct MessageEncoder;
pub struct MessageEncoder<T: CustomMessage> {
marker: PhantomData<T>,
}

impl Encoder<PeerMessage> for MessageEncoder {
impl<T: CustomMessage> Encoder<PeerMessage<T>> for MessageEncoder<T> {
type Error = io::Error;

fn encode(&mut self, item: PeerMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: PeerMessage<T>, dst: &mut BytesMut) -> Result<(), Self::Error> {
match item {
PeerMessage::Hello(u) => {
dst.put_u8(0); // Message type
Expand All @@ -32,50 +35,52 @@ impl Encoder<PeerMessage> for MessageEncoder {
}
PeerMessage::Data(data) => {
dst.put_u8(2); // Message type
let len = u32::try_from(data.len()).map_err(|_| {
dst.put_u32_le(0); // We put here length after
data.encode(dst).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"Max length {} but try to put {} bytes",
u32::max_value(),
data.len()
),
io::ErrorKind::InvalidData,
format!("An error occured when encode body: {}", e),
)
})?;
dst.put_u32_le(len);
dst.put(data.as_slice());
let body_len = (dst.len() - 5) as u32;
dst[1..5].copy_from_slice(&body_len.to_le_bytes()[..])
}
}
Ok(())
}
}

impl MessageEncoder {
impl<T: CustomMessage> MessageEncoder<T> {
pub fn new() -> Self {
Self {}
Self {
marker: PhantomData,
}
}
}

pub struct MessageDecoder {
pub struct MessageDecoder<T: CustomMessage> {
state: DecodeState,
marker: PhantomData<T>,
}

impl MessageDecoder {
impl<T: CustomMessage> MessageDecoder<T> {
pub fn new() -> Self {
MessageDecoder {
state: DecodeState::MessageType,
marker: PhantomData,
}
}
}

#[derive(Debug, PartialEq)]
enum DecodeState {
MessageType,
Len(u8),
Body(u8, usize),
}

impl Decoder for MessageDecoder {
type Item = PeerMessage;
impl<T: CustomMessage> Decoder for MessageDecoder<T> {
type Item = PeerMessage<T>;
type Error = io::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand Down Expand Up @@ -116,11 +121,11 @@ impl Decoder for MessageDecoder {
}
}

fn read_message_body(
fn read_message_body<T: CustomMessage>(
message_type: u8,
len: usize,
src: &mut BytesMut,
) -> Result<PeerMessage, io::Error> {
) -> Result<PeerMessage<T>, io::Error> {
match message_type {
0 => {
if len != 2 {
Expand All @@ -141,7 +146,16 @@ fn read_message_body(
}
Ok(PeerMessage::Peers(peers))
}
2 => Ok(PeerMessage::Data(src.split_to(len).to_vec())),
2 => {
let body = src.split_to(len);
match T::decode(&mut body.freeze()) {
Ok(data) => Ok(PeerMessage::Data(data)),
Err(e) => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("An error occured when decode body: {}", e),
)),
}
}
m => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Unknown message type: {}", m),
Expand Down Expand Up @@ -238,7 +252,7 @@ mod tests {

#[test]
fn code_hello() {
let msg = PeerMessage::Hello(20);
let msg = PeerMessage::<Vec<u8>>::Hello(20);
let mut bytes = BytesMut::new();
MessageEncoder::new()
.encode(msg.clone(), &mut bytes)
Expand All @@ -253,7 +267,7 @@ mod tests {

#[test]
fn code_peers() {
let msg = PeerMessage::Peers(vec![
let msg = PeerMessage::<Vec<u8>>::Peers(vec![
PeerAddr {
id: PeerID(PublicKey::from(CompressedRistretto([0u8; 32]))),
addr: SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from([30; 16]), 40, 12, 24)),
Expand All @@ -274,4 +288,25 @@ mod tests {

assert_eq!(msg, res);
}

#[test]
fn code_custom() {
let msg = PeerMessage::Data(vec![1, 2, 3, 4, 5, 6]);
let mut bytes = BytesMut::new();

let mut encoder = MessageEncoder::new();
let mut decoder = MessageDecoder::new();

encoder
.encode(msg.clone(), &mut bytes)
.expect("Must be encoded");
let res = decoder
.decode(&mut bytes)
.expect("Message must be decoded without errors")
.expect("message must be encoded to end");

assert_eq!(msg, res);
assert_eq!(decoder.state, DecodeState::MessageType);
assert!(bytes.is_empty())
}
}
6 changes: 5 additions & 1 deletion p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,9 @@ mod peer;
mod priority;

pub use self::node::{Direction, Node, NodeConfig, NodeHandle, NodeNotification, PeerInfo};
pub use self::peer::{PeerID, PeerLink, PeerMessage, PeerNotification};
pub use self::peer::{CustomMessage, PeerID, PeerLink, PeerMessage, PeerNotification};
pub use self::priority::Priority;

pub mod reexport {
pub use bytes::{Buf, BufMut, Bytes, BytesMut};
}

0 comments on commit 8cb43db

Please sign in to comment.