Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New core #568

Merged
merged 5 commits into from Oct 17, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 15 additions & 22 deletions core/src/either.rs
Expand Up @@ -18,10 +18,11 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use futures::{prelude::*, future};
use futures::prelude::*;
use muxing::{Shutdown, StreamMuxer};
use std::io::{Error as IoError, Read, Write};
use tokio_io::{AsyncRead, AsyncWrite};
use Multiaddr;

/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to
/// either `First` or `Second`.
Expand Down Expand Up @@ -243,52 +244,44 @@ pub enum EitherListenStream<A, B> {

impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream>
where
AStream: Stream<Item = AInner, Error = IoError>,
BStream: Stream<Item = BInner, Error = IoError>,
AStream: Stream<Item = (AInner, Multiaddr), Error = IoError>,
BStream: Stream<Item = (BInner, Multiaddr), Error = IoError>,
{
type Item = EitherListenUpgrade<AInner, BInner>;
type Item = (EitherFuture<AInner, BInner>, Multiaddr);
type Error = IoError;

#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
&mut EitherListenStream::First(ref mut a) => a.poll()
.map(|i| i.map(|v| v.map(EitherListenUpgrade::First))),
.map(|i| (i.map(|v| (v.map(|(o, addr)| (EitherFuture::First(o), addr)))))),
&mut EitherListenStream::Second(ref mut a) => a.poll()
.map(|i| i.map(|v| v.map(EitherListenUpgrade::Second))),
.map(|i| (i.map(|v| (v.map(|(o, addr)| (EitherFuture::Second(o), addr)))))),
}
}
}

// TODO: This type is needed because of the lack of `impl Trait` in stable Rust.
// If Rust had impl Trait we could use the Either enum from the futures crate and add some
// modifiers to it. This custom enum is a combination of Either and these modifiers.
/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherListenUpgrade<A, B> {
pub enum EitherFuture<A, B> {
First(A),
Second(B),
}

impl<A, B, Ao, Bo, Af, Bf> Future for EitherListenUpgrade<A, B>
impl<AFuture, BFuture, AInner, BInner> Future for EitherFuture<AFuture, BFuture>
where
A: Future<Item = (Ao, Af), Error = IoError>,
B: Future<Item = (Bo, Bf), Error = IoError>,
AFuture: Future<Item = AInner, Error = IoError>,
BFuture: Future<Item = BInner, Error = IoError>,
{
type Item = (EitherOutput<Ao, Bo>, future::Either<Af, Bf>);
type Item = EitherOutput<AInner, BInner>;
type Error = IoError;

#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
&mut EitherListenUpgrade::First(ref mut a) => {
let (item, addr) = try_ready!(a.poll());
Ok(Async::Ready((EitherOutput::First(item), future::Either::A(addr))))
}
&mut EitherListenUpgrade::Second(ref mut b) => {
let (item, addr) = try_ready!(b.poll());
Ok(Async::Ready((EitherOutput::Second(item), future::Either::B(addr))))
}
&mut EitherFuture::First(ref mut a) => a.poll().map(|v| v.map(EitherOutput::First)),
&mut EitherFuture::Second(ref mut a) => a.poll().map(|v| v.map(EitherOutput::Second)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can remove ref mut and &mut now that rustc (>= 1.26) can handle this in match expressions?

}
}
}
4 changes: 2 additions & 2 deletions core/src/lib.rs
Expand Up @@ -142,7 +142,7 @@
//! // TODO: right now the only available protocol is ping, but we want to replace it with
//! // something that is more simple to use
//! .dial("127.0.0.1:12345".parse::<libp2p_core::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
//! .and_then(|(out, _)| {
//! .and_then(|out| {
//! match out {
//! PingOutput::Ponger(processing) => Box::new(processing) as Box<Future<Item = _, Error = _>>,
//! PingOutput::Pinger(mut pinger) => {
Expand Down Expand Up @@ -220,5 +220,5 @@ pub use self::multiaddr::Multiaddr;
pub use self::muxing::StreamMuxer;
pub use self::peer_id::PeerId;
pub use self::public_key::PublicKey;
pub use self::transport::{MuxedTransport, Transport};
pub use self::transport::Transport;
pub use self::upgrade::{ConnectionUpgrade, Endpoint};
95 changes: 48 additions & 47 deletions core/src/nodes/collection.rs
Expand Up @@ -25,16 +25,15 @@ use nodes::node::Substream;
use nodes::handled_node_tasks::{HandledNodesEvent, HandledNodesTasks};
use nodes::handled_node_tasks::{Task as HandledNodesTask, TaskId};
use nodes::handled_node::NodeHandler;
use std::{collections::hash_map::Entry, fmt, mem};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use {Multiaddr, PeerId};
use std::{collections::hash_map::Entry, fmt, io, mem};
use PeerId;

// TODO: make generic over PeerId

