Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tcp #1845

Closed
wants to merge 4 commits into from
Closed

Tcp #1845

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ default = [
"pnet",
"request-response",
"secp256k1",
"tcp-async-std",
"tcp",
"uds",
"wasm-ext",
"websocket",
Expand All @@ -44,8 +44,7 @@ ping = ["libp2p-ping"]
plaintext = ["libp2p-plaintext"]
pnet = ["libp2p-pnet"]
request-response = ["libp2p-request-response"]
tcp-async-std = ["libp2p-tcp", "libp2p-tcp/async-std"]
tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"]
tcp = ["libp2p-tcp"]
uds = ["libp2p-uds"]
wasm-ext = ["libp2p-wasm-ext"]
wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext/websocket"]
Expand Down Expand Up @@ -91,7 +90,7 @@ libp2p-tcp = { version = "0.25.1", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.26.0", path = "transports/websocket", optional = true }

[dev-dependencies]
async-std = "1.6.2"
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.8.1"
tokio = { version = "0.3", features = ["io-util", "io-std", "stream", "macros", "rt", "rt-multi-thread"] }

Expand Down Expand Up @@ -121,7 +120,3 @@ members = [
"transports/websocket",
"transports/wasm-ext"
]

[[example]]
name = "chat-tokio"
required-features = ["tcp-tokio", "mdns"]
4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ zeroize = "1"
ring = { version = "0.16.9", features = ["alloc", "std"], default-features = false }

[dev-dependencies]
async-std = "1.6.2"
async-std = { version = "1.6.2", features = ["attributes"] }
libp2p-mplex = { path = "../muxers/mplex" }
libp2p-noise = { path = "../protocols/noise" }
libp2p-tcp = { path = "../transports/tcp", features = ["async-std"] }
libp2p-tcp = { path = "../transports/tcp" }
multihash = { version = "0.13", default-features = false, features = ["arb"] }
quickcheck = "0.9.0"
wasm-timer = "0.2"
Expand Down
9 changes: 8 additions & 1 deletion core/src/connection/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ use std::{collections::VecDeque, fmt, pin::Pin};
/// # Example
///
/// ```no_run
/// # #[async_std::main]
/// # async fn main() {
/// use futures::prelude::*;
/// use libp2p_core::connection::{ListenersEvent, ListenersStream};
///
/// let mut listeners = ListenersStream::new(libp2p_tcp::TcpConfig::new());
/// let mut listeners = ListenersStream::new(libp2p_tcp::TcpConfig::new().await.unwrap());
///
/// // Ask the `listeners` to start listening on the given multiaddress.
/// listeners.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
Expand Down Expand Up @@ -74,6 +76,7 @@ use std::{collections::VecDeque, fmt, pin::Pin};
/// }
/// }
/// })
/// # }
/// ```
pub struct ListenersStream<TTrans>
where
Expand Down Expand Up @@ -428,6 +431,8 @@ mod tests {
fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}

fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> { None }
}

async_std::task::block_on(async move {
Expand Down Expand Up @@ -466,6 +471,8 @@ mod tests {
fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}

fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> { None }
}

async_std::task::block_on(async move {
Expand Down
7 changes: 7 additions & 0 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,4 +477,11 @@ where
},
}
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
match self {
EitherTransport::Left(a) => a.address_translation(server, observed),
EitherTransport::Right(b) => b.address_translation(server, observed),
}
}
}
4 changes: 2 additions & 2 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::{
Executor,
Multiaddr,
PeerId,
address_translation,
connection::{
ConnectionId,
ConnectionLimit,
Expand Down Expand Up @@ -198,8 +197,9 @@ where
TMuxer: 'a,
THandler: 'a,
{
let transport = self.listeners.transport();
let mut addrs: Vec<_> = self.listen_addrs()
.filter_map(move |server| address_translation(server, observed_addr))
.filter_map(move |server| transport.address_translation(server, observed_addr))
.collect();

// remove duplicates
Expand Down
3 changes: 3 additions & 0 deletions core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ pub trait Transport {
where
Self: Sized;

/// Perform transport specific multiaddr translation.
fn address_translation(&self, _server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what was discussed with @romanb

/// Boxes the transport, including custom transport errors.
fn boxed(self) -> boxed::Boxed<Self::Output>
where
Expand Down
4 changes: 4 additions & 0 deletions core/src/transport/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ where
};
Ok(future)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
}

/// Custom `Stream` to avoid boxing.
Expand Down
9 changes: 9 additions & 0 deletions core/src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type ListenerUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
trait Abstract<O> {
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>>;
fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
}

impl<T, O> Abstract<O> for T
Expand Down Expand Up @@ -78,6 +79,10 @@ where
.map_err(|e| e.map(box_err))?;
Ok(Box::pin(fut) as Dial<_>)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
Transport::address_translation(self, server, observed)
}
}

impl<O> fmt::Debug for Boxed<O> {
Expand Down Expand Up @@ -108,6 +113,10 @@ impl<O> Transport for Boxed<O> {
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.inner.dial(addr)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
}

fn box_err<E: Error + Send + Sync + 'static>(e: E) -> io::Error {
Expand Down
8 changes: 8 additions & 0 deletions core/src/transport/choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,12 @@ where

Err(TransportError::MultiaddrNotSupported(addr))
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
if let Some(addr) = self.0.address_translation(server, observed) {
Some(addr)
} else {
self.1.address_translation(server, observed)
}
}
}
4 changes: 4 additions & 0 deletions core/src/transport/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl<TOut> Transport for DummyTransport<TOut> {
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
Err(TransportError::MultiaddrNotSupported(addr))
}

fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None
}
}

/// Implementation of `AsyncRead` and `AsyncWrite`. Not meant to be instanciated.
Expand Down
4 changes: 4 additions & 0 deletions core/src/transport/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ where
let p = ConnectedPoint::Dialer { address: addr };
Ok(MapFuture { inner: future, args: Some((self.fun, p)) })
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
}

/// Custom `Stream` implementation to avoid boxing.
Expand Down
4 changes: 4 additions & 0 deletions core/src/transport/map_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ where
Err(err) => Err(err.map(map)),
}
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
}

/// Listening stream for `MapErr`.
Expand Down
4 changes: 4 additions & 0 deletions core/src/transport/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ impl Transport for MemoryTransport {

DialFuture::new(port).ok_or(TransportError::Other(MemoryTransportError::Unreachable))
}

fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None
}
}

/// Error that can be produced from the `MemoryTransport`.
Expand Down
8 changes: 8 additions & 0 deletions core/src/transport/optional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,12 @@ where
Err(TransportError::MultiaddrNotSupported(addr))
}
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
if let Some(inner) = &self.0 {
inner.address_translation(server, observed)
} else {
None
}
}
}
4 changes: 4 additions & 0 deletions core/src/transport/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ where
timer: Delay::new(self.outgoing_timeout),
})
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
}

// TODO: can be removed and replaced with an `impl Stream` once impl Trait is fully stable
Expand Down
8 changes: 8 additions & 0 deletions core/src/transport/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ where
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
self.0.listen_on(addr)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.0.address_translation(server, observed)
}
}

/// An inbound or outbound upgrade.
Expand Down Expand Up @@ -383,6 +387,10 @@ where
upgrade: self.upgrade
})
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
}

/// Errors produced by a transport upgrade.
Expand Down
26 changes: 14 additions & 12 deletions core/tests/network_dial_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ fn deny_incoming_connec() {
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();

let address = async_std::task::block_on(future::poll_fn(|cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) {
Poll::Ready(listen_addr)
} else {
panic!("Was expecting the listen address to be reported")
match swarm1.poll(cx) {
Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => {
Poll::Ready(listen_addr)
}
Poll::Pending => Poll::Pending,
_ => panic!("Was expecting the listen address to be reported"),
}
}));

Expand Down Expand Up @@ -95,15 +97,15 @@ fn dial_self() {
let mut swarm = test_network(NetworkConfig::default());
swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();

let (local_address, mut swarm) = async_std::task::block_on(
future::lazy(move |cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm.poll(cx) {
Ok::<_, void::Void>((listen_addr, swarm))
} else {
panic!("Was expecting the listen address to be reported")
let local_address = async_std::task::block_on(future::poll_fn(|cx| {
match swarm.poll(cx) {
Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => {
Poll::Ready(listen_addr)
}
}))
.unwrap();
Poll::Pending => Poll::Pending,
_ => panic!("Was expecting the listen address to be reported"),
}
}));

swarm.dial(&local_address, TestHandler()).unwrap();

Expand Down
3 changes: 2 additions & 1 deletion core/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ pub fn test_network(cfg: NetworkConfig) -> TestNetwork {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&local_key).unwrap();
let transport: TestTransport = tcp::TcpConfig::new()
let transport: TestTransport = async_std::task::block_on(tcp::TcpConfig::new())
.unwrap()
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(mplex::MplexConfig::new())
Expand Down
Loading