Skip to content

Commit

Permalink
Add routing table metrics + tweaks + fixes (#261)
Browse files Browse the repository at this point in the history
- routing table metrics + option in dcli
- only forward "seen" nodes on a findNode request
- setJustSeen & replace on ping AND findnode
- self lookup only at start
- revalidate 10x more
- use bitsPerHop (b) of 5
- small fix in resolve
- small fix in bucket split
  • Loading branch information
kdeme committed Jun 30, 2020
1 parent 0d591c6 commit 9a46722
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 57 deletions.
42 changes: 39 additions & 3 deletions eth/p2p/discoveryv5/dcli.nim
@@ -1,6 +1,6 @@
import
sequtils, options, strutils, chronos, chronicles, chronicles/topics_registry,
stew/byteutils, stew/shims/net, confutils,
stew/byteutils, confutils, confutils/std/net, metrics,
eth/keys, eth/trie/db, eth/net/nat,
eth/p2p/discoveryv5/[protocol, discovery_db, enr, node]

Expand Down Expand Up @@ -30,6 +30,26 @@ type
"Must be one of: any, none, upnp, pmp, extip:<IP>."
defaultValue: "any" .}: string

nodeKey* {.
desc: "P2P node private key as hex.",
defaultValue: PrivateKey.random().expect("Properly intialized private key")
name: "nodekey" .}: PrivateKey

metricsEnabled* {.
defaultValue: false
desc: "Enable the metrics server."
name: "metrics" .}: bool

metricsAddress* {.
defaultValue: ValidIpAddress.init("127.0.0.1")
desc: "Listening address of the metrics server."
name: "metrics-address" .}: ValidIpAddress

metricsPort* {.
defaultValue: 8008
desc: "Listening HTTP port of the metrics server."
name: "metrics-port" .}: Port

case cmd* {.
command
defaultValue: noCommand }: DiscoveryCmd
Expand Down Expand Up @@ -76,6 +96,15 @@ proc parseCmdArg*(T: type Node, p: TaintedString): T =
proc completeCmdArg*(T: type Node, val: TaintedString): seq[string] =
return @[]

proc parseCmdArg*(T: type PrivateKey, p: TaintedString): T =
try:
result = PrivateKey.fromHex(string(p)).tryGet()
except CatchableError as e:
raise newException(ConfigurationError, "Invalid private key")

proc completeCmdArg*(T: type PrivateKey, val: TaintedString): seq[string] =
return @[]

