Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dht seed node script and dockerfile #3436

Merged
merged 8 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docker/Dockerfile.dht_node
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
FROM debian:10-slim

ARG user=lbry
ARG projects_dir=/home/$user

ARG DOCKER_TAG
ARG DOCKER_COMMIT=docker
ENV DOCKER_TAG=$DOCKER_TAG DOCKER_COMMIT=$DOCKER_COMMIT

RUN apt-get update && \
apt-get -y --no-install-recommends install \
wget \
automake libtool \
tar unzip \
build-essential \
pkg-config \
libleveldb-dev \
python3.7 \
python3-dev \
python3-pip \
python3-wheel \
python3-setuptools && \
update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1 && \
rm -rf /var/lib/apt/lists/*

RUN groupadd -g 999 $user && useradd -m -u 999 -g $user $user

COPY . $projects_dir
RUN chown -R $user:$user $projects_dir

USER $user
WORKDIR $projects_dir

RUN make install
RUN python3 docker/set_build.py
RUN rm ~/.cache -rf
ENTRYPOINT ["python3", "scripts/dht_node.py"]

2 changes: 2 additions & 0 deletions lbry/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,8 @@ class Config(CLIConfig):
('spv19.lbry.com', 50001),
])
known_dht_nodes = Servers("Known nodes for bootstrapping connection to the DHT", [
('dht.lbry.grin.io', 4444), # Grin
('dht.lbry.madiator.com', 4444), # Madiator
('lbrynet1.lbry.com', 4444), # US EAST
('lbrynet2.lbry.com', 4444), # US WEST
('lbrynet3.lbry.com', 4444), # EU
Expand Down
7 changes: 3 additions & 4 deletions lbry/dht/node.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import asyncio
import typing
import binascii
import socket
from lbry.utils import resolve_host
from lbry.dht import constants
Expand Down Expand Up @@ -80,7 +79,7 @@ async def refresh_node(self, force_once=False):
await fut

async def announce_blob(self, blob_hash: str) -> typing.List[bytes]:
hash_value = binascii.unhexlify(blob_hash.encode())
hash_value = bytes.fromhex(blob_hash)
assert len(hash_value) == constants.HASH_LENGTH
peers = await self.peer_search(hash_value)

Expand All @@ -95,7 +94,7 @@ async def announce_blob(self, blob_hash: str) -> typing.List[bytes]:
stored_to = [node_id for node_id, contacted in stored_to_tup if contacted]
if stored_to:
log.debug(
"Stored %s to %i of %i attempted peers", binascii.hexlify(hash_value).decode()[:8],
"Stored %s to %i of %i attempted peers", hash_value.hex()[:8],
len(stored_to), len(peers)
)
else:
Expand Down Expand Up @@ -223,7 +222,7 @@ async def put_into_result_queue_after_pong(_peer):
# prioritize peers who reply to a dht ping first
# this minimizes attempting to make tcp connections that won't work later to dead or unreachable peers

async for results in self.get_iterative_value_finder(binascii.unhexlify(blob_hash.encode())):
async for results in self.get_iterative_value_finder(bytes.fromhex(blob_hash)):
to_put = []
for peer in results:
if peer.address == self.protocol.external_ip and self.protocol.peer_port == peer.tcp_port:
Expand Down
3 changes: 1 addition & 2 deletions lbry/dht/peer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import typing
import asyncio
import logging
from binascii import hexlify
from dataclasses import dataclass, field
from functools import lru_cache
from lbry.utils import is_valid_public_ipv4 as _is_valid_public_ipv4, LRUCache
Expand Down Expand Up @@ -154,7 +153,7 @@ class KademliaPeer:
def __post_init__(self):
if self._node_id is not None:
if not len(self._node_id) == constants.HASH_LENGTH:
raise ValueError("invalid node_id: {}".format(hexlify(self._node_id).decode()))
raise ValueError("invalid node_id: {}".format(self._node_id.hex()))
if self.udp_port is not None and not 1024 <= self.udp_port <= 65535:
raise ValueError(f"invalid udp port: {self.address}:{self.udp_port}")
if self.tcp_port is not None and not 1024 <= self.tcp_port <= 65535:
Expand Down
3 changes: 3 additions & 0 deletions lbry/dht/protocol/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager')
self._peer_manager = peer_manager
self.completed_blobs: typing.Set[str] = set()

def __len__(self):
return self._data_store.__len__()

def removed_expired_peers(self):
now = self.loop.time()
keys = list(self._data_store.keys())
Expand Down
5 changes: 2 additions & 3 deletions lbry/dht/protocol/iterative_find.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
from binascii import hexlify
from itertools import chain
from collections import defaultdict
import typing
Expand Down Expand Up @@ -198,7 +197,7 @@ async def _search_round(self):
added += 1
log.debug("running %d probes", len(self.running_probes))
if not added and not self.running_probes:
log.debug("search for %s exhausted", hexlify(self.key)[:8])
log.debug("search for %s exhausted", self.key.hex()[:8])
self.search_exhausted()

def _schedule_probe(self, peer: 'KademliaPeer'):
Expand Down Expand Up @@ -271,7 +270,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
self.yielded_peers: typing.Set['KademliaPeer'] = set()

async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
log.debug("probing %s:%d %s", peer.address, peer.udp_port, hexlify(peer.node_id)[:8] if peer.node_id else '')
log.debug("probing %s:%d %s", peer.address, peer.udp_port, peer.node_id.hex()[:8] if peer.node_id else '')
response = await self.protocol.get_rpc_peer(peer).find_node(self.key)
return FindNodeResponse(self.key, response)

Expand Down
15 changes: 7 additions & 8 deletions lbry/dht/protocol/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import hashlib
import asyncio
import typing
import binascii
import random
from asyncio.protocols import DatagramProtocol
from asyncio.transports import DatagramTransport
Expand Down Expand Up @@ -97,7 +96,7 @@ def find_value(self, rpc_contact: 'KademliaPeer', key: bytes, page: int = 0):
if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp()
]
# if we don't have k storing peers to return and we have this hash locally, include our contact information
if len(peers) < constants.K and binascii.hexlify(key).decode() in self.protocol.data_store.completed_blobs:
if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs:
peers.append(self.compact_address())
if not peers:
response[PAGE_KEY] = 0
Expand Down Expand Up @@ -415,8 +414,8 @@ async def routing_table_task(self):
self._wakeup_routing_task.clear()

def _handle_rpc(self, sender_contact: 'KademliaPeer', message: RequestDatagram):
assert sender_contact.node_id != self.node_id, (binascii.hexlify(sender_contact.node_id)[:8].decode(),
binascii.hexlify(self.node_id)[:8].decode())
assert sender_contact.node_id != self.node_id, (sender_contact.node_id.hex()[:8],
self.node_id.hex()[:8])
method = message.method
if method not in [b'ping', b'store', b'findNode', b'findValue']:
raise AttributeError('Invalid method: %s' % message.method.decode())
Expand Down Expand Up @@ -561,7 +560,7 @@ def datagram_received(self, datagram: bytes, address: typing.Tuple[str, int]) ->
message = decode_datagram(datagram)
except (ValueError, TypeError, DecodeError):
self.peer_manager.report_failure(address[0], address[1])
log.warning("Couldn't decode dht datagram from %s: %s", address, binascii.hexlify(datagram).decode())
log.warning("Couldn't decode dht datagram from %s: %s", address, datagram.hex())
return

if isinstance(message, RequestDatagram):
Expand Down Expand Up @@ -603,7 +602,7 @@ def _send(self, peer: 'KademliaPeer', message: typing.Union[RequestDatagram, Res
if len(data) > constants.MSG_SIZE_LIMIT:
log.warning("cannot send datagram larger than %i bytes (packet is %i bytes)",
constants.MSG_SIZE_LIMIT, len(data))
log.debug("Packet is too large to send: %s", binascii.hexlify(data[:3500]).decode())
log.debug("Packet is too large to send: %s", data[:3500].hex())
raise ValueError(
f"cannot send datagram larger than {constants.MSG_SIZE_LIMIT} bytes (packet is {len(data)} bytes)"
)
Expand Down Expand Up @@ -663,13 +662,13 @@ async def __store():
res = await self.get_rpc_peer(peer).store(hash_value)
if res != b"OK":
raise ValueError(res)
log.debug("Stored %s to %s", binascii.hexlify(hash_value).decode()[:8], peer)
log.debug("Stored %s to %s", hash_value.hex()[:8], peer)
return peer.node_id, True

try:
return await __store()
except asyncio.TimeoutError:
log.debug("Timeout while storing blob_hash %s at %s", binascii.hexlify(hash_value).decode()[:8], peer)
log.debug("Timeout while storing blob_hash %s at %s", hash_value.hex()[:8], peer)
return peer.node_id, False
except ValueError as err:
log.error("Unexpected response: %s", err)
Expand Down
48 changes: 48 additions & 0 deletions scripts/dht_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import asyncio
import argparse
import logging
from typing import Optional

from lbry.dht.constants import generate_id
from lbry.dht.node import Node
from lbry.dht.peer import PeerManager
from lbry.extras.daemon.storage import SQLiteStorage
from lbry.conf import Config

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s")
log = logging.getLogger(__name__)


async def main(host: str, port: int, db_file_path: str, bootstrap_node: Optional[str]):
loop = asyncio.get_event_loop()
conf = Config()
storage = SQLiteStorage(conf, db_file_path, loop, loop.time)
if bootstrap_node:
nodes = bootstrap_node.split(':')
nodes = [(nodes[0], int(nodes[1]))]
else:
nodes = conf.known_dht_nodes
await storage.open()
node = Node(
loop, PeerManager(loop), generate_id(), port, port, 3333, None,
storage=storage
)
node.start(host, nodes)
while True:
await asyncio.sleep(10)
log.info("Known peers: %d. Storing contact information for %d blobs from %d peers.",
len(node.protocol.routing_table.get_peers()), len(node.protocol.data_store),
len(node.protocol.data_store.get_storing_contacts()))


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Starts a single DHT node, which then can be used as a seed node or just a contributing node.")
parser.add_argument("--host", default='0.0.0.0', type=str, help="Host to listen for requests. Default: 0.0.0.0")
parser.add_argument("--port", default=4444, type=int, help="Port to listen for requests. Default: 4444")
parser.add_argument("--db_file", default='/tmp/dht.db', type=str, help="DB file to save peers. Default: /tmp/dht.db")
parser.add_argument("--bootstrap_node", default=None, type=str,
help="Node to connect for bootstraping this node. Leave unset to use the default ones. "
"Format: host:port Example: lbrynet1.lbry.com:4444")
args = parser.parse_args()
asyncio.run(main(args.host, args.port, args.db_file, args.bootstrap_node))