Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Support multiple blob dirs and (optional) hierarchical structure within each blob dir #3725

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 35 additions & 16 deletions lbry/blob/blob_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from lbry.blob.blob_info import BlobInfo
from lbry.blob.writer import HashBlobWriter

if typing.TYPE_CHECKING:
from lbry.blob.blob_manager import BlobManager

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -79,24 +82,35 @@ class AbstractBlob:
def __init__(
self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False,
blob_manager: typing.Optional['BlobManager'] = None,
added_on: typing.Optional[int] = None, is_mine: bool = False,
error_fmt: str = "invalid blob directory '%s'",
):
if not is_valid_blobhash(blob_hash):
raise InvalidBlobHashError(blob_hash)
from lbry.blob.blob_manager import BlobManager # pylint: disable=import-outside-toplevel
if not isinstance(blob_manager, (BlobManager, type(None))):
raise TypeError(f"{type(blob_manager)} not instance of BlobManager")

self.loop = loop
self.blob_hash = blob_hash
self.length = length
self.blob_completed_callback = blob_completed_callback
self.blob_directory = blob_directory
self.blob_directory, _ = blob_manager._blob_dir(blob_hash) if blob_manager is not None else (None, None)
self.writers: typing.Dict[typing.Tuple[typing.Optional[str], typing.Optional[int]], HashBlobWriter] = {}
self.verified: asyncio.Event = asyncio.Event()
self.writing: asyncio.Event = asyncio.Event()
self.readers: typing.List[typing.BinaryIO] = []
self.added_on = added_on or time.time()
self.is_mine = is_mine

if not is_valid_blobhash(blob_hash):
raise InvalidBlobHashError(blob_hash)
if blob_manager and (not self.blob_directory or not os.path.isdir(self.blob_directory)):
raise OSError(error_fmt%(self.blob_directory))

def __del__(self):
if not hasattr(self, 'writers') and not hasattr(self, 'readers'):
# object initialization failed
return
if self.writers or self.readers:
log.warning("%s not closed before being garbage collected", self.blob_hash)
self.close()
Expand Down Expand Up @@ -187,7 +201,7 @@ def decrypt(self, key: bytes, iv: bytes) -> bytes:

@classmethod
async def create_from_unencrypted(
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
cls, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', key: bytes, iv: bytes,
unencrypted: bytes, blob_num: int, added_on: int, is_mine: bool,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None,
) -> BlobInfo:
Expand All @@ -197,7 +211,10 @@ async def create_from_unencrypted(

blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted)
length = len(blob_bytes)
blob = cls(loop, blob_hash, length, blob_completed_callback, blob_dir, added_on, is_mine)
blob = cls(
loop, blob_hash, length, blob_completed_callback, blob_manager, added_on, is_mine,
error_fmt="cannot create blob in directory: '%s'",
)
writer = blob.get_blob_writer()
writer.write(blob_bytes)
await blob.verified.wait()
Expand Down Expand Up @@ -259,10 +276,11 @@ class BlobBuffer(AbstractBlob):
def __init__(
self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False
blob_manager: typing.Optional['BlobManager'] = None,
added_on: typing.Optional[int] = None, is_mine: bool = False
):
self._verified_bytes: typing.Optional[BytesIO] = None
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory, added_on, is_mine)
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_manager, added_on, is_mine)

@contextlib.contextmanager
def _reader_context(self) -> typing.ContextManager[typing.BinaryIO]:
Expand Down Expand Up @@ -302,11 +320,14 @@ class BlobFile(AbstractBlob):
def __init__(
self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None,
blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False
blob_manager: typing.Optional['BlobManager'] = None,
added_on: typing.Optional[int] = None, is_mine: bool = False,
error_fmt: str = "invalid blob directory '%s'",
):
super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory, added_on, is_mine)
if not blob_directory or not os.path.isdir(blob_directory):
raise OSError(f"invalid blob directory '{blob_directory}'")
super().__init__(
loop, blob_hash, length, blob_completed_callback, blob_manager, added_on, is_mine,
error_fmt,
)
self.file_path = os.path.join(self.blob_directory, self.blob_hash)
if self.file_exists:
file_size = int(os.stat(self.file_path).st_size)
Expand Down Expand Up @@ -355,12 +376,10 @@ def delete(self):

@classmethod
async def create_from_unencrypted(
cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes,
cls, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', key: bytes, iv: bytes,
unencrypted: bytes, blob_num: int, added_on: float, is_mine: bool,
blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None
) -> BlobInfo:
if not blob_dir or not os.path.isdir(blob_dir):
raise OSError(f"cannot create blob in directory: '{blob_dir}'")
return await super().create_from_unencrypted(
loop, blob_dir, key, iv, unencrypted, blob_num, added_on, is_mine, blob_completed_callback
loop, blob_manager, key, iv, unencrypted, blob_num, added_on, is_mine, blob_completed_callback
)
93 changes: 73 additions & 20 deletions lbry/blob/blob_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@
import typing
import asyncio
import logging
from collections import defaultdict
from lbry.utils import LRUCacheWithMetrics
from lbry.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, AbstractBlob
from lbry.blob import BLOBHASH_LENGTH
from lbry.blob.blob_file import (
HEXMATCH,
is_valid_blobhash,
BlobFile,
BlobBuffer,
AbstractBlob,
)
from lbry.stream.descriptor import StreamDescriptor
from lbry.connection_manager import ConnectionManager

