diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 81b6269823..5790bccf18 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -5,6 +5,7 @@ import asyncio import binascii import time +from operator import itemgetter from typing import Optional from lbry.wallet import SQLiteMixin from lbry.conf import Config @@ -635,6 +636,15 @@ def update_db_removed(transaction: sqlite3.Connection, removed): def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]: return self.db.run(get_all_lbry_files) + async def get_all_torrent_files(self) -> typing.List[typing.Dict]: + def _get_all_torrent_files(transaction): + cursor = transaction.execute("select * from file join torrent on file.bt_infohash=torrent.bt_infohash") + return [ + {field: value for field, value in zip(list(map(itemgetter(0), cursor.description)), row)} + for row in cursor.fetchall() + ] + return await self.db.run(_get_all_torrent_files) + def change_file_status(self, stream_hash: str, new_status: str): log.debug("update file status %s -> %s", stream_hash, new_status) return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash)) @@ -907,7 +917,7 @@ async def get_content_claim(self, stream_hash: str, include_supports: typing.Opt async def get_content_claim_for_torrent(self, bt_infohash): claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash]) - return claims[bt_infohash].as_dict() if claims else None + return claims[bt_infohash] if claims else None # # # # # # # # # reflector functions # # # # # # # # # diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index 311a89ded4..e50a3fd182 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -138,7 +138,7 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name ) claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier) - existing[0].set_claim(claim_info, claim) + existing[0].set_claim(claim_info.as_dict() if claim_info else None, claim) else: await self.storage.save_content_claim( existing[0].stream_hash, outpoint @@ -238,7 +238,7 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag stream.identifier, outpoint, stream.torrent_length, stream.torrent_name ) claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier) - stream.set_claim(claim_info, claim) + stream.set_claim(claim_info.as_dict() if claim_info else None, claim) if save_file: await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download), loop=self.loop) diff --git a/lbry/file/source_manager.py b/lbry/file/source_manager.py index 2f199f159c..2ef1780075 100644 --- a/lbry/file/source_manager.py +++ b/lbry/file/source_manager.py @@ -84,6 +84,7 @@ async def create(self, file_path: str, key: Optional[bytes] = None, raise NotImplementedError() async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): + await self.storage.delete_torrent(source.identifier) self.remove(source) if delete_file and source.output_file_exists: os.remove(source.full_path) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index 1e91d14109..76e4bc32e8 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -161,7 +161,7 @@ async def recover_streams(self, file_infos: typing.List[typing.Dict]): async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str], download_directory: Optional[str], status: str, claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], - added_on: Optional[int]): + added_on: Optional[int], **kwargs): stream = TorrentSource( self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name, download_directory=download_directory, status=status, claim=claim, rowid=rowid, @@ -171,7 +171,9 @@ async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[s self.add(stream) async def initialize_from_database(self): - pass + for file in await self.storage.get_all_torrent_files(): + claim = await self.storage.get_content_claim_for_torrent(file['bt_infohash']) + await self._load_stream(None, claim=claim, **file) async def start(self): await super().start() diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index f0185d2b8d..35d7046ef4 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -66,6 +66,14 @@ async def test_download_torrent(self): # claim now points to another torrent, update to it self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih) + self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) + + # restart and verify that only one updated stream was recovered + self.daemon.file_manager.stop() + await self.daemon.file_manager.start() + self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih) + self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) + self.assertIn(new_btih, self.client_session._handles) self.assertNotIn(btih, self.client_session._handles) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1)