proc setupNat(conf: DiscoveryConf): tuple[ip: Option[ValidIpAddress],
tcpPort: Port,
udpPort: Port] {.gcsafe.} =
Expand Down Expand Up @@ -116,14 +145,21 @@ proc setupNat(conf: DiscoveryConf): tuple[ip: Option[ValidIpAddress],
proc run(config: DiscoveryConf) =
let
(ip, tcpPort, udpPort) = setupNat(config)
privKey = PrivateKey.random().expect("Properly intialized private key")
ddb = DiscoveryDB.init(newMemoryDB())
# TODO: newProtocol should allow for no tcpPort
d = newProtocol(privKey, ddb, ip, tcpPort, udpPort,
d = newProtocol(config.nodeKey, ddb, ip, tcpPort, udpPort,
bootstrapRecords = config.bootnodes)

d.open()

when defined(insecure):
if config.metricsEnabled:
let
address = config.metricsAddress
port = config.metricsPort
info "Starting metrics HTTP server", address, port
metrics.startHttpServer($address, port)

case config.cmd
of ping:
let pong = waitFor d.ping(config.pingTarget)
Expand Down
2 changes: 2 additions & 0 deletions eth/p2p/discoveryv5/node.nim
Expand Up @@ -16,6 +16,8 @@ type
pubkey*: PublicKey
address*: Option[Address]
record*: Record
seen*: bool ## Indicates if there was at least one successful
## request-response with this node.

proc toNodeId*(pk: PublicKey): NodeId =
readUintBE[256](keccak256.digest(pk.toRaw()).data)
Expand Down
78 changes: 42 additions & 36 deletions eth/p2p/discoveryv5/protocol.nim
Expand Up @@ -76,7 +76,7 @@ import
std/[tables, sets, options, math, random],
stew/shims/net as stewNet, json_serialization/std/net,
stew/[byteutils, endians2], chronicles, chronos, stint,
eth/[rlp, keys], types, encoding, node, routing_table, enr
eth/[rlp, keys, async_utils], types, encoding, node, routing_table, enr

import nimcrypto except toHex

Expand All @@ -95,9 +95,11 @@ const
lookupInterval = 60.seconds ## Interval of launching a random lookup to
## populate the routing table. go-ethereum seems to do 3 runs every 30
## minutes. Trinity starts one every minute.
revalidateMax = 1000 ## Revalidation of a peer is done between 0 and this
## value in milliseconds
handshakeTimeout* = 2.seconds ## timeout for the reply on the
## whoareyou message
responseTimeout* = 2.seconds ## timeout for the response of a request-response
responseTimeout* = 4.seconds ## timeout for the response of a request-response
## call
magicSize = 32 ## size of the magic which is the start of the whoareyou
## message
Expand Down Expand Up @@ -303,7 +305,7 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
else:
let distance = min(fn.distance, 256)
d.sendNodes(fromId, fromAddr, reqId,
d.routingTable.neighboursAtDistance(distance))
d.routingTable.neighboursAtDistance(distance, seenOnly = true))

proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
raises: [
Expand Down Expand Up @@ -455,6 +457,22 @@ proc validIp(sender, address: IpAddress): bool {.raises: [Defect].} =
# https://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml
return true

proc replaceNode(d: Protocol, n: Node) =
if n.record notin d.bootstrapRecords:
d.routingTable.replaceNode(n)
# Remove shared secrets when removing the node from routing table.
# TODO: This might be to direct, so we could keep these longer. But better
# would be to simply not remove the nodes immediatly but use an LRU cache.
# Also because some shared secrets will be with nodes not eligable for
# the routing table, and these don't get deleted now, see issue:
# https://github.com/status-im/nim-eth/issues/242
discard d.codec.db.deleteKeys(n.id, n.address.get())
else:
# For now we never remove bootstrap nodes. It might make sense to actually
# do so and to retry them only in case we drop to a really low amount of
# peers in the routing table.
debug "Revalidation of bootstrap node failed", enr = toURI(n.record)

# TODO: This could be improved to do the clean-up immediatily in case a non
# whoareyou response does arrive, but we would need to store the AuthTag
# somewhere
Expand Down Expand Up @@ -497,12 +515,12 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId):
if op.isSome and op.get.kind == nodes:
res.addNodesFromENRs(op.get.nodes.enrs)
else:
# No error on this as we received some nodes.
break
return ok(res)
else:
return err("Nodes message not received in time")


proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T):
DiscResult[RequestId] {.raises: [Exception, Defect].} =
doAssert(toNode.address.isSome())
Expand All @@ -524,8 +542,10 @@ proc ping*(d: Protocol, toNode: Node):
let resp = await d.waitMessage(toNode, reqId[])

if resp.isSome() and resp.get().kind == pong:
d.routingTable.setJustSeen(toNode)
return ok(resp.get().pong)
else:
d.replaceNode(toNode)
return err("Pong message not received in time")

proc findNode*(d: Protocol, toNode: Node, distance: uint32):
Expand All @@ -538,12 +558,18 @@ proc findNode*(d: Protocol, toNode: Node, distance: uint32):
if nodes.isOk:
var res = newSeq[Node]()
for n in nodes[]:
# Check if the node has an address and if the address is public or from
# the same local network or lo network as the sender. The latter allows
# for local testing.
# Any port is allowed, also the so called "well-known" ports.
if n.address.isSome() and
validIp(toNode.address.get().ip, n.address.get().ip):
res.add(n)
# TODO: Check ports

d.routingTable.setJustSeen(toNode)
return ok(res)
else:
d.replaceNode(toNode)
return err(nodes.error)

