From d1d0ea255ac126e84e9b604645cd37d04d071e1e Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 27 Sep 2021 03:26:34 -0300 Subject: [PATCH 1/8] add dht seed node script --- lbry/dht/protocol/data_store.py | 3 +++ scripts/dht_node.py | 38 +++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 scripts/dht_node.py diff --git a/lbry/dht/protocol/data_store.py b/lbry/dht/protocol/data_store.py index 6a614680fe..3d937e84e2 100644 --- a/lbry/dht/protocol/data_store.py +++ b/lbry/dht/protocol/data_store.py @@ -16,6 +16,9 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager') self._peer_manager = peer_manager self.completed_blobs: typing.Set[str] = set() + def __len__(self): + return self._data_store.__len__() + def removed_expired_peers(self): now = self.loop.time() keys = list(self._data_store.keys()) diff --git a/scripts/dht_node.py b/scripts/dht_node.py new file mode 100644 index 0000000000..9321c5f527 --- /dev/null +++ b/scripts/dht_node.py @@ -0,0 +1,38 @@ +import asyncio +import argparse +import logging + +from lbry.dht.constants import generate_id +from lbry.dht.node import Node +from lbry.dht.peer import PeerManager +from lbry.extras.daemon.storage import SQLiteStorage +from lbry.conf import Config + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") +log = logging.getLogger(__name__) + + +async def main(host: str, port: int): + loop = asyncio.get_event_loop() + conf = Config() + storage = SQLiteStorage(conf, ":memory:", loop, loop.time) + await storage.open() + node = Node( + loop, PeerManager(loop), generate_id(), port, port, 3333, None, + storage=storage + ) + node.start(host, conf.known_dht_nodes) + while True: + await asyncio.sleep(10) + log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.", + len(node.protocol.routing_table.get_peers()), len(node.protocol.data_store), + len(node.protocol.data_store.get_storing_contacts())) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Starts a single DHT node, which then can be used as a seed node or just a contributing node.") + parser.add_argument("--host", default='0.0.0.0', help="Host to listen for requests. Default: 0.0.0.0") + parser.add_argument("--port", default=4444, help="Port to listen for requests. Default: 4444") + args = parser.parse_args() + asyncio.run(main(args.host, args.port)) From 9cc094191c1ca742b30ea0ba3d0f41283c4606b8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 27 Sep 2021 13:33:10 -0300 Subject: [PATCH 2/8] define arg types --- scripts/dht_node.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 9321c5f527..476d2107dc 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -32,7 +32,7 @@ async def main(host: str, port: int): if __name__ == '__main__': parser = argparse.ArgumentParser( description="Starts a single DHT node, which then can be used as a seed node or just a contributing node.") - parser.add_argument("--host", default='0.0.0.0', help="Host to listen for requests. Default: 0.0.0.0") - parser.add_argument("--port", default=4444, help="Port to listen for requests. Default: 4444") + parser.add_argument("--host", default='0.0.0.0', type=str, help="Host to listen for requests. Default: 0.0.0.0") + parser.add_argument("--port", default=4444, type=int, help="Port to listen for requests. Default: 4444") args = parser.parse_args() asyncio.run(main(args.host, args.port)) From 4cf5e06fbe6ec523f14e581a2412d1862fcd09e9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 28 Sep 2021 03:58:31 -0300 Subject: [PATCH 3/8] configure where to save peers --- scripts/dht_node.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 476d2107dc..9531591910 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -12,10 +12,10 @@ log = logging.getLogger(__name__) -async def main(host: str, port: int): +async def main(host: str, port: int, db_file_path): loop = asyncio.get_event_loop() conf = Config() - storage = SQLiteStorage(conf, ":memory:", loop, loop.time) + storage = SQLiteStorage(conf, db_file_path, loop, loop.time) await storage.open() node = Node( loop, PeerManager(loop), generate_id(), port, port, 3333, None, @@ -34,5 +34,6 @@ async def main(host: str, port: int): description="Starts a single DHT node, which then can be used as a seed node or just a contributing node.") parser.add_argument("--host", default='0.0.0.0', type=str, help="Host to listen for requests. Default: 0.0.0.0") parser.add_argument("--port", default=4444, type=int, help="Port to listen for requests. Default: 4444") + parser.add_argument("--db_file", default='/tmp/dht.db', type=str, help="DB file to save peers. Default: /tmp/dht.db") args = parser.parse_args() - asyncio.run(main(args.host, args.port)) + asyncio.run(main(args.host, args.port, args.db_file)) From c952ac5da0cffcc6524ef02921fdc327579dc7d3 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 28 Sep 2021 18:43:28 -0300 Subject: [PATCH 4/8] add dockerfile for dht node --- docker/Dockerfile.dht_node | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 docker/Dockerfile.dht_node diff --git a/docker/Dockerfile.dht_node b/docker/Dockerfile.dht_node new file mode 100644 index 0000000000..d44370fc59 --- /dev/null +++ b/docker/Dockerfile.dht_node @@ -0,0 +1,38 @@ +FROM debian:10-slim + +ARG user=lbry +ARG projects_dir=/home/$user + +ARG DOCKER_TAG +ARG DOCKER_COMMIT=docker +ENV DOCKER_TAG=$DOCKER_TAG DOCKER_COMMIT=$DOCKER_COMMIT + +RUN apt-get update && \ + apt-get -y --no-install-recommends install \ + wget \ + automake libtool \ + tar unzip \ + build-essential \ + pkg-config \ + libleveldb-dev \ + python3.7 \ + python3-dev \ + python3-pip \ + python3-wheel \ + python3-setuptools && \ + update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 && \ + rm -rf /var/lib/apt/lists/* + +RUN groupadd -g 999 $user && useradd -m -u 999 -g $user $user + +COPY . $projects_dir +RUN chown -R $user:$user $projects_dir + +USER $user +WORKDIR $projects_dir + +RUN make install +RUN python3 docker/set_build.py +RUN rm ~/.cache -rf +ENTRYPOINT ["python3", "scripts/dht_node.py"] + From c5eb78a71a6603cf14b0b6b3416effbba045ad4d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 28 Sep 2021 18:52:23 -0300 Subject: [PATCH 5/8] add option to set bootstrap_node --- scripts/dht_node.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/scripts/dht_node.py b/scripts/dht_node.py index 9531591910..5b123829b0 100644 --- a/scripts/dht_node.py +++ b/scripts/dht_node.py @@ -1,6 +1,7 @@ import asyncio import argparse import logging +from typing import Optional from lbry.dht.constants import generate_id from lbry.dht.node import Node @@ -12,16 +13,21 @@ log = logging.getLogger(__name__) -async def main(host: str, port: int, db_file_path): +async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str]): loop = asyncio.get_event_loop() conf = Config() storage = SQLiteStorage(conf, db_file_path, loop, loop.time) + if bootstrap_node: + nodes = bootstrap_node.split(':') + nodes = [(nodes[0], int(nodes[1]))] + else: + nodes = conf.known_dht_nodes await storage.open() node = Node( loop, PeerManager(loop), generate_id(), port, port, 3333, None, storage=storage ) - node.start(host, conf.known_dht_nodes) + node.start(host, nodes) while True: await asyncio.sleep(10) log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.", @@ -35,5 +41,8 @@ async def main(host: str, port: int, db_file_path): parser.add_argument("--host", default='0.0.0.0', type=str, help="Host to listen for requests. Default: 0.0.0.0") parser.add_argument("--port", default=4444, type=int, help="Port to listen for requests. Default: 4444") parser.add_argument("--db_file", default='/tmp/dht.db', type=str, help="DB file to save peers. Default: /tmp/dht.db") + parser.add_argument("--bootstrap_node", default=None, type=str, + help="Node to connect for bootstraping this node. Leave unset to use the default ones. " + "Format: host:port Example: lbrynet1.lbry.com:4444") args = parser.parse_args() - asyncio.run(main(args.host, args.port, args.db_file)) + asyncio.run(main(args.host, args.port, args.db_file, args.bootstrap_node)) From bd0b9dc271172ec5f4190133a0e07edeb58e1381 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 29 Sep 2021 00:24:51 -0300 Subject: [PATCH 6/8] add madiator to known dht nodes --- lbry/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lbry/conf.py b/lbry/conf.py index bc23f0984f..92bb731c5d 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -692,6 +692,7 @@ class Config(CLIConfig): ('spv19.lbry.com', 50001), ]) known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [ + ('dht.lbry.madiator.com', 4444), # Madiator ('lbrynet1.lbry.com', 4444), # US EAST ('lbrynet2.lbry.com', 4444), # US WEST ('lbrynet3.lbry.com', 4444), # EU From d86b8a2bc40b9cce77e793975b4e175e4ca4cf52 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 29 Sep 2021 11:49:53 -0300 Subject: [PATCH 7/8] add grin to dht known list --- lbry/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lbry/conf.py b/lbry/conf.py index 92bb731c5d..6a099afb31 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -692,6 +692,7 @@ class Config(CLIConfig): ('spv19.lbry.com', 50001), ]) known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [ + ('dht.lbry.grin.io', 4444), # Grin ('dht.lbry.madiator.com', 4444), # Madiator ('lbrynet1.lbry.com', 4444), # US EAST ('lbrynet2.lbry.com', 4444), # US WEST From 55c37acd3efe57a54578dc541042b33180addebe Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 29 Sep 2021 15:33:43 -0300 Subject: [PATCH 8/8] dht: use bytes hex/fromhex instead of binascii --- lbry/dht/node.py | 7 +++---- lbry/dht/peer.py | 3 +-- lbry/dht/protocol/iterative_find.py | 5 ++--- lbry/dht/protocol/protocol.py | 15 +++++++-------- 4 files changed, 13 insertions(+), 17 deletions(-) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 745789e897..d44ce5cd9b 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -1,7 +1,6 @@ import logging import asyncio import typing -import binascii import socket from lbry.utils import resolve_host from lbry.dht import constants @@ -80,7 +79,7 @@ async def refresh_node(self, force_once=False): await fut async def announce_blob(self, blob_hash: str) -> typing.List[bytes]: - hash_value = binascii.unhexlify(blob_hash.encode()) + hash_value = bytes.fromhex(blob_hash) assert len(hash_value) == constants.HASH_LENGTH peers = await self.peer_search(hash_value) @@ -95,7 +94,7 @@ async def announce_blob(self, blob_hash: str) -> typing.List[bytes]: stored_to = [node_id for node_id, contacted in stored_to_tup if contacted] if stored_to: log.debug( - "Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8], + "Stored %s to %i of %i attempted peers", hash_value.hex()[:8], len(stored_to), len(peers) ) else: @@ -223,7 +222,7 @@ async def put_into_result_queue_after_pong(_peer): # prioritize peers who reply to a dht ping first # this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers - async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())): + async for results in self.get_iterative_value_finder(bytes.fromhex(blob_hash)): to_put = [] for peer in results: if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port: diff --git a/lbry/dht/peer.py b/lbry/dht/peer.py index b1abaa8fb5..c69f5001c5 100644 --- a/lbry/dht/peer.py +++ b/lbry/dht/peer.py @@ -1,7 +1,6 @@ import typing import asyncio import logging -from binascii import hexlify from dataclasses import dataclass, field from functools import lru_cache from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache @@ -154,7 +153,7 @@ class KademliaPeer: def __post_init__(self): if self._node_id is not None: if not len(self._node_id) == constants.HASH_LENGTH: - raise ValueError("invalid node_id: {}".format(hexlify(self._node_id).decode())) + raise ValueError("invalid node_id: {}".format(self._node_id.hex())) if self.udp_port is not None and not 1024 <= self.udp_port <= 65535: raise ValueError(f"invalid udp port: {self.address}:{self.udp_port}") if self.tcp_port is not None and not 1024 <= self.tcp_port <= 65535: diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 7d49996f9f..9e11438bd7 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -1,5 +1,4 @@ import asyncio -from binascii import hexlify from itertools import chain from collections import defaultdict import typing @@ -198,7 +197,7 @@ async def _search_round(self): added += 1 log.debug("running %d probes", len(self.running_probes)) if not added and not self.running_probes: - log.debug("search for %s exhausted", hexlify(self.key)[:8]) + log.debug("search for %s exhausted", self.key.hex()[:8]) self.search_exhausted() def _schedule_probe(self, peer: 'KademliaPeer'): @@ -271,7 +270,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', self.yielded_peers: typing.Set['KademliaPeer'] = set() async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse: - log.debug("probing %s:%d %s", peer.address, peer.udp_port, hexlify(peer.node_id)[:8] if peer.node_id else '') + log.debug("probing %s:%d %s", peer.address, peer.udp_port, peer.node_id.hex()[:8] if peer.node_id else '') response = await self.protocol.get_rpc_peer(peer).find_node(self.key) return FindNodeResponse(self.key, response) diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index d6c0d33475..66165740b4 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -4,7 +4,6 @@ import hashlib import asyncio import typing -import binascii import random from asyncio.protocols import DatagramProtocol from asyncio.transports import DatagramTransport @@ -97,7 +96,7 @@ def find_value(self, rpc_contact: 'KademliaPeer', key: bytes, page: int = 0): if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp() ] # if we don't have k storing peers to return and we have this hash locally, include our contact information - if len(peers) < constants.K and binascii.hexlify(key).decode() in self.protocol.data_store.completed_blobs: + if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs: peers.append(self.compact_address()) if not peers: response[PAGE_KEY] = 0 @@ -415,8 +414,8 @@ async def routing_table_task(self): self._wakeup_routing_task.clear() def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram): - assert sender_contact.node_id != self.node_id, (binascii.hexlify(sender_contact.node_id)[:8].decode(), - binascii.hexlify(self.node_id)[:8].decode()) + assert sender_contact.node_id != self.node_id, (sender_contact.node_id.hex()[:8], + self.node_id.hex()[:8]) method = message.method if method not in [b'ping', b'store', b'findNode', b'findValue']: raise AttributeError('Invalid method: %s' % message.method.decode()) @@ -561,7 +560,7 @@ def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) -> message = decode_datagram(datagram) except (ValueError, TypeError, DecodeError): self.peer_manager.report_failure(address[0], address[1]) - log.warning("Couldn't decode dht datagram from %s: %s", address, binascii.hexlify(datagram).decode()) + log.warning("Couldn't decode dht datagram from %s: %s", address, datagram.hex()) return if isinstance(message, RequestDatagram): @@ -603,7 +602,7 @@ def _send(self, peer: 'KademliaPeer', message: typing.Union[RequestDatagram, Res if len(data) > constants.MSG_SIZE_LIMIT: log.warning("cannot send datagram larger than %i bytes (packet is %i bytes)", constants.MSG_SIZE_LIMIT, len(data)) - log.debug("Packet is too large to send: %s", binascii.hexlify(data[:3500]).decode()) + log.debug("Packet is too large to send: %s", data[:3500].hex()) raise ValueError( f"cannot send datagram larger than {constants.MSG_SIZE_LIMIT} bytes (packet is {len(data)} bytes)" ) @@ -663,13 +662,13 @@ async def __store(): res = await self.get_rpc_peer(peer).store(hash_value) if res != b"OK": raise ValueError(res) - log.debug("Stored %s to %s", binascii.hexlify(hash_value).decode()[:8], peer) + log.debug("Stored %s to %s", hash_value.hex()[:8], peer) return peer.node_id, True try: return await __store() except asyncio.TimeoutError: - log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(hash_value).decode()[:8], peer) + log.debug("Timeout while storing blob_hash %s at %s", hash_value.hex()[:8], peer) return peer.node_id, False except ValueError as err: log.error("Unexpected response: %s", err)