Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dht_bug' into temp_dht_supermerge
Browse files Browse the repository at this point in the history
  • Loading branch information
shyba committed Feb 23, 2022
2 parents f68ea01 + f5b3e9b commit f4fa217
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 197 deletions.
2 changes: 1 addition & 1 deletion lbry/conf.py
Expand Up @@ -622,7 +622,7 @@ class Config(CLIConfig):
"Routing table bucket index below which we always split the bucket if given a new key to add to it and "
"the bucket is full. As this value is raised the depth of the routing table (and number of peers in it) "
"will increase. This setting is used by seed nodes, you probably don't want to change it during normal "
"use.", 1
"use.", 2
)

# protocol timeouts
Expand Down
44 changes: 25 additions & 19 deletions lbry/dht/blob_announcer.py
Expand Up @@ -27,21 +27,24 @@ def __init__(self, loop: asyncio.AbstractEventLoop, node: 'Node', storage: 'SQLi
self.storage = storage
self.announce_task: asyncio.Task = None
self.announce_queue: typing.List[str] = []
self._done = asyncio.Event()
self.announced = set()

async def _submit_announcement(self, blob_hash):
try:

peers = len(await self.node.announce_blob(blob_hash))
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
if peers > 4:
return blob_hash
else:
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def _run_consumer(self):
while self.announce_queue:
try:
blob_hash = self.announce_queue.pop()
peers = len(await self.node.announce_blob(blob_hash))
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
if peers > 4:
self.announced.add(blob_hash)
else:
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))

async def _announce(self, batch_size: typing.Optional[int] = 10):
while batch_size:
Expand All @@ -56,14 +59,14 @@ async def _announce(self, batch_size: typing.Optional[int] = 10):
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
while len(self.announce_queue) > 0:
log.info("%i blobs to announce", len(self.announce_queue))
announced = await asyncio.gather(*[
self._submit_announcement(
self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue
], loop=self.loop)
announced = list(filter(None, announced))
await asyncio.gather(*[self._run_consumer() for _ in range(batch_size)], loop=self.loop)
announced = list(filter(None, self.announced))
if announced:
await self.storage.update_last_announced_blobs(announced)
log.info("announced %i blobs", len(announced))
self.announced.clear()
self._done.set()
self._done.clear()

def start(self, batch_size: typing.Optional[int] = 10):
assert not self.announce_task or self.announce_task.done(), "already running"
Expand All @@ -72,3 +75,6 @@ def start(self, batch_size: typing.Optional[int] = 10):
def stop(self):
if self.announce_task and not self.announce_task.done():
self.announce_task.cancel()

def wait(self):
return self._done.wait()
1 change: 0 additions & 1 deletion lbry/dht/constants.py
Expand Up @@ -20,7 +20,6 @@
CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5
RPC_ID_LENGTH = 20
PROTOCOL_VERSION = 1
BOTTOM_OUT_LIMIT = 3
MSG_SIZE_LIMIT = 1400


Expand Down
10 changes: 4 additions & 6 deletions lbry/dht/node.py
Expand Up @@ -202,25 +202,23 @@ def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typ
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))

def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,
max_results: int = constants.K) -> IterativeNodeFinder:

return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist)
key, max_results, None, shortlist)

def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = 40,
max_results: int = -1) -> IterativeValueFinder:

return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist)
key, max_results, None, shortlist)

async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
bottom_out_limit=20, shortlist: typing.Optional[typing.List['KademliaPeer']] = None
shortlist: typing.Optional[typing.List['KademliaPeer']] = None
) -> typing.List['KademliaPeer']:
peers = []
async for iteration_peers in self.get_iterative_node_finder(
node_id, shortlist=shortlist, bottom_out_limit=bottom_out_limit, max_results=max_results):
node_id, shortlist=shortlist, max_results=max_results):
peers.extend(iteration_peers)
distance = Distance(node_id)
peers.sort(key=lambda peer: distance(peer.node_id))
Expand Down
3 changes: 3 additions & 0 deletions lbry/dht/peer.py
Expand Up @@ -190,3 +190,6 @@ def compact_address_tcp(self) -> bytearray:

def compact_ip(self):
return make_compact_ip(self.address)

