Skip to content

Commit

Permalink
Merge pull request #3500 from lbryio/fix_script
Browse files Browse the repository at this point in the history
Add Prometheus metrics for DHT internals
  • Loading branch information
jackrobison committed Jan 14, 2022
2 parents a4dce8c + 0618053 commit f78e382
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 24 deletions.
24 changes: 23 additions & 1 deletion lbry/dht/node.py
Expand Up @@ -2,6 +2,9 @@
import asyncio
import typing
import socket

from prometheus_client import Gauge

from lbry.utils import resolve_host
from lbry.dht import constants
from lbry.dht.peer import make_kademlia_peer
Expand All @@ -17,6 +20,14 @@


class Node:
storing_peers_metric = Gauge(
"storing_peers", "Number of peers storing blobs announced to this node", namespace="dht_node",
labelnames=("scope",),
)
stored_blob_with_x_bytes_colliding = Gauge(
"stored_blobs_x_bytes_colliding", "Number of blobs with at least X bytes colliding with this node id prefix",
namespace="dht_node", labelnames=("amount",)
)
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, udp_port: int,
internal_udp_port: int, peer_port: int, external_ip: str, rpc_timeout: float = constants.RPC_TIMEOUT,
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX,
Expand Down Expand Up @@ -44,7 +55,18 @@ async def refresh_node(self, force_once=False):
# add all peers in the routing table
total_peers.extend(self.protocol.routing_table.get_peers())
# add all the peers who have announced blobs to us
total_peers.extend(self.protocol.data_store.get_storing_contacts())
storing_peers = self.protocol.data_store.get_storing_contacts()
self.storing_peers_metric.labels("global").set(len(storing_peers))
total_peers.extend(storing_peers)

counts = {0: 0, 1: 0, 2: 0}
node_id = self.protocol.node_id
for blob_hash in self.protocol.data_store.keys():
bytes_colliding = 0 if blob_hash[0] != node_id[0] else 2 if blob_hash[1] == node_id[1] else 1
counts[bytes_colliding] += 1
self.stored_blob_with_x_bytes_colliding.labels(amount=0).set(counts[0])
self.stored_blob_with_x_bytes_colliding.labels(amount=1).set(counts[1])
self.stored_blob_with_x_bytes_colliding.labels(amount=2).set(counts[2])

# get ids falling in the midpoint of each bucket that hasn't been recently updated
node_ids = self.protocol.routing_table.get_refresh_list(0, True)
Expand Down
13 changes: 13 additions & 0 deletions lbry/dht/peer.py
Expand Up @@ -3,6 +3,9 @@
import logging
from dataclasses import dataclass, field
from functools import lru_cache

from prometheus_client import Gauge

from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache
from lbry.dht import constants
from lbry.dht.serialization.datagram import make_compact_address, make_compact_ip, decode_compact_address
Expand All @@ -26,6 +29,10 @@ def is_valid_public_ipv4(address, allow_localhost: bool = False):


