diff --git a/lbry/dht/protocol/iterative_find.py b/lbry/dht/protocol/iterative_find.py index 31e6ab56a7..dfeb4795a1 100644 --- a/lbry/dht/protocol/iterative_find.py +++ b/lbry/dht/protocol/iterative_find.py @@ -86,10 +86,10 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', self.key = key self.bottom_out_limit = bottom_out_limit - self.max_results = max_results + self.max_results = max(constants.K, max_results) self.exclude = exclude or [] - self.active: typing.Set['KademliaPeer'] = set() + self.active: typing.Dict['KademliaPeer', int] = {} # peer: distance, sorted self.contacted: typing.Set['KademliaPeer'] = set() self.distance = Distance(key) @@ -119,11 +119,12 @@ def is_closest_peer_ready(self): @property def are_k_closest_peers_ready(self): - if not self.is_closest_peer_ready: + if not self.is_closest_peer_ready or len(self.active) < self.max_results: return False - to_probe = list(self.active) - to_probe.sort(key=lambda peer: self.distance(peer.node_id)) - return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results]) + for peer in list(self.active.keys())[:self.max_results]: + if peer not in self.contacted or not self.peer_manager.peer_is_good(peer): + return False + return True async def send_probe(self, peer: 'KademliaPeer') -> FindResponse: """ @@ -169,7 +170,8 @@ def _add_active(self, peer, force=False): self.closest_peer = None self.prev_closest_peer = None if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id: - self.active.add(peer) + self.active[peer] = self.distance(peer.node_id) + self.active = dict(sorted(self.active.items(), key=lambda item: item[1])) if self._is_closer(peer): self.prev_closest_peer = self.closest_peer self.closest_peer = peer @@ -187,7 +189,8 @@ async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindRespons self._log_state() def _reset_closest(self, peer): - self.active.discard(peer) + if peer in self.active: + del self.active[peer] if peer == self.prev_closest_peer: self.prev_closest_peer = None if peer == self.closest_peer: @@ -216,12 +219,18 @@ async def _search_round(self): """ added = 0 - to_probe = list(self.active - self.contacted) - to_probe.sort(key=lambda peer: self.distance(peer.node_id)) - log.debug("closest to probe: %s", to_probe[0].node_id.hex()[:8] if to_probe else None) - for peer in to_probe: + for index, peer in enumerate(self.active.keys()): + if index == 0: + log.debug("closest to probe: %s", peer.node_id.hex()[:8]) + if self.closest_peer != peer: + self.prev_closest_peer = self.closest_peer + self.closest_peer = peer + if peer in self.contacted: + continue if added >= constants.ALPHA: break + if index > self.max_results: + break origin_address = (peer.address, peer.udp_port) if origin_address in self.exclude: continue @@ -320,7 +329,7 @@ async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse: return FindNodeResponse(self.key, response) def search_exhausted(self): - self.put_result(self.active, finish=True) + self.put_result(self.active.keys(), finish=True) def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False): not_yet_yielded = [ @@ -342,17 +351,17 @@ def check_result_ready(self, response: FindNodeResponse): if found: log.debug("found") - return self.put_result(self.active, finish=True) + return self.put_result(self.active.keys(), finish=True) elif self.is_closest_peer_ready: self.bottom_out_count += 1 else: self.bottom_out_count = 0 if self.are_k_closest_peers_ready: - self.put_result(self.active, True) + self.put_result(self.active.keys(), True) elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit: - log.debug("peer search bottomed out.") - self.put_result(self.active, True) + log.warning("peer search bottomed out.") + self.put_result([], True) class IterativeValueFinder(IterativeFinder):