Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge of #116 and #111 #117

Merged
merged 38 commits into from
Mar 7, 2018
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b5aa092
Implement ConnectionReuse correctly
tomaka Jan 24, 2018
b92fb7c
Add some tests and fixes
tomaka Jan 24, 2018
a67c3b0
Remove useless boolean in active_connections
tomaka Jan 24, 2018
84458ae
Correctly run tests
tomaka Jan 24, 2018
d17c32a
Optimize the processing
tomaka Jan 25, 2018
f669efb
Next incoming is now in two steps
tomaka Jan 25, 2018
545b225
Remove log
tomaka Jan 26, 2018
c374bc5
Fix dialing a node even if we already have a connection
tomaka Jan 26, 2018
a7e7b93
Add a proper PeerId to Peerstore
tomaka Jan 26, 2018
459a666
Turn identify into a transport layer
tomaka Jan 26, 2018
9c469e9
Expose the dialed multiaddress
tomaka Jan 29, 2018
7f88c0a
Add identified nodes to the peerstore
tomaka Jan 30, 2018
7061c75
Allow configuring the TTL of the addresses
tomaka Jan 30, 2018
ad1e082
Split identify in two modules
tomaka Jan 30, 2018
9825635
Some comments and tweaks
tomaka Jan 30, 2018
34c3d99
Run rustfmt
tomaka Jan 30, 2018
c40cf3b
Add test and bugfix
tomaka Jan 30, 2018
eca1252
Merge branch 'connection-reuse-correct' into identify-merge-connec-reuse
tomaka Jan 30, 2018
fdabefc
Fix wrong address reported when dialing
tomaka Feb 1, 2018
6455d95
Merge branch 'identify-rework' into identify-merge-connec-reuse
tomaka Feb 1, 2018
aefd574
Merge remote-tracking branch 'upstream/master' into identify-merge-co…
tomaka Feb 4, 2018
08aadaf
Merge remote-tracking branch 'upstream/master' into identify-rework
tomaka Feb 7, 2018
432c94e
Merge remote-tracking branch 'upstream/master' into connection-reuse-…
tomaka Feb 7, 2018
0aa1bae
Merge branch 'identify-rework' into identify-merge-connec-reuse
tomaka Feb 7, 2018
da20541
Merge branch 'connection-reuse-correct' into identify-merge-connec-reuse
tomaka Feb 7, 2018
77b74bd
Fix websocket browser code
tomaka Feb 7, 2018
b22f500
Merge branch 'identify-rework' into identify-merge-connec-reuse
tomaka Feb 7, 2018
7f951f9
Ignore errors in the swarm
tomaka Feb 8, 2018
819b790
Merge remote-tracking branch 'upstream/master' into connection-reuse-…
tomaka Feb 8, 2018
29d1371
Fix multiplex test
tomaka Feb 8, 2018
ba35e25
Fix some style concerns
tomaka Feb 15, 2018
e5de1fb
Merge remote-tracking branch 'upstream/master' into connection-reuse-…
tomaka Feb 15, 2018
dedccd7
Merge branch 'master' into connection-reuse-correct
tomaka Feb 15, 2018
b592e2f
Fix concerns
tomaka Mar 6, 2018
c17398b
Merge remote-tracking branch 'upstream/master' into connection-reuse-…
tomaka Mar 6, 2018
ad8e89c
Merge remote-tracking branch 'origin/connection-reuse-correct' into c…
tomaka Mar 7, 2018
e406fc6
Merge remote-tracking branch 'upstream/master' into identify-merge-co…
tomaka Mar 7, 2018
e6b704d
Merge branch 'connection-reuse-correct' into identify-merge-connec-reuse
tomaka Mar 7, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/examples/echo-dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn main() {
// We now use the controller to dial to the address.
let (finished_tx, finished_rx) = oneshot::channel();
swarm_controller
.dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), proto, |echo| {
.dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), proto, |echo, _| {
// `echo` is what the closure used when initializing `proto` returns.
// Consequently, please note that the `send` method is available only because the type
// `length_delimited::Framed` has a `send` method.
Expand Down
2 changes: 1 addition & 1 deletion example/examples/ping-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ fn main() {
let (tx, rx) = oneshot::channel();
swarm_controller
.dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), ping::Ping,
|(mut pinger, future)| {
|(mut pinger, future), _| {
let ping = pinger.ping().map_err(|_| unreachable!()).inspect(|_| {
println!("Received pong from the remote");
let _ = tx.send(());
Expand Down
4 changes: 2 additions & 2 deletions example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ extern crate libp2p_peerstore;
extern crate libp2p_swarm;
extern crate multiaddr;

use libp2p_peerstore::{PeerAccess, Peerstore};
use libp2p_peerstore::{PeerId, PeerAccess, Peerstore};
use multiaddr::Multiaddr;
use std::time::Duration;

Expand Down Expand Up @@ -58,7 +58,7 @@ where

peer_store
.clone()
.peer_or_create(&public_key)
.peer_or_create(&PeerId::from_bytes(public_key).unwrap())
.add_addr(multiaddr, ttl.clone());
}
}
230 changes: 44 additions & 186 deletions libp2p-identify/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,200 +23,58 @@
//!
//! When two nodes connect to each other, the listening half sends a message to the dialing half,
//! indicating the information, and then the protocol stops.
//!
//! # Usage
//!
//! Both low-level and high-level usages are available.
//!
//! ## High-level usage through the `IdentifyTransport` struct
//!
//! This crate provides the `IdentifyTransport` struct, which wraps around a `Transport` and an
//! implementation of `Peerstore`. `IdentifyTransport` is itself a transport that accepts
//! multiaddresses of the form `/ipfs/...`.
//!
//! If you dial a multiaddr of the form `/ipfs/...`, then the `IdentifyTransport` will look into
//! the `Peerstore` for any known multiaddress for this peer and try to dial them using the
//! underlying transport. If you dial any other multiaddr, then it will dial this multiaddr using
//! the underlying transport, then negotiate the *identify* protocol with the remote in order to
//! obtain its ID, then add it to the peerstore, and finally dial the same multiaddr again and
//! return the connection.
//!
//! Listening doesn't support multiaddresses of the form `/ipfs/...` (because that wouldn't make
//! sense). Any address passed to `listen_on` will be passed directly to the underlying transport.
//!
//! Whenever a remote connects to us, either through listening or through `next_incoming`, the
//! `IdentifyTransport` dials back the remote, upgrades the connection to the *identify* protocol
//! in order to obtain the ID of the remote, stores the information in the peerstore, and finally
//! only returns the connection. From the exterior, the multiaddress of the remote is of the form
//! `/ipfs/...`. If the remote doesn't support the *identify* protocol, then the socket is closed.
//!
//! Because of the behaviour of `IdentifyProtocol`, it is recommended to build it on top of a
//! `ConnectionReuse`.
//!
//! ## Low-level usage through the `IdentifyProtocolConfig` struct
//!
//! The `IdentifyProtocolConfig` struct implements the `ConnectionUpgrade` trait. Using it will
//! negotiate the *identify* protocol.
//!
//! The output of the upgrade is a `IdentifyOutput`. If we are the dialer, then `IdentifyOutput`
//! will contain the information sent by the remote. If we are the listener, then it will contain
//! a `IdentifySender` struct that can be used to transmit back to the remote the information about
//! it.

extern crate bytes;
extern crate futures;
extern crate multiaddr;
extern crate libp2p_peerstore;
extern crate libp2p_swarm;
extern crate multiaddr;
extern crate protobuf;
extern crate tokio_io;
extern crate varint;

use bytes::{Bytes, BytesMut};
use futures::{Future, Stream, Sink};
use libp2p_swarm::{ConnectionUpgrade, Endpoint};
use multiaddr::Multiaddr;
use protobuf::Message as ProtobufMessage;
use protobuf::core::parse_from_bytes as protobuf_parse_from_bytes;
use protobuf::repeated::RepeatedField;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::iter;
use tokio_io::{AsyncRead, AsyncWrite};
use varint::VarintCodec;
pub use self::protocol::{IdentifyInfo, IdentifyOutput, IdentifyProtocolConfig, IdentifySender};
pub use self::transport::IdentifyTransport;

mod protocol;
mod structs_proto;

/// Prototype for an upgrade to the identity protocol.
#[derive(Debug, Clone)]
pub struct IdentifyProtocol {
/// Our public key to report to the remote.
pub public_key: Vec<u8>,
/// Version of the "global" protocol, eg. `ipfs/1.0.0` or `polkadot/1.0.0`.
pub protocol_version: String,
/// Name and version of the client. Can be thought as similar to the `User-Agent` header
/// of HTTP.
pub agent_version: String,
/// Addresses that we are listening on.
pub listen_addrs: Vec<Multiaddr>,
/// Protocols supported by us.
pub protocols: Vec<String>,
}

/// Information sent from the listener to the dialer.
#[derive(Debug, Clone)]
pub struct IdentifyInfo {
/// Public key of the node.
pub public_key: Vec<u8>,
/// Version of the "global" protocol, eg. `ipfs/1.0.0` or `polkadot/1.0.0`.
pub protocol_version: String,
/// Name and version of the client. Can be thought as similar to the `User-Agent` header
/// of HTTP.
pub agent_version: String,
/// Addresses that the remote is listening on.
pub listen_addrs: Vec<Multiaddr>,
/// Our own address as reported by the remote.
pub observed_addr: Multiaddr,
/// Protocols supported by the remote.
pub protocols: Vec<String>,
}

impl<C> ConnectionUpgrade<C> for IdentifyProtocol
where C: AsyncRead + AsyncWrite + 'static
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
type Output = Option<IdentifyInfo>;
type Future = Box<Future<Item = Self::Output, Error = IoError>>;

#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::once((Bytes::from("/ipfs/id/1.0.0"), ()))
}

fn upgrade(self, socket: C, _: (), ty: Endpoint, remote_addr: &Multiaddr) -> Self::Future {
let socket = socket.framed(VarintCodec::default());

match ty {
Endpoint::Dialer => {
let future = socket.into_future()
.map(|(msg, _)| msg)
.map_err(|(err, _)| err)
.and_then(|msg| if let Some(msg) = msg {
Ok(Some(parse_proto_msg(msg)?))
} else {
Ok(None)
});

Box::new(future) as Box<_>
}

Endpoint::Listener => {
let listen_addrs = self.listen_addrs
.into_iter()
.map(|addr| addr.to_string().into_bytes())
.collect();

let mut message = structs_proto::Identify::new();
message.set_agentVersion(self.agent_version);
message.set_protocolVersion(self.protocol_version);
message.set_publicKey(self.public_key);
message.set_listenAddrs(listen_addrs);
message.set_observedAddr(remote_addr.to_string().into_bytes());
message.set_protocols(RepeatedField::from_vec(self.protocols));

let bytes = message.write_to_bytes()
.expect("writing protobuf failed ; should never happen");

// On the server side, after sending the information to the client we make the
// future produce a `None`. If we were on the client side, this would contain the
// information received by the server.
let future = socket.send(bytes).map(|_| None);
Box::new(future) as Box<_>
}
}
}
}

// Turns a protobuf message into an `IdentifyInfo`. If something bad happens, turn it into
// an `IoError`.
fn parse_proto_msg(msg: BytesMut) -> Result<IdentifyInfo, IoError> {
match protobuf_parse_from_bytes::<structs_proto::Identify>(&msg) {
Ok(mut msg) => {
let listen_addrs = {
let mut addrs = Vec::new();
for addr in msg.take_listenAddrs().into_iter() {
addrs.push(bytes_to_multiaddr(addr)?);
}
addrs
};

let observed_addr = bytes_to_multiaddr(msg.take_observedAddr())?;

Ok(IdentifyInfo {
public_key: msg.take_publicKey(),
protocol_version: msg.take_protocolVersion(),
agent_version: msg.take_agentVersion(),
listen_addrs: listen_addrs,
observed_addr: observed_addr,
protocols: msg.take_protocols().into_vec(),
})
}

Err(err) => {
Err(IoError::new(IoErrorKind::InvalidData, err))
}
}
}

// Turn a `Vec<u8>` into a `Multiaddr`. If something bad happens, turn it into an `IoError`.
fn bytes_to_multiaddr(bytes: Vec<u8>) -> Result<Multiaddr, IoError> {
String::from_utf8(bytes)
.map_err(|err| {
IoError::new(IoErrorKind::InvalidData, err)
})
.and_then(|s| {
s.parse()
.map_err(|err| IoError::new(IoErrorKind::InvalidData, err))
})
}

#[cfg(test)]
mod tests {
extern crate libp2p_tcp_transport;
extern crate tokio_core;

use self::libp2p_tcp_transport::TcpConfig;
use self::tokio_core::reactor::Core;
use IdentifyProtocol;
use futures::{IntoFuture, Future, Stream};
use libp2p_swarm::Transport;

#[test]
fn basic() {
let mut core = Core::new().unwrap();
let tcp = TcpConfig::new(core.handle());
let with_proto = tcp.with_upgrade(IdentifyProtocol {
public_key: vec![1, 2, 3, 4],
protocol_version: "ipfs/1.0.0".to_owned(),
agent_version: "agent/version".to_owned(),
listen_addrs: vec!["/ip4/5.6.7.8/tcp/12345".parse().unwrap()],
protocols: vec!["ping".to_owned(), "kad".to_owned()],
});

let (server, addr) = with_proto.clone()
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
let server = server.into_future()
.map_err(|(err, _)| err)
.and_then(|(n, _)| n.unwrap().0);
let dialer = with_proto.dial(addr)
.unwrap()
.into_future();

let (recv, should_be_empty) = core.run(dialer.join(server)).unwrap();
assert!(should_be_empty.is_none());
let recv = recv.unwrap();
assert_eq!(recv.public_key, &[1, 2, 3, 4]);
}
}
mod transport;