Skip to content

Commit

Permalink
Merge 4c7e345 into 87ec7ac
Browse files Browse the repository at this point in the history
  • Loading branch information
shyba committed Feb 12, 2022
2 parents 87ec7ac + 4c7e345 commit 15e8c14
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 118 deletions.
1 change: 0 additions & 1 deletion lbry/dht/constants.py
Expand Up @@ -20,7 +20,6 @@
CHECK_REFRESH_INTERVAL = REFRESH_INTERVAL / 5
RPC_ID_LENGTH = 20
PROTOCOL_VERSION = 1
BOTTOM_OUT_LIMIT = 3
MSG_SIZE_LIMIT = 1400


Expand Down
10 changes: 4 additions & 6 deletions lbry/dht/node.py
Expand Up @@ -202,25 +202,23 @@ def start(self, interface: str, known_node_urls: typing.Optional[typing.List[typ
self._join_task = self.loop.create_task(self.join_network(interface, known_node_urls))

def get_iterative_node_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = constants.BOTTOM_OUT_LIMIT,
max_results: int = constants.K) -> IterativeNodeFinder:

return IterativeNodeFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist)
key, max_results, None, shortlist)

def get_iterative_value_finder(self, key: bytes, shortlist: typing.Optional[typing.List['KademliaPeer']] = None,
bottom_out_limit: int = 40,
max_results: int = -1) -> IterativeValueFinder:

return IterativeValueFinder(self.loop, self.protocol.peer_manager, self.protocol.routing_table, self.protocol,
key, bottom_out_limit, max_results, None, shortlist)
key, max_results, None, shortlist)

