Skip to content

Commit

Permalink
Revert "replace Vec with FuturesUnordered (libp2p#163)"
Browse files Browse the repository at this point in the history
This reverts commit 9674744.
  • Loading branch information
twittner committed May 2, 2018
1 parent 2347161 commit 31dba46
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 77 deletions.
47 changes: 26 additions & 21 deletions swarm/src/connection_reuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
use fnv::FnvHashMap;
use futures::future::{self, FutureResult, IntoFuture};
use futures::{Async, Future, Poll, Stream};
use futures::stream::FuturesUnordered;
use futures::stream::Fuse as StreamFuse;
use futures::sync::mpsc;
use multiaddr::Multiaddr;
Expand Down Expand Up @@ -143,7 +142,7 @@ where
let listener = ConnectionReuseListener {
shared: self.shared.clone(),
listener: listener.fuse(),
current_upgrades: FuturesUnordered::new(),
current_upgrades: Vec::new(),
connections: Vec::new(),
};

Expand Down Expand Up @@ -235,7 +234,7 @@ where
{
// The main listener. `S` is from the underlying transport.
listener: StreamFuse<S>,
current_upgrades: FuturesUnordered<F>,
current_upgrades: Vec<F>,
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,

// Shared between the whole connection reuse mechanism.
Expand Down Expand Up @@ -274,27 +273,33 @@ where
}
};

// Check whether any upgrade (to a muxer) on an incoming connection is ready.
// We extract everything at the start, then insert back the elements that we still want at
// the next iteration.
match self.current_upgrades.poll() {
Ok(Async::Ready(Some((muxer, client_addr)))) => {
let next_incoming = muxer.clone().inbound();
self.connections
.push((muxer.clone(), next_incoming, client_addr.clone()));
// We overwrite any current active connection to that multiaddr because we
// are the freshest possible connection.
self.shared
.lock()
.active_connections
.insert(client_addr, muxer);
}
Err(err) => {
// Insert the rest of the pending upgrades, but not the current one.
debug!(target: "libp2p-swarm", "error while upgrading listener connection: \
{:?}", err);
return Ok(Async::Ready(Some(future::err(err))));
for n in (0..self.current_upgrades.len()).rev() {
let mut current_upgrade = self.current_upgrades.swap_remove(n);
match current_upgrade.poll() {
Ok(Async::Ready((muxer, client_addr))) => {
let next_incoming = muxer.clone().inbound();
self.connections
.push((muxer.clone(), next_incoming, client_addr.clone()));
// We overwrite any current active connection to that multiaddr because we
// are the freshest possible connection.
self.shared
.lock()
.active_connections
.insert(client_addr, muxer);
}
Ok(Async::NotReady) => {
self.current_upgrades.push(current_upgrade);
}
Err(err) => {
// Insert the rest of the pending upgrades, but not the current one.
debug!(target: "libp2p-swarm", "error while upgrading listener connection: \
{:?}", err);
return Ok(Async::Ready(Some(future::err(err))));
}
}
_ => {}
}

// Check whether any incoming substream is ready.
Expand Down
129 changes: 73 additions & 56 deletions swarm/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
use std::fmt;
use std::io::Error as IoError;
use futures::{future, Async, Future, IntoFuture, Poll, Stream};
use futures::stream::{FuturesUnordered, StreamFuture};
use futures::sync::mpsc;
use transport::UpgradedNode;
use {ConnectionUpgrade, Multiaddr, MuxedTransport};
Expand Down Expand Up @@ -57,11 +56,11 @@ where
handler: handler,
new_listeners: new_listeners_rx,
next_incoming: upgraded.clone().next_incoming(),
listeners: FuturesUnordered::new(),
listeners_upgrade: FuturesUnordered::new(),
dialers: FuturesUnordered::new(),
listeners: Vec::new(),
listeners_upgrade: Vec::new(),
dialers: Vec::new(),
new_dialers: new_dialers_rx,
to_process: FuturesUnordered::new(),
to_process: Vec::new(),
new_toprocess: new_toprocess_rx,
};

Expand Down Expand Up @@ -224,22 +223,19 @@ where
next_incoming: Box<
Future<Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>, Error = IoError>,
>,
listeners: FuturesUnordered<
StreamFuture<
Box<
Stream<
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>,
Error = IoError,
>,
listeners: Vec<
Box<
Stream<
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>,
Error = IoError,
>,
>,
>,
listeners_upgrade:
FuturesUnordered<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
dialers: FuturesUnordered<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
listeners_upgrade: Vec<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
dialers: Vec<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
new_dialers:
mpsc::UnboundedReceiver<Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>>,
to_process: FuturesUnordered<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
to_process: Vec<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>,
}

