Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DHT: improve iterative find #3562

Merged
merged 6 commits into from Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion lbry/dht/constants.py
Expand Up @@ -20,7 +20,6 @@
CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5
RPC_ID_LENGTH = 20
PROTOCOL_VERSION = 1
BOTTOM_OUT_LIMIT = 3
MSG_SIZE_LIMIT = 1400


Expand Down
10 changes: 4 additions & 6 deletions lbry/dht/node.py
Expand Up @@ -202,25 +202,23 @@ def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typ
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))

def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,
max_results: int = constants.K) -> IterativeNodeFinder:

return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist)
key, max_results, None, shortlist)

def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = 40,
max_results: int = -1) -> IterativeValueFinder:

return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist)
key, max_results, None, shortlist)

async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
bottom_out_limit=60, shortlist: typing.Optional[typing.List['KademliaPeer']] = None
shortlist: typing.Optional[typing.List['KademliaPeer']] = None
) -> typing.List['KademliaPeer']:
peers = []
async for iteration_peers in self.get_iterative_node_finder(
node_id, shortlist=shortlist, bottom_out_limit=bottom_out_limit, max_results=max_results):
node_id, shortlist=shortlist, max_results=max_results):
peers.extend(iteration_peers)
distance = Distance(node_id)
peers.sort(key=lambda peer: distance(peer.node_id))
Expand Down
139 changes: 40 additions & 99 deletions lbry/dht/protocol/iterative_find.py
Expand Up @@ -74,7 +74,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
class IterativeFinder:
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
if len(key) != constants.HASH_LENGTH:
Expand All @@ -85,46 +85,27 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
self.protocol = protocol

self.key = key
self.bottom_out_limit = bottom_out_limit
self.max_results = max_results
self.max_results = max(constants.K, max_results)
self.exclude = exclude or []

self.active: typing.Set['KademliaPeer'] = set()
self.active: typing.Dict['KademliaPeer', int] = {} # peer: distance, sorted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be an ordered dict to be explicit

self.contacted: typing.Set['KademliaPeer'] = set()
self.distance = Distance(key)

self.closest_peer: typing.Optional['KademliaPeer'] = None
self.prev_closest_peer: typing.Optional['KademliaPeer'] = None

self.iteration_queue = asyncio.Queue(loop=self.loop)

self.running_probes: typing.Set[asyncio.Task] = set()
self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
self.iteration_count = 0
self.bottom_out_count = 0
self.running = False
self.tasks: typing.List[asyncio.Task] = []
self.delayed_calls: typing.List[asyncio.Handle] = []
self.delayed_call: asyncio.Handle = None
for peer in get_shortlist(routing_table, key, shortlist):
if peer.node_id:
self._add_active(peer, force=True)
else:
# seed nodes
self._schedule_probe(peer)

@property
def is_closest_peer_ready(self):
if not self.closest_peer or not self.prev_closest_peer:
return False
return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer)

@property
def are_k_closest_peers_ready(self):
if not self.is_closest_peer_ready:
return False
to_probe = list(self.active)
to_probe.sort(key=lambda peer: self.distance(peer.node_id))
return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results])

async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
"""
Send the rpc request to the peer and return an object with the FindResponse interface
Expand Down Expand Up @@ -152,27 +133,14 @@ def get_initial_result(self) -> typing.List['KademliaPeer']: #pylint: disable=n
"""
return []

def _is_closer(self, peer: 'KademliaPeer') -> bool:
return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)

def _add_active(self, peer, force=False):
if not force and self.peer_manager.peer_is_good(peer) is False:
return
if peer in self.contacted:
return
if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False:
log.debug("[%s] closest peer went bad", self.key.hex()[:8])
if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False:
log.debug("[%s] previous closest was bad too", self.key.hex()[:8])
self.closest_peer = self.prev_closest_peer
else:
self.closest_peer = None
self.prev_closest_peer = None
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
self.active.add(peer)
if self._is_closer(peer):
self.prev_closest_peer = self.closest_peer
self.closest_peer = peer
self.active[peer] = self.distance(peer.node_id)
self.active = dict(sorted(self.active.items(), key=lambda item: item[1]))