class PeerManager:
peer_manager_keys_metric = Gauge(
"peer_manager_keys", "Number of keys tracked by PeerManager dicts (sum)", namespace="dht_node",
labelnames=("scope",)
)
def __init__(self, loop: asyncio.AbstractEventLoop):
self._loop = loop
self._rpc_failures: typing.Dict[
Expand All @@ -38,6 +45,11 @@ def __init__(self, loop: asyncio.AbstractEventLoop):
self._node_id_reverse_mapping: typing.Dict[bytes, typing.Tuple[str, int]] = LRUCache(CACHE_SIZE)
self._node_tokens: typing.Dict[bytes, (float, bytes)] = LRUCache(CACHE_SIZE)

def count_cache_keys(self):
return len(self._rpc_failures) + len(self._last_replied) + len(self._last_sent) + len(
self._last_requested) + len(self._node_id_mapping) + len(self._node_id_reverse_mapping) + len(
self._node_tokens)

def reset(self):
for statistic in (self._rpc_failures, self._last_replied, self._last_sent, self._last_requested):
statistic.clear()
Expand Down Expand Up @@ -86,6 +98,7 @@ def update_contact_triple(self, node_id: bytes, address: str, udp_port: int):
self._node_id_mapping.pop(self._node_id_reverse_mapping.pop(node_id))
self._node_id_mapping[(address, udp_port)] = node_id
self._node_id_reverse_mapping[node_id] = (address, udp_port)
self.peer_manager_keys_metric.labels("global").set(self.count_cache_keys())

def prune(self): # TODO: periodically call this
now = self._loop.time()
Expand Down
39 changes: 39 additions & 0 deletions lbry/dht/protocol/protocol.py
Expand Up @@ -3,11 +3,14 @@
import functools
import hashlib
import asyncio
import time
import typing
import random
from asyncio.protocols import DatagramProtocol
from asyncio.transports import DatagramTransport

from prometheus_client import Gauge, Counter, Histogram

from lbry.dht import constants
from lbry.dht.serialization.bencoding import DecodeError
from lbry.dht.serialization.datagram import decode_datagram, ErrorDatagram, ResponseDatagram, RequestDatagram
Expand All @@ -30,6 +33,11 @@


class KademliaRPC:
stored_blob_metric = Gauge(
"stored_blobs", "Number of blobs announced by other peers", namespace="dht_node",
labelnames=("scope",),
)

def __init__(self, protocol: 'KademliaProtocol', loop: asyncio.AbstractEventLoop, peer_port: int = 3333):
self.protocol = protocol
self.loop = loop
Expand Down Expand Up @@ -61,6 +69,7 @@ def store(self, rpc_contact: 'KademliaPeer', blob_hash: bytes, token: bytes, por
self.protocol.data_store.add_peer_to_blob(
rpc_contact, blob_hash
)
self.stored_blob_metric.labels("global").set(len(self.protocol.data_store))
return b'OK'

def find_node(self, rpc_contact: 'KademliaPeer', key: bytes) -> typing.List[typing.Tuple[bytes, str, int]]:
Expand Down Expand Up @@ -259,6 +268,30 @@ def stop(self):


class KademliaProtocol(DatagramProtocol):
request_sent_metric = Counter(
"request_sent", "Number of requests send from DHT RPC protocol", namespace="dht_node",
labelnames=("method",),
)
request_success_metric = Counter(
"request_success", "Number of successful requests", namespace="dht_node",
labelnames=("method",),
)
request_error_metric = Counter(
"request_error", "Number of errors returned from request to other peers", namespace="dht_node",
labelnames=("method",),
)
HISTOGRAM_BUCKETS = (
.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 3.0, 3.5, 4.0, 4.50, 5.0, 5.50, 6.0, float('inf')
)
response_time_metric = Histogram(
"response_time", "Response times of DHT RPC requests", namespace="dht_node", buckets=HISTOGRAM_BUCKETS,
labelnames=("method",)
)
received_request_metric = Counter(
"received_request", "Number of received DHT RPC requests", namespace="dht_node",
labelnames=("method",),
)

def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', node_id: bytes, external_ip: str,
udp_port: int, peer_port: int, rpc_timeout: float = constants.RPC_TIMEOUT,
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX):
Expand Down Expand Up @@ -447,6 +480,7 @@ def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):

def handle_request_datagram(self, address: typing.Tuple[str, int], request_datagram: RequestDatagram):
# This is an RPC method request
self.received_request_metric.labels(method=request_datagram.method).inc()
self.peer_manager.report_last_requested(address[0], address[1])
try:
peer = self.routing_table.get_peer(request_datagram.node_id)
Expand Down Expand Up @@ -575,14 +609,19 @@ async def send_request(self, peer: 'KademliaPeer', request: RequestDatagram) ->
self._send(peer, request)
response_fut = self.sent_messages[request.rpc_id][1]
try:
self.request_sent_metric.labels(method=request.method).inc()
start = time.perf_counter()
response = await asyncio.wait_for(response_fut, self.rpc_timeout)
self.response_time_metric.labels(method=request.method).observe(time.perf_counter() - start)
self.peer_manager.report_last_replied(peer.address, peer.udp_port)
self.request_success_metric.labels(method=request.method).inc()
return response
except asyncio.CancelledError:
if not response_fut.done():
response_fut.cancel()
raise
except (asyncio.TimeoutError, RemoteException):
self.request_error_metric.labels(method=request.method).inc()
self.peer_manager.report_failure(peer.address, peer.udp_port)
if self.peer_manager.peer_is_good(peer) is False:
self.remove_peer(peer)
Expand Down
27 changes: 26 additions & 1 deletion lbry/dht/protocol/routing_table.py
Expand Up @@ -4,6 +4,8 @@
import typing
import itertools

from prometheus_client import Gauge

from lbry.dht import constants
from lbry.dht.protocol.distance import Distance
if typing.TYPE_CHECKING:
Expand All @@ -13,8 +15,17 @@