Expand Down Expand Up @@ -275,7 +271,7 @@ where

match self.new_listeners.poll() {
Ok(Async::Ready(Some(new_listener))) => {
self.listeners.push(new_listener.into_future());
self.listeners.push(new_listener);
}
Ok(Async::Ready(None)) | Err(_) => {
// New listener sender has been closed.
Expand Down Expand Up @@ -303,54 +299,75 @@ where
Ok(Async::NotReady) => {}
};

match self.listeners.poll() {
Ok(Async::Ready(Some((Some(upgrade), remaining)))) => {
trace!(target: "libp2p-swarm", "Swarm received new connection on listener socket");
self.listeners_upgrade.push(upgrade);
self.listeners.push(remaining.into_future());
}
Err((err, _)) => {
warn!(target: "libp2p-swarm", "Error in listener: {:?}", err);
}
_ => {}
for n in (0..self.listeners.len()).rev() {
let mut listener = self.listeners.swap_remove(n);
match listener.poll() {
Ok(Async::Ready(Some(upgrade))) => {
trace!(target: "libp2p-swarm", "Swarm received new connection on \
listener socket");
self.listeners.push(listener);
self.listeners_upgrade.push(upgrade);
}
Ok(Async::NotReady) => {
self.listeners.push(listener);
}
Ok(Async::Ready(None)) => {}
Err(err) => {
warn!(target: "libp2p-swarm", "Error in listener: {:?}", err);
}
};
}

match self.listeners_upgrade.poll() {
Ok(Async::Ready(Some((output, client_addr)))) => {
debug!(
"Successfully upgraded incoming connection with {}",
client_addr
);
self.to_process.push(future::Either::A(
handler(output, client_addr).into_future(),
));
for n in (0..self.listeners_upgrade.len()).rev() {
let mut upgrade = self.listeners_upgrade.swap_remove(n);
match upgrade.poll() {
Ok(Async::Ready((output, client_addr))) => {
debug!(
"Successfully upgraded incoming connection with {}",
client_addr
);
self.to_process.push(future::Either::A(
handler(output, client_addr).into_future(),
));
}
Ok(Async::NotReady) => {
self.listeners_upgrade.push(upgrade);
}
Err(err) => {
debug!(target: "libp2p-swarm", "Error in listener upgrade: {:?}", err);
}
}
Err(err) => {
warn!(target: "libp2p-swarm", "Error in listener upgrade: {:?}", err);
}
_ => {}
}

match self.dialers.poll() {
Ok(Async::Ready(Some((output, addr)))) => {
trace!("Successfully upgraded dialed connection with {}", addr);
self.to_process
.push(future::Either::A(handler(output, addr).into_future()));
}
Err(err) => {
warn!(target: "libp2p-swarm", "Error in dialer upgrade: {:?}", err);
for n in (0..self.dialers.len()).rev() {
let mut dialer = self.dialers.swap_remove(n);
match dialer.poll() {
Ok(Async::Ready((output, addr))) => {
trace!("Successfully upgraded dialed connection with {}", addr);
self.to_process
.push(future::Either::A(handler(output, addr).into_future()));
}
Ok(Async::NotReady) => {
self.dialers.push(dialer);
}
Err(err) => {
debug!(target: "libp2p-swarm", "Error in dialer upgrade: {:?}", err);
}
}
_ => {}
}

match self.to_process.poll() {
Ok(Async::Ready(Some(()))) => {
trace!(target: "libp2p-swarm", "Future returned by swarm handler driven to completion");
}
Err(err) => {
warn!(target: "libp2p-swarm", "Error in processing: {:?}", err);
for n in (0..self.to_process.len()).rev() {
let mut to_process = self.to_process.swap_remove(n);
match to_process.poll() {
Ok(Async::Ready(())) => {
trace!(target: "libp2p-swarm", "Future returned by swarm handler driven to \
completion");
}
Ok(Async::NotReady) => self.to_process.push(to_process),
Err(err) => {
debug!(target: "libp2p-swarm", "Error in processing: {:?}", err);
}
}
_ => {}
}

// TODO: we never return `Ok(Ready)` because there's no way to know whether
Expand Down

0 comments on commit 31dba46

Please sign in to comment.