Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(net): add discv4 crate #113

Merged
merged 38 commits into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
3a350e9
port kad
mattsse Oct 21, 2022
ee57a6b
feat: port kad bucket
mattsse Oct 21, 2022
97e322e
feat: add discv4
mattsse Oct 21, 2022
90707ff
chore: rustfmt
mattsse Oct 21, 2022
b993672
cargo update
mattsse Oct 21, 2022
2ad6feb
Merge branch 'main' into matt/discv4
mattsse Oct 21, 2022
980e9de
just reuse discv5 table
mattsse Oct 21, 2022
9ef7ee6
test: add rlp tests
mattsse Oct 21, 2022
8146f5d
message encoding
mattsse Oct 21, 2022
50a4125
feat: impl codec roundtrip testing
mattsse Oct 22, 2022
70a7940
more work in message handling
mattsse Oct 23, 2022
011bd4f
implement ping
mattsse Oct 23, 2022
837a11b
feat: impl commands
mattsse Oct 23, 2022
6303c29
cleanup
mattsse Oct 23, 2022
590a69a
more cleanup
mattsse Oct 23, 2022
05c705b
trim config
mattsse Oct 23, 2022
8ac45f1
more docs
mattsse Oct 23, 2022
3d19def
feat: implement recursive lookup
mattsse Oct 23, 2022
7bab43d
docs
mattsse Oct 23, 2022
ec6105f
cleanup config
mattsse Oct 23, 2022
6e2e858
feat: implement update stream
mattsse Oct 23, 2022
f007ddd
Merge branch 'main' into matt/discv4
mattsse Oct 24, 2022
91c4819
chore: config cleanup
mattsse Oct 24, 2022
6b94e5a
docs: add crate docs
mattsse Oct 24, 2022
7959fb5
feat: more testing
mattsse Oct 24, 2022
d180691
Merge branch 'main' into matt/discv4
mattsse Oct 24, 2022
9c28a08
fix deny
mattsse Oct 24, 2022
abffb85
clarify ring
mattsse Oct 24, 2022
9815b13
docs: more docs
mattsse Oct 24, 2022
4a7e923
Merge branch 'main' into matt/discv4
mattsse Oct 25, 2022
90dcbaf
use discv5 master
mattsse Oct 25, 2022
147c024
Merge branch 'main' into matt/discv4
mattsse Oct 25, 2022
d422275
docs: address review and add comments
mattsse Oct 25, 2022
53f2924
update readme
mattsse Oct 25, 2022
a609bff
Merge branch 'main' into matt/discv4
mattsse Oct 25, 2022
afa9a05
rustmft
mattsse Oct 25, 2022
a8203c7
Merge branch 'main' into matt/discv4
mattsse Oct 25, 2022
d261944
chore(clippy): make clippy happy
mattsse Oct 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 3 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions crates/net/discv4/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# <h1 align="center"> discv4 </h1>
mattsse marked this conversation as resolved.
Show resolved Hide resolved

