Skip to content

Commit

Permalink
hash announcer: X continuous tasks instead of batch gather
Browse files Browse the repository at this point in the history
  • Loading branch information
shyba committed Feb 9, 2022
1 parent c96d1d9 commit 4a36628
Showing 1 changed file with 38 additions and 30 deletions.
68 changes: 38 additions & 30 deletions lbry/dht/blob_announcer.py
Expand Up @@ -25,23 +25,29 @@ def __init__(self, loop: asyncio.AbstractEventLoop, node: 'Node', storage: 'SQLi
self.loop = loop
self.node = node
self.storage = storage
self.announce_task: asyncio.Task = None
self.announce_queue: typing.List[str] = []
self.announce_tasks: typing.List[asyncio.Task] = []
self.announce_queue: asyncio.Queue = asyncio.Queue()
self.announced = set()

async def _submit_announcement(self, blob_hash):
try:

peers = len(await self.node.announce_blob(blob_hash))
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
if peers > 4:
return blob_hash
else:
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
except Exception as err:
self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
async def worker_task(self):
while True:
try:
blob_hash = await self.announce_queue.get()
peers = len(await self.node.announce_blob(blob_hash))
self.announcements_sent_metric.labels(peers=peers, error=False).inc()
if peers <= 4:
log.debug("failed to announce %s, could only find %d peers, retrying soon.", blob_hash[:8], peers)
self.announce_queue.put_nowait(blob_hash)
else:
self.announced.add(blob_hash)
except Exception as err:
self.announce_queue.put_nowait(blob_hash)
self.announcements_sent_metric.labels(peers=0, error=True).inc()
if isinstance(err, asyncio.CancelledError): # TODO: remove when updated to 3.8
raise err
log.warning("error announcing %s: %s", blob_hash[:8], str(err))
finally:
self.announce_queue.task_done()

async def _announce(self, batch_size: typing.Optional[int] = 10):
while batch_size:
Expand All @@ -51,24 +57,26 @@ async def _announce(self, batch_size: typing.Optional[int] = 10):
if not self.node.protocol.routing_table.get_peers():
log.warning("No peers in DHT, announce round skipped")
continue
self.announce_queue.extend(await self.storage.get_blobs_to_announce())
self.announcement_queue_size_metric.labels(scope="global").set(len(self.announce_queue))
log.debug("announcer task wake up, %d blobs to announce", len(self.announce_queue))
while len(self.announce_queue) > 0:
log.info("%i blobs to announce", len(self.announce_queue))
announced = await asyncio.gather(*[
self._submit_announcement(
self.announce_queue.pop()) for _ in range(batch_size) if self.announce_queue
], loop=self.loop)
announced = list(filter(None, announced))
for blob_hash in await self.storage.get_blobs_to_announce():
self.announce_queue.put_nowait(blob_hash)
self.announcement_queue_size_metric.labels(scope="global").set(self.announce_queue.qsize())
log.debug("announcer task wake up, %d blobs to announce", self.announce_queue.qsize())
while self.announce_queue.qsize() > 0:
log.info("%i blobs to announce", self.announce_queue.qsize())
await self.announce_queue.join()
announced = list(filter(None, self.announced))
if announced:
await self.storage.update_last_announced_blobs(announced)
log.info("announced %i blobs", len(announced))

def start(self, batch_size: typing.Optional[int] = 10):
assert not self.announce_task or self.announce_task.done(), "already running"
self.announce_task = self.loop.create_task(self._announce(batch_size))
assert not self.announce_tasks or self.announce_tasks[0].done(), "already running"
self.announce_tasks.append(self.loop.create_task(self._announce(batch_size)))
for _ in range(batch_size):
self.announce_tasks.append(asyncio.create_task(self.worker_task()))

def stop(self):
if self.announce_task and not self.announce_task.done():
self.announce_task.cancel()
while self.announce_tasks:
task = self.announce_tasks.pop()
if not task.done():
task.cancel()

0 comments on commit 4a36628

Please sign in to comment.