Skip to content

Commit

Permalink
Merge b154805 into 1ddc7dd
Browse files Browse the repository at this point in the history
  • Loading branch information
shyba committed Nov 9, 2021
2 parents 1ddc7dd + b154805 commit fb2ad38
Show file tree
Hide file tree
Showing 14 changed files with 278 additions and 33 deletions.
33 changes: 27 additions & 6 deletions lbry/blob/disk_space_manager.py
Expand Up @@ -14,25 +14,46 @@ def __init__(self, config, db, blob_manager, cleaning_interval=30 * 60, analytic
self.running = False
self.task = None
self.analytics = analytics
self._used_space_bytes = None

async def get_free_space_mb(self, is_network_blob=False):
limit_mb = self.config.network_storage_limit if is_network_blob else self.config.blob_storage_limit
space_used_mb = await self.get_space_used_mb()
space_used_mb = space_used_mb['network_storage'] if is_network_blob else space_used_mb['content_storage']
return max(0, limit_mb - space_used_mb)

async def get_space_used_bytes(self):
return await self.db.get_stored_blob_disk_usage()
self._used_space_bytes = await self.db.get_stored_blob_disk_usage()
return self._used_space_bytes

async def get_space_used_mb(self):
return int(await self.get_space_used_bytes()/1024.0/1024.0)
async def get_space_used_mb(self, cached=True):
cached = cached and self._used_space_bytes is not None
space_used_bytes = self._used_space_bytes if cached else await self.get_space_used_bytes()
return {key: int(value/1024.0/1024.0) for key, value in space_used_bytes.items()}

async def clean(self):
await self._clean(False)
await self._clean(True)

async def _clean(self, is_network_blob=False):
space_used_bytes = await self.get_space_used_bytes()
storage_limit = self.config.blob_storage_limit*1024*1024 if self.config.blob_storage_limit else None
if is_network_blob:
space_used_bytes = space_used_bytes['network_storage']
else:
space_used_bytes = space_used_bytes['content_storage'] + space_used_bytes['private_storage']
storage_limit_mb = self.config.network_storage_limit if is_network_blob else self.config.blob_storage_limit
storage_limit = storage_limit_mb*1024*1024 if storage_limit_mb else None
if self.analytics:
asyncio.create_task(self.analytics.send_disk_space_used(space_used_bytes, storage_limit))
asyncio.create_task(
self.analytics.send_disk_space_used(space_used_bytes, storage_limit, is_network_blob)
)
if not storage_limit:
return 0
delete = []
available = storage_limit - space_used_bytes
if available > 0:
return 0
for blob_hash, file_size, _ in await self.db.get_stored_blobs(is_mine=False):
for blob_hash, file_size, _ in await self.db.get_stored_blobs(is_mine=False, is_network_blob=is_network_blob):
delete.append(blob_hash)
available += file_size
if available > 0:
Expand Down
1 change: 1 addition & 0 deletions lbry/conf.py
Expand Up @@ -634,6 +634,7 @@ class Config(CLIConfig):

# blob announcement and download
save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True)
network_storage_limit = Integer("Disk space in MB to be allocated for helping the P2P network. 0 = disable", 0)
blob_storage_limit = Integer("Disk space in MB to be allocated for blob storage. 0 = no limit", 0)
blob_lru_cache_size = Integer(
"LRU cache size for decrypted downloaded blobs used to minimize re-downloading the same blobs when "
Expand Down
4 changes: 4 additions & 0 deletions lbry/dht/node.py
Expand Up @@ -31,6 +31,10 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager',
self._refresh_task: asyncio.Task = None
self._storage = storage

@property
def stored_blob_hashes(self):
return self.protocol.data_store.keys()

async def refresh_node(self, force_once=False):
while True:
# remove peers with expired blob announcements from the datastore
Expand Down
3 changes: 3 additions & 0 deletions lbry/dht/protocol/data_store.py
Expand Up @@ -16,6 +16,9 @@ def __init__(self, loop: asyncio.AbstractEventLoop, peer_manager: 'PeerManager')
self._peer_manager = peer_manager
self.completed_blobs: typing.Set[str] = set()

def keys(self):
return self._data_store.keys()

def __len__(self):
return self._data_store.__len__()

Expand Down
3 changes: 2 additions & 1 deletion lbry/extras/daemon/analytics.py
Expand Up @@ -170,11 +170,12 @@ async def send_upnp_setup_success_fail(self, success, status):
})
)

async def send_disk_space_used(self, storage_used, storage_limit):
async def send_disk_space_used(self, storage_used, storage_limit, is_from_network_quota):
await self.track(
self._event(DISK_SPACE, {
'used': storage_used,
'limit': storage_limit,
'from_network_quota': is_from_network_quota
})
)

Expand Down
66 changes: 64 additions & 2 deletions lbry/extras/daemon/components.py
Expand Up @@ -4,6 +4,7 @@
import logging
import binascii
import typing

import base58