proc lookupDistances(target, dest: NodeId): seq[uint32] {.raises: [Defect].} =
Expand Down Expand Up @@ -577,7 +603,8 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]]
## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`.
# TODO: Sort the returned nodes on distance
result = d.routingTable.neighbours(target, BUCKET_SIZE)
# Also use unseen nodes as a form of validation.
result = d.routingTable.neighbours(target, BUCKET_SIZE, seenOnly = false)
var asked = initHashSet[NodeId]()
asked.incl(d.localNode.id)
var seen = asked
Expand Down Expand Up @@ -646,57 +673,36 @@ proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]]
else:
return some(n)

return node

proc revalidateNode*(d: Protocol, n: Node)
{.async, raises: [Exception, Defect].} = # TODO: Exception
trace "Ping to revalidate node", node = $n
let pong = await d.ping(n)

if pong.isOK():
if pong.get().enrSeq > n.record.seqNum:
# TODO: Request new ENR
discard

d.routingTable.setJustSeen(n)
trace "Revalidated node", node = $n
else:
# TODO: Handle failures better. E.g. don't remove nodes on different
# failures than timeout
# For now we never remove bootstrap nodes. It might make sense to actually
# do so and to retry them only in case we drop to a really low amount of
# peers in the DHT
if n.record notin d.bootstrapRecords:
trace "Revalidation of node failed, removing node", record = n.record
d.routingTable.replaceNode(n)
# Remove shared secrets when removing the node from routing table.
# This might be to direct, so we could keep these longer. But better
# would be to simply not remove the nodes immediatly but only after x
# amount of failures.
doAssert(n.address.isSome())
discard d.codec.db.deleteKeys(n.id, n.address.get())
else:
debug "Revalidation of bootstrap node failed", enr = toURI(n.record)

proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
# TODO: General Exception raised.
try:
randomize()
while true:
await sleepAsync(rand(10 * 1000).milliseconds)
await sleepAsync(rand(revalidateMax).milliseconds)
let n = d.routingTable.nodeToRevalidate()
if not n.isNil:
# TODO: Should we do these in parallel and/or async to be certain of how
# often nodes are revalidated?
await d.revalidateNode(n)
traceAsyncErrors d.revalidateNode(n)
except CancelledError:
trace "revalidateLoop canceled"

proc lookupLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
# TODO: General Exception raised.
try:
# lookup self (neighbour nodes)
let selfLookup = await d.lookup(d.localNode.id)
trace "Discovered nodes in self lookup", nodes = $selfLookup
while true:
# lookup self (neighbour nodes)
let selfLookup = await d.lookup(d.localNode.id)
trace "Discovered nodes in self lookup", nodes = $selfLookup

let randomLookup = await d.lookupRandom()
if randomLookup.isOK:
trace "Discovered nodes in random lookup", nodes = $randomLookup[]
Expand Down Expand Up @@ -733,7 +739,7 @@ proc newProtocol*(privKey: PrivateKey, db: Database,
codec: Codec(localNode: node, privKey: privKey, db: db),
bootstrapRecords: @bootstrapRecords)

result.routingTable.init(node)
result.routingTable.init(node, 5)

proc open*(d: Protocol) {.raises: [Exception, Defect].} =
info "Starting discovery node", node = $d.localNode,
Expand Down
36 changes: 27 additions & 9 deletions eth/p2p/discoveryv5/routing_table.nim
@@ -1,12 +1,15 @@
import
std/[algorithm, times, sequtils, bitops, random, sets, options],
stint, chronicles,
stint, chronicles, metrics,
node

export options

{.push raises: [Defect].}

declarePublicGauge routing_table_nodes,
"Discovery routing table nodes", labels = ["state"]

type
RoutingTable* = object
thisNode: Node
Expand Down Expand Up @@ -108,6 +111,7 @@ proc add(k: KBucket, n: Node): Node =
return nil
elif k.len < BUCKET_SIZE:
k.nodes.add(n)
routing_table_nodes.inc()
return nil
else:
return k.tail
Expand All @@ -130,7 +134,9 @@ proc addReplacement(k: KBucket, n: Node) =