class KBucket:
""" Description - later
"""
Kademlia K-bucket implementation.
"""
peer_in_routing_table_metric = Gauge(
"peers_in_routing_table", "Number of peers on routing table", namespace="dht_node",
labelnames=("scope",)
)
peer_with_x_bit_colliding_metric = Gauge(
"peer_x_bit_colliding", "Number of peers with at least X bits colliding with this node id",
namespace="dht_node", labelnames=("amount",)
)

def __init__(self, peer_manager: 'PeerManager', range_min: int, range_max: int, node_id: bytes):
"""
Expand Down Expand Up @@ -58,6 +69,10 @@ def add_peer(self, peer: 'KademliaPeer') -> bool:
return True
if len(self.peers) < constants.K:
self.peers.append(peer)
self.peer_in_routing_table_metric.labels("global").inc()
if peer.node_id[0] == self._node_id[0]:
bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length()
self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).inc()
return True
else:
return False
Expand Down Expand Up @@ -124,6 +139,10 @@ def get_bad_or_unknown_peers(self) -> typing.List['KademliaPeer']:

def remove_peer(self, peer: 'KademliaPeer') -> None:
self.peers.remove(peer)
self.peer_in_routing_table_metric.labels("global").dec()
if peer.node_id[0] == self._node_id[0]:
bits_colliding = 8 - (peer.node_id[1] ^ self._node_id[1]).bit_length()
self.peer_with_x_bit_colliding_metric.labels(amount=(bits_colliding + 8)).dec()

def key_in_range(self, key: bytes) -> bool:
""" Tests whether the specified key (i.e. node ID) is in the range
Expand Down Expand Up @@ -162,6 +181,10 @@ class TreeRoutingTable:
ping RPC-based k-bucket eviction algorithm described in section 2.2 of
that paper.
"""
bucket_in_routing_table_metric = Gauge(
"buckets_in_routing_table", "Number of buckets on routing table", namespace="dht_node",
labelnames=("scope",)
)

def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', parent_node_id: bytes,
split_buckets_under_index: int = constants.SPLIT_BUCKETS_UNDER_INDEX):
Expand Down Expand Up @@ -279,6 +302,7 @@ def split_bucket(self, old_bucket_index: int) -> None:
# ...and remove them from the old bucket
for contact in new_bucket.peers:
old_bucket.remove_peer(contact)
self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets))

def join_buckets(self):
if len(self.buckets) == 1:
Expand All @@ -302,6 +326,7 @@ def join_buckets(self):
elif can_go_higher:
self.buckets[bucket_index_to_pop + 1].range_min = bucket.range_min
self.buckets.remove(bucket)
self.bucket_in_routing_table_metric.labels("global").set(len(self.buckets))
return self.join_buckets()