def __str__(self):
return f"{self.__class__.__name__}({self.node_id.hex()[:8]}@{self.address}:{self.udp_port}-{self.tcp_port})"
122 changes: 46 additions & 76 deletions lbry/dht/protocol/iterative_find.py
@@ -1,6 +1,6 @@
import asyncio
from itertools import chain
from collections import defaultdict
from collections import defaultdict, OrderedDict
import typing
import logging
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -74,7 +74,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
class IterativeFinder:
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
if len(key) != constants.HASH_LENGTH:
Expand All @@ -85,28 +85,22 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
self.protocol = protocol

self.key = key
self.bottom_out_limit = bottom_out_limit
self.max_results = max_results
self.max_results = max(constants.K, max_results)
self.exclude = exclude or []

self.active: typing.Set['KademliaPeer'] = set()
self.active: typing.Dict['KademliaPeer', int] = OrderedDict() # peer: distance, sorted
self.contacted: typing.Set['KademliaPeer'] = set()
self.distance = Distance(key)

self.closest_peer: typing.Optional['KademliaPeer'] = None
self.prev_closest_peer: typing.Optional['KademliaPeer'] = None

self.iteration_queue = asyncio.Queue(loop=self.loop)

self.running_probes: typing.Set[asyncio.Task] = set()
self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
self.iteration_count = 0
self.bottom_out_count = 0
self.running = False
self.tasks: typing.List[asyncio.Task] = []
self.delayed_calls: typing.List[asyncio.Handle] = []
for peer in get_shortlist(routing_table, key, shortlist):
if peer.node_id:
self._add_active(peer)
self._add_active(peer, force=True)
else:
# seed nodes
self._schedule_probe(peer)
Expand Down Expand Up @@ -138,15 +132,14 @@ def get_initial_result(self) -> typing.List['KademliaPeer']: #pylint: disable=n
"""
return []

def _is_closer(self, peer: 'KademliaPeer') -> bool:
return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)

def _add_active(self, peer):
def _add_active(self, peer, force=False):
if not force and self.peer_manager.peer_is_good(peer) is False:
return
if peer in self.contacted:
return
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
self.active.add(peer)
if self._is_closer(peer):
self.prev_closest_peer = self.closest_peer
self.closest_peer = peer
self.active[peer] = self.distance(peer.node_id)
self.active = OrderedDict(sorted(self.active.items(), key=lambda item: item[1]))

async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
self._add_active(peer)
Expand All @@ -158,33 +151,43 @@ async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindRespons
log.warning("misbehaving peer %s:%i returned peer with reserved ip %s:%i", peer.address,
peer.udp_port, address, udp_port)
self.check_result_ready(response)
self._log_state()

def _reset_closest(self, peer):
if peer in self.active:
del self.active[peer]

async def _send_probe(self, peer: 'KademliaPeer'):
try:
response = await self.send_probe(peer)
except asyncio.TimeoutError:
self.active.discard(peer)
self._reset_closest(peer)
return
except ValueError as err:
log.warning(str(err))
self.active.discard(peer)
self._reset_closest(peer)
return
except TransportNotConnected:
return self.aclose()
except RemoteException:
self._reset_closest(peer)
return
return await self._handle_probe_result(peer, response)

async def _search_round(self):
def _search_round(self):
"""
Send up to constants.alpha (5) probes to closest active peers
"""

added = 0
to_probe = list(self.active - self.contacted)
to_probe.sort(key=lambda peer: self.distance(self.key))
for peer in to_probe:
if added >= constants.ALPHA:
for index, peer in enumerate(self.active.keys()):
if index == 0:
log.debug("closest to probe: %s", peer.node_id.hex()[:8])
if peer in self.contacted:
continue
if len(self.running_probes) >= constants.ALPHA:
break
if index > (constants.K + len(self.running_probes)):
break
origin_address = (peer.address, peer.udp_port)
if origin_address in self.exclude:
Expand All @@ -206,33 +209,22 @@ def _schedule_probe(self, peer: 'KademliaPeer'):
t = self.loop.create_task(self._send_probe(peer))

def callback(_):
self.running_probes.difference_update({
probe for probe in self.running_probes if probe.done() or probe == t
})
if not self.running_probes:
self.tasks.append(self.loop.create_task(self._search_task(0.0)))
self.running_probes.pop(peer, None)
if self.running:
self._search_round()

t.add_done_callback(callback)
self.running_probes.add(t)

async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
try:
if self.running:
await self._search_round()
if self.running:
self.delayed_calls.append(self.loop.call_later(delay, self._search))
except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected):
if self.running:
self.loop.call_soon(self.aclose)
self.running_probes[peer] = t

def _search(self):
self.tasks.append(self.loop.create_task(self._search_task()))
def _log_state(self):
log.debug("[%s] check result: %i active nodes %i contacted",
self.key.hex()[:8], len(self.active), len(self.contacted))

def __aiter__(self):
if self.running:
raise Exception("already running")
self.running = True
self._search()
self.loop.call_soon(self._search_round)
return self

async def __anext__(self) -> typing.List['KademliaPeer']:
Expand All @@ -252,20 +244,19 @@ async def __anext__(self) -> typing.List['KademliaPeer']:
def aclose(self):
self.running = False
self.iteration_queue.put_nowait(None)
for task in chain(self.tasks, self.running_probes, self.delayed_calls):
for task in chain(self.tasks, self.running_probes.values()):
task.cancel()
self.tasks.clear()
self.running_probes.clear()
self.delayed_calls.clear()


class IterativeNodeFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist)
self.yielded_peers: typing.Set['KademliaPeer'] = set()

Expand All @@ -276,14 +267,14 @@ async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
return FindNodeResponse(self.key, response)

def search_exhausted(self):
self.put_result(self.active, finish=True)
self.put_result(self.active.keys(), finish=True)

def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
not_yet_yielded = [
peer for peer in from_iter
if peer not in self.yielded_peers
and peer.node_id != self.protocol.node_id
and self.peer_manager.peer_is_good(peer) is not False
and self.peer_manager.peer_is_good(peer) is True # return only peers who answered
]
not_yet_yielded.sort(key=lambda peer: self.distance(peer.node_id))
to_yield = not_yet_yielded[:max(constants.K, self.max_results)]
Expand All @@ -298,26 +289,16 @@ def check_result_ready(self, response: FindNodeResponse):

if found:
log.debug("found")
return self.put_result(self.active, finish=True)
if self.prev_closest_peer and self.closest_peer and not self._is_closer(self.prev_closest_peer):
# log.info("improving, %i %i %i %i %i", len(self.shortlist), len(self.active), len(self.contacted),
# self.bottom_out_count, self.iteration_count)
self.bottom_out_count = 0
elif self.prev_closest_peer and self.closest_peer:
self.bottom_out_count += 1
log.info("bottom out %i %i %i", len(self.active), len(self.contacted), self.bottom_out_count)
if self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.info("limit hit")
self.put_result(self.active, True)
return self.put_result(self.active.keys(), finish=True)


class IterativeValueFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist)
self.blob_peers: typing.Set['KademliaPeer'] = set()
# this tracks the index of the most recent page we requested from each peer
Expand Down Expand Up @@ -362,23 +343,12 @@ def check_result_ready(self, response: FindValueResponse):
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
for compact_addr in response.found_compact_addresses]
to_yield = []
self.bottom_out_count = 0
for blob_peer in blob_peers:
if blob_peer not in self.blob_peers:
self.blob_peers.add(blob_peer)
to_yield.append(blob_peer)
if to_yield:
# log.info("found %i new peers for blob", len(to_yield))
self.iteration_queue.put_nowait(to_yield)
# if self.max_results and len(self.blob_peers) >= self.max_results:
# log.info("enough blob peers found")
# if not self.finished.is_set():
# self.finished.set()
elif self.prev_closest_peer and self.closest_peer:
self.bottom_out_count += 1
if self.bottom_out_count >= self.bottom_out_limit:
log.info("blob peer search bottomed out")
self.iteration_queue.put_nowait(None)

def get_initial_result(self) -> typing.List['KademliaPeer']:
if self.protocol.data_store.has_peers_for_blob(self.key):
Expand Down

0 comments on commit f4fa217

Please sign in to comment.