from aioupnp import __version__ as aioupnp_version
Expand All @@ -17,6 +18,7 @@
from lbry.blob.blob_manager import BlobManager
from lbry.blob.disk_space_manager import DiskSpaceManager
from lbry.blob_exchange.server import BlobServer
from lbry.stream.background_downloader import BackgroundDownloader
from lbry.stream.stream_manager import StreamManager
from lbry.file.file_manager import FileManager
from lbry.extras.daemon.component import Component
Expand All @@ -42,6 +44,7 @@
HASH_ANNOUNCER_COMPONENT = "hash_announcer"
FILE_MANAGER_COMPONENT = "file_manager"
DISK_SPACE_COMPONENT = "disk_space"
BACKGROUND_DOWNLOADER_COMPONENT = "background_downloader"
PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
UPNP_COMPONENT = "upnp"
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
Expand Down Expand Up @@ -377,6 +380,61 @@ async def stop(self):
self.file_manager.stop()


class BackgroundDownloaderComponent(Component):
component_name = BACKGROUND_DOWNLOADER_COMPONENT
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT, DISK_SPACE_COMPONENT]

def __init__(self, component_manager):
super().__init__(component_manager)
self.background_task: typing.Optional[asyncio.Task] = None
self.download_loop_delay_seconds = 60
self.ongoing_download: typing.Optional[asyncio.Task] = None
self.space_manager: typing.Optional[DiskSpaceManager] = None
self.blob_manager: typing.Optional[BlobManager] = None
self.background_downloader: typing.Optional[BackgroundDownloader] = None
self.dht_node: typing.Optional[Node] = None
self.space_available: typing.Optional[int] = None

@property
def is_busy(self):
return bool(self.ongoing_download and not self.ongoing_download.done())

@property
def component(self) -> 'BackgroundDownloaderComponent':
return self

async def get_status(self):
return {'running': self.background_task is not None and not self.background_task.done(),
'available_free_space_mb': self.space_available,
'ongoing_download': self.is_busy}

async def download_blobs_in_background(self):
while True:
self.space_available = await self.space_manager.get_free_space_mb(True)
if not self.is_busy and self.space_available > 10:
blob_hash = next((key.hex() for key in self.dht_node.stored_blob_hashes if
key.hex() not in self.blob_manager.completed_blob_hashes), None)
if blob_hash:
self.ongoing_download = asyncio.create_task(self.background_downloader.download_blobs(blob_hash))
await asyncio.sleep(self.download_loop_delay_seconds)

async def start(self):
self.space_manager: DiskSpaceManager = self.component_manager.get_component(DISK_SPACE_COMPONENT)
if not self.component_manager.has_component(DHT_COMPONENT):
return
self.dht_node = self.component_manager.get_component(DHT_COMPONENT)
self.blob_manager = self.component_manager.get_component(BLOB_COMPONENT)
storage = self.component_manager.get_component(DATABASE_COMPONENT)
self.background_downloader = BackgroundDownloader(self.conf, storage, self.blob_manager, self.dht_node)
self.background_task = asyncio.create_task(self.download_blobs_in_background())

async def stop(self):
if self.ongoing_download and not self.ongoing_download.done():
self.ongoing_download.cancel()
if self.background_task:
self.background_task.cancel()


class DiskSpaceComponent(Component):
component_name = DISK_SPACE_COMPONENT
depends_on = [DATABASE_COMPONENT, BLOB_COMPONENT]
Expand All @@ -391,11 +449,15 @@ def component(self) -> typing.Optional[DiskSpaceManager]:

async def get_status(self):
if self.disk_space_manager:
space_used = await self.disk_space_manager.get_space_used_mb(cached=True)
return {
'space_used': str(await self.disk_space_manager.get_space_used_mb()),
'total_used_mb': space_used['total'],
'published_blobs_storage_used_mb': space_used['private_storage'],
'content_blobs_storage_used_mb': space_used['content_storage'],
'seed_blobs_storage_used_mb': space_used['network_storage'],
'running': self.disk_space_manager.running,
}
return {'space_used': '0', 'running': False}
return {'space_used': '0', 'network_seeding_space_used': '0', 'running': False}

