From f06d804578617442559d49f57f07e8ce06735fc9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 7 Sep 2022 18:59:06 -0300 Subject: [PATCH] piece prioritization and deadlines --- lbry/torrent/session.py | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 88507d5154..dc112501f7 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -108,9 +108,12 @@ async def stream_range_as_completed(self, file_index, start, end): piece_size = self._torrent_info.piece_length() log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d): %s", first_piece.piece, final_piece.piece, start, end, self.name) + self.prioritize(file_index, start, end) + await self.resume() for piece_index in range(first_piece.piece, final_piece.piece + 1): while not self._handle.have_piece(piece_index): log.info("Waiting for piece %d: %s", piece_index, self.name) + self._handle.set_piece_deadline(piece_index, 0) await asyncio.sleep(0.2) log.info("Streaming piece offset %d / %d for torrent %s", piece_index, final_piece.piece, self.name) yield piece_size - start_piece_offset @@ -128,31 +131,38 @@ def _show_status(self): self.metadata_completed.set() self._torrent_info = self._handle.torrent_file() log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) + # prioritize first 2mb + self.prioritize(self.largest_file_index, 0, 2 * 1024 * 1024) self._base_path = status.save_path first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index) if not self.started.is_set(): if self._handle.have_piece(first_piece): + log.debug("Got first piece, set started - %s", self.name) self.started.set() - else: - # prioritize it - self._handle.set_piece_deadline(first_piece, 100) - prios = self._handle.piece_priorities() - prios[first_piece] = 7 - self._handle.prioritize_pieces(prios) - if not status.is_seeding: - log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', - status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, - status.num_peers, status.num_seeds, status.state, status.save_path) - elif not self.finished.is_set(): + log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', + status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, + status.num_peers, status.num_seeds, status.state, status.save_path) + if (status.is_finished or status.is_seeding) and not self.finished.is_set(): self.finished.set() log.info("Torrent finished: %s", self.name) + def prioritize(self, file_index, start, end, cleanup=False): + first_piece, last_piece = self.byte_range_to_piece_range(file_index, start, end) + priorities = self._handle.get_piece_priorities() + priorities = [0 if cleanup else 1 for _ in priorities] + self._handle.clear_piece_deadlines() + for idx, piece_number in enumerate(range(first_piece.piece, last_piece.piece + 1)): + priorities[piece_number] = 7 - idx if 0 <= idx <= 6 else 1 + self._handle.set_piece_deadline(piece_number, idx) + log.debug("Prioritizing pieces for %s: %s", self.name, priorities) + self._handle.prioritize_pieces(priorities) + async def status_loop(self): while True: self._show_status() if self.finished.is_set(): break - await asyncio.sleep(0.1, loop=self._loop) + await asyncio.sleep(0.5, loop=self._loop) async def pause(self): await self._loop.run_in_executor(