async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
self._add_active(peer)
Expand All @@ -187,11 +155,8 @@ async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindRespons
self._log_state()

def _reset_closest(self, peer):
self.active.discard(peer)
if peer == self.prev_closest_peer:
self.prev_closest_peer = None
if peer == self.closest_peer:
self.closest_peer = self.prev_closest_peer
if peer in self.active:
del self.active[peer]

async def _send_probe(self, peer: 'KademliaPeer'):
try:
Expand All @@ -210,17 +175,21 @@ async def _send_probe(self, peer: 'KademliaPeer'):
return
return await self._handle_probe_result(peer, response)

async def _search_round(self):
def _search_round(self):
"""
Send up to constants.alpha (5) probes to closest active peers
"""

added = 0
to_probe = list(self.active - self.contacted)
to_probe.sort(key=lambda peer: self.distance(peer.node_id))
log.debug("closest to probe: %s", to_probe[0].node_id.hex()[:8] if to_probe else None)
for peer in to_probe:
if added >= constants.ALPHA:
for index, peer in enumerate(self.active.keys()):
if index == 0:
log.debug("closest to probe: %s", peer.node_id.hex()[:8])

if peer in self.contacted:
continue
if len(self.running_probes) >= constants.ALPHA:
break
if index > (constants.K + len(self.running_probes)):
break
origin_address = (peer.address, peer.udp_port)
if origin_address in self.exclude:
Expand All @@ -242,35 +211,31 @@ def _schedule_probe(self, peer: 'KademliaPeer'):
t = self.loop.create_task(self._send_probe(peer))

def callback(_):
self.running_probes.difference_update({
probe for probe in self.running_probes if probe.done() or probe == t
})
if not self.running_probes:
self.tasks.append(self.loop.create_task(self._search_task(0.0)))
for peer in [peer for peer, task in self.running_probes.items() if task.done() or task == t]:
del self.running_probes[peer]
self._search_task(0.0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the delay isn't needed as an arg anymore


t.add_done_callback(callback)
self.running_probes.add(t)
self.running_probes[peer] = t

async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
try:
if self.running:
await self._search_round()
if self.running:
self.delayed_calls.append(self.loop.call_later(delay, self._search))
if self.delayed_call:
self.delayed_call.cancel() # ensure anything scheduled gets cancelled
self._search_round()
#if self.running:
# self.delayed_call = self.loop.call_later(delay, self._search)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can delayed_call be deleted?

except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected):
if self.running:
self.loop.call_soon(self.aclose)

def _log_state(self):
log.debug("[%s] check result: %i active nodes %i contacted %i bottomed count",
self.key.hex()[:8], len(self.active), len(self.contacted), self.bottom_out_count)
if self.closest_peer and self.prev_closest_peer:
log.debug("[%s] best node id: %s (contacted: %s, good: %s), previous best: %s",
self.key.hex()[:8], self.closest_peer.node_id.hex()[:8], self.closest_peer in self.contacted,
self.peer_manager.peer_is_good(self.closest_peer), self.prev_closest_peer.node_id.hex()[:8])
log.debug("[%s] check result: %i active nodes %i contacted",
self.key.hex()[:8], len(self.active), len(self.contacted))

def _search(self):
self.tasks.append(self.loop.create_task(self._search_task()))
self._search_task()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this method?


def __aiter__(self):
if self.running:
Expand All @@ -296,20 +261,20 @@ async def __anext__(self) -> typing.List['KademliaPeer']:
def aclose(self):
self.running = False
self.iteration_queue.put_nowait(None)
for task in chain(self.tasks, self.running_probes, self.delayed_calls):
for task in chain(self.tasks, self.running_probes.values(), filter(None, [self.delayed_call])):
task.cancel()
self.tasks.clear()
self.running_probes.clear()
self.delayed_calls.clear()
self.delayed_call = None


class IterativeNodeFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist)
self.yielded_peers: typing.Set['KademliaPeer'] = set()

Expand All @@ -320,7 +285,7 @@ async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
return FindNodeResponse(self.key, response)

def search_exhausted(self):
self.put_result(self.active, finish=True)
self.put_result(self.active.keys(), finish=True)

def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
not_yet_yielded = [
Expand All @@ -342,26 +307,16 @@ def check_result_ready(self, response: FindNodeResponse):

if found:
log.debug("found")
return self.put_result(self.active, finish=True)
elif self.is_closest_peer_ready:
self.bottom_out_count += 1
else:
self.bottom_out_count = 0

if self.are_k_closest_peers_ready:
self.put_result(self.active, True)
elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.debug("peer search bottomed out.")
self.put_result(self.active, True)
return self.put_result(self.active.keys(), finish=True)


class IterativeValueFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist)
self.blob_peers: typing.Set['KademliaPeer'] = set()
# this tracks the index of the most recent page we requested from each peer
Expand Down Expand Up @@ -406,26 +361,12 @@ def check_result_ready(self, response: FindValueResponse):
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
for compact_addr in response.found_compact_addresses]
to_yield = []
self.bottom_out_count = 0
for blob_peer in blob_peers:
if blob_peer not in self.blob_peers:
self.blob_peers.add(blob_peer)
to_yield.append(blob_peer)
if to_yield:
# log.info("found %i new peers for blob", len(to_yield))
self.iteration_queue.put_nowait(to_yield)
# if self.max_results and len(self.blob_peers) >= self.max_results:
# log.info("enough blob peers found")
# if not self.finished.is_set():
# self.finished.set()
elif self.is_closest_peer_ready:
self.bottom_out_count += 1
if self.are_k_closest_peers_ready:
log.info("blob peer search finished for %s", self.key.hex()[:8])
self.iteration_queue.put_nowait(None)
elif self.bottom_out_count >= self.bottom_out_limit:
log.info("blob peer search bottomed out for %s", self.key.hex()[:8])
self.iteration_queue.put_nowait(None)

def get_initial_result(self) -> typing.List['KademliaPeer']:
if self.protocol.data_store.has_peers_for_blob(self.key):
Expand Down
13 changes: 1 addition & 12 deletions lbry/extras/daemon/daemon.py
Expand Up @@ -4885,20 +4885,16 @@ async def jsonrpc_blob_delete(self, blob_hash):
"""

@requires(DHT_COMPONENT)
async def jsonrpc_peer_list(self, blob_hash, search_bottom_out_limit=None, page=None, page_size=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be documented as a breaking change

async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
"""
Get peers for blob hash

Usage:
peer_list (<blob_hash> | --blob_hash=<blob_hash>)
[<search_bottom_out_limit> | --search_bottom_out_limit=<search_bottom_out_limit>]
[--page=<page>] [--page_size=<page_size>]

Options:
--blob_hash=<blob_hash> : (str) find available peers for this blob hash
--search_bottom_out_limit=<search_bottom_out_limit> : (int) the number of search probes in a row
that don't find any new peers
before giving up and returning
--page=<page> : (int) page to return during paginating
--page_size=<page_size> : (int) number of items on page during pagination

Expand All @@ -4910,13 +4906,6 @@ async def jsonrpc_peer_list(self, blob_hash, search_bottom_out_limit=None, page=
if not is_valid_blobhash(blob_hash):
# TODO: use error from lbry.error
raise Exception("invalid blob hash")
if search_bottom_out_limit is not None:
search_bottom_out_limit = int(search_bottom_out_limit)
if search_bottom_out_limit <= 0:
# TODO: use error from lbry.error
raise Exception("invalid bottom out limit")
else:
search_bottom_out_limit = 4
peers = []
peer_q = asyncio.Queue(loop=self.component_manager.loop)
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)
Expand Down