async def peer_search(self, node_id: bytes, count=constants.K, max_results=constants.K * 2,
bottom_out_limit=60, shortlist: typing.Optional[typing.List['KademliaPeer']] = None
shortlist: typing.Optional[typing.List['KademliaPeer']] = None
) -> typing.List['KademliaPeer']:
peers = []
async for iteration_peers in self.get_iterative_node_finder(
node_id, shortlist=shortlist, bottom_out_limit=bottom_out_limit, max_results=max_results):
node_id, shortlist=shortlist, max_results=max_results):
peers.extend(iteration_peers)
distance = Distance(node_id)
peers.sort(key=lambda peer: distance(peer.node_id))
Expand Down
139 changes: 40 additions & 99 deletions lbry/dht/protocol/iterative_find.py
Expand Up @@ -74,7 +74,7 @@ def get_shortlist(routing_table: 'TreeRoutingTable', key: bytes,
class IterativeFinder:
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
if len(key) != constants.HASH_LENGTH:
Expand All @@ -85,46 +85,27 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
self.protocol = protocol

self.key = key
self.bottom_out_limit = bottom_out_limit
self.max_results = max_results
self.max_results = max(constants.K, max_results)
self.exclude = exclude or []

self.active: typing.Set['KademliaPeer'] = set()
self.active: typing.Dict['KademliaPeer', int] = {} # peer: distance, sorted
self.contacted: typing.Set['KademliaPeer'] = set()
self.distance = Distance(key)

self.closest_peer: typing.Optional['KademliaPeer'] = None
self.prev_closest_peer: typing.Optional['KademliaPeer'] = None

self.iteration_queue = asyncio.Queue(loop=self.loop)

self.running_probes: typing.Set[asyncio.Task] = set()
self.running_probes: typing.Dict['KademliaPeer', asyncio.Task] = {}
self.iteration_count = 0
self.bottom_out_count = 0
self.running = False
self.tasks: typing.List[asyncio.Task] = []
self.delayed_calls: typing.List[asyncio.Handle] = []
self.delayed_call: asyncio.Handle = None
for peer in get_shortlist(routing_table, key, shortlist):
if peer.node_id:
self._add_active(peer, force=True)
else:
# seed nodes
self._schedule_probe(peer)

@property
def is_closest_peer_ready(self):
if not self.closest_peer or not self.prev_closest_peer:
return False
return self.closest_peer in self.contacted and self.peer_manager.peer_is_good(self.closest_peer)

@property
def are_k_closest_peers_ready(self):
if not self.is_closest_peer_ready:
return False
to_probe = list(self.active)
to_probe.sort(key=lambda peer: self.distance(peer.node_id))
return all(self.peer_manager.peer_is_good(peer) for peer in to_probe[:self.max_results])

async def send_probe(self, peer: 'KademliaPeer') -> FindResponse:
"""
Send the rpc request to the peer and return an object with the FindResponse interface
Expand Down Expand Up @@ -152,27 +133,14 @@ def get_initial_result(self) -> typing.List['KademliaPeer']: #pylint: disable=n
"""
return []

def _is_closer(self, peer: 'KademliaPeer') -> bool:
return not self.closest_peer or self.distance.is_closer(peer.node_id, self.closest_peer.node_id)

def _add_active(self, peer, force=False):
if not force and self.peer_manager.peer_is_good(peer) is False:
return
if peer in self.contacted:
return
if self.closest_peer and self.peer_manager.peer_is_good(self.closest_peer) is False:
log.debug("[%s] closest peer went bad", self.key.hex()[:8])
if self.prev_closest_peer and self.peer_manager.peer_is_good(self.prev_closest_peer) is not False:
log.debug("[%s] previous closest was bad too", self.key.hex()[:8])
self.closest_peer = self.prev_closest_peer
else:
self.closest_peer = None
self.prev_closest_peer = None
if peer not in self.active and peer.node_id and peer.node_id != self.protocol.node_id:
self.active.add(peer)
if self._is_closer(peer):
self.prev_closest_peer = self.closest_peer
self.closest_peer = peer
self.active[peer] = self.distance(peer.node_id)
self.active = dict(sorted(self.active.items(), key=lambda item: item[1]))

async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindResponse):
self._add_active(peer)
Expand All @@ -187,11 +155,8 @@ async def _handle_probe_result(self, peer: 'KademliaPeer', response: FindRespons
self._log_state()

def _reset_closest(self, peer):
self.active.discard(peer)
if peer == self.prev_closest_peer:
self.prev_closest_peer = None
if peer == self.closest_peer:
self.closest_peer = self.prev_closest_peer
if peer in self.active:
del self.active[peer]

async def _send_probe(self, peer: 'KademliaPeer'):
try:
Expand All @@ -210,17 +175,21 @@ async def _send_probe(self, peer: 'KademliaPeer'):
return
return await self._handle_probe_result(peer, response)

async def _search_round(self):
def _search_round(self):
"""
Send up to constants.alpha (5) probes to closest active peers
"""

added = 0
to_probe = list(self.active - self.contacted)
to_probe.sort(key=lambda peer: self.distance(peer.node_id))
log.debug("closest to probe: %s", to_probe[0].node_id.hex()[:8] if to_probe else None)
for peer in to_probe:
if added >= constants.ALPHA:
for index, peer in enumerate(self.active.keys()):
if index == 0:
log.debug("closest to probe: %s", peer.node_id.hex()[:8])

if peer in self.contacted:
continue
if len(self.running_probes) >= constants.ALPHA:
break
if index > (constants.K + len(self.running_probes)):
break
origin_address = (peer.address, peer.udp_port)
if origin_address in self.exclude:
Expand All @@ -242,35 +211,31 @@ def _schedule_probe(self, peer: 'KademliaPeer'):
t = self.loop.create_task(self._send_probe(peer))

def callback(_):
self.running_probes.difference_update({
probe for probe in self.running_probes if probe.done() or probe == t
})
if not self.running_probes:
self.tasks.append(self.loop.create_task(self._search_task(0.0)))
for peer in [peer for peer, task in self.running_probes.items() if task.done() or task == t]:
del self.running_probes[peer]
self._search_task(0.0)

t.add_done_callback(callback)
self.running_probes.add(t)
self.running_probes[peer] = t

async def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
def _search_task(self, delay: typing.Optional[float] = constants.ITERATIVE_LOOKUP_DELAY):
try:
if self.running:
await self._search_round()
if self.running:
self.delayed_calls.append(self.loop.call_later(delay, self._search))
if self.delayed_call:
self.delayed_call.cancel() # ensure anything scheduled gets cancelled
self._search_round()
#if self.running:
# self.delayed_call = self.loop.call_later(delay, self._search)
except (asyncio.CancelledError, StopAsyncIteration, TransportNotConnected):
if self.running:
self.loop.call_soon(self.aclose)

def _log_state(self):
log.debug("[%s] check result: %i active nodes %i contacted %i bottomed count",
self.key.hex()[:8], len(self.active), len(self.contacted), self.bottom_out_count)
if self.closest_peer and self.prev_closest_peer:
log.debug("[%s] best node id: %s (contacted: %s, good: %s), previous best: %s",
self.key.hex()[:8], self.closest_peer.node_id.hex()[:8], self.closest_peer in self.contacted,
self.peer_manager.peer_is_good(self.closest_peer), self.prev_closest_peer.node_id.hex()[:8])
log.debug("[%s] check result: %i active nodes %i contacted",
self.key.hex()[:8], len(self.active), len(self.contacted))

def _search(self):
self.tasks.append(self.loop.create_task(self._search_task()))
self._search_task()

def __aiter__(self):
if self.running:
Expand All @@ -296,20 +261,20 @@ async def __anext__(self) -> typing.List['KademliaPeer']:
def aclose(self):
self.running = False
self.iteration_queue.put_nowait(None)
for task in chain(self.tasks, self.running_probes, self.delayed_calls):
for task in chain(self.tasks, self.running_probes.values(), filter(None, [self.delayed_call])):
task.cancel()
self.tasks.clear()
self.running_probes.clear()
self.delayed_calls.clear()
self.delayed_call = None


class IterativeNodeFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist)
self.yielded_peers: typing.Set['KademliaPeer'] = set()

