Skip to content

Commit

Permalink
revert libp2p#163, it breaks protocol upgrade in connection reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
quake committed Apr 20, 2018
1 parent 9674744 commit 4107fb1
Showing 1 changed file with 25 additions and 20 deletions.
45 changes: 25 additions & 20 deletions swarm/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use std::fmt;
use std::io::Error as IoError;
use futures::{future, Async, Future, IntoFuture, Poll, Stream};
use futures::stream::{FuturesUnordered, StreamFuture};
use futures::stream::FuturesUnordered;
use futures::sync::mpsc;
use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode};

Expand Down Expand Up @@ -56,7 +56,7 @@ where
handler: handler,
new_listeners: new_listeners_rx,
next_incoming: upgraded.clone().next_incoming(),
listeners: FuturesUnordered::new(),
listeners: Vec::new(),
listeners_upgrade: FuturesUnordered::new(),
dialers: FuturesUnordered::new(),
new_dialers: new_dialers_rx,
Expand Down Expand Up @@ -223,13 +223,11 @@ where
next_incoming: Box<
Future<Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>, Error = IoError>,
>,
listeners: FuturesUnordered<
StreamFuture<
Box<
Stream<
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>,
Error = IoError,
>,
listeners: Vec<
Box<
Stream<
Item = Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>,
Error = IoError,
>,
>,
>,
Expand Down Expand Up @@ -274,7 +272,7 @@ where

match self.new_listeners.poll() {
Ok(Async::Ready(Some(new_listener))) => {
self.listeners.push(new_listener.into_future());
self.listeners.push(new_listener);
}
Ok(Async::Ready(None)) | Err(_) => {
// New listener sender has been closed.
Expand Down Expand Up @@ -302,16 +300,23 @@ where
Ok(Async::NotReady) => {}
};

match self.listeners.poll() {
Ok(Async::Ready(Some((Some(upgrade), remaining)))) => {
trace!(target: "libp2p-swarm", "Swarm received new connection on listener socket");
self.listeners_upgrade.push(upgrade);
self.listeners.push(remaining.into_future());
}
Err((err, _)) => {
warn!(target: "libp2p-swarm", "Error in listener: {:?}", err);
}
_ => {}
for n in (0..self.listeners.len()).rev() {
let mut listener = self.listeners.swap_remove(n);
match listener.poll() {
Ok(Async::Ready(Some(upgrade))) => {
trace!(target: "libp2p-swarm", "Swarm received new connection on \
listener socket");
self.listeners.push(listener);
self.listeners_upgrade.push(upgrade);
}
Ok(Async::NotReady) => {
self.listeners.push(listener);
}
Ok(Async::Ready(None)) => {}
Err(err) => {
warn!(target: "libp2p-swarm", "Error in listener: {:?}", err);
}
};
}

match self.listeners_upgrade.poll() {
Expand Down

0 comments on commit 4107fb1

Please sign in to comment.