Skip to content

Commit

Permalink
Try async-trait.
Browse files Browse the repository at this point in the history
  • Loading branch information
romanb committed Jun 27, 2020
1 parent fa2ee41 commit 9c6e6b4
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 24 deletions.
1 change: 1 addition & 0 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]

[dependencies]
async-trait = "0.1"
futures = "0.3.1"
libp2p-core = { version = "0.19.2", path = "../../core" }
libp2p-swarm = { version = "0.19.1", path = "../../swarm" }
Expand Down
20 changes: 11 additions & 9 deletions protocols/request-response/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

pub use libp2p_core::ProtocolName;

use futures::{prelude::*, future::BoxFuture};
use async_trait::async_trait;
use futures::prelude::*;
use std::io;

/// A `RequestResponseCodec` defines the request and response types
/// for a [`RequestResponse`](crate::RequestResponse) protocol or
/// protocol family and how they are encoded / decoded on an I/O stream.
#[async_trait]
pub trait RequestResponseCodec {
/// The type of protocol(s) or protocol versions being negotiated.
type Protocol: ProtocolName + Send + Clone;
Expand All @@ -36,29 +38,29 @@ pub trait RequestResponseCodec {

/// Reads a request from the given I/O stream according to the
/// negotiated protocol.
fn read_request<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T)
-> BoxFuture<'a, Result<Self::Request, io::Error>>
async fn read_request<T>(&mut self, protocol: &Self::Protocol, io: &mut T)
-> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send;

/// Reads a response from the given I/O stream according to the
/// negotiated protocol.
fn read_response<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T)
-> BoxFuture<'a, Result<Self::Response, io::Error>>
async fn read_response<T>(&mut self, protocol: &Self::Protocol, io: &mut T)
-> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send;

/// Writes a request to the given I/O stream according to the
/// negotiated protocol.
fn write_request<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T, req: Self::Request)
-> BoxFuture<'a, Result<(), io::Error>>
async fn write_request<T>(&mut self, protocol: &Self::Protocol, io: &mut T, req: Self::Request)
-> io::Result<()>
where
T: AsyncWrite + Unpin + Send;

/// Writes a response to the given I/O stream according to the
/// negotiated protocol.
fn write_response<'a, T>(&mut self, protocol: &Self::Protocol, io: &'a mut T, res: Self::Response)
-> BoxFuture<'a, Result<(), io::Error>>
async fn write_response<T>(&mut self, protocol: &Self::Protocol, io: &mut T, res: Self::Response)
-> io::Result<()>
where
T: AsyncWrite + Unpin + Send;
}
32 changes: 17 additions & 15 deletions protocols/request-response/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

//! Integration tests for the `RequestResponse` network behaviour.

use async_trait::async_trait;
use libp2p_core::{
Multiaddr,
PeerId,
Expand All @@ -32,7 +33,7 @@ use libp2p_noise::{NoiseConfig, X25519Spec, Keypair};
use libp2p_request_response::*;
use libp2p_swarm::Swarm;
use libp2p_tcp::TcpConfig;
use futures::{prelude::*, channel::mpsc, future::BoxFuture};
use futures::{prelude::*, channel::mpsc};
use rand::{self, Rng};
use std::{io, iter};

Expand Down Expand Up @@ -147,47 +148,48 @@ impl ProtocolName for PingProtocol {
}
}

#[async_trait]
impl RequestResponseCodec for PingCodec {
type Protocol = PingProtocol;
type Request = Ping;
type Response = Pong;

fn read_request<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T)
-> BoxFuture<'a, Result<Self::Request, io::Error>>
async fn read_request<T>(&mut self, _: &PingProtocol, io: &mut T)
-> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send
{
read_one(io, 1024)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
.and_then(|data| future::ready(Ok(Ping(data))))
.boxed()
.map_ok(Ping)
.await
}

fn read_response<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T)
-> BoxFuture<'a, Result<Self::Response, io::Error>>
async fn read_response<T>(&mut self, _: &PingProtocol, io: &mut T)
-> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send
{
read_one(io, 1024)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
.and_then(|data| future::ready(Ok(Pong(data))))
.boxed()
.map_ok(Pong)
.await
}

fn write_request<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T, Ping(data): Ping)
-> BoxFuture<'a, Result<(), io::Error>>
async fn write_request<T>(&mut self, _: &PingProtocol, io: &mut T, Ping(data): Ping)
-> io::Result<()>
where
T: AsyncWrite + Unpin + Send
{
write_one(io, data).boxed()
write_one(io, data).await
}

fn write_response<'a, T>(&mut self, _: &PingProtocol, io: &'a mut T, Pong(data): Pong)
-> BoxFuture<'a, Result<(), io::Error>>
async fn write_response<T>(&mut self, _: &PingProtocol, io: &mut T, Pong(data): Pong)
-> io::Result<()>
where
T: AsyncWrite + Unpin + Send
{
write_one(io, data).boxed()
write_one(io, data).await
}
}

0 comments on commit 9c6e6b4

Please sign in to comment.