Expand All @@ -320,7 +285,7 @@ async def send_probe(self, peer: 'KademliaPeer') -> FindNodeResponse:
return FindNodeResponse(self.key, response)

def search_exhausted(self):
self.put_result(self.active, finish=True)
self.put_result(self.active.keys(), finish=True)

def put_result(self, from_iter: typing.Iterable['KademliaPeer'], finish=False):
not_yet_yielded = [
Expand All @@ -342,26 +307,16 @@ def check_result_ready(self, response: FindNodeResponse):

if found:
log.debug("found")
return self.put_result(self.active, finish=True)
elif self.is_closest_peer_ready:
self.bottom_out_count += 1
else:
self.bottom_out_count = 0

if self.are_k_closest_peers_ready:
self.put_result(self.active, True)
elif self.bottom_out_count >= self.bottom_out_limit or self.iteration_count >= self.bottom_out_limit:
log.debug("peer search bottomed out.")
self.put_result(self.active, True)
return self.put_result(self.active.keys(), finish=True)


class IterativeValueFinder(IterativeFinder):
def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
routing_table: 'TreeRoutingTable', protocol: 'KademliaProtocol', key: bytes,
bottom_out_limit: typing.Optional[int] = 2, max_results: typing.Optional[int] = constants.K,
max_results: typing.Optional[int] = constants.K,
exclude: typing.Optional[typing.List[typing.Tuple[str, int]]] = None,
shortlist: typing.Optional[typing.List['KademliaPeer']] = None):
super().__init__(loop, peer_manager, routing_table, protocol, key, bottom_out_limit, max_results, exclude,
super().__init__(loop, peer_manager, routing_table, protocol, key, max_results, exclude,
shortlist)
self.blob_peers: typing.Set['KademliaPeer'] = set()
# this tracks the index of the most recent page we requested from each peer
Expand Down Expand Up @@ -406,26 +361,12 @@ def check_result_ready(self, response: FindValueResponse):
blob_peers = [self.peer_manager.decode_tcp_peer_from_compact_address(compact_addr)
for compact_addr in response.found_compact_addresses]
to_yield = []
self.bottom_out_count = 0
for blob_peer in blob_peers:
if blob_peer not in self.blob_peers:
self.blob_peers.add(blob_peer)
to_yield.append(blob_peer)
if to_yield:
# log.info("found %i new peers for blob", len(to_yield))
self.iteration_queue.put_nowait(to_yield)
# if self.max_results and len(self.blob_peers) >= self.max_results:
# log.info("enough blob peers found")
# if not self.finished.is_set():
# self.finished.set()
elif self.is_closest_peer_ready:
self.bottom_out_count += 1
if self.are_k_closest_peers_ready:
log.info("blob peer search finished for %s", self.key.hex()[:8])
self.iteration_queue.put_nowait(None)
elif self.bottom_out_count >= self.bottom_out_limit:
log.info("blob peer search bottomed out for %s", self.key.hex()[:8])
self.iteration_queue.put_nowait(None)

def get_initial_result(self) -> typing.List['KademliaPeer']:
if self.protocol.data_store.has_peers_for_blob(self.key):
Expand Down
13 changes: 1 addition & 12 deletions lbry/extras/daemon/daemon.py
Expand Up @@ -4885,20 +4885,16 @@ async def jsonrpc_blob_delete(self, blob_hash):
"""

@requires(DHT_COMPONENT)
async def jsonrpc_peer_list(self, blob_hash, search_bottom_out_limit=None, page=None, page_size=None):
async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
"""
Get peers for blob hash
Usage:
peer_list (<blob_hash> | --blob_hash=<blob_hash>)
[<search_bottom_out_limit> | --search_bottom_out_limit=<search_bottom_out_limit>]
[--page=<page>] [--page_size=<page_size>]
Options:
--blob_hash=<blob_hash> : (str) find available peers for this blob hash
--search_bottom_out_limit=<search_bottom_out_limit> : (int) the number of search probes in a row
that don't find any new peers
before giving up and returning
--page=<page> : (int) page to return during paginating
--page_size=<page_size> : (int) number of items on page during pagination
Expand All @@ -4910,13 +4906,6 @@ async def jsonrpc_peer_list(self, blob_hash, search_bottom_out_limit=None, page=
if not is_valid_blobhash(blob_hash):
# TODO: use error from lbry.error
raise Exception("invalid blob hash")
if search_bottom_out_limit is not None:
search_bottom_out_limit = int(search_bottom_out_limit)
if search_bottom_out_limit <= 0:
# TODO: use error from lbry.error
raise Exception("invalid bottom out limit")
else:
search_bottom_out_limit = 4
peers = []
peer_q = asyncio.Queue(loop=self.component_manager.loop)
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)
Expand Down

0 comments on commit 15e8c14

Please sign in to comment.