Skip to content

Commit

Permalink
Merge f952c3e into 87ec7ac
Browse files Browse the repository at this point in the history
  • Loading branch information
shyba committed Feb 8, 2022
2 parents 87ec7ac + f952c3e commit e40a455
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions lbry/dht/protocol/iterative_find.py
Expand Up @@ -86,10 +86,10 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',

self.key = key
self.bottom_out_limit = bottom_out_limit
self.max_results = max_results
self.max_results = max(constants.K, max_results)
self.exclude = exclude or []

self.active: typing.Set['KademliaPeer'] = set()
self.active: typing.Dict['KademliaPeer', int] = {} # peer: distance, sorted
self.contacted: typing.Set['KademliaPeer'] = set()
self.distance = Distance(key)

Expand Down Expand Up @@ -119,11 +119,12 @@ def is_closest_peer_ready(self):

@property
def are_k_closest_peers_ready(self):
if not self.is_closest_peer_ready:
if not self.is_closest_peer_ready or len(self.active) < self.max_results:
return False
to_probe = list(self.active)
to_probe.sort(key=lambda peer: self.distance(peer.node_id))
return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results])
for peer in list(self.active.keys())[:self.max_results]:
if peer not in self.contacted or not self.peer_manager.peer_is_good(peer):
return False
return True

async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
"""
Expand Down Expand Up @@ -169,7 +170,8 @@ def _add_active(self, peer, force=False):
self.closest_peer = None
self.prev_closest_peer = None
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
self.active.add(peer)
self.active[peer] = self.distance(peer.node_id)
self.active = dict(sorted(self.active.items(), key=lambda item: item[1]))
if self._is_closer(peer):
self.prev_closest_peer = self.closest_peer
self.closest_peer = peer
Expand All @@ -187,7 +189,8 @@ async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindRespons
self._log_state()

def _reset_closest(self, peer):
self.active.discard(peer)
if peer in self.active:
del self.active[peer]
if peer == self.prev_closest_peer:
self.prev_closest_peer = None
if peer == self.closest_peer:
Expand Down Expand Up @@ -216,12 +219,18 @@ async def _search_round(self):
"""

added = 0
to_probe = list(self.active - self.contacted)
to_probe.sort(key=lambda peer: self.distance(peer.node_id))
log.debug("closest to probe: %s", to_probe[0].node_id.hex()[:8] if to_probe else None)
for peer in to_probe:
for index, peer in enumerate(self.active.keys()):
if index == 0:
log.debug("closest to probe: %s", peer.node_id.hex()[:8])
if self.closest_peer != peer:
self.prev_closest_peer = self.closest_peer
self.closest_peer = peer
if peer in self.contacted:
continue
if added >= constants.ALPHA:
break
if index > self.max_results:
break
origin_address = (peer.address, peer.udp_port)
if origin_address in self.exclude:
continue
Expand Down Expand Up @@ -320,7 +329,7 @@ async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
return FindNodeResponse(self.key, response)

def search_exhausted(self):
self.put_result(self.active, finish=True)
self.put_result(self.active.keys(), finish=True)

def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
not_yet_yielded = [
Expand All @@ -342,17 +351,17 @@ def check_result_ready(self, response: FindNodeResponse):

if found:
log.debug("found")
return self.put_result(self.active, finish=True)
return self.put_result(self.active.keys(), finish=True)
elif self.is_closest_peer_ready:
self.bottom_out_count += 1
else:
self.bottom_out_count = 0

if self.are_k_closest_peers_ready:
self.put_result(self.active, True)
self.put_result(self.active.keys(), True)
elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.debug("peer search bottomed out.")
self.put_result(self.active, True)
log.warning("peer search bottomed out.")
self.put_result([], True)


class IterativeValueFinder(IterativeFinder):
Expand Down

0 comments on commit e40a455

Please sign in to comment.