Expand All @@ -16,16 +24,19 @@


class BlobManager:
def __init__(self, loop: asyncio.AbstractEventLoop, blob_dir: str, storage: 'SQLiteStorage', config: 'Config',
def __init__(self, loop: asyncio.AbstractEventLoop, blob_dirs: typing.List[str],
storage: 'SQLiteStorage', config: 'Config',
node_data_store: typing.Optional['DictDataStore'] = None):
"""
This class stores blobs on the hard disk

blob_dir - directory where blobs are stored
blob_dirs - directories where blobs are stored
storage - SQLiteStorage object
"""
self.loop = loop
self.blob_dir = blob_dir
self.blob_dirs = defaultdict(list)
self.blob_dirs.update({ '': blob_dirs if isinstance(blob_dirs, list) else [blob_dirs]})
self.blob_dirs_max_prefix_len = 0 # Maximum key length in "blob_dirs" dictionary.
self.storage = storage
self._node_data_store = node_data_store
self.completed_blob_hashes: typing.Set[str] = set() if not self._node_data_store\
Expand All @@ -36,14 +47,37 @@ def __init__(self, loop: asyncio.AbstractEventLoop, blob_dir: str, storage: 'SQL
self.config.blob_lru_cache_size)
self.connection_manager = ConnectionManager(loop)

def _blob_dir(self, blob_hash: str) -> typing.Tuple[str, bool]:
"""
Locate blob directory matching longest prefix of blob hash.
An existing blob is preferred, even if it doesn't reside in
the directory with longest prefix.
"""
best_dir = None
for prefix in [blob_hash[:i] for i in range(min(len(blob_hash), self.blob_dirs_max_prefix_len), -1, -1)]:
if prefix in self.blob_dirs:
if not best_dir:
best_dir = self.blob_dirs[prefix][0]
for path in self.blob_dirs[prefix]:
if os.path.isfile(os.path.join(path, blob_hash)):
#print(f'blob {blob_hash} FOUND at location: {path}')
return path, True
#print(f'blob {blob_hash} has BEST location: {best_dir}')
return best_dir, False


def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None, is_mine: bool = False):
if self.config.save_blobs or (
is_valid_blobhash(blob_hash) and os.path.isfile(os.path.join(self.blob_dir, blob_hash))):
if self.config.save_blobs:
return BlobFile(
self.loop, blob_hash, length, self.blob_completed, self.blob_dir, is_mine=is_mine
self.loop, blob_hash, length, self.blob_completed, self, is_mine=is_mine
)
_, blob_found = self._blob_dir(blob_hash)
if blob_found:
return BlobFile(
self.loop, blob_hash, length, self.blob_completed, self, is_mine=is_mine
)
return BlobBuffer(
self.loop, blob_hash, length, self.blob_completed, self.blob_dir, is_mine=is_mine
self.loop, blob_hash, length, self.blob_completed, self, is_mine=is_mine
)

def get_blob(self, blob_hash, length: typing.Optional[int] = None, is_mine: bool = False):
Expand All @@ -65,21 +99,39 @@ def get_blob(self, blob_hash, length: typing.Optional[int] = None, is_mine: bool
def is_blob_verified(self, blob_hash: str, length: typing.Optional[int] = None) -> bool:
if not is_valid_blobhash(blob_hash):
raise ValueError(blob_hash)
if not os.path.isfile(os.path.join(self.blob_dir, blob_hash)):
_, blob_found = self._blob_dir(blob_hash)
if not blob_found:
return False
if blob_hash in self.blobs:
return self.blobs[blob_hash].get_is_verified()
return self._get_blob(blob_hash, length).get_is_verified()

def list_blobs(self, paths = None, prefix = '', setup=False):
"""
Recursively search for blob files within path(s) and subdirectories.
When setup=True, subdirectories which are candidates for blob storage
are added to the "self.blob_dirs" dictionary.
"""
blobfiles = set()
subdirs = defaultdict(list)
for path in paths if paths is not None else self.blob_dirs[prefix]:
with os.scandir(path) as entries:
for item in entries:
if item.is_file() and is_valid_blobhash(item.name):
blobfiles.add(item.name)
elif item.is_dir() and len(prefix+item.name) < BLOBHASH_LENGTH and HEXMATCH.match(item.name):
subdirs[item.name].append(item.path)
# Recursively process subdirectories which may also contain blobs.
for name, subdir_paths in subdirs.items():
if setup:
self.blob_dirs[prefix+name] = subdir_paths
self.blob_dirs_max_prefix_len = max(self.blob_dirs_max_prefix_len, len(prefix+name))
blobfiles.update(self.list_blobs(paths=subdir_paths, prefix=prefix+name, setup=setup))
return blobfiles

async def setup(self) -> bool:
def get_files_in_blob_dir() -> typing.Set[str]:
if not self.blob_dir:
return set()
return {
item.name for item in os.scandir(self.blob_dir) if is_valid_blobhash(item.name)
}

in_blobfiles_dir = await self.loop.run_in_executor(None, get_files_in_blob_dir)
in_blobfiles_dir = await self.loop.run_in_executor(None, lambda: self.list_blobs(setup=True))
#print(f'blob dirs: {self.blob_dirs}')
to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir)
if to_add:
self.completed_blob_hashes.update(to_add)
Expand All @@ -97,7 +149,7 @@ def stop(self):
self.completed_blob_hashes.clear()

