From 2e20bfbce6df3f2e9b4c035c5a0c4ff2f6dce701 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 15 Oct 2021 03:09:38 -0300 Subject: [PATCH 01/30] create downloader component and initial tests --- lbry/extras/daemon/components.py | 58 +++++++++++++++++++ lbry/extras/daemon/daemon.py | 40 +++++++++++++ lbry/extras/daemon/storage.py | 19 ++++++ .../datanetwork/test_file_commands.py | 34 ++++++++++- 4 files changed, 150 insertions(+), 1 deletion(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 8e32cf6c17..a5b23d7437 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -42,6 +42,7 @@ HASH_ANNOUNCER_COMPONENT = "hash_announcer" FILE_MANAGER_COMPONENT = "file_manager" DISK_SPACE_COMPONENT = "disk_space" +BACKGROUND_DOWNLOADER_COMPONENT = "background_downloader" PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server" UPNP_COMPONENT = "upnp" EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager" @@ -377,6 +378,63 @@ async def stop(self): self.file_manager.stop() +class BackgroundDownloader(Component): + component_name = BACKGROUND_DOWNLOADER_COMPONENT + depends_on = [FILE_MANAGER_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT] + + def __init__(self, component_manager): + super().__init__(component_manager) + self.status = {'pending': 0, 'ongoing': 0} + self.task: typing.Optional[asyncio.Task] = None + self.download_loop_delay_seconds = 60 + + @property + def component(self) -> 'BackgroundDownloader': + return self + + async def get_status(self): + self.status['running'] = self.task is not None and not self.task.done() + return self.status + + async def loop(self): + return + db: SQLiteStorage = self.component_manager.get_component(DATABASE_COMPONENT) + while True: + for channel_id, download_latest, download_all in await db.get_subscriptions(): + amount = 1_000_000 if download_all else download_latest + if not amount: + continue + await self.ensure_download(channel_id, amount) + await asyncio.sleep(self.download_loop_delay_seconds) + + async def ensure_download(self, channel_id, amount): + file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) + wallet = self.component_manager.get_component(WALLET_COMPONENT) + ledger = wallet.ledger + claims, _, _, _ = await ledger.claim_search( + ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height']) + page = 0 + while claims and amount > 0: + for claim in claims: + if not claim.script.source or claim.has_price: + continue + stream = await file_manager.download_from_uri( + claim.permanent_url, None, 60.0, save_file=False, wallet=wallet + ) + amount -= 1 + if amount == 0: + break + page += 1 + claims, _, _, _ = await ledger.claim_search( + ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'], page=page) + + async def start(self): + self.task = asyncio.create_task(self.loop()) + + async def stop(self): + self.task.cancel() + + class DiskSpaceComponent(Component): component_name = DISK_SPACE_COMPONENT depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT] diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 89e56dd8fe..c9060eaa7a 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -3030,6 +3030,46 @@ async def jsonrpc_channel_export(self, channel_id=None, channel_name=None, accou } return base58.b58encode(json.dumps(export, separators=(',', ':'))) + @requires(WALLET_COMPONENT) + def jsonrpc_channel_subscribe(self, channel_id, download_latest=None, download_all=False): + """ + Subscribe to a channel and optionally start downloading streams proactively. + + Usage: + channel_subscribe ( | --channel_id=) [--download_latest=] + [--download_all] + + Options: + --channel_id= : (str) claim id of channel to subscribe. + --download_latest= : (int) amount of newest streams to ensure download. + --download_all : (bool) download all streams from the channel. + + Returns: + (bool) Subscription successful? (False only if channel doesn't exist) + """ + if download_all and download_latest is not None: + raise ConflictingInputValueError("Please set either download_latest or download_all, not both.") + return self.storage.add_subscription(channel_id, download_latest, download_all) + + @requires(WALLET_COMPONENT) + def jsonrpc_channel_unsubscribe(self, channel_id): + """ + Subscribe to a channel and optionally start downloading streams proactively. + + Usage: + channel_subscribe ( | --channel_id=) [--download=] + + Options: + --channel_id= : (str) claim id of channel to subscribe + --download= : (str) which strategy to use for downloads: 'all' for everything. + 'latest-X' for the latest X streams. None (default) for nothing. + + Returns: + (bool) Subscription successful? (False only if channel doesn't exist) + """ + return self.storage.remove_subscription(channel_id) + + @requires(WALLET_COMPONENT) async def jsonrpc_channel_import(self, channel_data, wallet_id=None): """ diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 758c259700..6fc25313fd 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -235,6 +235,12 @@ class SQLiteStorage(SQLiteMixin): pragma foreign_keys=on; pragma journal_mode=WAL; + create table if not exists subscription ( + channel_id char(40) primary key not null, + download_latest integer not null default 0, + download_all integer not null default 0 + ); + create table if not exists blob ( blob_hash char(96) primary key not null, blob_length integer not null, @@ -539,6 +545,19 @@ def delete_stream(self, descriptor: 'StreamDescriptor'): async def delete_torrent(self, bt_infohash: str): return await self.db.run(delete_torrent, bt_infohash) + # # # # # # # # # subscriptions # # # # # # # # # + + def add_subscription(self, channel_id, download_latest=None, download_all=None): + return self.db.execute_fetchall( + "insert or replace into subscription(channel_id, download_latest, download_all) values (?, ?, ?)", + (channel_id, download_latest or 0, 1 if download_all else 0)) + + def remove_subscription(self, channel_id): + return self.db.execute_fetchall("delete from subscriptions where channel_id=?", (channel_id,)) + + def get_subscriptions(self): + return self.db.execute_fetchall("select channel_id, download_latest, download_all from subscription") + # # # # # # # # # file stuff # # # # # # # # # def save_downloaded_file(self, stream_hash: str, file_name: typing.Optional[str], diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 8edd2cc6a0..424bb0f2b5 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -5,7 +5,7 @@ from lbry.schema import Claim from lbry.testcase import CommandTestCase -from lbry.extras.daemon.components import TorrentSession +from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT from lbry.wallet import Transaction @@ -571,3 +571,35 @@ async def test_file_management(self): self.assertTrue(blobs2.issubset(blobs)) self.assertFalse(blobs3.issubset(blobs)) self.assertTrue(blobs4.issubset(blobs)) + + +class TestProactiveDownloaderComponent(CommandTestCase): + async def assertFileList(self, *txos): + txos_names = {txo['outputs'][0]['name'] for txo in txos} + files = await self.file_list() + self.assertEqual(len(txos), len(files)) + file_claim_names = {file['claim_name'] for file in files} + self.assertSetEqual(txos_names, file_claim_names) + + async def test_ensure_download(self): + unrelated_claim_id = self.get_claim_id(await self.stream_create('something_else', '0.01')) + channel_id = self.get_claim_id(await self.channel_create('@cool')) + content1 = await self.stream_create('content1', '0.01', channel_id=channel_id) + content2 = await self.stream_create('content2', '0.01', channel_id=channel_id) + await self.stream_create('paid', '0.01', channel_id=channel_id, fee_amount=42, fee_currency='USD') + await self.stream_repost(unrelated_claim_id, 'repost') + await self.daemon.jsonrpc_file_delete(delete_all=True) + self.assertEqual(0, len(await self.file_list())) + + proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT) + await self.assertFileList() + await proactive_downloader.ensure_download(channel_id, 1) + await self.assertFileList(content1) + await proactive_downloader.ensure_download(channel_id, 2) + await self.assertFileList(content1, content2) + # ignores paid content + await proactive_downloader.ensure_download(channel_id, 3) + await self.assertFileList(content1, content2) + # ignores reposts + await proactive_downloader.ensure_download(channel_id, 4) + await self.assertFileList(content1, content2) From a164bcfebd7574ff4497f2ca00f7b81f996b5cba Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 15 Oct 2021 03:23:45 -0300 Subject: [PATCH 02/30] download all blobs and check that on tests --- lbry/extras/daemon/components.py | 3 +++ lbry/stream/managed_stream.py | 4 ++++ tests/integration/datanetwork/test_file_commands.py | 2 +- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index a5b23d7437..c27cb3e09a 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -17,6 +17,7 @@ from lbry.blob.blob_manager import BlobManager from lbry.blob.disk_space_manager import DiskSpaceManager from lbry.blob_exchange.server import BlobServer +from lbry.stream.managed_stream import ManagedStream from lbry.stream.stream_manager import StreamManager from lbry.file.file_manager import FileManager from lbry.extras.daemon.component import Component @@ -421,6 +422,8 @@ async def ensure_download(self, channel_id, amount): stream = await file_manager.download_from_uri( claim.permanent_url, None, 60.0, save_file=False, wallet=wallet ) + if isinstance(stream, ManagedStream): + await stream.save_blobs() amount -= 1 if amount == 0: break diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index 2a85da66e4..aee4bbc72a 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -246,6 +246,10 @@ def _write_decrypted_blob(output_path: str, data: bytes): handle.write(data) handle.flush() + async def save_blobs(self): + async for _ in self._aiter_read_stream(0, connection_id=self.STREAMING_ID): + pass + async def _save_file(self, output_path: str): log.info("save file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, self.sd_hash[:6], output_path) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 424bb0f2b5..c8d333cf15 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -576,7 +576,7 @@ async def test_file_management(self): class TestProactiveDownloaderComponent(CommandTestCase): async def assertFileList(self, *txos): txos_names = {txo['outputs'][0]['name'] for txo in txos} - files = await self.file_list() + files = await self.file_list(blobs_remaining=0) self.assertEqual(len(txos), len(files)) file_claim_names = {file['claim_name'] for file in files} self.assertSetEqual(txos_names, file_claim_names) From 36f05790490ef91eb1b199d962f09c211c5bd003 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 15 Oct 2021 03:39:36 -0300 Subject: [PATCH 03/30] fix and test main api --- lbry/extras/daemon/components.py | 10 ++++++---- .../integration/datanetwork/test_file_commands.py | 15 +++++++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index c27cb3e09a..e9fdb0573e 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -388,6 +388,7 @@ def __init__(self, component_manager): self.status = {'pending': 0, 'ongoing': 0} self.task: typing.Optional[asyncio.Task] = None self.download_loop_delay_seconds = 60 + self.finished_iteration = asyncio.Event() @property def component(self) -> 'BackgroundDownloader': @@ -398,7 +399,6 @@ async def get_status(self): return self.status async def loop(self): - return db: SQLiteStorage = self.component_manager.get_component(DATABASE_COMPONENT) while True: for channel_id, download_latest, download_all in await db.get_subscriptions(): @@ -406,6 +406,8 @@ async def loop(self): if not amount: continue await self.ensure_download(channel_id, amount) + self.finished_iteration.set() + self.finished_iteration.clear() await asyncio.sleep(self.download_loop_delay_seconds) async def ensure_download(self, channel_id, amount): @@ -414,9 +416,10 @@ async def ensure_download(self, channel_id, amount): ledger = wallet.ledger claims, _, _, _ = await ledger.claim_search( ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height']) - page = 0 + offset = 0 while claims and amount > 0: for claim in claims: + offset += 1 if not claim.script.source or claim.has_price: continue stream = await file_manager.download_from_uri( @@ -427,9 +430,8 @@ async def ensure_download(self, channel_id, amount): amount -= 1 if amount == 0: break - page += 1 claims, _, _, _ = await ledger.claim_search( - ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'], page=page) + ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'], offset=offset) async def start(self): self.task = asyncio.create_task(self.loop()) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index c8d333cf15..4f81081636 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -603,3 +603,18 @@ async def test_ensure_download(self): # ignores reposts await proactive_downloader.ensure_download(channel_id, 4) await self.assertFileList(content1, content2) + + await self.daemon.jsonrpc_file_delete(delete_all=True) + self.assertEqual(0, len(await self.file_list())) + await proactive_downloader.stop() + await self.daemon.jsonrpc_channel_subscribe(channel_id, 1) + await proactive_downloader.start() + await proactive_downloader.finished_iteration.wait() + await self.assertFileList(content1) + await self.daemon.jsonrpc_file_delete(delete_all=True) + + await self.daemon.jsonrpc_channel_subscribe(channel_id, download_all=True) + await proactive_downloader.stop() + await proactive_downloader.start() + await proactive_downloader.finished_iteration.wait() + await self.assertFileList(content1, content2) From 6a95097e9c022bb60a753cdaddd58bf389c287eb Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 15 Oct 2021 03:48:03 -0300 Subject: [PATCH 04/30] test add/remove/list subscriptions --- lbry/extras/daemon/daemon.py | 13 +++++++++++++ lbry/extras/daemon/storage.py | 2 +- tests/integration/datanetwork/test_file_commands.py | 4 ++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index c9060eaa7a..a6885957b7 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -3030,6 +3030,19 @@ async def jsonrpc_channel_export(self, channel_id=None, channel_name=None, accou } return base58.b58encode(json.dumps(export, separators=(',', ':'))) + @requires(WALLET_COMPONENT) + def jsonrpc_channel_subscription_list(self): + """ + List subscribed channels and modes. + + Usage: + channel_subscription_list + + Returns: + (list) [(channel_id, download_latest, download_all)] + """ + return self.storage.get_subscriptions() + @requires(WALLET_COMPONENT) def jsonrpc_channel_subscribe(self, channel_id, download_latest=None, download_all=False): """ diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 6fc25313fd..a9b83d4479 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -553,7 +553,7 @@ def add_subscription(self, channel_id, download_latest=None, download_all=None): (channel_id, download_latest or 0, 1 if download_all else 0)) def remove_subscription(self, channel_id): - return self.db.execute_fetchall("delete from subscriptions where channel_id=?", (channel_id,)) + return self.db.execute_fetchall("delete from subscription where channel_id=?", (channel_id,)) def get_subscriptions(self): return self.db.execute_fetchall("select channel_id, download_latest, download_all from subscription") diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 4f81081636..51520b300b 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -618,3 +618,7 @@ async def test_ensure_download(self): await proactive_downloader.start() await proactive_downloader.finished_iteration.wait() await self.assertFileList(content1, content2) + + self.assertEqual([(channel_id, 0, 1)], await self.daemon.jsonrpc_channel_subscription_list()) + await self.daemon.jsonrpc_channel_unsubscribe(channel_id) + self.assertEqual([], await self.daemon.jsonrpc_channel_subscription_list()) From 9867ce592916d01a21c9c6b67d2b872d5d37eac3 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 15 Oct 2021 04:08:18 -0300 Subject: [PATCH 05/30] fix exception arguments --- lbry/extras/daemon/daemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index a6885957b7..ff07db2193 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -3061,7 +3061,7 @@ def jsonrpc_channel_subscribe(self, channel_id, download_latest=None, download_a (bool) Subscription successful? (False only if channel doesn't exist) """ if download_all and download_latest is not None: - raise ConflictingInputValueError("Please set either download_latest or download_all, not both.") + raise ConflictingInputValueError("download_latest", "download_all") return self.storage.add_subscription(channel_id, download_latest, download_all) @requires(WALLET_COMPONENT) From 800b67b0ed6dffda3bd5aca3bf2d25213acd125a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 15 Oct 2021 04:14:35 -0300 Subject: [PATCH 06/30] fix tests --- tests/integration/other/test_cli.py | 4 ++-- tests/unit/components/test_component_manager.py | 8 ++++++-- tests/unit/lbrynet_daemon/test_allowed_origin.py | 4 ++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/integration/other/test_cli.py b/tests/integration/other/test_cli.py index 9686653336..ec28bbee39 100644 --- a/tests/integration/other/test_cli.py +++ b/tests/integration/other/test_cli.py @@ -10,7 +10,7 @@ DATABASE_COMPONENT, DISK_SPACE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT, - LIBTORRENT_COMPONENT + LIBTORRENT_COMPONENT, BACKGROUND_DOWNLOADER_COMPONENT ) from lbry.extras.daemon.daemon import Daemon @@ -26,7 +26,7 @@ async def asyncSetUp(self): DATABASE_COMPONENT, DISK_SPACE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT, - LIBTORRENT_COMPONENT + LIBTORRENT_COMPONENT, BACKGROUND_DOWNLOADER_COMPONENT ) Daemon.component_attributes = {} self.daemon = Daemon(conf) diff --git a/tests/unit/components/test_component_manager.py b/tests/unit/components/test_component_manager.py index af7f19c33f..fa5ceafb48 100644 --- a/tests/unit/components/test_component_manager.py +++ b/tests/unit/components/test_component_manager.py @@ -3,7 +3,8 @@ from lbry.conf import Config from lbry.extras.daemon.componentmanager import ComponentManager -from lbry.extras.daemon.components import DATABASE_COMPONENT, DISK_SPACE_COMPONENT, DHT_COMPONENT +from lbry.extras.daemon.components import DATABASE_COMPONENT, DISK_SPACE_COMPONENT, DHT_COMPONENT, \ + BACKGROUND_DOWNLOADER_COMPONENT from lbry.extras.daemon.components import HASH_ANNOUNCER_COMPONENT, UPNP_COMPONENT from lbry.extras.daemon.components import PEER_PROTOCOL_SERVER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT from lbry.extras.daemon import components @@ -30,6 +31,9 @@ def setUp(self): components.HashAnnouncerComponent, components.PeerProtocolServerComponent, components.WalletServerPaymentsComponent + ], + [ + components.BackgroundDownloader ] ] self.component_manager = ComponentManager(Config()) @@ -154,7 +158,7 @@ def setUp(self): Config(), skip_components=[ DATABASE_COMPONENT, DISK_SPACE_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, - PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, + PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, BACKGROUND_DOWNLOADER_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT], wallet=FakeDelayedWallet, file_manager=FakeDelayedFileManager, diff --git a/tests/unit/lbrynet_daemon/test_allowed_origin.py b/tests/unit/lbrynet_daemon/test_allowed_origin.py index 6316db0a04..b70fe6d3f2 100644 --- a/tests/unit/lbrynet_daemon/test_allowed_origin.py +++ b/tests/unit/lbrynet_daemon/test_allowed_origin.py @@ -11,7 +11,7 @@ DATABASE_COMPONENT, DISK_SPACE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT, - LIBTORRENT_COMPONENT + LIBTORRENT_COMPONENT, BACKGROUND_DOWNLOADER_COMPONENT ) from lbry.extras.daemon.daemon import Daemon @@ -72,7 +72,7 @@ async def asyncSetUp(self): DATABASE_COMPONENT, DISK_SPACE_COMPONENT, BLOB_COMPONENT, WALLET_COMPONENT, DHT_COMPONENT, HASH_ANNOUNCER_COMPONENT, FILE_MANAGER_COMPONENT, PEER_PROTOCOL_SERVER_COMPONENT, UPNP_COMPONENT, EXCHANGE_RATE_MANAGER_COMPONENT, WALLET_SERVER_PAYMENTS_COMPONENT, - LIBTORRENT_COMPONENT + LIBTORRENT_COMPONENT, BACKGROUND_DOWNLOADER_COMPONENT ) Daemon.component_attributes = {} self.daemon = Daemon(conf) From a1d2779b6fa2f64b59306ce7abb83ee52a1e7a7b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 18 Oct 2021 03:10:39 -0300 Subject: [PATCH 07/30] download only blobs --- lbry/extras/daemon/components.py | 22 +++++++---- .../datanetwork/test_file_commands.py | 39 ++++++++++++------- 2 files changed, 40 insertions(+), 21 deletions(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index e9fdb0573e..21b6fcd4b7 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -17,7 +17,7 @@ from lbry.blob.blob_manager import BlobManager from lbry.blob.disk_space_manager import DiskSpaceManager from lbry.blob_exchange.server import BlobServer -from lbry.stream.managed_stream import ManagedStream +from lbry.stream.downloader import StreamDownloader from lbry.stream.stream_manager import StreamManager from lbry.file.file_manager import FileManager from lbry.extras.daemon.component import Component @@ -381,7 +381,7 @@ async def stop(self): class BackgroundDownloader(Component): component_name = BACKGROUND_DOWNLOADER_COMPONENT - depends_on = [FILE_MANAGER_COMPONENT, DATABASE_COMPONENT, WALLET_COMPONENT] + depends_on = [DATABASE_COMPONENT, WALLET_COMPONENT] def __init__(self, component_manager): super().__init__(component_manager) @@ -411,7 +411,6 @@ async def loop(self): await asyncio.sleep(self.download_loop_delay_seconds) async def ensure_download(self, channel_id, amount): - file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT) wallet = self.component_manager.get_component(WALLET_COMPONENT) ledger = wallet.ledger claims, _, _, _ = await ledger.claim_search( @@ -422,17 +421,24 @@ async def ensure_download(self, channel_id, amount): offset += 1 if not claim.script.source or claim.has_price: continue - stream = await file_manager.download_from_uri( - claim.permanent_url, None, 60.0, save_file=False, wallet=wallet - ) - if isinstance(stream, ManagedStream): - await stream.save_blobs() + await self.download_blobs(claim.claim.stream.source.sd_hash) amount -= 1 if amount == 0: break claims, _, _, _ = await ledger.claim_search( ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'], offset=offset) + async def download_blobs(self, sd_hash): + blob_manager = self.component_manager.get_component(BLOB_COMPONENT) + downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash) + node = None + if self.component_manager.has_component(DHT_COMPONENT): + node = self.component_manager.get_component(DHT_COMPONENT) + await downloader.start(node) + await downloader.load_descriptor() + for blob_info in downloader.descriptor.blobs[:-1]: + await downloader.download_stream_blob(blob_info) + async def start(self): self.task = asyncio.create_task(self.loop()) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 51520b300b..54b917f899 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -574,50 +574,63 @@ async def test_file_management(self): class TestProactiveDownloaderComponent(CommandTestCase): - async def assertFileList(self, *txos): - txos_names = {txo['outputs'][0]['name'] for txo in txos} - files = await self.file_list(blobs_remaining=0) - self.assertEqual(len(txos), len(files)) - file_claim_names = {file['claim_name'] for file in files} - self.assertSetEqual(txos_names, file_claim_names) + async def assertBlobs(self, *sd_hashes): + # checks that we have ony the finished blobs needed for the the referenced streams + seen = set(sd_hashes) + for sd_hash in sd_hashes: + self.assertTrue(self.daemon.blob_manager.get_blob(sd_hash).get_is_verified()) + blobs = await self.daemon.storage.get_blobs_for_stream( + await self.daemon.storage.get_stream_hash_for_sd_hash(sd_hash) + ) + for blob in blobs[:-1]: + self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified()) + seen.update(blob.blob_hash for blob in blobs if blob.blob_hash) + self.assertEqual(seen, self.daemon.blob_manager.completed_blob_hashes) async def test_ensure_download(self): unrelated_claim_id = self.get_claim_id(await self.stream_create('something_else', '0.01')) channel_id = self.get_claim_id(await self.channel_create('@cool')) content1 = await self.stream_create('content1', '0.01', channel_id=channel_id) + content1 = content1['outputs'][0]['value']['source']['sd_hash'] content2 = await self.stream_create('content2', '0.01', channel_id=channel_id) + content2 = content2['outputs'][0]['value']['source']['sd_hash'] await self.stream_create('paid', '0.01', channel_id=channel_id, fee_amount=42, fee_currency='USD') await self.stream_repost(unrelated_claim_id, 'repost') await self.daemon.jsonrpc_file_delete(delete_all=True) self.assertEqual(0, len(await self.file_list())) + self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT) - await self.assertFileList() await proactive_downloader.ensure_download(channel_id, 1) - await self.assertFileList(content1) + await self.assertBlobs(content1) await proactive_downloader.ensure_download(channel_id, 2) - await self.assertFileList(content1, content2) + await self.assertBlobs(content1, content2) # ignores paid content await proactive_downloader.ensure_download(channel_id, 3) - await self.assertFileList(content1, content2) + await self.assertBlobs(content1, content2) # ignores reposts await proactive_downloader.ensure_download(channel_id, 4) - await self.assertFileList(content1, content2) + await self.assertBlobs(content1, content2) await self.daemon.jsonrpc_file_delete(delete_all=True) self.assertEqual(0, len(await self.file_list())) + await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True) + self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) + await proactive_downloader.stop() await self.daemon.jsonrpc_channel_subscribe(channel_id, 1) await proactive_downloader.start() await proactive_downloader.finished_iteration.wait() - await self.assertFileList(content1) + await self.assertBlobs(content1) await self.daemon.jsonrpc_file_delete(delete_all=True) await self.daemon.jsonrpc_channel_subscribe(channel_id, download_all=True) await proactive_downloader.stop() await proactive_downloader.start() await proactive_downloader.finished_iteration.wait() - await self.assertFileList(content1, content2) + await self.assertBlobs(content1, content2) + + self.assertEqual(0, len(await self.file_list())) self.assertEqual([(channel_id, 0, 1)], await self.daemon.jsonrpc_channel_subscription_list()) await self.daemon.jsonrpc_channel_unsubscribe(channel_id) From 1ce371c5b42de6a818db375b28458a976fcae2bb Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 18 Oct 2021 04:04:02 -0300 Subject: [PATCH 08/30] no api yet --- lbry/extras/daemon/daemon.py | 53 ------------------- lbry/extras/daemon/storage.py | 19 ------- lbry/stream/managed_stream.py | 4 -- .../datanetwork/test_file_commands.py | 19 ------- 4 files changed, 95 deletions(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index ff07db2193..89e56dd8fe 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -3030,59 +3030,6 @@ async def jsonrpc_channel_export(self, channel_id=None, channel_name=None, accou } return base58.b58encode(json.dumps(export, separators=(',', ':'))) - @requires(WALLET_COMPONENT) - def jsonrpc_channel_subscription_list(self): - """ - List subscribed channels and modes. - - Usage: - channel_subscription_list - - Returns: - (list) [(channel_id, download_latest, download_all)] - """ - return self.storage.get_subscriptions() - - @requires(WALLET_COMPONENT) - def jsonrpc_channel_subscribe(self, channel_id, download_latest=None, download_all=False): - """ - Subscribe to a channel and optionally start downloading streams proactively. - - Usage: - channel_subscribe ( | --channel_id=) [--download_latest=] - [--download_all] - - Options: - --channel_id= : (str) claim id of channel to subscribe. - --download_latest= : (int) amount of newest streams to ensure download. - --download_all : (bool) download all streams from the channel. - - Returns: - (bool) Subscription successful? (False only if channel doesn't exist) - """ - if download_all and download_latest is not None: - raise ConflictingInputValueError("download_latest", "download_all") - return self.storage.add_subscription(channel_id, download_latest, download_all) - - @requires(WALLET_COMPONENT) - def jsonrpc_channel_unsubscribe(self, channel_id): - """ - Subscribe to a channel and optionally start downloading streams proactively. - - Usage: - channel_subscribe ( | --channel_id=) [--download=] - - Options: - --channel_id= : (str) claim id of channel to subscribe - --download= : (str) which strategy to use for downloads: 'all' for everything. - 'latest-X' for the latest X streams. None (default) for nothing. - - Returns: - (bool) Subscription successful? (False only if channel doesn't exist) - """ - return self.storage.remove_subscription(channel_id) - - @requires(WALLET_COMPONENT) async def jsonrpc_channel_import(self, channel_data, wallet_id=None): """ diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index a9b83d4479..758c259700 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -235,12 +235,6 @@ class SQLiteStorage(SQLiteMixin): pragma foreign_keys=on; pragma journal_mode=WAL; - create table if not exists subscription ( - channel_id char(40) primary key not null, - download_latest integer not null default 0, - download_all integer not null default 0 - ); - create table if not exists blob ( blob_hash char(96) primary key not null, blob_length integer not null, @@ -545,19 +539,6 @@ def delete_stream(self, descriptor: 'StreamDescriptor'): async def delete_torrent(self, bt_infohash: str): return await self.db.run(delete_torrent, bt_infohash) - # # # # # # # # # subscriptions # # # # # # # # # - - def add_subscription(self, channel_id, download_latest=None, download_all=None): - return self.db.execute_fetchall( - "insert or replace into subscription(channel_id, download_latest, download_all) values (?, ?, ?)", - (channel_id, download_latest or 0, 1 if download_all else 0)) - - def remove_subscription(self, channel_id): - return self.db.execute_fetchall("delete from subscription where channel_id=?", (channel_id,)) - - def get_subscriptions(self): - return self.db.execute_fetchall("select channel_id, download_latest, download_all from subscription") - # # # # # # # # # file stuff # # # # # # # # # def save_downloaded_file(self, stream_hash: str, file_name: typing.Optional[str], diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index aee4bbc72a..2a85da66e4 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -246,10 +246,6 @@ def _write_decrypted_blob(output_path: str, data: bytes): handle.write(data) handle.flush() - async def save_blobs(self): - async for _ in self._aiter_read_stream(0, connection_id=self.STREAMING_ID): - pass - async def _save_file(self, output_path: str): log.info("save file for lbry://%s#%s (sd hash %s...) -> %s", self.claim_name, self.claim_id, self.sd_hash[:6], output_path) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 54b917f899..83490b90e0 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -616,22 +616,3 @@ async def test_ensure_download(self): self.assertEqual(0, len(await self.file_list())) await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True) self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) - - await proactive_downloader.stop() - await self.daemon.jsonrpc_channel_subscribe(channel_id, 1) - await proactive_downloader.start() - await proactive_downloader.finished_iteration.wait() - await self.assertBlobs(content1) - await self.daemon.jsonrpc_file_delete(delete_all=True) - - await self.daemon.jsonrpc_channel_subscribe(channel_id, download_all=True) - await proactive_downloader.stop() - await proactive_downloader.start() - await proactive_downloader.finished_iteration.wait() - await self.assertBlobs(content1, content2) - - self.assertEqual(0, len(await self.file_list())) - - self.assertEqual([(channel_id, 0, 1)], await self.daemon.jsonrpc_channel_subscription_list()) - await self.daemon.jsonrpc_channel_unsubscribe(channel_id) - self.assertEqual([], await self.daemon.jsonrpc_channel_subscription_list()) From 723228b886f90f9e930be958d9e3f0d3c0ab8ec4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 18 Oct 2021 04:17:51 -0300 Subject: [PATCH 09/30] handle case where something that isn't a sd blob gets hit --- lbry/extras/daemon/components.py | 6 ++++-- tests/integration/datanetwork/test_file_commands.py | 10 ++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 21b6fcd4b7..1f27a18128 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -434,8 +434,10 @@ async def download_blobs(self, sd_hash): node = None if self.component_manager.has_component(DHT_COMPONENT): node = self.component_manager.get_component(DHT_COMPONENT) - await downloader.start(node) - await downloader.load_descriptor() + try: + await downloader.start(node) + except ValueError: + return for blob_info in downloader.descriptor.blobs[:-1]: await downloader.download_stream_blob(blob_info) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 83490b90e0..6aebbeb85e 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -590,9 +590,9 @@ async def assertBlobs(self, *sd_hashes): async def test_ensure_download(self): unrelated_claim_id = self.get_claim_id(await self.stream_create('something_else', '0.01')) channel_id = self.get_claim_id(await self.channel_create('@cool')) - content1 = await self.stream_create('content1', '0.01', channel_id=channel_id) + content1 = await self.stream_create('content1', '0.01', channel_id=channel_id, data=bytes([0] * (2 << 23))) content1 = content1['outputs'][0]['value']['source']['sd_hash'] - content2 = await self.stream_create('content2', '0.01', channel_id=channel_id) + content2 = await self.stream_create('content2', '0.01', channel_id=channel_id, data=bytes([0] * (2 << 23))) content2 = content2['outputs'][0]['value']['source']['sd_hash'] await self.stream_create('paid', '0.01', channel_id=channel_id, fee_amount=42, fee_currency='USD') await self.stream_repost(unrelated_claim_id, 'repost') @@ -612,7 +612,13 @@ async def test_ensure_download(self): await proactive_downloader.ensure_download(channel_id, 4) await self.assertBlobs(content1, content2) + # tests that an attempt to download something that isn't a sd blob will download the single blob and stop + blobs = await self.daemon.storage.get_blobs_for_stream( + await self.daemon.storage.get_stream_hash_for_sd_hash(content1) + ) await self.daemon.jsonrpc_file_delete(delete_all=True) self.assertEqual(0, len(await self.file_list())) await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True) self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) + await proactive_downloader.download_blobs(blobs[0].blob_hash) + self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes) From cc1f00031f5954fb67d1bad4b0218358b5f44e86 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 18 Oct 2021 04:33:37 -0300 Subject: [PATCH 10/30] drop channel support, prepare to hook into DHT --- lbry/extras/daemon/components.py | 26 +----------- .../datanetwork/test_file_commands.py | 40 +++++++++---------- 2 files changed, 19 insertions(+), 47 deletions(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 1f27a18128..8162322e15 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -381,7 +381,7 @@ async def stop(self): class BackgroundDownloader(Component): component_name = BACKGROUND_DOWNLOADER_COMPONENT - depends_on = [DATABASE_COMPONENT, WALLET_COMPONENT] + depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT] def __init__(self, component_manager): super().__init__(component_manager) @@ -399,35 +399,11 @@ async def get_status(self): return self.status async def loop(self): - db: SQLiteStorage = self.component_manager.get_component(DATABASE_COMPONENT) while True: - for channel_id, download_latest, download_all in await db.get_subscriptions(): - amount = 1_000_000 if download_all else download_latest - if not amount: - continue - await self.ensure_download(channel_id, amount) self.finished_iteration.set() self.finished_iteration.clear() await asyncio.sleep(self.download_loop_delay_seconds) - async def ensure_download(self, channel_id, amount): - wallet = self.component_manager.get_component(WALLET_COMPONENT) - ledger = wallet.ledger - claims, _, _, _ = await ledger.claim_search( - ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height']) - offset = 0 - while claims and amount > 0: - for claim in claims: - offset += 1 - if not claim.script.source or claim.has_price: - continue - await self.download_blobs(claim.claim.stream.source.sd_hash) - amount -= 1 - if amount == 0: - break - claims, _, _, _ = await ledger.claim_search( - ledger.accounts, channel_id=channel_id, order_by=['release_time', '^height'], offset=offset) - async def download_blobs(self, sd_hash): blob_manager = self.component_manager.get_component(BLOB_COMPONENT) downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 6aebbeb85e..3d0ad61667 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -574,7 +574,7 @@ async def test_file_management(self): class TestProactiveDownloaderComponent(CommandTestCase): - async def assertBlobs(self, *sd_hashes): + async def assertBlobs(self, *sd_hashes, no_files=True): # checks that we have ony the finished blobs needed for the the referenced streams seen = set(sd_hashes) for sd_hash in sd_hashes: @@ -586,39 +586,35 @@ async def assertBlobs(self, *sd_hashes): self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified()) seen.update(blob.blob_hash for blob in blobs if blob.blob_hash) self.assertEqual(seen, self.daemon.blob_manager.completed_blob_hashes) + if no_files: + self.assertEqual(0, len(await self.file_list())) - async def test_ensure_download(self): - unrelated_claim_id = self.get_claim_id(await self.stream_create('something_else', '0.01')) - channel_id = self.get_claim_id(await self.channel_create('@cool')) - content1 = await self.stream_create('content1', '0.01', channel_id=channel_id, data=bytes([0] * (2 << 23))) - content1 = content1['outputs'][0]['value']['source']['sd_hash'] - content2 = await self.stream_create('content2', '0.01', channel_id=channel_id, data=bytes([0] * (2 << 23))) - content2 = content2['outputs'][0]['value']['source']['sd_hash'] - await self.stream_create('paid', '0.01', channel_id=channel_id, fee_amount=42, fee_currency='USD') - await self.stream_repost(unrelated_claim_id, 'repost') + async def clear(self): await self.daemon.jsonrpc_file_delete(delete_all=True) self.assertEqual(0, len(await self.file_list())) + await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True) self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) + async def test_ensure_download(self): + content1 = await self.stream_create('content1', '0.01', data=bytes([0] * (2 << 23))) + content1 = content1['outputs'][0]['value']['source']['sd_hash'] + content2 = await self.stream_create('content2', '0.01', data=bytes([0] * (2 << 23))) + content2 = content2['outputs'][0]['value']['source']['sd_hash'] + proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT) - await proactive_downloader.ensure_download(channel_id, 1) + await self.clear() + await proactive_downloader.download_blobs(content1) await self.assertBlobs(content1) - await proactive_downloader.ensure_download(channel_id, 2) - await self.assertBlobs(content1, content2) - # ignores paid content - await proactive_downloader.ensure_download(channel_id, 3) - await self.assertBlobs(content1, content2) - # ignores reposts - await proactive_downloader.ensure_download(channel_id, 4) + await proactive_downloader.download_blobs(content2) await self.assertBlobs(content1, content2) + await self.clear() + await proactive_downloader.download_blobs(content2) + await self.assertBlobs(content2) # tests that an attempt to download something that isn't a sd blob will download the single blob and stop blobs = await self.daemon.storage.get_blobs_for_stream( await self.daemon.storage.get_stream_hash_for_sd_hash(content1) ) - await self.daemon.jsonrpc_file_delete(delete_all=True) - self.assertEqual(0, len(await self.file_list())) - await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True) - self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) + await self.clear() await proactive_downloader.download_blobs(blobs[0].blob_hash) self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes) From 2dc5eca224912051eb51c54a18e4bb941313e5af Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 18 Oct 2021 04:59:55 -0300 Subject: [PATCH 11/30] download from DHT --- lbry/dht/protocol/data_store.py | 2 ++ lbry/dht/protocol/protocol.py | 1 + lbry/extras/daemon/components.py | 13 ++++++++++--- tests/integration/datanetwork/test_file_commands.py | 9 +++++++++ 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/lbry/dht/protocol/data_store.py b/lbry/dht/protocol/data_store.py index 3d937e84e2..b3fa6b946b 100644 --- a/lbry/dht/protocol/data_store.py +++ b/lbry/dht/protocol/data_store.py @@ -1,5 +1,6 @@ import asyncio import typing +from collections import deque from lbry.dht import constants if typing.TYPE_CHECKING: @@ -15,6 +16,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager') self.loop = loop self._peer_manager = peer_manager self.completed_blobs: typing.Set[str] = set() + self.requested_blobs: typing.Deque = deque(maxlen=10) def __len__(self): return self._data_store.__len__() diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 66165740b4..0717e5826c 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -95,6 +95,7 @@ def find_value(self, rpc_contact: 'KademliaPeer', key: bytes, page: int = 0): for peer in self.protocol.data_store.get_peers_for_blob(key) if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp() ] + self.protocol.data_store.requested_blobs.append(key.hex()) # if we don't have k storing peers to return and we have this hash locally, include our contact information if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs: peers.append(self.compact_address()) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 8162322e15..4f36925190 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -4,6 +4,8 @@ import logging import binascii import typing +from collections import deque + import base58 from aioupnp import __version__ as aioupnp_version @@ -385,21 +387,26 @@ class BackgroundDownloader(Component): def __init__(self, component_manager): super().__init__(component_manager) - self.status = {'pending': 0, 'ongoing': 0} self.task: typing.Optional[asyncio.Task] = None self.download_loop_delay_seconds = 60 self.finished_iteration = asyncio.Event() + self.requested_blobs = deque(maxlen=10) @property def component(self) -> 'BackgroundDownloader': return self async def get_status(self): - self.status['running'] = self.task is not None and not self.task.done() - return self.status + return {'running': self.task is not None and not self.task.done(), 'enqueued': len(self.requested_blobs)} async def loop(self): while True: + if self.component_manager.has_component(DHT_COMPONENT): + node = self.component_manager.get_component(DHT_COMPONENT) + self.requested_blobs = node.protocol.data_store.requested_blobs + if self.requested_blobs: + blob_hash = self.requested_blobs.pop() + await self.download_blobs(blob_hash) self.finished_iteration.set() self.finished_iteration.clear() await asyncio.sleep(self.download_loop_delay_seconds) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 3d0ad61667..5c2ee3e0bc 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -618,3 +618,12 @@ async def test_ensure_download(self): await self.clear() await proactive_downloader.download_blobs(blobs[0].blob_hash) self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes) + + # trigger from requested blobs + await self.clear() + await proactive_downloader.stop() + proactive_downloader.requested_blobs.append(content1) + finished = proactive_downloader.finished_iteration.wait() + await proactive_downloader.start() + await finished + await self.assertBlobs(content1) From afed30e90a41fdbacd30fe304f0cb0215f0558ea Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 19 Oct 2021 16:05:01 -0300 Subject: [PATCH 12/30] fix unit tests from component dependency chain changes --- tests/unit/components/test_component_manager.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/unit/components/test_component_manager.py b/tests/unit/components/test_component_manager.py index fa5ceafb48..7f901fbbb7 100644 --- a/tests/unit/components/test_component_manager.py +++ b/tests/unit/components/test_component_manager.py @@ -26,14 +26,12 @@ def setUp(self): components.WalletComponent ], [ + components.BackgroundDownloader, components.DiskSpaceComponent, components.FileManagerComponent, components.HashAnnouncerComponent, components.PeerProtocolServerComponent, components.WalletServerPaymentsComponent - ], - [ - components.BackgroundDownloader ] ] self.component_manager = ComponentManager(Config()) From aee611c65a0b2964e2cbc619197b72c17ee6fd7a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 19 Oct 2021 22:06:41 -0300 Subject: [PATCH 13/30] schedule the download task instead --- lbry/extras/daemon/components.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 4f36925190..44f128f5d8 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -391,6 +391,7 @@ def __init__(self, component_manager): self.download_loop_delay_seconds = 60 self.finished_iteration = asyncio.Event() self.requested_blobs = deque(maxlen=10) + self.ongoing_download: typing.Optional[asyncio.Task] = None @property def component(self) -> 'BackgroundDownloader': @@ -404,10 +405,10 @@ async def loop(self): if self.component_manager.has_component(DHT_COMPONENT): node = self.component_manager.get_component(DHT_COMPONENT) self.requested_blobs = node.protocol.data_store.requested_blobs - if self.requested_blobs: + if self.requested_blobs and (not self.ongoing_download or self.ongoing_download.done()): blob_hash = self.requested_blobs.pop() - await self.download_blobs(blob_hash) - self.finished_iteration.set() + self.ongoing_download = asyncio.create_task(self.download_blobs(blob_hash)) + self.ongoing_download.add_done_callback(lambda _: self.finished_iteration.set()) self.finished_iteration.clear() await asyncio.sleep(self.download_loop_delay_seconds) @@ -428,6 +429,8 @@ async def start(self): self.task = asyncio.create_task(self.loop()) async def stop(self): + if self.ongoing_download and not self.ongoing_download.done(): + self.ongoing_download.cancel() self.task.cancel() From c5c5e4c9c17c8b88955a2a43300f6de6abd681f1 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 20 Oct 2021 01:27:36 -0300 Subject: [PATCH 14/30] add conf for network seeding space limit --- lbry/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lbry/conf.py b/lbry/conf.py index 6a099afb31..7dca19c5d7 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -634,6 +634,7 @@ class Config(CLIConfig): # blob announcement and download save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True) + network_storage_limit = Integer("Disk space in MB to be allocated for helping the P2P network. 0 = disable", 0) blob_storage_limit = Integer("Disk space in MB to be allocated for blob storage. 0 = no limit", 0) blob_lru_cache_size = Integer( "LRU cache size for decrypted downloaded blobs used to minimize re-downloading the same blobs when " From ebd307acc81a3ecc7a6eb8d3c1831865d4ad323b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 20 Oct 2021 02:18:34 -0300 Subject: [PATCH 15/30] don't save streams for network blobs and bypass disk space manager --- lbry/extras/daemon/components.py | 2 +- lbry/extras/daemon/storage.py | 21 +++++++--- lbry/stream/downloader.py | 4 +- .../datanetwork/test_file_commands.py | 39 ++++++++++++++----- 4 files changed, 48 insertions(+), 18 deletions(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 44f128f5d8..5371e74b52 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -419,7 +419,7 @@ async def download_blobs(self, sd_hash): if self.component_manager.has_component(DHT_COMPONENT): node = self.component_manager.get_component(DHT_COMPONENT) try: - await downloader.start(node) + await downloader.start(node, save_stream=False) except ValueError: return for blob_info in downloader.descriptor.blobs[:-1]: diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 758c259700..832dab1c82 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -437,19 +437,28 @@ def delete_blobs(transaction): def get_all_blob_hashes(self): return self.run_and_return_list("select blob_hash from blob") - async def get_stored_blobs(self, is_mine: bool): + async def get_stored_blobs(self, is_mine: bool, orphans=False): is_mine = 1 if is_mine else 0 - return await self.db.execute_fetchall( - "select blob_hash, blob_length, added_on from blob where is_mine=? order by added_on asc", + sd_blobs = await self.db.execute_fetchall( + "select blob.blob_hash, blob.blob_length, blob.added_on " + "from blob join stream on blob.blob_hash=stream.sd_hash join file using (stream_hash) " + "where blob.is_mine=? order by blob.added_on asc", (is_mine,) ) + normal_blobs = await self.db.execute_fetchall( + "select blob.blob_hash, blob.blob_length, blob.added_on " + "from blob join stream_blob using (blob_hash) cross join stream using (stream_hash)" + "cross join file using (stream_hash) where blob.is_mine=? order by blob.added_on asc", + (is_mine,) + ) + return normal_blobs + sd_blobs async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None): if is_mine is None: - sql, args = "select coalesce(sum(blob_length), 0) from blob", () + sql = "select coalesce(sum(blob_length), 0) from blob join stream_blob using (blob_hash)" else: - is_mine = 1 if is_mine else 0 - sql, args = "select coalesce(sum(blob_length), 0) from blob where is_mine=?", (is_mine,) + sql = "select coalesce(sum(blob_length), 0) from blob join stream_blob using (blob_hash) where is_mine=?" + args = (1 if is_mine else 0,) if is_mine is not None else () return (await self.db.execute_fetchone(sql, args))[0] async def update_blob_ownership(self, sd_hash, is_mine: bool): diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index 83ff67fef6..12c63fb96d 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -83,7 +83,7 @@ async def load_descriptor(self, connection_id: int = 0): ) log.info("loaded stream manifest %s", self.sd_hash) - async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0): + async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0, save_stream=True): # set up peer accumulation self.node = node or self.node # fixme: this shouldnt be set here! if self.node: @@ -102,7 +102,7 @@ async def start(self, node: typing.Optional['Node'] = None, connection_id: int = self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash) log.info("added head blob to peer search for stream %s", self.sd_hash) - if not await self.blob_manager.storage.stream_exists(self.sd_hash): + if not await self.blob_manager.storage.stream_exists(self.sd_hash) and save_stream: await self.blob_manager.storage.store_stream( self.blob_manager.get_blob(self.sd_hash, length=self.descriptor.length), self.descriptor ) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 5c2ee3e0bc..c4766983cf 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -4,6 +4,7 @@ from binascii import hexlify from lbry.schema import Claim +from lbry.stream.descriptor import StreamDescriptor from lbry.testcase import CommandTestCase from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT from lbry.wallet import Transaction @@ -574,19 +575,24 @@ async def test_file_management(self): class TestProactiveDownloaderComponent(CommandTestCase): + async def get_blobs_from_sd_blob(self, sd_blob): + descriptor = await StreamDescriptor.from_stream_descriptor_blob( + asyncio.get_running_loop(), self.daemon.blob_manager.blob_dir, sd_blob + ) + return descriptor.blobs + async def assertBlobs(self, *sd_hashes, no_files=True): # checks that we have ony the finished blobs needed for the the referenced streams seen = set(sd_hashes) for sd_hash in sd_hashes: - self.assertTrue(self.daemon.blob_manager.get_blob(sd_hash).get_is_verified()) - blobs = await self.daemon.storage.get_blobs_for_stream( - await self.daemon.storage.get_stream_hash_for_sd_hash(sd_hash) - ) + sd_blob = self.daemon.blob_manager.get_blob(sd_hash) + self.assertTrue(sd_blob.get_is_verified()) + blobs = await self.get_blobs_from_sd_blob(sd_blob) for blob in blobs[:-1]: self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified()) seen.update(blob.blob_hash for blob in blobs if blob.blob_hash) - self.assertEqual(seen, self.daemon.blob_manager.completed_blob_hashes) if no_files: + self.assertEqual(seen, self.daemon.blob_manager.completed_blob_hashes) self.assertEqual(0, len(await self.file_list())) async def clear(self): @@ -596,25 +602,28 @@ async def clear(self): self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) async def test_ensure_download(self): - content1 = await self.stream_create('content1', '0.01', data=bytes([0] * (2 << 23))) + content1 = await self.stream_create('content1', '0.01', data=bytes([0] * (2 << 24))) content1 = content1['outputs'][0]['value']['source']['sd_hash'] content2 = await self.stream_create('content2', '0.01', data=bytes([0] * (2 << 23))) content2 = content2['outputs'][0]['value']['source']['sd_hash'] + self.assertEqual('48', (await self.status())['disk_space']['space_used']) proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT) await self.clear() + self.assertEqual('0', (await self.status())['disk_space']['space_used']) await proactive_downloader.download_blobs(content1) await self.assertBlobs(content1) + self.assertEqual('0', (await self.status())['disk_space']['space_used']) await proactive_downloader.download_blobs(content2) await self.assertBlobs(content1, content2) + self.assertEqual('0', (await self.status())['disk_space']['space_used']) await self.clear() await proactive_downloader.download_blobs(content2) await self.assertBlobs(content2) + self.assertEqual('0', (await self.status())['disk_space']['space_used']) # tests that an attempt to download something that isn't a sd blob will download the single blob and stop - blobs = await self.daemon.storage.get_blobs_for_stream( - await self.daemon.storage.get_stream_hash_for_sd_hash(content1) - ) + blobs = await self.get_blobs_from_sd_blob(self.reflector.blob_manager.get_blob(content1)) await self.clear() await proactive_downloader.download_blobs(blobs[0].blob_hash) self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes) @@ -627,3 +636,15 @@ async def test_ensure_download(self): await proactive_downloader.start() await finished await self.assertBlobs(content1) + await self.clear() + # test that disk space manager doesn't delete orphan network blobs + await proactive_downloader.download_blobs(content1) + await self.daemon.storage.db.execute_fetchall("update blob set added_on=0") # so it is preferred for cleaning + await self.daemon.jsonrpc_get("content2", save_file=False) + while (await self.file_list())[0]['status'] == 'running': + await asyncio.sleep(0.5) + await self.assertBlobs(content1, no_files=False) + + self.daemon.conf.blob_storage_limit = 1 + await self.blob_clean() + await self.assertBlobs(content1, no_files=False) From 75db8a92fde61cc6c33e448b535f3a644ddd3ec2 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 22 Oct 2021 00:32:04 -0300 Subject: [PATCH 16/30] separated network seeding space metrics --- lbry/blob/disk_space_manager.py | 8 ++++---- lbry/extras/daemon/components.py | 3 ++- lbry/extras/daemon/storage.py | 11 +++++++---- tests/integration/datanetwork/test_file_commands.py | 8 ++++++-- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/lbry/blob/disk_space_manager.py b/lbry/blob/disk_space_manager.py index 96053eca0d..a0fb501ea2 100644 --- a/lbry/blob/disk_space_manager.py +++ b/lbry/blob/disk_space_manager.py @@ -15,11 +15,11 @@ def __init__(self, config, db, blob_manager, cleaning_interval=30 * 60, analytic self.task = None self.analytics = analytics - async def get_space_used_bytes(self): - return await self.db.get_stored_blob_disk_usage() + async def get_space_used_bytes(self, is_network_blob=False): + return await self.db.get_stored_blob_disk_usage(is_orphan_blob=is_network_blob) - async def get_space_used_mb(self): - return int(await self.get_space_used_bytes()/1024.0/1024.0) + async def get_space_used_mb(self, is_network_blob=False): + return int(await self.get_space_used_bytes(is_network_blob)/1024.0/1024.0) async def clean(self): space_used_bytes = await self.get_space_used_bytes() diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 5371e74b52..ba8a351219 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -450,9 +450,10 @@ async def get_status(self): if self.disk_space_manager: return { 'space_used': str(await self.disk_space_manager.get_space_used_mb()), + 'network_seeding_space_used': str(await self.disk_space_manager.get_space_used_mb(True)), 'running': self.disk_space_manager.running, } - return {'space_used': '0', 'running': False} + return {'space_used': '0', 'network_seeding_space_used': '0', 'running': False} async def start(self): db = self.component_manager.get_component(DATABASE_COMPONENT) diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 832dab1c82..43ea9459b7 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -453,11 +453,14 @@ async def get_stored_blobs(self, is_mine: bool, orphans=False): ) return normal_blobs + sd_blobs - async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None): - if is_mine is None: - sql = "select coalesce(sum(blob_length), 0) from blob join stream_blob using (blob_hash)" + async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None, is_orphan_blob: bool = False): + sql = "select coalesce(sum(blob_length), 0) " + if is_orphan_blob: + sql += "from blob left join stream_blob using (blob_hash) where stream_blob.stream_hash is null" else: - sql = "select coalesce(sum(blob_length), 0) from blob join stream_blob using (blob_hash) where is_mine=?" + sql += "from blob join stream_blob using (blob_hash)" + if is_mine is not None: + sql += f'{(" and " if is_orphan_blob else " where ")} is_mine=?' args = (1 if is_mine else 0,) if is_mine is not None else () return (await self.db.execute_fetchone(sql, args))[0] diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index c4766983cf..2f06254ac0 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -602,25 +602,29 @@ async def clear(self): self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) async def test_ensure_download(self): - content1 = await self.stream_create('content1', '0.01', data=bytes([0] * (2 << 24))) + content1 = await self.stream_create('content1', '0.01', data=bytes([0] * 32 * 1024 * 1024)) content1 = content1['outputs'][0]['value']['source']['sd_hash'] - content2 = await self.stream_create('content2', '0.01', data=bytes([0] * (2 << 23))) + content2 = await self.stream_create('content2', '0.01', data=bytes([0] * 16 * 1024 * 1024)) content2 = content2['outputs'][0]['value']['source']['sd_hash'] self.assertEqual('48', (await self.status())['disk_space']['space_used']) proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT) await self.clear() self.assertEqual('0', (await self.status())['disk_space']['space_used']) + self.assertEqual('0', (await self.status())['disk_space']['network_seeding_space_used']) await proactive_downloader.download_blobs(content1) await self.assertBlobs(content1) self.assertEqual('0', (await self.status())['disk_space']['space_used']) + self.assertEqual('32', (await self.status())['disk_space']['network_seeding_space_used']) await proactive_downloader.download_blobs(content2) await self.assertBlobs(content1, content2) self.assertEqual('0', (await self.status())['disk_space']['space_used']) + self.assertEqual('48', (await self.status())['disk_space']['network_seeding_space_used']) await self.clear() await proactive_downloader.download_blobs(content2) await self.assertBlobs(content2) self.assertEqual('0', (await self.status())['disk_space']['space_used']) + self.assertEqual('16', (await self.status())['disk_space']['network_seeding_space_used']) # tests that an attempt to download something that isn't a sd blob will download the single blob and stop blobs = await self.get_blobs_from_sd_blob(self.reflector.blob_manager.get_blob(content1)) From 03310434723db6747467f5597f57ebded7c6fe3d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 22 Oct 2021 02:43:40 -0300 Subject: [PATCH 17/30] cleanup background downloader blobs from conf --- lbry/blob/disk_space_manager.py | 14 +++++++++++--- lbry/extras/daemon/components.py | 8 +++++++- lbry/extras/daemon/storage.py | 8 ++++++++ .../integration/datanetwork/test_file_commands.py | 12 ++++++++++++ tests/unit/components/test_component_manager.py | 4 +++- 5 files changed, 41 insertions(+), 5 deletions(-) diff --git a/lbry/blob/disk_space_manager.py b/lbry/blob/disk_space_manager.py index a0fb501ea2..3ce813064b 100644 --- a/lbry/blob/disk_space_manager.py +++ b/lbry/blob/disk_space_manager.py @@ -22,9 +22,17 @@ async def get_space_used_mb(self, is_network_blob=False): return int(await self.get_space_used_bytes(is_network_blob)/1024.0/1024.0) async def clean(self): - space_used_bytes = await self.get_space_used_bytes() - storage_limit = self.config.blob_storage_limit*1024*1024 if self.config.blob_storage_limit else None + await self._clean(False) + await self._clean(True) + + async def _clean(self, from_network_storage=False): + space_used_bytes = await self.get_space_used_bytes(from_network_storage) + if from_network_storage: + storage_limit = self.config.network_storage_limit*1024*1024 if self.config.network_storage_limit else None + else: + storage_limit = self.config.blob_storage_limit*1024*1024 if self.config.blob_storage_limit else None if self.analytics: + # todo: add metrics for network case asyncio.create_task(self.analytics.send_disk_space_used(space_used_bytes, storage_limit)) if not storage_limit: return 0 @@ -32,7 +40,7 @@ async def clean(self): available = storage_limit - space_used_bytes if available > 0: return 0 - for blob_hash, file_size, _ in await self.db.get_stored_blobs(is_mine=False): + for blob_hash, file_size, _ in await self.db.get_stored_blobs(is_mine=False, orphans=from_network_storage): delete.append(blob_hash) available += file_size if available > 0: diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index ba8a351219..2b07c4a769 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -383,7 +383,7 @@ async def stop(self): class BackgroundDownloader(Component): component_name = BACKGROUND_DOWNLOADER_COMPONENT - depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT] + depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT] def __init__(self, component_manager): super().__init__(component_manager) @@ -413,6 +413,12 @@ async def loop(self): await asyncio.sleep(self.download_loop_delay_seconds) async def download_blobs(self, sd_hash): + if self.conf.network_storage_limit <= 0: + return + space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT) + if (await space_manager.get_space_used_mb(True)) >= self.conf.network_storage_limit: + log.info("Allocated space for proactive downloader is full. Background download aborted.") + return blob_manager = self.component_manager.get_component(BLOB_COMPONENT) downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash) node = None diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 43ea9459b7..ea8cf5fccc 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -439,6 +439,14 @@ def get_all_blob_hashes(self): async def get_stored_blobs(self, is_mine: bool, orphans=False): is_mine = 1 if is_mine else 0 + if orphans: + return await self.db.execute_fetchall( + "select blob.blob_hash, blob.blob_length, blob.added_on " + "from blob left join stream_blob using (blob_hash) " + "where stream_blob.stream_hash is null and blob.is_mine=? order by blob.added_on asc", + (is_mine,) + ) + sd_blobs = await self.db.execute_fetchall( "select blob.blob_hash, blob.blob_length, blob.added_on " "from blob join stream on blob.blob_hash=stream.sd_hash join file using (stream_hash) " diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 2f06254ac0..4ff9560472 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -609,6 +609,7 @@ async def test_ensure_download(self): self.assertEqual('48', (await self.status())['disk_space']['space_used']) proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT) + self.daemon.conf.network_storage_limit = 100 await self.clear() self.assertEqual('0', (await self.status())['disk_space']['space_used']) self.assertEqual('0', (await self.status())['disk_space']['network_seeding_space_used']) @@ -652,3 +653,14 @@ async def test_ensure_download(self): self.daemon.conf.blob_storage_limit = 1 await self.blob_clean() await self.assertBlobs(content1, no_files=False) + + # downloading above limit triggers cleanup + self.daemon.conf.network_storage_limit = 6 + with self.assertLogs() as log: + await proactive_downloader.download_blobs(content2) + self.assertIn('Allocated space for proactive downloader is full.', log.output[0]) + await self.assertBlobs(content1, no_files=False) + self.assertEqual('32', (await self.status())['disk_space']['network_seeding_space_used']) + await self.blob_clean() + self.assertLessEqual(int((await self.status())['disk_space']['network_seeding_space_used']), + self.daemon.conf.network_storage_limit) diff --git a/tests/unit/components/test_component_manager.py b/tests/unit/components/test_component_manager.py index 7f901fbbb7..7e03f59083 100644 --- a/tests/unit/components/test_component_manager.py +++ b/tests/unit/components/test_component_manager.py @@ -26,12 +26,14 @@ def setUp(self): components.WalletComponent ], [ - components.BackgroundDownloader, components.DiskSpaceComponent, components.FileManagerComponent, components.HashAnnouncerComponent, components.PeerProtocolServerComponent, components.WalletServerPaymentsComponent + ], + [ + components.BackgroundDownloader, ] ] self.component_manager = ComponentManager(Config()) From 769c138d48d0ae323c39549d078dd5f297412647 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 22 Oct 2021 02:55:59 -0300 Subject: [PATCH 18/30] announce orphan blobs manually, as that was done when save stream --- lbry/extras/daemon/components.py | 2 ++ lbry/extras/daemon/storage.py | 5 +++++ tests/integration/datanetwork/test_file_commands.py | 3 +++ 3 files changed, 10 insertions(+) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 2b07c4a769..e919e85195 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -421,6 +421,7 @@ async def download_blobs(self, sd_hash): return blob_manager = self.component_manager.get_component(BLOB_COMPONENT) downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash) + storage = blob_manager.storage node = None if self.component_manager.has_component(DHT_COMPONENT): node = self.component_manager.get_component(DHT_COMPONENT) @@ -430,6 +431,7 @@ async def download_blobs(self, sd_hash): return for blob_info in downloader.descriptor.blobs[:-1]: await downloader.download_stream_blob(blob_info) + await storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash) async def start(self): self.task = asyncio.create_task(self.loop()) diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index ea8cf5fccc..a0aa29adc3 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -380,6 +380,11 @@ def get_blob_status(self, blob_hash: str): "select status from blob where blob_hash=?", blob_hash ) + def set_announce(self, *blob_hashes): + return self.db.execute_fetchall( + "update blob set should_announce=1 where blob_hash in (?, ?)", blob_hashes + ) + def update_last_announced_blobs(self, blob_hashes: typing.List[str]): def _update_last_announced_blobs(transaction: sqlite3.Connection): last_announced = self.time_getter() diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 4ff9560472..b7d4a05adf 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -584,10 +584,13 @@ async def get_blobs_from_sd_blob(self, sd_blob): async def assertBlobs(self, *sd_hashes, no_files=True): # checks that we have ony the finished blobs needed for the the referenced streams seen = set(sd_hashes) + to_announce = await self.daemon.storage.get_blobs_to_announce() for sd_hash in sd_hashes: + self.assertIn(sd_hash, to_announce) sd_blob = self.daemon.blob_manager.get_blob(sd_hash) self.assertTrue(sd_blob.get_is_verified()) blobs = await self.get_blobs_from_sd_blob(sd_blob) + self.assertIn(blobs[0].blob_hash, to_announce) for blob in blobs[:-1]: self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified()) seen.update(blob.blob_hash for blob in blobs if blob.blob_hash) From e9a25e32861d2c96ba654ad7b1743ccc5230e5f2 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 22 Oct 2021 03:00:14 -0300 Subject: [PATCH 19/30] add analytics event for network disk space --- lbry/blob/disk_space_manager.py | 5 +++-- lbry/extras/daemon/analytics.py | 3 ++- lbry/wallet/__init__.pyc | Bin 1268 -> 0 bytes 3 files changed, 5 insertions(+), 3 deletions(-) delete mode 100644 lbry/wallet/__init__.pyc diff --git a/lbry/blob/disk_space_manager.py b/lbry/blob/disk_space_manager.py index 3ce813064b..1156b3e169 100644 --- a/lbry/blob/disk_space_manager.py +++ b/lbry/blob/disk_space_manager.py @@ -32,8 +32,9 @@ async def _clean(self, from_network_storage=False): else: storage_limit = self.config.blob_storage_limit*1024*1024 if self.config.blob_storage_limit else None if self.analytics: - # todo: add metrics for network case - asyncio.create_task(self.analytics.send_disk_space_used(space_used_bytes, storage_limit)) + asyncio.create_task( + self.analytics.send_disk_space_used(space_used_bytes, storage_limit, from_network_storage) + ) if not storage_limit: return 0 delete = [] diff --git a/lbry/extras/daemon/analytics.py b/lbry/extras/daemon/analytics.py index 868b14789c..3db911bb7a 100644 --- a/lbry/extras/daemon/analytics.py +++ b/lbry/extras/daemon/analytics.py @@ -170,11 +170,12 @@ async def send_upnp_setup_success_fail(self, success, status): }) ) - async def send_disk_space_used(self, storage_used, storage_limit): + async def send_disk_space_used(self, storage_used, storage_limit, is_from_network_quota): await self.track( self._event(DISK_SPACE, { 'used': storage_used, 'limit': storage_limit, + 'from_network_quota': is_from_network_quota }) ) diff --git a/lbry/wallet/__init__.pyc b/lbry/wallet/__init__.pyc deleted file mode 100644 index d4854844fe6b60a10a9b0972d81a6424f412ff41..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1268 zcmZ8g+in{-5IyomzRI`wQahJAP9OB*RRaVGPy|JrIDmsVPC^MN%9q7@$x1}XU4cv4 zaeuOZ(I3cjfzFUh>_o)Qa?ad#&WvV1cb5PC=fAwf^fyEA-{@)30u&%gwjf(jGmtY- zvyih;bC7dT^H6QrdAK%Y8*Tw|0d5g;5pD@`2`%CFfYp_Rfk02kBzK(u(gO~`5BkUm9O)j#sc0CR8*BC6>%X+uRY|})nO>d06o0-SUCw$d z%iE%&BtF3lWINa4s`ejm3ZH334l!iPr$OzaC=mmg2H8IHoMOI?MOh%^6Q zmi43xVU6!fdEGSA>S2kXzNv;*ztPuWpR`k>ec{WIDGPg|gZ9o;hDa&9vt$iCuz@eC zrYM7{tH{v1Nf=K;m?zWgif&!r`mjnOS^A|n;}AomJ?Y7d^YuTRnV{dAPo|37aZ-fh zf<}A8&8aSw_RV?sC(g|5=6O{sovT8-y2|rdZHmzzeG6Qe?>r{nk_S&T&ztdOS_49O z+-Wrgw;;GMwG&aGm5vbDG1RG-l8^~mdmRWG0(Z5K;5KNEYPY^s7 z!97KW!VS5C$0{~iY_XYZHjztlhrw+IxsI9=CUzL+y^Ox2$eP$Ay-Uw6(J;pUr(SBe qm*+I)kmuR>I%a(CYBjO?S6 Date: Sun, 24 Oct 2021 17:25:27 -0300 Subject: [PATCH 20/30] extract background downloader to its own class --- lbry/stream/background_downloader.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 lbry/stream/background_downloader.py diff --git a/lbry/stream/background_downloader.py b/lbry/stream/background_downloader.py new file mode 100644 index 0000000000..d2182831f1 --- /dev/null +++ b/lbry/stream/background_downloader.py @@ -0,0 +1,21 @@ +import asyncio + +from lbry.stream.downloader import StreamDownloader + + +class BackgroundDownloader: + def __init__(self, conf, storage, blob_manager, dht_node): + self.storage = storage + self.blob_manager = blob_manager + self.node = dht_node + self.conf = conf + + async def download_blobs(self, sd_hash): + downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, self.blob_manager, sd_hash) + try: + await downloader.start(self.node, save_stream=False) + except ValueError: + return + for blob_info in downloader.descriptor.blobs[:-1]: + await downloader.download_stream_blob(blob_info) + await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash) \ No newline at end of file From 2c8cc2dec70af1e2c6062b21bfc0bac4c1ae8c16 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sun, 24 Oct 2021 18:19:06 -0300 Subject: [PATCH 21/30] move more logic out of the downloader component --- lbry/blob/disk_space_manager.py | 20 ++++--- lbry/dht/node.py | 5 ++ lbry/extras/daemon/components.py | 60 ++++++++----------- lbry/extras/daemon/storage.py | 10 ++-- lbry/stream/background_downloader.py | 4 +- .../datanetwork/test_file_commands.py | 26 +------- .../unit/components/test_component_manager.py | 2 +- 7 files changed, 51 insertions(+), 76 deletions(-) diff --git a/lbry/blob/disk_space_manager.py b/lbry/blob/disk_space_manager.py index 1156b3e169..7c48e5e75c 100644 --- a/lbry/blob/disk_space_manager.py +++ b/lbry/blob/disk_space_manager.py @@ -15,8 +15,12 @@ def __init__(self, config, db, blob_manager, cleaning_interval=30 * 60, analytic self.task = None self.analytics = analytics + async def get_free_space_bytes(self, is_network_blob=False): + limit_mb = self.config.network_storage_limit if is_network_blob else self.config.blob_storage_limit + return max(0, limit_mb*1024*1024 - (await self.get_space_used_mb(is_network_blob))) + async def get_space_used_bytes(self, is_network_blob=False): - return await self.db.get_stored_blob_disk_usage(is_orphan_blob=is_network_blob) + return await self.db.get_stored_blob_disk_usage(is_network_blob=is_network_blob) async def get_space_used_mb(self, is_network_blob=False): return int(await self.get_space_used_bytes(is_network_blob)/1024.0/1024.0) @@ -25,15 +29,13 @@ async def clean(self): await self._clean(False) await self._clean(True) - async def _clean(self, from_network_storage=False): - space_used_bytes = await self.get_space_used_bytes(from_network_storage) - if from_network_storage: - storage_limit = self.config.network_storage_limit*1024*1024 if self.config.network_storage_limit else None - else: - storage_limit = self.config.blob_storage_limit*1024*1024 if self.config.blob_storage_limit else None + async def _clean(self, is_network_blob=False): + space_used_bytes = await self.get_space_used_bytes(is_network_blob) + storage_limit_mb = self.config.network_storage_limit if is_network_blob else self.config.blob_storage_limit + storage_limit = storage_limit_mb*1024*1024 if storage_limit_mb else None if self.analytics: asyncio.create_task( - self.analytics.send_disk_space_used(space_used_bytes, storage_limit, from_network_storage) + self.analytics.send_disk_space_used(space_used_bytes, storage_limit, is_network_blob) ) if not storage_limit: return 0 @@ -41,7 +43,7 @@ async def _clean(self, from_network_storage=False): available = storage_limit - space_used_bytes if available > 0: return 0 - for blob_hash, file_size, _ in await self.db.get_stored_blobs(is_mine=False, orphans=from_network_storage): + for blob_hash, file_size, _ in await self.db.get_stored_blobs(is_mine=False, is_network_blob=is_network_blob): delete.append(blob_hash) available += file_size if available > 0: diff --git a/lbry/dht/node.py b/lbry/dht/node.py index de278a5f57..4bfb3d478f 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -31,6 +31,11 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', self._refresh_task: asyncio.Task = None self._storage = storage + @property + def last_requested_blob_hash(self): + if len(self.protocol.data_store.requested_blobs) > 0: + return self.protocol.data_store.requested_blobs[-1] + async def refresh_node(self, force_once=False): while True: # remove peers with expired blob announcements from the datastore diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index e919e85195..17ef80695f 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -4,7 +4,6 @@ import logging import binascii import typing -from collections import deque import base58 @@ -19,7 +18,7 @@ from lbry.blob.blob_manager import BlobManager from lbry.blob.disk_space_manager import DiskSpaceManager from lbry.blob_exchange.server import BlobServer -from lbry.stream.downloader import StreamDownloader +from lbry.stream.background_downloader import BackgroundDownloader from lbry.stream.stream_manager import StreamManager from lbry.file.file_manager import FileManager from lbry.extras.daemon.component import Component @@ -381,7 +380,7 @@ async def stop(self): self.file_manager.stop() -class BackgroundDownloader(Component): +class BackgroundDownloaderComponent(Component): component_name = BACKGROUND_DOWNLOADER_COMPONENT depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT] @@ -389,57 +388,46 @@ def __init__(self, component_manager): super().__init__(component_manager) self.task: typing.Optional[asyncio.Task] = None self.download_loop_delay_seconds = 60 - self.finished_iteration = asyncio.Event() - self.requested_blobs = deque(maxlen=10) self.ongoing_download: typing.Optional[asyncio.Task] = None + self.space_manager: typing.Optional[DiskSpaceManager] = None + self.background_downloader: typing.Optional[BackgroundDownloader] = None + self.dht_node: typing.Optional[Node] = None + + @property + def is_busy(self): + return bool(self.ongoing_download and not self.ongoing_download.done()) @property - def component(self) -> 'BackgroundDownloader': + def component(self) -> 'BackgroundDownloaderComponent': return self async def get_status(self): - return {'running': self.task is not None and not self.task.done(), 'enqueued': len(self.requested_blobs)} + return {'running': self.task is not None and not self.task.done(), + 'ongoing_download': self.is_busy} async def loop(self): while True: - if self.component_manager.has_component(DHT_COMPONENT): - node = self.component_manager.get_component(DHT_COMPONENT) - self.requested_blobs = node.protocol.data_store.requested_blobs - if self.requested_blobs and (not self.ongoing_download or self.ongoing_download.done()): - blob_hash = self.requested_blobs.pop() - self.ongoing_download = asyncio.create_task(self.download_blobs(blob_hash)) - self.ongoing_download.add_done_callback(lambda _: self.finished_iteration.set()) - self.finished_iteration.clear() + if not self.is_busy and await self.space_manager.get_free_space_bytes(True) > 0: + blob_hash = self.dht_node.last_requested_blob_hash + if blob_hash: + self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash)) await asyncio.sleep(self.download_loop_delay_seconds) - async def download_blobs(self, sd_hash): - if self.conf.network_storage_limit <= 0: - return - space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT) - if (await space_manager.get_space_used_mb(True)) >= self.conf.network_storage_limit: - log.info("Allocated space for proactive downloader is full. Background download aborted.") + async def start(self): + self.space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT) + if not self.component_manager.has_component(DHT_COMPONENT): return + self.dht_node = self.component_manager.get_component(DHT_COMPONENT) blob_manager = self.component_manager.get_component(BLOB_COMPONENT) - downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, blob_manager, sd_hash) - storage = blob_manager.storage - node = None - if self.component_manager.has_component(DHT_COMPONENT): - node = self.component_manager.get_component(DHT_COMPONENT) - try: - await downloader.start(node, save_stream=False) - except ValueError: - return - for blob_info in downloader.descriptor.blobs[:-1]: - await downloader.download_stream_blob(blob_info) - await storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash) - - async def start(self): + storage = self.component_manager.get_component(DATABASE_COMPONENT) + self.background_downloader = BackgroundDownloader(self.conf, storage, blob_manager, self.dht_node) self.task = asyncio.create_task(self.loop()) async def stop(self): if self.ongoing_download and not self.ongoing_download.done(): self.ongoing_download.cancel() - self.task.cancel() + if self.task: + self.task.cancel() class DiskSpaceComponent(Component): diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index a0aa29adc3..054bb109d2 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -442,9 +442,9 @@ def delete_blobs(transaction): def get_all_blob_hashes(self): return self.run_and_return_list("select blob_hash from blob") - async def get_stored_blobs(self, is_mine: bool, orphans=False): + async def get_stored_blobs(self, is_mine: bool, is_network_blob=False): is_mine = 1 if is_mine else 0 - if orphans: + if is_network_blob: return await self.db.execute_fetchall( "select blob.blob_hash, blob.blob_length, blob.added_on " "from blob left join stream_blob using (blob_hash) " @@ -466,14 +466,14 @@ async def get_stored_blobs(self, is_mine: bool, orphans=False): ) return normal_blobs + sd_blobs - async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None, is_orphan_blob: bool = False): + async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None, is_network_blob: bool = False): sql = "select coalesce(sum(blob_length), 0) " - if is_orphan_blob: + if is_network_blob: sql += "from blob left join stream_blob using (blob_hash) where stream_blob.stream_hash is null" else: sql += "from blob join stream_blob using (blob_hash)" if is_mine is not None: - sql += f'{(" and " if is_orphan_blob else " where ")} is_mine=?' + sql += f'{(" and " if is_network_blob else " where ")} is_mine=?' args = (1 if is_mine else 0,) if is_mine is not None else () return (await self.db.execute_fetchone(sql, args))[0] diff --git a/lbry/stream/background_downloader.py b/lbry/stream/background_downloader.py index d2182831f1..d02a7c9c0c 100644 --- a/lbry/stream/background_downloader.py +++ b/lbry/stream/background_downloader.py @@ -4,7 +4,7 @@ class BackgroundDownloader: - def __init__(self, conf, storage, blob_manager, dht_node): + def __init__(self, conf, storage, blob_manager, dht_node=None): self.storage = storage self.blob_manager = blob_manager self.node = dht_node @@ -18,4 +18,4 @@ async def download_blobs(self, sd_hash): return for blob_info in downloader.descriptor.blobs[:-1]: await downloader.download_stream_blob(blob_info) - await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash) \ No newline at end of file + await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index b7d4a05adf..03572dcec8 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -4,6 +4,7 @@ from binascii import hexlify from lbry.schema import Claim +from lbry.stream.background_downloader import BackgroundDownloader from lbry.stream.descriptor import StreamDescriptor from lbry.testcase import CommandTestCase from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT @@ -611,8 +612,7 @@ async def test_ensure_download(self): content2 = content2['outputs'][0]['value']['source']['sd_hash'] self.assertEqual('48', (await self.status())['disk_space']['space_used']) - proactive_downloader = self.daemon.component_manager.get_component(BACKGROUND_DOWNLOADER_COMPONENT) - self.daemon.conf.network_storage_limit = 100 + proactive_downloader = BackgroundDownloader(self.daemon.conf, self.daemon.storage, self.daemon.blob_manager) await self.clear() self.assertEqual('0', (await self.status())['disk_space']['space_used']) self.assertEqual('0', (await self.status())['disk_space']['network_seeding_space_used']) @@ -636,34 +636,14 @@ async def test_ensure_download(self): await proactive_downloader.download_blobs(blobs[0].blob_hash) self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes) - # trigger from requested blobs - await self.clear() - await proactive_downloader.stop() - proactive_downloader.requested_blobs.append(content1) - finished = proactive_downloader.finished_iteration.wait() - await proactive_downloader.start() - await finished - await self.assertBlobs(content1) - await self.clear() # test that disk space manager doesn't delete orphan network blobs await proactive_downloader.download_blobs(content1) await self.daemon.storage.db.execute_fetchall("update blob set added_on=0") # so it is preferred for cleaning await self.daemon.jsonrpc_get("content2", save_file=False) - while (await self.file_list())[0]['status'] == 'running': + while (await self.file_list())[0]['status'] != 'stopped': await asyncio.sleep(0.5) await self.assertBlobs(content1, no_files=False) self.daemon.conf.blob_storage_limit = 1 await self.blob_clean() await self.assertBlobs(content1, no_files=False) - - # downloading above limit triggers cleanup - self.daemon.conf.network_storage_limit = 6 - with self.assertLogs() as log: - await proactive_downloader.download_blobs(content2) - self.assertIn('Allocated space for proactive downloader is full.', log.output[0]) - await self.assertBlobs(content1, no_files=False) - self.assertEqual('32', (await self.status())['disk_space']['network_seeding_space_used']) - await self.blob_clean() - self.assertLessEqual(int((await self.status())['disk_space']['network_seeding_space_used']), - self.daemon.conf.network_storage_limit) diff --git a/tests/unit/components/test_component_manager.py b/tests/unit/components/test_component_manager.py index 7e03f59083..a237c1ac88 100644 --- a/tests/unit/components/test_component_manager.py +++ b/tests/unit/components/test_component_manager.py @@ -33,7 +33,7 @@ def setUp(self): components.WalletServerPaymentsComponent ], [ - components.BackgroundDownloader, + components.BackgroundDownloaderComponent, ] ] self.component_manager = ComponentManager(Config()) From c063aecc38b592850a24a0c4271031fcba7baa44 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 25 Oct 2021 16:44:19 -0300 Subject: [PATCH 22/30] fix free space calculation, test it and give a margin of 10mb before starting so it doesnt insist when full --- lbry/blob/disk_space_manager.py | 4 ++-- lbry/extras/daemon/components.py | 3 ++- tests/integration/datanetwork/test_file_commands.py | 2 ++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/lbry/blob/disk_space_manager.py b/lbry/blob/disk_space_manager.py index 7c48e5e75c..d6af702399 100644 --- a/lbry/blob/disk_space_manager.py +++ b/lbry/blob/disk_space_manager.py @@ -15,9 +15,9 @@ def __init__(self, config, db, blob_manager, cleaning_interval=30 * 60, analytic self.task = None self.analytics = analytics - async def get_free_space_bytes(self, is_network_blob=False): + async def get_free_space_mb(self, is_network_blob=False): limit_mb = self.config.network_storage_limit if is_network_blob else self.config.blob_storage_limit - return max(0, limit_mb*1024*1024 - (await self.get_space_used_mb(is_network_blob))) + return max(0, limit_mb - (await self.get_space_used_mb(is_network_blob))) async def get_space_used_bytes(self, is_network_blob=False): return await self.db.get_stored_blob_disk_usage(is_network_blob=is_network_blob) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 17ef80695f..61aaa9c517 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -403,11 +403,12 @@ def component(self) -> 'BackgroundDownloaderComponent': async def get_status(self): return {'running': self.task is not None and not self.task.done(), + 'available_free_space': await self.space_manager.get_free_space_mb(True), 'ongoing_download': self.is_busy} async def loop(self): while True: - if not self.is_busy and await self.space_manager.get_free_space_bytes(True) > 0: + if not self.is_busy and await self.space_manager.get_free_space_mb(True) > 10: blob_hash = self.dht_node.last_requested_blob_hash if blob_hash: self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash)) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 03572dcec8..9be8da26b3 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -629,6 +629,8 @@ async def test_ensure_download(self): await self.assertBlobs(content2) self.assertEqual('0', (await self.status())['disk_space']['space_used']) self.assertEqual('16', (await self.status())['disk_space']['network_seeding_space_used']) + self.daemon.conf.network_storage_limit = 100 + self.assertEqual(84, (await self.status())['background_downloader']['available_free_space']) # tests that an attempt to download something that isn't a sd blob will download the single blob and stop blobs = await self.get_blobs_from_sd_blob(self.reflector.blob_manager.get_blob(content1)) From 9b4f31926fd79db16d9a888f40f7f6c27dffcfc8 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 27 Oct 2021 14:17:06 -0300 Subject: [PATCH 23/30] normal_blobs->stream_blobs, proactive->background --- lbry/extras/daemon/storage.py | 4 ++-- .../integration/datanetwork/test_file_commands.py | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 054bb109d2..ee4ea2c936 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -458,13 +458,13 @@ async def get_stored_blobs(self, is_mine: bool, is_network_blob=False): "where blob.is_mine=? order by blob.added_on asc", (is_mine,) ) - normal_blobs = await self.db.execute_fetchall( + stream_blobs = await self.db.execute_fetchall( "select blob.blob_hash, blob.blob_length, blob.added_on " "from blob join stream_blob using (blob_hash) cross join stream using (stream_hash)" "cross join file using (stream_hash) where blob.is_mine=? order by blob.added_on asc", (is_mine,) ) - return normal_blobs + sd_blobs + return stream_blobs + sd_blobs async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None, is_network_blob: bool = False): sql = "select coalesce(sum(blob_length), 0) " diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 9be8da26b3..716aa61eaf 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -575,7 +575,7 @@ async def test_file_management(self): self.assertTrue(blobs4.issubset(blobs)) -class TestProactiveDownloaderComponent(CommandTestCase): +class TestBackgroundDownloaderComponent(CommandTestCase): async def get_blobs_from_sd_blob(self, sd_blob): descriptor = await StreamDescriptor.from_stream_descriptor_blob( asyncio.get_running_loop(), self.daemon.blob_manager.blob_dir, sd_blob @@ -612,20 +612,20 @@ async def test_ensure_download(self): content2 = content2['outputs'][0]['value']['source']['sd_hash'] self.assertEqual('48', (await self.status())['disk_space']['space_used']) - proactive_downloader = BackgroundDownloader(self.daemon.conf, self.daemon.storage, self.daemon.blob_manager) + background_downloader = BackgroundDownloader(self.daemon.conf, self.daemon.storage, self.daemon.blob_manager) await self.clear() self.assertEqual('0', (await self.status())['disk_space']['space_used']) self.assertEqual('0', (await self.status())['disk_space']['network_seeding_space_used']) - await proactive_downloader.download_blobs(content1) + await background_downloader.download_blobs(content1) await self.assertBlobs(content1) self.assertEqual('0', (await self.status())['disk_space']['space_used']) self.assertEqual('32', (await self.status())['disk_space']['network_seeding_space_used']) - await proactive_downloader.download_blobs(content2) + await background_downloader.download_blobs(content2) await self.assertBlobs(content1, content2) self.assertEqual('0', (await self.status())['disk_space']['space_used']) self.assertEqual('48', (await self.status())['disk_space']['network_seeding_space_used']) await self.clear() - await proactive_downloader.download_blobs(content2) + await background_downloader.download_blobs(content2) await self.assertBlobs(content2) self.assertEqual('0', (await self.status())['disk_space']['space_used']) self.assertEqual('16', (await self.status())['disk_space']['network_seeding_space_used']) @@ -635,11 +635,11 @@ async def test_ensure_download(self): # tests that an attempt to download something that isn't a sd blob will download the single blob and stop blobs = await self.get_blobs_from_sd_blob(self.reflector.blob_manager.get_blob(content1)) await self.clear() - await proactive_downloader.download_blobs(blobs[0].blob_hash) + await background_downloader.download_blobs(blobs[0].blob_hash) self.assertEqual({blobs[0].blob_hash}, self.daemon.blob_manager.completed_blob_hashes) # test that disk space manager doesn't delete orphan network blobs - await proactive_downloader.download_blobs(content1) + await background_downloader.download_blobs(content1) await self.daemon.storage.db.execute_fetchall("update blob set added_on=0") # so it is preferred for cleaning await self.daemon.jsonrpc_get("content2", save_file=False) while (await self.file_list())[0]['status'] != 'stopped': From acda1074bf0188032a28a40a4bd4dd8f5f86728b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 2 Nov 2021 22:34:15 -0300 Subject: [PATCH 24/30] improve disk space manager status, include more info and unify space queries --- lbry/blob/disk_space_manager.py | 19 ++++++---- lbry/extras/daemon/components.py | 9 +++-- lbry/extras/daemon/storage.py | 36 ++++++++++++------- .../datanetwork/test_file_commands.py | 32 +++++++++-------- 4 files changed, 60 insertions(+), 36 deletions(-) diff --git a/lbry/blob/disk_space_manager.py b/lbry/blob/disk_space_manager.py index d6af702399..ba1a20a3de 100644 --- a/lbry/blob/disk_space_manager.py +++ b/lbry/blob/disk_space_manager.py @@ -17,20 +17,27 @@ def __init__(self, config, db, blob_manager, cleaning_interval=30 * 60, analytic async def get_free_space_mb(self, is_network_blob=False): limit_mb = self.config.network_storage_limit if is_network_blob else self.config.blob_storage_limit - return max(0, limit_mb - (await self.get_space_used_mb(is_network_blob))) + space_used_mb = await self.get_space_used_mb() + space_used_mb = space_used_mb['network_storage'] if is_network_blob else space_used_mb['content_storage'] + return max(0, limit_mb - space_used_mb) - async def get_space_used_bytes(self, is_network_blob=False): - return await self.db.get_stored_blob_disk_usage(is_network_blob=is_network_blob) + async def get_space_used_bytes(self): + return await self.db.get_stored_blob_disk_usage() - async def get_space_used_mb(self, is_network_blob=False): - return int(await self.get_space_used_bytes(is_network_blob)/1024.0/1024.0) + async def get_space_used_mb(self): + space_used_bytes = await self.get_space_used_bytes() + return {key: int(value/1024.0/1024.0) for key, value in space_used_bytes.items()} async def clean(self): await self._clean(False) await self._clean(True) async def _clean(self, is_network_blob=False): - space_used_bytes = await self.get_space_used_bytes(is_network_blob) + space_used_bytes = await self.get_space_used_bytes() + if is_network_blob: + space_used_bytes = space_used_bytes['network_storage'] + else: + space_used_bytes = space_used_bytes['content_storage'] + space_used_bytes['private_storage'] storage_limit_mb = self.config.network_storage_limit if is_network_blob else self.config.blob_storage_limit storage_limit = storage_limit_mb*1024*1024 if storage_limit_mb else None if self.analytics: diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 61aaa9c517..9ab9388a92 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -403,7 +403,7 @@ def component(self) -> 'BackgroundDownloaderComponent': async def get_status(self): return {'running': self.task is not None and not self.task.done(), - 'available_free_space': await self.space_manager.get_free_space_mb(True), + 'available_free_space_mb': await self.space_manager.get_free_space_mb(True), 'ongoing_download': self.is_busy} async def loop(self): @@ -445,9 +445,12 @@ def component(self) -> typing.Optional[DiskSpaceManager]: async def get_status(self): if self.disk_space_manager: + space_used = await self.disk_space_manager.get_space_used_mb() return { - 'space_used': str(await self.disk_space_manager.get_space_used_mb()), - 'network_seeding_space_used': str(await self.disk_space_manager.get_space_used_mb(True)), + 'total_used_mb': space_used['total'], + 'published_blobs_storage_used_mb': space_used['private_storage'], + 'content_blobs_storage_used_mb': space_used['content_storage'], + 'seed_blobs_storage_used_mb': space_used['network_storage'], 'running': self.disk_space_manager.running, } return {'space_used': '0', 'network_seeding_space_used': '0', 'running': False} diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index ee4ea2c936..efe791745b 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -458,24 +458,34 @@ async def get_stored_blobs(self, is_mine: bool, is_network_blob=False): "where blob.is_mine=? order by blob.added_on asc", (is_mine,) ) - stream_blobs = await self.db.execute_fetchall( + content_blobs = await self.db.execute_fetchall( "select blob.blob_hash, blob.blob_length, blob.added_on " "from blob join stream_blob using (blob_hash) cross join stream using (stream_hash)" "cross join file using (stream_hash) where blob.is_mine=? order by blob.added_on asc", (is_mine,) ) - return stream_blobs + sd_blobs - - async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None, is_network_blob: bool = False): - sql = "select coalesce(sum(blob_length), 0) " - if is_network_blob: - sql += "from blob left join stream_blob using (blob_hash) where stream_blob.stream_hash is null" - else: - sql += "from blob join stream_blob using (blob_hash)" - if is_mine is not None: - sql += f'{(" and " if is_network_blob else " where ")} is_mine=?' - args = (1 if is_mine else 0,) if is_mine is not None else () - return (await self.db.execute_fetchone(sql, args))[0] + return content_blobs + sd_blobs + + async def get_stored_blob_disk_usage(self): + total, network_size, content_size, private_size = await self.db.execute_fetchone(""" + select coalesce(sum(blob_length), 0) as total, + coalesce(sum(case when + stream_blob.stream_hash is null + then blob_length else 0 end), 0) as network_storage, + coalesce(sum(case when + stream_blob.blob_hash is not null and is_mine=0 + then blob_length else 0 end), 0) as content_storage, + coalesce(sum(case when + is_mine=1 + then blob_length else 0 end), 0) as private_storage + from blob left join stream_blob using (blob_hash) + """) + return { + 'network_storage': network_size, + 'content_storage': content_size, + 'private_storage': private_size, + 'total': total + } async def update_blob_ownership(self, sd_hash, is_mine: bool): is_mine = 1 if is_mine else 0 diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 716aa61eaf..23cb8f1efa 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -537,7 +537,7 @@ async def get_referenced_blobs(self, tx): async def test_file_management(self): status = await self.status() self.assertIn('disk_space', status) - self.assertEqual('0', status['disk_space']['space_used']) + self.assertEqual(0, status['disk_space']['total_used_mb']) self.assertEqual(True, status['disk_space']['running']) sd_hash1, blobs1 = await self.get_referenced_blobs( await self.stream_create('foo1', '0.01', data=('0' * 2 * 1024 * 1024).encode()) @@ -556,18 +556,22 @@ async def test_file_management(self): await self.daemon.storage.update_blob_ownership(sd_hash3, False) await self.daemon.storage.update_blob_ownership(sd_hash4, False) - self.assertEqual('10', (await self.status())['disk_space']['space_used']) + self.assertEqual(7, (await self.status())['disk_space']['content_blobs_storage_used_mb']) + self.assertEqual(10, (await self.status())['disk_space']['total_used_mb']) self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list())) await self.blob_clean() - self.assertEqual('10', (await self.status())['disk_space']['space_used']) + self.assertEqual(10, (await self.status())['disk_space']['total_used_mb']) + self.assertEqual(3, (await self.status())['disk_space']['published_blobs_storage_used_mb']) self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list())) self.daemon.conf.blob_storage_limit = 6 await self.blob_clean() - self.assertEqual('5', (await self.status())['disk_space']['space_used']) + self.assertEqual(5, (await self.status())['disk_space']['total_used_mb']) + self.assertEqual(2, (await self.status())['disk_space']['content_blobs_storage_used_mb']) + self.assertEqual(3, (await self.status())['disk_space']['published_blobs_storage_used_mb']) blobs = set(await self.blob_list()) self.assertFalse(blobs1.issubset(blobs)) self.assertTrue(blobs2.issubset(blobs)) @@ -610,27 +614,27 @@ async def test_ensure_download(self): content1 = content1['outputs'][0]['value']['source']['sd_hash'] content2 = await self.stream_create('content2', '0.01', data=bytes([0] * 16 * 1024 * 1024)) content2 = content2['outputs'][0]['value']['source']['sd_hash'] - self.assertEqual('48', (await self.status())['disk_space']['space_used']) + self.assertEqual(48, (await self.status())['disk_space']['published_blobs_storage_used_mb']) + self.assertEqual(0, (await self.status())['disk_space']['content_blobs_storage_used_mb']) background_downloader = BackgroundDownloader(self.daemon.conf, self.daemon.storage, self.daemon.blob_manager) await self.clear() - self.assertEqual('0', (await self.status())['disk_space']['space_used']) - self.assertEqual('0', (await self.status())['disk_space']['network_seeding_space_used']) + self.assertEqual(0, (await self.status())['disk_space']['total_used_mb']) await background_downloader.download_blobs(content1) await self.assertBlobs(content1) - self.assertEqual('0', (await self.status())['disk_space']['space_used']) - self.assertEqual('32', (await self.status())['disk_space']['network_seeding_space_used']) + self.assertEqual(0, (await self.status())['disk_space']['content_blobs_storage_used_mb']) + self.assertEqual(32, (await self.status())['disk_space']['seed_blobs_storage_used_mb']) await background_downloader.download_blobs(content2) await self.assertBlobs(content1, content2) - self.assertEqual('0', (await self.status())['disk_space']['space_used']) - self.assertEqual('48', (await self.status())['disk_space']['network_seeding_space_used']) + self.assertEqual(0, (await self.status())['disk_space']['content_blobs_storage_used_mb']) + self.assertEqual(48, (await self.status())['disk_space']['seed_blobs_storage_used_mb']) await self.clear() await background_downloader.download_blobs(content2) await self.assertBlobs(content2) - self.assertEqual('0', (await self.status())['disk_space']['space_used']) - self.assertEqual('16', (await self.status())['disk_space']['network_seeding_space_used']) + self.assertEqual(0, (await self.status())['disk_space']['content_blobs_storage_used_mb']) + self.assertEqual(16, (await self.status())['disk_space']['seed_blobs_storage_used_mb']) self.daemon.conf.network_storage_limit = 100 - self.assertEqual(84, (await self.status())['background_downloader']['available_free_space']) + self.assertEqual(84, (await self.status())['background_downloader']['available_free_space_mb']) # tests that an attempt to download something that isn't a sd blob will download the single blob and stop blobs = await self.get_blobs_from_sd_blob(self.reflector.blob_manager.get_blob(content1)) From 6261527fb3fe2cb12bf8c7f63bf5f0ca9b369c24 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 3 Nov 2021 11:43:42 -0300 Subject: [PATCH 25/30] download from stored announcements and dont reannounce --- lbry/dht/node.py | 5 ++--- lbry/dht/protocol/data_store.py | 4 +++- lbry/dht/protocol/protocol.py | 1 - lbry/extras/daemon/components.py | 8 +++++--- lbry/stream/background_downloader.py | 3 ++- tests/integration/datanetwork/test_file_commands.py | 5 +---- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/lbry/dht/node.py b/lbry/dht/node.py index 4bfb3d478f..300c1a774f 100644 --- a/lbry/dht/node.py +++ b/lbry/dht/node.py @@ -32,9 +32,8 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager', self._storage = storage @property - def last_requested_blob_hash(self): - if len(self.protocol.data_store.requested_blobs) > 0: - return self.protocol.data_store.requested_blobs[-1] + def stored_blob_hashes(self): + return self.protocol.data_store.keys() async def refresh_node(self, force_once=False): while True: diff --git a/lbry/dht/protocol/data_store.py b/lbry/dht/protocol/data_store.py index b3fa6b946b..a5434aaa2f 100644 --- a/lbry/dht/protocol/data_store.py +++ b/lbry/dht/protocol/data_store.py @@ -16,7 +16,9 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager') self.loop = loop self._peer_manager = peer_manager self.completed_blobs: typing.Set[str] = set() - self.requested_blobs: typing.Deque = deque(maxlen=10) + + def keys(self): + return self._data_store.keys() def __len__(self): return self._data_store.__len__() diff --git a/lbry/dht/protocol/protocol.py b/lbry/dht/protocol/protocol.py index 0717e5826c..66165740b4 100644 --- a/lbry/dht/protocol/protocol.py +++ b/lbry/dht/protocol/protocol.py @@ -95,7 +95,6 @@ def find_value(self, rpc_contact: 'KademliaPeer', key: bytes, page: int = 0): for peer in self.protocol.data_store.get_peers_for_blob(key) if not rpc_contact.tcp_port or peer.compact_address_tcp() != rpc_contact.compact_address_tcp() ] - self.protocol.data_store.requested_blobs.append(key.hex()) # if we don't have k storing peers to return and we have this hash locally, include our contact information if len(peers) < constants.K and key.hex() in self.protocol.data_store.completed_blobs: peers.append(self.compact_address()) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 9ab9388a92..f771cd2f35 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -390,6 +390,7 @@ def __init__(self, component_manager): self.download_loop_delay_seconds = 60 self.ongoing_download: typing.Optional[asyncio.Task] = None self.space_manager: typing.Optional[DiskSpaceManager] = None + self.blob_manager: typing.Optional[BlobManager] = None self.background_downloader: typing.Optional[BackgroundDownloader] = None self.dht_node: typing.Optional[Node] = None @@ -409,7 +410,8 @@ async def get_status(self): async def loop(self): while True: if not self.is_busy and await self.space_manager.get_free_space_mb(True) > 10: - blob_hash = self.dht_node.last_requested_blob_hash + blob_hash = next((key.hex() for key in self.dht_node.stored_blob_hashes if + key.hex() not in self.blob_manager.completed_blob_hashes), None) if blob_hash: self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash)) await asyncio.sleep(self.download_loop_delay_seconds) @@ -419,9 +421,9 @@ async def start(self): if not self.component_manager.has_component(DHT_COMPONENT): return self.dht_node = self.component_manager.get_component(DHT_COMPONENT) - blob_manager = self.component_manager.get_component(BLOB_COMPONENT) + self.blob_manager = self.component_manager.get_component(BLOB_COMPONENT) storage = self.component_manager.get_component(DATABASE_COMPONENT) - self.background_downloader = BackgroundDownloader(self.conf, storage, blob_manager, self.dht_node) + self.background_downloader = BackgroundDownloader(self.conf, storage, self.blob_manager, self.dht_node) self.task = asyncio.create_task(self.loop()) async def stop(self): diff --git a/lbry/stream/background_downloader.py b/lbry/stream/background_downloader.py index d02a7c9c0c..045a47824e 100644 --- a/lbry/stream/background_downloader.py +++ b/lbry/stream/background_downloader.py @@ -18,4 +18,5 @@ async def download_blobs(self, sd_hash): return for blob_info in downloader.descriptor.blobs[:-1]: await downloader.download_stream_blob(blob_info) - await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash) + # for now, announcing is unnecessary because the blobs we have were announced to us, se they will be queried + # await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 23cb8f1efa..9443893f72 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -589,13 +589,10 @@ async def get_blobs_from_sd_blob(self, sd_blob): async def assertBlobs(self, *sd_hashes, no_files=True): # checks that we have ony the finished blobs needed for the the referenced streams seen = set(sd_hashes) - to_announce = await self.daemon.storage.get_blobs_to_announce() for sd_hash in sd_hashes: - self.assertIn(sd_hash, to_announce) sd_blob = self.daemon.blob_manager.get_blob(sd_hash) self.assertTrue(sd_blob.get_is_verified()) blobs = await self.get_blobs_from_sd_blob(sd_blob) - self.assertIn(blobs[0].blob_hash, to_announce) for blob in blobs[:-1]: self.assertTrue(self.daemon.blob_manager.get_blob(blob.blob_hash).get_is_verified()) seen.update(blob.blob_hash for blob in blobs if blob.blob_hash) @@ -609,7 +606,7 @@ async def clear(self): await self.daemon.blob_manager.delete_blobs(list(self.daemon.blob_manager.completed_blob_hashes), True) self.assertEqual(0, len((await self.daemon.jsonrpc_blob_list())['items'])) - async def test_ensure_download(self): + async def test_download(self): content1 = await self.stream_create('content1', '0.01', data=bytes([0] * 32 * 1024 * 1024)) content1 = content1['outputs'][0]['value']['source']['sd_hash'] content2 = await self.stream_create('content2', '0.01', data=bytes([0] * 16 * 1024 * 1024)) From c3882479784c82dddabbfd928803e78b5c2a3675 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 3 Nov 2021 15:13:37 -0300 Subject: [PATCH 26/30] add index for blob table so size summaries are faster --- lbry/extras/daemon/storage.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index efe791745b..64805c04a8 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -337,6 +337,7 @@ class SQLiteStorage(SQLiteMixin): tcp_port integer, unique (address, udp_port) ); + create index if not exists blob_data on blob(blob_hash, blob_length, is_mine); """ def __init__(self, conf: Config, path, loop=None, time_getter: typing.Optional[typing.Callable[[], float]] = None): From 00b881cc82e99e6caade2c3900fef401c6f6eefe Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 3 Nov 2021 15:38:32 -0300 Subject: [PATCH 27/30] cache space stats from running components so status is instant --- lbry/blob/disk_space_manager.py | 9 ++++++--- lbry/extras/daemon/components.py | 8 +++++--- tests/integration/datanetwork/test_file_commands.py | 2 ++ 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/lbry/blob/disk_space_manager.py b/lbry/blob/disk_space_manager.py index ba1a20a3de..187f6de970 100644 --- a/lbry/blob/disk_space_manager.py +++ b/lbry/blob/disk_space_manager.py @@ -14,6 +14,7 @@ def __init__(self, config, db, blob_manager, cleaning_interval=30 * 60, analytic self.running = False self.task = None self.analytics = analytics + self._used_space_bytes = None async def get_free_space_mb(self, is_network_blob=False): limit_mb = self.config.network_storage_limit if is_network_blob else self.config.blob_storage_limit @@ -22,10 +23,12 @@ async def get_free_space_mb(self, is_network_blob=False): return max(0, limit_mb - space_used_mb) async def get_space_used_bytes(self): - return await self.db.get_stored_blob_disk_usage() + self._used_space_bytes = await self.db.get_stored_blob_disk_usage() + return self._used_space_bytes - async def get_space_used_mb(self): - space_used_bytes = await self.get_space_used_bytes() + async def get_space_used_mb(self, cached=True): + cached = cached and self._used_space_bytes is not None + space_used_bytes = self._used_space_bytes if cached else await self.get_space_used_bytes() return {key: int(value/1024.0/1024.0) for key, value in space_used_bytes.items()} async def clean(self): diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index f771cd2f35..6747bb3dbb 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -393,6 +393,7 @@ def __init__(self, component_manager): self.blob_manager: typing.Optional[BlobManager] = None self.background_downloader: typing.Optional[BackgroundDownloader] = None self.dht_node: typing.Optional[Node] = None + self.space_available: typing.Optional[int] = None @property def is_busy(self): @@ -404,12 +405,13 @@ def component(self) -> 'BackgroundDownloaderComponent': async def get_status(self): return {'running': self.task is not None and not self.task.done(), - 'available_free_space_mb': await self.space_manager.get_free_space_mb(True), + 'available_free_space_mb': self.space_available, 'ongoing_download': self.is_busy} async def loop(self): while True: - if not self.is_busy and await self.space_manager.get_free_space_mb(True) > 10: + self.space_available = await self.space_manager.get_free_space_mb(True) + if not self.is_busy and self.space_available > 10: blob_hash = next((key.hex() for key in self.dht_node.stored_blob_hashes if key.hex() not in self.blob_manager.completed_blob_hashes), None) if blob_hash: @@ -447,7 +449,7 @@ def component(self) -> typing.Optional[DiskSpaceManager]: async def get_status(self): if self.disk_space_manager: - space_used = await self.disk_space_manager.get_space_used_mb() + space_used = await self.disk_space_manager.get_space_used_mb(cached=True) return { 'total_used_mb': space_used['total'], 'published_blobs_storage_used_mb': space_used['private_storage'], diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 9443893f72..753b89a744 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -555,6 +555,7 @@ async def test_file_management(self): await self.daemon.storage.update_blob_ownership(sd_hash1, False) await self.daemon.storage.update_blob_ownership(sd_hash3, False) await self.daemon.storage.update_blob_ownership(sd_hash4, False) + await self.blob_clean() # just to refresh caches, has no effect self.assertEqual(7, (await self.status())['disk_space']['content_blobs_storage_used_mb']) self.assertEqual(10, (await self.status())['disk_space']['total_used_mb']) @@ -563,6 +564,7 @@ async def test_file_management(self): await self.blob_clean() self.assertEqual(10, (await self.status())['disk_space']['total_used_mb']) + self.assertEqual(7, (await self.status())['disk_space']['content_blobs_storage_used_mb']) self.assertEqual(3, (await self.status())['disk_space']['published_blobs_storage_used_mb']) self.assertEqual(blobs1 | blobs2 | blobs3 | blobs4, set(await self.blob_list())) From 5c2cdec1ba55137f35aa8a91adae99c7300beb0c Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 3 Nov 2021 15:51:51 -0300 Subject: [PATCH 28/30] make sure the downloader always stops gracefully --- lbry/stream/background_downloader.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lbry/stream/background_downloader.py b/lbry/stream/background_downloader.py index 045a47824e..b7476495c9 100644 --- a/lbry/stream/background_downloader.py +++ b/lbry/stream/background_downloader.py @@ -14,9 +14,9 @@ async def download_blobs(self, sd_hash): downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, self.blob_manager, sd_hash) try: await downloader.start(self.node, save_stream=False) + for blob_info in downloader.descriptor.blobs[:-1]: + await downloader.download_stream_blob(blob_info) except ValueError: return - for blob_info in downloader.descriptor.blobs[:-1]: - await downloader.download_stream_blob(blob_info) - # for now, announcing is unnecessary because the blobs we have were announced to us, se they will be queried - # await self.storage.set_announce(sd_hash, downloader.descriptor.blobs[0].blob_hash) + finally: + downloader.stop() From 32d10f75fbbdf6670e25a48f94fb37dd61c834ca Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 5 Nov 2021 00:31:27 -0300 Subject: [PATCH 29/30] clear cache on test assertions --- lbry/dht/protocol/data_store.py | 1 - tests/integration/datanetwork/test_file_commands.py | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/lbry/dht/protocol/data_store.py b/lbry/dht/protocol/data_store.py index a5434aaa2f..42db910422 100644 --- a/lbry/dht/protocol/data_store.py +++ b/lbry/dht/protocol/data_store.py @@ -1,6 +1,5 @@ import asyncio import typing -from collections import deque from lbry.dht import constants if typing.TYPE_CHECKING: diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 753b89a744..541657f98a 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -618,22 +618,24 @@ async def test_download(self): background_downloader = BackgroundDownloader(self.daemon.conf, self.daemon.storage, self.daemon.blob_manager) await self.clear() + await self.blob_clean() self.assertEqual(0, (await self.status())['disk_space']['total_used_mb']) await background_downloader.download_blobs(content1) await self.assertBlobs(content1) + await self.blob_clean() self.assertEqual(0, (await self.status())['disk_space']['content_blobs_storage_used_mb']) self.assertEqual(32, (await self.status())['disk_space']['seed_blobs_storage_used_mb']) await background_downloader.download_blobs(content2) await self.assertBlobs(content1, content2) + await self.blob_clean() self.assertEqual(0, (await self.status())['disk_space']['content_blobs_storage_used_mb']) self.assertEqual(48, (await self.status())['disk_space']['seed_blobs_storage_used_mb']) await self.clear() await background_downloader.download_blobs(content2) await self.assertBlobs(content2) + await self.blob_clean() self.assertEqual(0, (await self.status())['disk_space']['content_blobs_storage_used_mb']) self.assertEqual(16, (await self.status())['disk_space']['seed_blobs_storage_used_mb']) - self.daemon.conf.network_storage_limit = 100 - self.assertEqual(84, (await self.status())['background_downloader']['available_free_space_mb']) # tests that an attempt to download something that isn't a sd blob will download the single blob and stop blobs = await self.get_blobs_from_sd_blob(self.reflector.blob_manager.get_blob(content1)) From b15480576eff3536a51b970ce556606a3e607203 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 9 Nov 2021 15:16:21 -0300 Subject: [PATCH 30/30] log unexpected errors, rename task/loop --- lbry/extras/daemon/components.py | 12 ++++++------ lbry/stream/background_downloader.py | 8 ++++++++ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 6747bb3dbb..780b7e5d52 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -386,7 +386,7 @@ class BackgroundDownloaderComponent(Component): def __init__(self, component_manager): super().__init__(component_manager) - self.task: typing.Optional[asyncio.Task] = None + self.background_task: typing.Optional[asyncio.Task] = None self.download_loop_delay_seconds = 60 self.ongoing_download: typing.Optional[asyncio.Task] = None self.space_manager: typing.Optional[DiskSpaceManager] = None @@ -404,11 +404,11 @@ def component(self) -> 'BackgroundDownloaderComponent': return self async def get_status(self): - return {'running': self.task is not None and not self.task.done(), + return {'running': self.background_task is not None and not self.background_task.done(), 'available_free_space_mb': self.space_available, 'ongoing_download': self.is_busy} - async def loop(self): + async def download_blobs_in_background(self): while True: self.space_available = await self.space_manager.get_free_space_mb(True) if not self.is_busy and self.space_available > 10: @@ -426,13 +426,13 @@ async def start(self): self.blob_manager = self.component_manager.get_component(BLOB_COMPONENT) storage = self.component_manager.get_component(DATABASE_COMPONENT) self.background_downloader = BackgroundDownloader(self.conf, storage, self.blob_manager, self.dht_node) - self.task = asyncio.create_task(self.loop()) + self.background_task = asyncio.create_task(self.download_blobs_in_background()) async def stop(self): if self.ongoing_download and not self.ongoing_download.done(): self.ongoing_download.cancel() - if self.task: - self.task.cancel() + if self.background_task: + self.background_task.cancel() class DiskSpaceComponent(Component): diff --git a/lbry/stream/background_downloader.py b/lbry/stream/background_downloader.py index b7476495c9..64848ac3ac 100644 --- a/lbry/stream/background_downloader.py +++ b/lbry/stream/background_downloader.py @@ -1,8 +1,12 @@ import asyncio +import logging from lbry.stream.downloader import StreamDownloader +log = logging.getLogger(__name__) + + class BackgroundDownloader: def __init__(self, conf, storage, blob_manager, dht_node=None): self.storage = storage @@ -18,5 +22,9 @@ async def download_blobs(self, sd_hash): await downloader.download_stream_blob(blob_info) except ValueError: return + except asyncio.CancelledError: + raise + except Exception: + log.error("Unexpected download error on background downloader") finally: downloader.stop()