This is a rust implementation of the [Discovery v4](https://github.com/ethereum/devp2p/blob/master/discv4.md)
peer discovery protocol.
This is a rust implementation of
the [Discovery v4](https://github.com/ethereum/devp2p/blob/40ab248bf7e017e83cc9812a4e048446709623e8/discv4.md)
peer discovery protocol.

For comparison to Discovery v5,
see [discv5#comparison-with-node-discovery-v4](https://github.com/ethereum/devp2p/blob/40ab248bf7e017e83cc9812a4e048446709623e8/discv5/discv5.md#comparison-with-node-discovery-v4)

This is inspired by the [discv5](https://github.com/sigp/discv5) crate and reuses its kademlia implementation.
2 changes: 2 additions & 0 deletions crates/net/discv4/src/bootnodes.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Various known bootstrap nodes for networks
mattsse marked this conversation as resolved.
Show resolved Hide resolved

// <https://github.com/ledgerwatch/erigon/blob/610e648dc43ec8cd6563313e28f06f534a9091b3/params/bootnodes.go>

use crate::node::NodeRecord;

/// Ethereum Foundation Go Bootnodes
Expand Down
134 changes: 96 additions & 38 deletions crates/net/discv4/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! [`TableUpdate`] that listeners will receive.
use crate::{
error::{DecodePacketError, Discv4Error},
node::{kad_key, NodeKey, NodeRecord},
node::{kad_key, NodeKey},
proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
};
use bytes::Bytes;
Expand All @@ -34,7 +34,7 @@ use reth_primitives::{H256, H512};
use secp256k1::SecretKey;
use std::{
cell::RefCell,
collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, HashSet, VecDeque},
collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
future::Future,
io,
net::SocketAddr,
Expand All @@ -54,35 +54,43 @@ use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tracing::{debug, instrument, trace, warn};

pub mod bootnodes;
mod config;
pub mod error;
mod node;
mod proto;

mod config;
pub use config::Discv4Config;
mod node;
pub use node::NodeRecord;

/// reexport to get public ip.
pub use public_ip;

pub use config::Discv4Config;

/// Identifier for nodes.
pub type NodeId = H512;

/// The default port for discv4 via UDP
///
/// Note: the default TCP port is the same.
pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;

/// The maximum size of any packet is 1280 bytes.
const MAX_PACKET_SIZE: usize = 1280;

/// Length of the packet-header: Hash + Signature + Packet Type
/// Length of the UDP datagram packet-header: Hash(32b) + Signature(65b) + Packet Type(1b)
const MIN_PACKET_SIZE: usize = 32 + 65 + 1;

/// Concurrency factor for `FindNode` requests
/// Concurrency factor for `FindNode` requests to pick `ALPHA` closest nodes, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
const ALPHA: usize = 3;

/// Maximum number of nodes to ping at once
const MAX_NODES_PING: usize = 32;
/// Maximum number of nodes to ping at concurrently. 2 full `Neighbours` responses with 16 _new_
/// nodes. This will apply some backpressure in recursive lookups.
const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;

/// The size of the datagram is limited, so we chunk here the max number that fit in the datagram is
/// 12: (MAX_PACKET_SIZE - (header + expire + rlp overhead) / rlplength(NodeRecord). The unhappy
/// case is all IPV6 IPS
const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = 12usize;
/// The size of the datagram is limited [`MAX_PACKET_SIZE`], 16 nodes, as the discv4 specifies don't
/// fit in one datagram. The safe number of nodes that always fit in a datagram is 12, with worst
/// case all of them being IPv6 nodes. This is calculated by `(MAX_PACKET_SIZE - (header + expire +
/// rlp overhead) / size(rlp(Node_IPv6))`
const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;

/// The timeout used to identify expired nodes
const NODE_LAST_SEEN_TIMEOUT: Duration = Duration::from_secs(24 * 60 * 60);
Expand All @@ -107,7 +115,7 @@ pub struct Discv4 {
// === impl Discv4 ===

impl Discv4 {
/// Sames as [`Self::bind`] but also spawns the service onto a new task.
/// Same as [`Self::bind`] but also spawns the service onto a new task, [`Discv4Service::spawn()`]
pub async fn spawn(
local_address: SocketAddr,
local_enr: NodeRecord,
Expand All @@ -122,6 +130,42 @@ impl Discv4 {
}

/// Binds a new UdpSocket and creates the service
mattsse marked this conversation as resolved.
Show resolved Hide resolved
///
/// ```
/// # use std::io;
/// use std::net::SocketAddr;
/// use std::str::FromStr;
/// use rand::thread_rng;
/// use secp256k1::SECP256K1;
/// use reth_discv4::{Discv4, Discv4Config, NodeId, NodeRecord};
/// # async fn t() -> io::Result<()> {
/// // generate a (random) keypair
/// let mut rng = thread_rng();
/// let (secret_key, pk) = SECP256K1.generate_keypair(&mut rng);
/// let id = NodeId::from_slice(&pk.serialize_uncompressed()[1..]);
///
/// let socket = SocketAddr::from_str("0.0.0.0:0").unwrap();
/// let local_enr = NodeRecord {
/// address: socket.ip(),
/// tcp_port: socket.port(),
/// udp_port: socket.port(),
/// id,
/// };
/// let config = Discv4Config::default();
Comment on lines +136 to +155
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sick

///
/// let(discv4, mut service) = Discv4::bind(socket, local_enr, secret_key, config).await.unwrap();
///
/// // get an update strea
/// let mut updates = service.update_stream();
///
/// let _handle = service.spawn();
///
/// // lookup the local node in the DHT
/// let _discovered = discv4.lookup_self().await.unwrap();
///
/// # Ok(())
/// # }
/// ```
pub async fn bind(
local_address: SocketAddr,
mut local_enr: NodeRecord,
Expand All @@ -133,7 +177,8 @@ impl Discv4 {
local_enr.udp_port = local_addr.port();
trace!(?local_addr, target = "net::disc", "opened UDP socket");

let (to_service, rx) = mpsc::channel(128);
// We don't expect many commands, so the buffer can be quite small here.
let (to_service, rx) = mpsc::channel(5);
let service =
Discv4Service::new(socket, local_addr, local_enr, secret_key, config, Some(rx));
let discv4 = Discv4 { local_addr, to_service };
Expand Down Expand Up @@ -186,7 +231,6 @@ impl Discv4 {

/// Manages discv4 peer discovery over UDP.
#[must_use = "Stream does nothing unless polled"]
#[allow(unused)]
pub struct Discv4Service {
gakonst marked this conversation as resolved.
Show resolved Hide resolved
/// Local address of the UDP socket.
local_address: SocketAddr,
Expand All @@ -195,17 +239,15 @@ pub struct Discv4Service {
/// The secret key used to sign payloads
secret_key: SecretKey,
/// The UDP socket for sending and receiving messages.
socket: Arc<UdpSocket>,
/// The boot node identifiers
bootstrap_node_ids: HashSet<NodeId>,
_socket: Arc<UdpSocket>,
/// The spawned UDP tasks.
///
/// Note: If dropped, the spawned send+receive tasks are aborted.
_tasks: JoinSet<()>,
/// The routing table.
kbuckets: KBucketsTable<NodeKey, NodeEntry>,
/// Whether to respect timestamps
check_timestamps: bool,
/// The spawned UDP tasks.
///
/// Note: If dropped, the spawned tasks are aborted.
tasks: JoinSet<()>,
/// Receiver for incoming messages
ingress: IngressReceiver,
/// Sender for sending outgoing messages
Expand Down Expand Up @@ -247,9 +289,13 @@ impl Discv4Service {
config: Discv4Config,
commands_rx: Option<mpsc::Receiver<Discv4Command>>,
) -> Self {
// Heuristic limit for channel buffer size, which is correlated with the number of
// concurrent requests and bucket size. This should be large enough to cover multiple
// lookups while also anticipating incoming requests.
const UDP_CHANNEL_BUFFER: usize = MAX_NODES_PER_BUCKET * ALPHA * (ALPHA * 2);
let socket = Arc::new(socket);
let (ingress_tx, ingress_rx) = mpsc::channel(1024);
let (egress_tx, egress_rx) = mpsc::channel(1024);
let (ingress_tx, ingress_rx) = mpsc::channel(UDP_CHANNEL_BUFFER);
let (egress_tx, egress_rx) = mpsc::channel(UDP_CHANNEL_BUFFER);
let mut tasks = JoinSet::<()>::new();
gakonst marked this conversation as resolved.
Show resolved Hide resolved

let udp = Arc::clone(&socket);
Expand Down Expand Up @@ -277,22 +323,19 @@ impl Discv4Service {

let evict_expired_requests_interval = tokio::time::interval(config.find_node_timeout);

let bootstrap_node_ids = config.bootstrap_nodes.iter().map(|n| n.id).collect();

Discv4Service {
local_address,
local_enr,
socket,
_socket: socket,
kbuckets,
secret_key,
tasks,
_tasks: tasks,
ingress: ingress_rx,
egress: egress_tx,
queued_pings: Default::default(),
pending_pings: Default::default(),
pending_find_nodes: Default::default(),
check_timestamps: false,
bootstrap_node_ids,
commands_rx,
update_listeners: Vec::with_capacity(1),
lookup_interval: self_lookup_interval,
Expand Down Expand Up @@ -341,12 +384,12 @@ impl Discv4Service {
ReceiverStream::new(rx)
}

/// Looks up the local node itself.
/// Looks up the local node in the DHT.
pub fn lookup_self(&mut self) {
self.lookup(self.local_enr.id)
}

/// Looks up the given node.
/// Looks up the given node in the DHT
///
/// A FindNode packet requests information about nodes close to target. The target is a 64-byte
/// secp256k1 public key. When FindNode is received, the recipient should reply with Neighbors
Expand All @@ -358,11 +401,21 @@ impl Discv4Service {
self.lookup_with(target, None)
}

/// Starts the recursive lookup process for the given target, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>.
///
/// At first the `ALPHA` (==3, defined concurrency factor) nodes that are closest to the target
/// in the underlying DHT are selected to seed the lookup via `FindNode` requests. In the
/// recursive step, the initiator resends FindNode to nodes it has learned about from previous
/// queries.
///
/// This takes an optional Sender through which all successfully discovered nodes are sent once
/// the request has finished.
#[instrument(skip_all, fields(?target), target = "net::discv4")]
fn lookup_with(&mut self, target: NodeId, tx: Option<NodeRecordSender>) {
trace!("Starting lookup");
let key = kad_key(target);

// Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes
let ctx = LookupContext::new(
target,
self.kbuckets
Expand All @@ -372,6 +425,7 @@ impl Discv4Service {
tx,
);

// From those 16, pick the 3 closest to start the lookup.
let closest = ctx.closest(ALPHA);

trace!(num = closest.len(), "Start lookup closest nodes");
Expand Down Expand Up @@ -618,14 +672,15 @@ impl Discv4Service {

let our_key = kad_key(self.local_enr.id);

// This is the recursive lookup step where we initiate new FindNode requests for the
// This is the recursive lookup step where we initiate new FindNode requests for new nodes
// that where discovered.
for node in msg.nodes {
let key = kad_key(node.id);
let distance = our_key.distance(&key);
ctx.add_node(distance, node);
}

// get the next closest nodes
// get the next closest nodes, not yet queried nodes and start over.
let closest = ctx.closest(ALPHA);

for closest in closest {
Expand Down Expand Up @@ -1030,10 +1085,13 @@ impl LookupContext {
}
}

// SAFETY: the shared context is only accessed mutably when a `Neighbour` response is processed,
// meaning the context is only ever accesses mutably
// SAFETY: The [`Discv4Service`] is intended to be spawned as task which requires `Send`.
// The `LookupContext` is shared by all active `FindNode` requests that are part of the lookup step.
// Which can modify the context. The shared context is only ever accessed mutably when a `Neighbour`
// response is processed and all Clones are stored inside [`Discv4Service`], in other words it is
// guaranteed that there's only 1 owner ([`Discv4Service`]) of all possible [`Rc`] clones of
// [`LookupContext`].
unsafe impl Send for LookupContext {}
unsafe impl Sync for LookupContext {}

struct LookupContextInner {
target: NodeId,
Expand Down
6 changes: 5 additions & 1 deletion crates/net/discv4/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ pub(crate) fn kad_key(node: NodeId) -> discv5::Key<NodeKey> {
discv5::kbucket::Key::from(NodeKey::from(node))
}

/// Represents a ENR in discv4
/// Represents a ENR in discv4.
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub struct NodeRecord {
/// The Address of a node.
pub address: IpAddr,
/// TCP port of the port that accepts connections.
pub tcp_port: u16,
/// UDP discovery port.
pub udp_port: u16,
/// Public key of the discovery service
pub id: NodeId,
}

Expand Down