Skip to content

Commit

Permalink
Merge pull request #2123 from lbryio/stop-timed-out-downloads
Browse files Browse the repository at this point in the history
Stop timed out downloads
  • Loading branch information
jackrobison committed May 10, 2019
2 parents 7d2f592 + ad2da24 commit 68ee31b
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 5 deletions.
1 change: 0 additions & 1 deletion lbrynet/dht/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ async def _add_hashes_from_queue():
try:
async with peer_generator as junction:
yield junction
await peer_generator.finished.wait()
finally:
if add_hashes_task and not (add_hashes_task.done() or add_hashes_task.cancelled()):
add_hashes_task.cancel()
Expand Down
5 changes: 4 additions & 1 deletion lbrynet/stream/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ async def start(self, node: typing.Optional['Node'] = None, connection_id: int =
async def download_stream_blob(self, blob_info: 'BlobInfo', connection_id: int = 0) -> 'AbstractBlob':
if not filter(lambda blob: blob.blob_hash == blob_info.blob_hash, self.descriptor.blobs[:-1]):
raise ValueError(f"blob {blob_info.blob_hash} is not part of stream with sd hash {self.sd_hash}")
blob = await self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length, connection_id)
blob = await asyncio.wait_for(
self.blob_downloader.download_blob(blob_info.blob_hash, blob_info.length, connection_id),
self.config.blob_download_timeout * 10, loop=self.loop
)
return blob

def decrypt_blob(self, blob_info: 'BlobInfo', blob: 'AbstractBlob') -> bytes:
Expand Down
17 changes: 15 additions & 2 deletions lbrynet/stream/managed_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ManagedStream:
'saving',
'finished_writing',
'started_writing',

'finished_write_attempt'
]

def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager: 'BlobManager',
Expand Down Expand Up @@ -100,6 +100,7 @@ def __init__(self, loop: asyncio.BaseEventLoop, config: 'Config', blob_manager:
self.saving = asyncio.Event(loop=self.loop)
self.finished_writing = asyncio.Event(loop=self.loop)
self.started_writing = asyncio.Event(loop=self.loop)
self.finished_write_attempt = asyncio.Event(loop=self.loop)

@property
def descriptor(self) -> StreamDescriptor:
Expand Down Expand Up @@ -347,6 +348,7 @@ async def _save_file(self, output_path: str):
log.info("save file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, self.sd_hash[:6],
output_path)
self.saving.set()
self.finished_write_attempt.clear()
self.finished_writing.clear()
self.started_writing.clear()
try:
Expand All @@ -370,11 +372,22 @@ async def _save_file(self, output_path: str):
if os.path.isfile(output_path):
log.warning("removing incomplete download %s for %s", output_path, self.sd_hash)
os.remove(output_path)
if not isinstance(err, asyncio.CancelledError):
self.written_bytes = 0
if isinstance(err, asyncio.TimeoutError):
self.downloader.stop()
await self.blob_manager.storage.change_file_download_dir_and_file_name(
self.stream_hash, None, None
)
self._file_name, self.download_directory = None, None
await self.blob_manager.storage.clear_saved_file(self.stream_hash)
await self.update_status(self.STATUS_STOPPED)
return
elif not isinstance(err, asyncio.CancelledError):
log.exception("unexpected error encountered writing file for stream %s", self.sd_hash)
raise err
finally:
self.saving.clear()
self.finished_write_attempt.set()

async def save_file(self, file_name: typing.Optional[str] = None, download_directory: typing.Optional[str] = None,
node: typing.Optional['Node'] = None):
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/dht/test_blob_announcer.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async def test_announce_blobs(self):

_, task = last.accumulate_peers(search_q, peer_q)
found_peers = await peer_q.get()
task.cancel()
await task

self.assertEqual(1, len(found_peers))
self.assertEqual(self.node.protocol.node_id, found_peers[0].node_id)
Expand Down
24 changes: 24 additions & 0 deletions tests/unit/stream/test_stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,30 @@ async def test_download_data_timeout(self):
self.server_blob_manager.delete_blob(head_blob_hash)
await self._test_download_error_analytics_on_start(DownloadDataTimeout, timeout=1)

async def test_non_head_data_timeout(self):
await self.setup_stream_manager()
with open(os.path.join(self.server_dir, self.sd_hash), 'r') as sdf:
last_blob_hash = json.loads(sdf.read())['blobs'][-2]['blob_hash']
self.server_blob_manager.delete_blob(last_blob_hash)
self.client_config.blob_download_timeout = 0.1
stream = await self.stream_manager.download_stream_from_uri(self.uri, self.exchange_rate_manager)
await stream.started_writing.wait()
self.assertEqual('running', stream.status)
self.assertIsNotNone(stream.full_path)
self.assertGreater(stream.written_bytes, 0)
await stream.finished_write_attempt.wait()
self.assertEqual('stopped', stream.status)
self.assertIsNone(stream.full_path)
self.assertEqual(0, stream.written_bytes)

self.stream_manager.stop()
await self.stream_manager.start()
self.assertEqual(1, len(self.stream_manager.streams))
stream = list(self.stream_manager.streams.values())[0]
self.assertEqual('stopped', stream.status)
self.assertIsNone(stream.full_path)
self.assertEqual(0, stream.written_bytes)

async def test_download_then_recover_stream_on_startup(self, old_sort=False):
expected_analytics_events = [
'Time To First Bytes',
Expand Down

0 comments on commit 68ee31b

Please sign in to comment.