def contact_in_routing_table(self, address_tuple: typing.Tuple[str, int]) -> bool:
Expand Down
5 changes: 4 additions & 1 deletion lbry/extras/daemon/daemon.py
Expand Up @@ -5149,10 +5149,12 @@ def jsonrpc_routing_table_get(self):
]
},
"node_id": (str) the local dht node id
"prefix_neighbors_count": (int) the amount of peers sharing the same byte prefix of the local node id
}
"""
result = {
'buckets': {}
'buckets': {},
'prefix_neighbors_count': 0
}

for i, _ in enumerate(self.dht_node.protocol.routing_table.buckets):
Expand All @@ -5165,6 +5167,7 @@ def jsonrpc_routing_table_get(self):
"node_id": hexlify(peer.node_id).decode(),
}
result['buckets'][i].append(host)
result['prefix_neighbors_count'] += 1 if peer.node_id[0] == self.dht_node.protocol.node_id[0] else 0

result['node_id'] = hexlify(self.dht_node.protocol.node_id).decode()
return result
Expand Down
46 changes: 25 additions & 21 deletions scripts/dht_node.py
Expand Up @@ -2,10 +2,11 @@
import argparse
import logging
import csv
import os.path
from io import StringIO
from typing import Optional
from aiohttp import web
from prometheus_client import generate_latest as prom_generate_latest, Gauge
from prometheus_client import generate_latest as prom_generate_latest

from lbry.dht.constants import generate_id
from lbry.dht.node import Node
Expand All @@ -15,22 +16,14 @@

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
log = logging.getLogger(__name__)
BLOBS_STORED = Gauge(
"blobs_stored", "Number of blob info received", namespace="dht_node",
labelnames=("method",)
)
PEERS = Gauge(
"known_peers", "Number of peers on routing table", namespace="dht_node",
labelnames=("method",)
)


class SimpleMetrics:
def __init__(self, port, node):
self.prometheus_port = port
self.dht_node: Node = node

async def handle_metrics_get_request(self, request: web.Request):
async def handle_metrics_get_request(self, _):
try:
return web.Response(
text=prom_generate_latest().decode(),
Expand All @@ -40,15 +33,15 @@ async def handle_metrics_get_request(self, request: web.Request):
log.exception('could not generate prometheus data')
raise

async def handle_peers_csv(self, request: web.Request):
async def handle_peers_csv(self, _):
out = StringIO()
writer = csv.DictWriter(out, fieldnames=["ip", "port", "dht_id"])
writer.writeheader()
for peer in self.dht_node.protocol.routing_table.get_peers():
writer.writerow({"ip": peer.address, "port": peer.udp_port, "dht_id": peer.node_id.hex()})
return web.Response(text=out.getvalue(), content_type='text/csv')

async def handle_blobs_csv(self, request: web.Request):
async def handle_blobs_csv(self, _):
out = StringIO()
writer = csv.DictWriter(out, fieldnames=["blob_hash"])
writer.writeheader()
Expand All @@ -59,17 +52,28 @@ async def handle_blobs_csv(self, request: web.Request):
async def start(self):
prom_app = web.Application()
prom_app.router.add_get('/metrics', self.handle_metrics_get_request)
prom_app.router.add_get('/peers.csv', self.handle_peers_csv)
prom_app.router.add_get('/blobs.csv', self.handle_blobs_csv)
if self.dht_node:
prom_app.router.add_get('/peers.csv', self.handle_peers_csv)
prom_app.router.add_get('/blobs.csv', self.handle_blobs_csv)
metrics_runner = web.AppRunner(prom_app)
await metrics_runner.setup()
prom_site = web.TCPSite(metrics_runner, "0.0.0.0", self.prometheus_port)
await prom_site.start()


async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str], prometheus_port: int):
async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str], prometheus_port: int, export: bool):
loop = asyncio.get_event_loop()
conf = Config()
if not db_file_path.startswith(':memory:'):
node_id_file_path = db_file_path + 'node_id'
if os.path.exists(node_id_file_path):
with open(node_id_file_path, 'rb') as node_id_file:
node_id = node_id_file.read()
else:
with open(node_id_file_path, 'wb') as node_id_file:
node_id = generate_id()
node_id_file.write(node_id)

storage = SQLiteStorage(conf, db_file_path, loop, loop.time)
if bootstrap_node:
nodes = bootstrap_node.split(':')
Expand All @@ -78,17 +82,16 @@ async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional
nodes = conf.known_dht_nodes
await storage.open()
node = Node(
loop, PeerManager(loop), generate_id(), port, port, 3333, None,
loop, PeerManager(loop), node_id, port, port, 3333, None,
storage=storage
)
if prometheus_port > 0:
metrics = SimpleMetrics(prometheus_port, node)
metrics = SimpleMetrics(prometheus_port, node if export else None)
await metrics.start()
node.start(host, nodes)
log.info("Peer with id %s started", node_id.hex())
while True:
await asyncio.sleep(10)
PEERS.labels('main').set(len(node.protocol.routing_table.get_peers()))
BLOBS_STORED.labels('main').set(len(node.protocol.data_store.get_storing_contacts()))
log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.",
len(node.protocol.routing_table.get_peers()), len(node.protocol.data_store),
len(node.protocol.data_store.get_storing_contacts()))
Expand All @@ -103,6 +106,7 @@ async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional
parser.add_argument("--bootstrap_node", default=None, type=str,
help="Node to connect for bootstraping this node. Leave unset to use the default ones. "
"Format: host:port Example: lbrynet1.lbry.com:4444")
parser.add_argument("--metrics_port", default=0, type=int, help="Port for Prometheus and raw CSV metrics. 0 to disable. Default: 0")
parser.add_argument("--metrics_port", default=0, type=int, help="Port for Prometheus metrics. 0 to disable. Default: 0")
parser.add_argument("--enable_csv_export", action='store_true', help="Enable CSV endpoints on metrics server.")
args = parser.parse_args()
asyncio.run(main(args.host, args.port, args.db_file, args.bootstrap_node, args.metrics_port))
asyncio.run(main(args.host, args.port, args.db_file, args.bootstrap_node, args.metrics_port, args.enable_csv_export))

0 comments on commit f78e382

Please sign in to comment.