/// Implementation of `Stream` that handles a collection of nodes.
pub struct CollectionStream<TInEvent, TOutEvent> {
pub struct CollectionStream<TInEvent, TOutEvent, THandler> {
/// Object that handles the tasks.
inner: HandledNodesTasks<TInEvent, TOutEvent>,
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler>,
/// List of nodes, with the task id that handles this node. The corresponding entry in `tasks`
/// must always be in the `Connected` state.
nodes: FnvHashMap<PeerId, TaskId>,
Expand All @@ -43,7 +42,7 @@ pub struct CollectionStream<TInEvent, TOutEvent> {
tasks: FnvHashMap<TaskId, TaskState>,
}

impl<TInEvent, TOutEvent> fmt::Debug for CollectionStream<TInEvent, TOutEvent> {
impl<TInEvent, TOutEvent, THandler> fmt::Debug for CollectionStream<TInEvent, TOutEvent, THandler> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let mut list = f.debug_list();
for (id, task) in &self.tasks {
Expand All @@ -70,10 +69,10 @@ enum TaskState {
}

/// Event that can happen on the `CollectionStream`.
pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a> {
pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a> {
/// A connection to a node has succeeded. You must use the provided event in order to accept
/// the connection.
NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent>),
NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler>),

/// A connection to a node has been closed.
///
Expand All @@ -85,19 +84,23 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a> {
},

/// A connection to a node has errored.
///
/// Can only happen after a node has been successfully reached.
NodeError {
/// Identifier of the node.
peer_id: PeerId,
/// The error that happened.
error: IoError,
error: io::Error,
},

/// An error happened on the future that was trying to reach a node.
ReachError {
/// Identifier of the reach attempt that failed.
id: ReachAttemptId,
/// Error that happened on the future.
error: IoError,
error: io::Error,
/// The handler that was passed to `add_reach_attempt`.
handler: THandler,
},

/// A node has produced an event.
Expand All @@ -109,7 +112,7 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a> {
},
}

impl<'a, TInEvent, TOutEvent> fmt::Debug for CollectionEvent<'a, TInEvent, TOutEvent>
impl<'a, TInEvent, TOutEvent, THandler> fmt::Debug for CollectionEvent<'a, TInEvent, TOutEvent, THandler>
where TOutEvent: fmt::Debug
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
Expand All @@ -130,7 +133,7 @@ where TOutEvent: fmt::Debug
.field("error", error)
.finish()
},
CollectionEvent::ReachError { ref id, ref error } => {
CollectionEvent::ReachError { ref id, ref error, .. } => {
f.debug_struct("CollectionEvent::ReachError")
.field("id", id)
.field("error", error)
Expand All @@ -148,16 +151,16 @@ where TOutEvent: fmt::Debug

/// Event that happens when we reach a node.
#[must_use = "The node reached event is used to accept the newly-opened connection"]
pub struct CollectionReachEvent<'a, TInEvent: 'a, TOutEvent: 'a> {
pub struct CollectionReachEvent<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a> {
/// Peer id we connected to.
peer_id: PeerId,
/// The task id that reached the node.
id: TaskId,
/// The `CollectionStream` we are referencing.
parent: &'a mut CollectionStream<TInEvent, TOutEvent>,
parent: &'a mut CollectionStream<TInEvent, TOutEvent, THandler>,
}

impl<'a, TInEvent, TOutEvent> CollectionReachEvent<'a, TInEvent, TOutEvent> {
impl<'a, TInEvent, TOutEvent, THandler> CollectionReachEvent<'a, TInEvent, TOutEvent, THandler> {
/// Returns the peer id the node that has been reached.
#[inline]
pub fn peer_id(&self) -> &PeerId {
Expand Down Expand Up @@ -220,7 +223,7 @@ impl<'a, TInEvent, TOutEvent> CollectionReachEvent<'a, TInEvent, TOutEvent> {
}
}

impl<'a, TInEvent, TOutEvent> fmt::Debug for CollectionReachEvent<'a, TInEvent, TOutEvent> {
impl<'a, TInEvent, TOutEvent, THandler> fmt::Debug for CollectionReachEvent<'a, TInEvent, TOutEvent, THandler> {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("CollectionReachEvent")
.field("peer_id", &self.peer_id)
Expand All @@ -229,7 +232,7 @@ impl<'a, TInEvent, TOutEvent> fmt::Debug for CollectionReachEvent<'a, TInEvent,
}
}

impl<'a, TInEvent, TOutEvent> Drop for CollectionReachEvent<'a, TInEvent, TOutEvent> {
impl<'a, TInEvent, TOutEvent, THandler> Drop for CollectionReachEvent<'a, TInEvent, TOutEvent, THandler> {
fn drop(&mut self) {
let task_state = self.parent.tasks.remove(&self.id);
debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false });
Expand All @@ -255,7 +258,7 @@ pub enum CollectionNodeAccept {
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ReachAttemptId(TaskId);

impl<TInEvent, TOutEvent> CollectionStream<TInEvent, TOutEvent> {
impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandler> {
/// Creates a new empty collection.
#[inline]
pub fn new() -> Self {
Expand All @@ -270,12 +273,11 @@ impl<TInEvent, TOutEvent> CollectionStream<TInEvent, TOutEvent> {
///
/// This method spawns a task dedicated to resolving this future and processing the node's
/// events.
pub fn add_reach_attempt<TFut, TMuxer, TAddrFut, THandler>(&mut self, future: TFut, handler: THandler)
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: THandler)
-> ReachAttemptId
where
TFut: Future<Item = ((PeerId, TMuxer), TAddrFut), Error = IoError> + Send + 'static,
TAddrFut: Future<Item = Multiaddr, Error = IoError> + Send + 'static,
THandler: NodeHandler<Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
TFut: Future<Item = (PeerId, TMuxer), Error = io::Error> + Send + 'static,
THandler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
Expand Down Expand Up @@ -362,59 +364,59 @@ impl<TInEvent, TOutEvent> CollectionStream<TInEvent, TOutEvent> {
/// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to
/// > remove the `Err` variant, but also because we want the `CollectionStream` to stay
/// > borrowed if necessary.
pub fn poll(&mut self) -> Async<Option<CollectionEvent<TInEvent, TOutEvent>>> {
pub fn poll(&mut self) -> Async<CollectionEvent<TInEvent, TOutEvent, THandler>> {
let item = match self.inner.poll() {
Async::Ready(item) => item,
Async::NotReady => return Async::NotReady,
};

match item {
Some(HandledNodesEvent::TaskClosed { id, result }) => {
match (self.tasks.remove(&id), result) {
(Some(TaskState::Pending), Err(err)) => {
Async::Ready(Some(CollectionEvent::ReachError {
HandledNodesEvent::TaskClosed { id, result, handler } => {
match (self.tasks.remove(&id), result, handler) {
(Some(TaskState::Pending), Err(err), Some(handler)) => {
Async::Ready(CollectionEvent::ReachError {
id: ReachAttemptId(id),
error: err,
}))
handler,
})
},
(Some(TaskState::Pending), Ok(())) => {
(Some(TaskState::Pending), _, _) => {
// TODO: this variant shouldn't happen ; prove this
Async::Ready(Some(CollectionEvent::ReachError {
id: ReachAttemptId(id),
error: IoError::new(IoErrorKind::Other, "couldn't reach the node"),
}))
panic!()
},
(Some(TaskState::Connected(peer_id)), Ok(())) => {
(Some(TaskState::Connected(peer_id)), Ok(()), _handler) => {
debug_assert!(_handler.is_none());
let _node_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(_node_task_id, Some(id));
Async::Ready(Some(CollectionEvent::NodeClosed {
Async::Ready(CollectionEvent::NodeClosed {
peer_id,
}))
})
},
(Some(TaskState::Connected(peer_id)), Err(err)) => {
(Some(TaskState::Connected(peer_id)), Err(err), _handler) => {
debug_assert!(_handler.is_none());
let _node_task_id = self.nodes.remove(&peer_id);
debug_assert_eq!(_node_task_id, Some(id));
Async::Ready(Some(CollectionEvent::NodeError {
Async::Ready(CollectionEvent::NodeError {
peer_id,
error: err,
}))
})
},
(None, _) => {
(None, _, _) => {
panic!("self.tasks is always kept in sync with the tasks in self.inner ; \
when we add a task in self.inner we add a corresponding entry in \
self.tasks, and remove the entry only when the task is closed ; \
qed")
},
}
},
Some(HandledNodesEvent::NodeReached { id, peer_id }) => {
Async::Ready(Some(CollectionEvent::NodeReached(CollectionReachEvent {
HandledNodesEvent::NodeReached { id, peer_id } => {
Async::Ready(CollectionEvent::NodeReached(CollectionReachEvent {
parent: self,
id,
peer_id,
})))
}))
},
Some(HandledNodesEvent::NodeEvent { id, event }) => {
HandledNodesEvent::NodeEvent { id, event } => {
let peer_id = match self.tasks.get(&id) {
Some(TaskState::Connected(peer_id)) => peer_id.clone(),
_ => panic!("we can only receive NodeEvent events from a task after we \
Expand All @@ -423,12 +425,11 @@ impl<TInEvent, TOutEvent> CollectionStream<TInEvent, TOutEvent> {
self.tasks is switched to the Connected state ; qed"),
};

Async::Ready(Some(CollectionEvent::NodeEvent {
Async::Ready(CollectionEvent::NodeEvent {
peer_id,
event,
}))
})
}
None => Async::Ready(None),
}
}
}
Expand Down