proc removeNode(k: KBucket, n: Node) =
let i = k.nodes.find(n)
if i != -1: k.nodes.delete(i)
if i != -1:
k.nodes.delete(i)
routing_table_nodes.dec()

proc split(k: KBucket): tuple[lower, upper: KBucket] =
## Split at the median id
Expand All @@ -139,7 +145,7 @@ proc split(k: KBucket): tuple[lower, upper: KBucket] =
result.upper = newKBucket(splitid + 1.u256, k.iend)
for node in k.nodes:
let bucket = if node.id <= splitid: result.lower else: result.upper
discard bucket.add(node)
bucket.nodes.add(node)
for node in k.replacementCache:
let bucket = if node.id <= splitid: result.lower else: result.upper
bucket.replacementCache.add(node)
Expand Down Expand Up @@ -243,9 +249,14 @@ proc replaceNode*(r: var RoutingTable, n: Node) =
let b = r.bucketForNode(n.id)
let idx = b.nodes.find(n)
if idx != -1:
routing_table_nodes.dec()
if b.nodes[idx].seen:
routing_table_nodes.dec(labelValues = ["seen"])
b.nodes.delete(idx)

if b.replacementCache.len > 0:
b.nodes.add(b.replacementCache[high(b.replacementCache)])
routing_table_nodes.inc()
b.replacementCache.delete(high(b.replacementCache))

proc getNode*(r: RoutingTable, id: NodeId): Option[Node] =
Expand All @@ -259,15 +270,18 @@ proc contains*(r: RoutingTable, n: Node): bool = n in r.bucketForNode(n.id)
proc bucketsByDistanceTo(r: RoutingTable, id: NodeId): seq[KBucket] =
sortedByIt(r.buckets, it.distanceTo(id))

proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE): seq[Node] =
proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE,
seenOnly = false): seq[Node] =
## Return up to k neighbours of the given node.
result = newSeqOfCap[Node](k * 2)
block addNodes:
for bucket in r.bucketsByDistanceTo(id):
for n in bucket.nodesByDistanceTo(id):
result.add(n)
if result.len == k * 2:
break addNodes
# Only provide actively seen nodes when `seenOnly` set
if not seenOnly or n.seen:
result.add(n)
if result.len == k * 2:
break addNodes

# TODO: is this sort still needed? Can we get nodes closer from the "next"
# bucket?
Expand All @@ -284,8 +298,8 @@ proc idAtDistance*(id: NodeId, dist: uint32): NodeId =
id xor (1.stuint(256) shl (dist.int - 1))

proc neighboursAtDistance*(r: RoutingTable, distance: uint32,
k: int = BUCKET_SIZE): seq[Node] =
result = r.neighbours(idAtDistance(r.thisNode.id, distance), k)
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
result = r.neighbours(idAtDistance(r.thisNode.id, distance), k, seenOnly)
# This is a bit silly, first getting closest nodes then to only keep the ones
# that are exactly the requested distance.
keepIf(result, proc(n: Node): bool = logDist(n.id, r.thisNode.id) == distance)
Expand All @@ -311,6 +325,10 @@ proc setJustSeen*(r: RoutingTable, n: Node) =
b.nodes.moveRight(0, idx - 1)
b.lastUpdated = epochTime()

if not n.seen:
b.nodes[0].seen = true
routing_table_nodes.inc(labelValues = ["seen"])

proc nodeToRevalidate*(r: RoutingTable): Node =
## Return a node to revalidate. The least recently seen node from a random
## bucket is selected.
Expand Down
5 changes: 5 additions & 0 deletions tests/p2p/discv5_test_helper.nim
Expand Up @@ -53,3 +53,8 @@ proc nodeAtDistance*(n: Node, d: uint32): Node =
proc nodesAtDistance*(n: Node, d: uint32, amount: int): seq[Node] =
for i in 0..<amount:
result.add(nodeAtDistance(n, d))

proc addSeenNode*(d: discv5_protocol.Protocol, n: Node): bool =
# Add it as a seen node, warning: for testing convenience only!
n.seen = true
d.addNode(n)

0 comments on commit 9a46722

Please sign in to comment.