async def start(self):
db = self.component_manager.get_component(DATABASE_COMPONENT)
Expand Down
58 changes: 47 additions & 11 deletions lbry/extras/daemon/storage.py
Expand Up @@ -337,6 +337,7 @@ class SQLiteStorage(SQLiteMixin):
tcp_port integer,
unique (address, udp_port)
);
create index if not exists blob_data on blob(blob_hash, blob_length, is_mine);
"""

def __init__(self, conf: Config, path, loop=None, time_getter: typing.Optional[typing.Callable[[], float]] = None):
Expand Down Expand Up @@ -380,6 +381,11 @@ def get_blob_status(self, blob_hash: str):
"select status from blob where blob_hash=?", blob_hash
)

def set_announce(self, *blob_hashes):
return self.db.execute_fetchall(
"update blob set should_announce=1 where blob_hash in (?, ?)", blob_hashes
)

def update_last_announced_blobs(self, blob_hashes: typing.List[str]):
def _update_last_announced_blobs(transaction: sqlite3.Connection):
last_announced = self.time_getter()
Expand Down Expand Up @@ -437,20 +443,50 @@ def delete_blobs(transaction):
def get_all_blob_hashes(self):
return self.run_and_return_list("select blob_hash from blob")

async def get_stored_blobs(self, is_mine: bool):
async def get_stored_blobs(self, is_mine: bool, is_network_blob=False):
is_mine = 1 if is_mine else 0
return await self.db.execute_fetchall(
"select blob_hash, blob_length, added_on from blob where is_mine=? order by added_on asc",
if is_network_blob:
return await self.db.execute_fetchall(
"select blob.blob_hash, blob.blob_length, blob.added_on "
"from blob left join stream_blob using (blob_hash) "
"where stream_blob.stream_hash is null and blob.is_mine=? order by blob.added_on asc",
(is_mine,)
)

sd_blobs = await self.db.execute_fetchall(
"select blob.blob_hash, blob.blob_length, blob.added_on "
"from blob join stream on blob.blob_hash=stream.sd_hash join file using (stream_hash) "
"where blob.is_mine=? order by blob.added_on asc",
(is_mine,)
)

async def get_stored_blob_disk_usage(self, is_mine: Optional[bool] = None):
if is_mine is None:
sql, args = "select coalesce(sum(blob_length), 0) from blob", ()
else:
is_mine = 1 if is_mine else 0
sql, args = "select coalesce(sum(blob_length), 0) from blob where is_mine=?", (is_mine,)
return (await self.db.execute_fetchone(sql, args))[0]
content_blobs = await self.db.execute_fetchall(
"select blob.blob_hash, blob.blob_length, blob.added_on "
"from blob join stream_blob using (blob_hash) cross join stream using (stream_hash)"
"cross join file using (stream_hash) where blob.is_mine=? order by blob.added_on asc",
(is_mine,)
)
return content_blobs + sd_blobs

async def get_stored_blob_disk_usage(self):
total, network_size, content_size, private_size = await self.db.execute_fetchone("""
select coalesce(sum(blob_length), 0) as total,
coalesce(sum(case when
stream_blob.stream_hash is null
then blob_length else 0 end), 0) as network_storage,
coalesce(sum(case when
stream_blob.blob_hash is not null and is_mine=0
then blob_length else 0 end), 0) as content_storage,
coalesce(sum(case when
is_mine=1
then blob_length else 0 end), 0) as private_storage
from blob left join stream_blob using (blob_hash)
""")
return {
'network_storage': network_size,
'content_storage': content_size,
'private_storage': private_size,
'total': total
}

async def update_blob_ownership(self, sd_hash, is_mine: bool):
is_mine = 1 if is_mine else 0
Expand Down
30 changes: 30 additions & 0 deletions lbry/stream/background_downloader.py
@@ -0,0 +1,30 @@
import asyncio
import logging

from lbry.stream.downloader import StreamDownloader


log = logging.getLogger(__name__)


class BackgroundDownloader:
def __init__(self, conf, storage, blob_manager, dht_node=None):
self.storage = storage
self.blob_manager = blob_manager
self.node = dht_node
self.conf = conf

async def download_blobs(self, sd_hash):
downloader = StreamDownloader(asyncio.get_running_loop(), self.conf, self.blob_manager, sd_hash)
try:
await downloader.start(self.node, save_stream=False)
for blob_info in downloader.descriptor.blobs[:-1]:
await downloader.download_stream_blob(blob_info)
except ValueError:
return
except asyncio.CancelledError:
raise
except Exception:
log.error("Unexpected download error on background downloader")
finally:
downloader.stop()
4 changes: 2 additions & 2 deletions lbry/stream/downloader.py
Expand Up @@ -83,7 +83,7 @@ async def load_descriptor(self, connection_id: int = 0):
)
log.info("loaded stream manifest %s", self.sd_hash)

async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0):
async def start(self, node: typing.Optional['Node'] = None, connection_id: int = 0, save_stream=True):
# set up peer accumulation
self.node = node or self.node # fixme: this shouldnt be set here!
if self.node:
Expand All @@ -102,7 +102,7 @@ async def start(self, node: typing.Optional['Node'] = None, connection_id: int =
self.search_queue.put_nowait(self.descriptor.blobs[0].blob_hash)
log.info("added head blob to peer search for stream %s", self.sd_hash)

if not await self.blob_manager.storage.stream_exists(self.sd_hash):
if not await self.blob_manager.storage.stream_exists(self.sd_hash) and save_stream:
await self.blob_manager.storage.store_stream(
self.blob_manager.get_blob(self.sd_hash, length=self.descriptor.length), self.descriptor
)
Expand Down
Binary file removed lbry/wallet/__init__.pyc
Binary file not shown.

0 comments on commit fb2ad38

Please sign in to comment.