def get_stream_descriptor(self, sd_hash):
return StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_dir, self.get_blob(sd_hash))
return StreamDescriptor.from_stream_descriptor_blob(self.loop, self, self.get_blob(sd_hash))

def blob_completed(self, blob: AbstractBlob) -> asyncio.Task:
if blob.blob_hash is None:
Expand Down Expand Up @@ -133,8 +185,9 @@ def delete_blob(self, blob_hash: str):
raise Exception("invalid blob hash to delete")

if blob_hash not in self.blobs:
if self.blob_dir and os.path.isfile(os.path.join(self.blob_dir, blob_hash)):
os.remove(os.path.join(self.blob_dir, blob_hash))
blob_dir, blob_found = self._blob_dir(blob_hash)
if blob_dir and blob_found:
os.remove(os.path.join(blob_dir, blob_hash))
else:
self.blobs.pop(blob_hash).delete()
if blob_hash in self.completed_blob_hashes:
Expand Down
10 changes: 9 additions & 1 deletion lbry/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ def validate(self, value):
f"Value of '{string}' at index {idx} in setting " \
f"'{self.name}' must be a string."

class Paths(Strings):

def __get__(self, obj, owner) -> List[str]:
values = super().__get__(obj, owner)
if isinstance(values, list):
return [os.path.expanduser(os.path.expandvars(path)) for path in values]
return values

class KnownHubsList:

Expand Down Expand Up @@ -593,7 +600,7 @@ class Config(CLIConfig):
jurisdiction = String("Limit interactions to wallet server in this jurisdiction.")

# directories
data_dir = Path("Directory path to store blobs.", metavar='DIR')
data_dir = Path("Directory path for daemon settings, blobs, logs, etc.", metavar='DIR')
download_dir = Path(
"Directory path to place assembled files downloaded from LBRY.",
previous_names=['download_directory'], metavar='DIR'
Expand Down Expand Up @@ -638,6 +645,7 @@ class Config(CLIConfig):

# blob announcement and download
save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True)
blob_dirs = Paths("Additional directory path(s) for storing blobs.", [], metavar='DIR')
network_storage_limit = Integer("Disk space in MB to be allocated for helping the P2P network. 0 = disable", 0)
blob_storage_limit = Integer("Disk space in MB to be allocated for blob storage. 0 = no limit", 0)
blob_lru_cache_size = Integer(
Expand Down
2 changes: 2 additions & 0 deletions lbry/extras/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ def main(argv=None):
conf = Config.create_from_arguments(args)
for directory in (conf.data_dir, conf.download_dir, conf.wallet_dir):
ensure_directory_exists(directory)
for directory in conf.blob_dirs:
ensure_directory_exists(directory)

if args.cli_version:
print(f"lbrynet {lbrynet_version}")
Expand Down
29 changes: 25 additions & 4 deletions lbry/extras/daemon/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,31 @@ async def start(self):
dht_node: Node = self.component_manager.get_component(DHT_COMPONENT)
if dht_node:
data_store = dht_node.protocol.data_store
blob_dir = os.path.join(self.conf.data_dir, 'blobfiles')
if not os.path.isdir(blob_dir):
os.mkdir(blob_dir)
self.blob_manager = BlobManager(self.component_manager.loop, blob_dir, storage, self.conf, data_store)

# Each blob dir should have 3 levels of subdirs corresponding to hash prefixes.
def setup_subdirs(path, depth):
if depth <= 0:
return
for prefix in '0123456789abcdef':
subdir = os.path.join(path, prefix)
if not os.path.isdir(subdir):
os.mkdir(subdir)
#print(f'created blob subdir: {subdir}')
setup_subdirs(subdir, depth-1)

# Set up any explict blob dirs plus a default <data_dir>/blobfiles.
blob_dirs = self.conf.blob_dirs + [os.path.join(self.conf.data_dir, 'blobfiles')]
#print(f'blob dirs: {blob_dirs}')
for blob_dir in blob_dirs:
if not os.path.isdir(blob_dir):
os.mkdir(blob_dir)
#print(f'created blob dir: {blob_dir}')
# TODO: Should subdir setup be done for new or empty blob dirs only?
# Setting up the subdirs will not relocate existing blobs and
# will just slow down lookups until & unless the subdirs fill up.
setup_subdirs(blob_dir, 3)

self.blob_manager = BlobManager(self.component_manager.loop, blob_dirs, storage, self.conf, data_store)
return await self.blob_manager.setup()

async def stop(self):
Expand Down