From eb2bbca1009d2d40d94c9f7413023ba9df61d080 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Thu, 5 Jan 2023 15:03:41 -0600 Subject: [PATCH 1/7] Add support for hierarchical structure within blob files dir. Add --blob-dirs argument to allow extra blob dirs outside of the main "data dir". --- lbry/blob/blob_file.py | 40 ++++---- lbry/blob/blob_manager.py | 93 +++++++++++++++---- lbry/conf.py | 17 +++- lbry/extras/daemon/components.py | 26 +++++- lbry/stream/descriptor.py | 32 ++++--- lbry/stream/downloader.py | 2 +- lbry/stream/reflector/server.py | 4 +- lbry/stream/stream_manager.py | 4 +- lbry/wallet/orchstr8/node.py | 3 +- .../datanetwork/test_file_commands.py | 2 +- .../integration/datanetwork/test_streaming.py | 16 ++-- 11 files changed, 170 insertions(+), 69 deletions(-) diff --git a/lbry/blob/blob_file.py b/lbry/blob/blob_file.py index 9145cd30b2..838a79d2d4 100644 --- a/lbry/blob/blob_file.py +++ b/lbry/blob/blob_file.py @@ -19,6 +19,9 @@ from lbry.blob.blob_info import BlobInfo from lbry.blob.writer import HashBlobWriter +if typing.TYPE_CHECKING: + from lbry.blob.blob_manager import BlobManager + log = logging.getLogger(__name__) @@ -79,13 +82,20 @@ class AbstractBlob: def __init__( self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, - blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False, + blob_manager: typing.Optional['BlobManager'] = None, + added_on: typing.Optional[int] = None, is_mine: bool = False, ): + if not is_valid_blobhash(blob_hash): + raise InvalidBlobHashError(blob_hash) + from lbry.blob.blob_manager import BlobManager # pylint: disable=import-outside-toplevel + if not isinstance(blob_manager, BlobManager): + raise TypeError(f"{type(blob_manager)} not instance of BlobManager") + self.loop = loop self.blob_hash = blob_hash self.length = length self.blob_completed_callback = blob_completed_callback - self.blob_directory = blob_directory + self.blob_directory, _ = blob_manager._blob_dir(blob_hash) self.writers: typing.Dict[typing.Tuple[typing.Optional[str], typing.Optional[int]], HashBlobWriter] = {} self.verified: asyncio.Event = asyncio.Event() self.writing: asyncio.Event = asyncio.Event() @@ -93,8 +103,8 @@ def __init__( self.added_on = added_on or time.time() self.is_mine = is_mine - if not is_valid_blobhash(blob_hash): - raise InvalidBlobHashError(blob_hash) + if not self.blob_directory or not os.path.isdir(self.blob_directory): + raise OSError(f"cannot create blob in directory: '{self.blob_directory}'") def __del__(self): if self.writers or self.readers: @@ -187,7 +197,7 @@ def decrypt(self, key: bytes, iv: bytes) -> bytes: @classmethod async def create_from_unencrypted( - cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, + cls, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', key: bytes, iv: bytes, unencrypted: bytes, blob_num: int, added_on: int, is_mine: bool, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], None]] = None, ) -> BlobInfo: @@ -197,7 +207,7 @@ async def create_from_unencrypted( blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted) length = len(blob_bytes) - blob = cls(loop, blob_hash, length, blob_completed_callback, blob_dir, added_on, is_mine) + blob = cls(loop, blob_hash, length, blob_completed_callback, blob_manager, added_on, is_mine) writer = blob.get_blob_writer() writer.write(blob_bytes) await blob.verified.wait() @@ -259,10 +269,11 @@ class BlobBuffer(AbstractBlob): def __init__( self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, - blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False + blob_manager: typing.Optional['BlobManager'] = None, + added_on: typing.Optional[int] = None, is_mine: bool = False ): self._verified_bytes: typing.Optional[BytesIO] = None - super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory, added_on, is_mine) + super().__init__(loop, blob_hash, length, blob_completed_callback, blob_manager, added_on, is_mine) @contextlib.contextmanager def _reader_context(self) -> typing.ContextManager[typing.BinaryIO]: @@ -302,11 +313,10 @@ class BlobFile(AbstractBlob): def __init__( self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, - blob_directory: typing.Optional[str] = None, added_on: typing.Optional[int] = None, is_mine: bool = False + blob_manager: typing.Optional['BlobManager'] = None, + added_on: typing.Optional[int] = None, is_mine: bool = False ): - super().__init__(loop, blob_hash, length, blob_completed_callback, blob_directory, added_on, is_mine) - if not blob_directory or not os.path.isdir(blob_directory): - raise OSError(f"invalid blob directory '{blob_directory}'") + super().__init__(loop, blob_hash, length, blob_completed_callback, blob_manager, added_on, is_mine) self.file_path = os.path.join(self.blob_directory, self.blob_hash) if self.file_exists: file_size = int(os.stat(self.file_path).st_size) @@ -355,12 +365,10 @@ def delete(self): @classmethod async def create_from_unencrypted( - cls, loop: asyncio.AbstractEventLoop, blob_dir: typing.Optional[str], key: bytes, iv: bytes, + cls, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', key: bytes, iv: bytes, unencrypted: bytes, blob_num: int, added_on: float, is_mine: bool, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None ) -> BlobInfo: - if not blob_dir or not os.path.isdir(blob_dir): - raise OSError(f"cannot create blob in directory: '{blob_dir}'") return await super().create_from_unencrypted( - loop, blob_dir, key, iv, unencrypted, blob_num, added_on, is_mine, blob_completed_callback + loop, blob_manager, key, iv, unencrypted, blob_num, added_on, is_mine, blob_completed_callback ) diff --git a/lbry/blob/blob_manager.py b/lbry/blob/blob_manager.py index 5663069452..1febc5684f 100644 --- a/lbry/blob/blob_manager.py +++ b/lbry/blob/blob_manager.py @@ -2,8 +2,16 @@ import typing import asyncio import logging +from collections import defaultdict from lbry.utils import LRUCacheWithMetrics -from lbry.blob.blob_file import is_valid_blobhash, BlobFile, BlobBuffer, AbstractBlob +from lbry.blob import BLOBHASH_LENGTH +from lbry.blob.blob_file import ( + HEXMATCH, + is_valid_blobhash, + BlobFile, + BlobBuffer, + AbstractBlob, +) from lbry.stream.descriptor import StreamDescriptor from lbry.connection_manager import ConnectionManager @@ -16,16 +24,19 @@ class BlobManager: - def __init__(self, loop: asyncio.AbstractEventLoop, blob_dir: str, storage: 'SQLiteStorage', config: 'Config', + def __init__(self, loop: asyncio.AbstractEventLoop, blob_dirs: typing.List[str], + storage: 'SQLiteStorage', config: 'Config', node_data_store: typing.Optional['DictDataStore'] = None): """ This class stores blobs on the hard disk - blob_dir - directory where blobs are stored + blob_dirs - directories where blobs are stored storage - SQLiteStorage object """ self.loop = loop - self.blob_dir = blob_dir + self.blob_dirs = defaultdict(list) + self.blob_dirs.update({ '': blob_dirs if isinstance(blob_dirs, list) else [blob_dirs]}) + self.blob_dirs_max_prefix_len = 0 # Maximum key length in "blob_dirs" dictionary. self.storage = storage self._node_data_store = node_data_store self.completed_blob_hashes: typing.Set[str] = set() if not self._node_data_store\ @@ -36,14 +47,37 @@ def __init__(self, loop: asyncio.AbstractEventLoop, blob_dir: str, storage: 'SQL self.config.blob_lru_cache_size) self.connection_manager = ConnectionManager(loop) + def _blob_dir(self, blob_hash: str) -> typing.Tuple[str, bool]: + """ + Locate blob directory matching longest prefix of blob hash. + An existing blob is preferred, even if it doesn't reside in + the directory with longest prefix. + """ + best_dir = None + for prefix in [blob_hash[:i] for i in range(min(len(blob_hash), self.blob_dirs_max_prefix_len), -1, -1)]: + if prefix in self.blob_dirs: + if not best_dir: + best_dir = self.blob_dirs[prefix][0] + for path in self.blob_dirs[prefix]: + if os.path.isfile(os.path.join(path, blob_hash)): + #print(f'blob {blob_hash} FOUND at location: {path}') + return path, True + #print(f'blob {blob_hash} has BEST location: {best_dir}') + return best_dir, False + + def _get_blob(self, blob_hash: str, length: typing.Optional[int] = None, is_mine: bool = False): - if self.config.save_blobs or ( - is_valid_blobhash(blob_hash) and os.path.isfile(os.path.join(self.blob_dir, blob_hash))): + if self.config.save_blobs: return BlobFile( - self.loop, blob_hash, length, self.blob_completed, self.blob_dir, is_mine=is_mine + self.loop, blob_hash, length, self.blob_completed, self, is_mine=is_mine + ) + _, blob_found = self._blob_dir(blob_hash) + if blob_found: + return BlobFile( + self.loop, blob_hash, length, self.blob_completed, self, is_mine=is_mine ) return BlobBuffer( - self.loop, blob_hash, length, self.blob_completed, self.blob_dir, is_mine=is_mine + self.loop, blob_hash, length, self.blob_completed, self, is_mine=is_mine ) def get_blob(self, blob_hash, length: typing.Optional[int] = None, is_mine: bool = False): @@ -65,21 +99,39 @@ def get_blob(self, blob_hash, length: typing.Optional[int] = None, is_mine: bool def is_blob_verified(self, blob_hash: str, length: typing.Optional[int] = None) -> bool: if not is_valid_blobhash(blob_hash): raise ValueError(blob_hash) - if not os.path.isfile(os.path.join(self.blob_dir, blob_hash)): + _, blob_found = self._blob_dir(blob_hash) + if not blob_found: return False if blob_hash in self.blobs: return self.blobs[blob_hash].get_is_verified() return self._get_blob(blob_hash, length).get_is_verified() + def list_blobs(self, paths = None, prefix = '', setup=False): + """ + Recursively search for blob files within path(s) and subdirectories. + When setup=True, subdirectories which are candidates for blob storage + are added to the "self.blob_dirs" dictionary. + """ + blobfiles = set() + subdirs = defaultdict(list) + for path in paths if paths is not None else self.blob_dirs[prefix]: + with os.scandir(path) as entries: + for item in entries: + if item.is_file() and is_valid_blobhash(item.name): + blobfiles.add(item.name) + elif item.is_dir() and len(prefix+item.name) < BLOBHASH_LENGTH and HEXMATCH.match(item.name): + subdirs[item.name].append(item.path) + # Recursively process subdirectories which may also contain blobs. + for name, subdir_paths in subdirs.items(): + if setup: + self.blob_dirs[prefix+name] = subdir_paths + self.blob_dirs_max_prefix_len = max(self.blob_dirs_max_prefix_len, len(prefix+name)) + blobfiles.update(self.list_blobs(paths=subdir_paths, prefix=prefix+name, setup=setup)) + return blobfiles + async def setup(self) -> bool: - def get_files_in_blob_dir() -> typing.Set[str]: - if not self.blob_dir: - return set() - return { - item.name for item in os.scandir(self.blob_dir) if is_valid_blobhash(item.name) - } - - in_blobfiles_dir = await self.loop.run_in_executor(None, get_files_in_blob_dir) + in_blobfiles_dir = await self.loop.run_in_executor(None, lambda: self.list_blobs(setup=True)) + #print(f'blob dirs: {self.blob_dirs}') to_add = await self.storage.sync_missing_blobs(in_blobfiles_dir) if to_add: self.completed_blob_hashes.update(to_add) @@ -97,7 +149,7 @@ def stop(self): self.completed_blob_hashes.clear() def get_stream_descriptor(self, sd_hash): - return StreamDescriptor.from_stream_descriptor_blob(self.loop, self.blob_dir, self.get_blob(sd_hash)) + return StreamDescriptor.from_stream_descriptor_blob(self.loop, self, self.get_blob(sd_hash)) def blob_completed(self, blob: AbstractBlob) -> asyncio.Task: if blob.blob_hash is None: @@ -133,8 +185,9 @@ def delete_blob(self, blob_hash: str): raise Exception("invalid blob hash to delete") if blob_hash not in self.blobs: - if self.blob_dir and os.path.isfile(os.path.join(self.blob_dir, blob_hash)): - os.remove(os.path.join(self.blob_dir, blob_hash)) + blob_dir, blob_found = self._blob_dir(blob_hash) + if blob_dir and blob_found: + os.remove(os.path.join(blob_dir, blob_hash)) else: self.blobs.pop(blob_hash).delete() if blob_hash in self.completed_blob_hashes: diff --git a/lbry/conf.py b/lbry/conf.py index d2dc8bb3ae..af356fe706 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -284,6 +284,20 @@ def validate(self, value): f"Value of '{string}' at index {idx} in setting " \ f"'{self.name}' must be a string." +class Paths(Strings): + + def validate(self, value): + super().validate(value) + for idx, path in enumerate(value): + assert os.path.isdir(path), \ + f"Path '{path}' at index {idx} in setting " \ + f"'{self.name}' must be a path to a directory." + + def __get__(self, obj, owner) -> List[str]: + values = super().__get__(obj, owner) + if isinstance(values, list): + return [os.path.expanduser(os.path.expandvars(path)) for path in values] + return values class KnownHubsList: @@ -593,7 +607,7 @@ class Config(CLIConfig): jurisdiction = String("Limit interactions to wallet server in this jurisdiction.") # directories - data_dir = Path("Directory path to store blobs.", metavar='DIR') + data_dir = Path("Directory path for daemon settings, blobs, logs, etc.", metavar='DIR') download_dir = Path( "Directory path to place assembled files downloaded from LBRY.", previous_names=['download_directory'], metavar='DIR' @@ -638,6 +652,7 @@ class Config(CLIConfig): # blob announcement and download save_blobs = Toggle("Save encrypted blob files for hosting, otherwise download blobs to memory only.", True) + blob_dirs = Paths("Additional directory path(s) for storing blobs.", [], metavar='DIR') network_storage_limit = Integer("Disk space in MB to be allocated for helping the P2P network. 0 = disable", 0) blob_storage_limit = Integer("Disk space in MB to be allocated for blob storage. 0 = no limit", 0) blob_lru_cache_size = Integer( diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index 1e5e6a4459..abed8d3739 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -222,10 +222,28 @@ async def start(self): dht_node: Node = self.component_manager.get_component(DHT_COMPONENT) if dht_node: data_store = dht_node.protocol.data_store - blob_dir = os.path.join(self.conf.data_dir, 'blobfiles') - if not os.path.isdir(blob_dir): - os.mkdir(blob_dir) - self.blob_manager = BlobManager(self.component_manager.loop, blob_dir, storage, self.conf, data_store) + + # Each blob dir should have 3 levels of subdirs corresponding to hash prefixes. + def setup_subdirs(path, depth): + if depth <= 0: + return + for prefix in '0123456789abcdef': + subdir = os.path.join(path, prefix) + if not os.path.isdir(subdir): + os.mkdir(subdir) + #print(f'created blob subdir: {subdir}') + setup_subdirs(subdir, depth-1) + + # Set up any explict blob dirs plus a default /blobfiles. + blob_dirs = self.conf.blob_dirs + [os.path.join(self.conf.data_dir, 'blobfiles')] + #print(f'blob dirs: {blob_dirs}') + for blob_dir in blob_dirs: + if not os.path.isdir(blob_dir): + os.mkdir(blob_dir) + #print(f'created blob dir: {blob_dir}') + setup_subdirs(blob_dir, 3) + + self.blob_manager = BlobManager(self.component_manager.loop, blob_dirs, storage, self.conf, data_store) return await self.blob_manager.setup() async def stop(self): diff --git a/lbry/stream/descriptor.py b/lbry/stream/descriptor.py index 45397e4cb1..deb27b0ce0 100644 --- a/lbry/stream/descriptor.py +++ b/lbry/stream/descriptor.py @@ -14,6 +14,9 @@ from lbry.utils import get_lbry_hash_obj from lbry.error import InvalidStreamDescriptorError +if typing.TYPE_CHECKING: + from lbry.blob.blob_manager import BlobManager + log = logging.getLogger(__name__) RE_ILLEGAL_FILENAME_CHARS = re.compile( @@ -83,7 +86,7 @@ def sanitize_file_name(dirty_name: str, default_file_name: str = 'lbry_download' class StreamDescriptor: __slots__ = [ 'loop', - 'blob_dir', + 'blob_manager', 'stream_name', 'key', 'suggested_file_name', @@ -92,11 +95,11 @@ class StreamDescriptor: 'sd_hash' ] - def __init__(self, loop: asyncio.AbstractEventLoop, blob_dir: str, stream_name: str, key: str, + def __init__(self, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', stream_name: str, key: str, suggested_file_name: str, blobs: typing.List[BlobInfo], stream_hash: typing.Optional[str] = None, sd_hash: typing.Optional[str] = None): self.loop = loop - self.blob_dir = blob_dir + self.blob_manager = blob_manager self.stream_name = stream_name self.key = key self.suggested_file_name = suggested_file_name @@ -164,7 +167,7 @@ async def make_sd_blob( else: sd_data = self.old_sort_json() sd_blob = blob_file_obj or BlobFile( - self.loop, sd_hash, len(sd_data), blob_completed_callback, self.blob_dir, added_on, is_mine + self.loop, sd_hash, len(sd_data), blob_completed_callback, self.blob_manager, added_on, is_mine ) if blob_file_obj: blob_file_obj.set_length(len(sd_data)) @@ -177,7 +180,7 @@ async def make_sd_blob( return sd_blob @classmethod - def _from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_dir: str, + def _from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', blob: AbstractBlob) -> 'StreamDescriptor': with blob.reader_context() as blob_reader: json_bytes = blob_reader.read() @@ -196,7 +199,7 @@ def _from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_dir: raise InvalidStreamDescriptorError("Stream contains out of order or skipped blobs") added_on = time.time() descriptor = cls( - loop, blob_dir, + loop, blob_manager, binascii.unhexlify(decoded['stream_name']).decode(), decoded['key'], binascii.unhexlify(decoded['suggested_file_name']).decode(), @@ -210,11 +213,11 @@ def _from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_dir: return descriptor @classmethod - async def from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_dir: str, + async def from_stream_descriptor_blob(cls, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', blob: AbstractBlob) -> 'StreamDescriptor': if not blob.is_readable(): raise InvalidStreamDescriptorError(f"unreadable/missing blob: {blob.blob_hash}") - return await loop.run_in_executor(None, cls._from_stream_descriptor_blob, loop, blob_dir, blob) + return await loop.run_in_executor(None, cls._from_stream_descriptor_blob, loop, blob_manager, blob) @staticmethod def get_blob_hashsum(blob_dict: typing.Dict): @@ -248,7 +251,8 @@ def calculate_stream_hash(hex_stream_name: bytes, key: bytes, hex_suggested_file @classmethod async def create_stream( - cls, loop: asyncio.AbstractEventLoop, blob_dir: str, file_path: str, key: typing.Optional[bytes] = None, + cls, loop: asyncio.AbstractEventLoop, blob_manager: 'BlobManager', + file_path: str, key: typing.Optional[bytes] = None, iv_generator: typing.Optional[typing.Generator[bytes, None, None]] = None, old_sort: bool = False, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], @@ -262,7 +266,8 @@ async def create_stream( async for blob_bytes in file_reader(file_path): blob_num += 1 blob_info = await BlobFile.create_from_unencrypted( - loop, blob_dir, key, next(iv_generator), blob_bytes, blob_num, added_on, True, blob_completed_callback + loop, blob_manager, key, next(iv_generator), blob_bytes, blob_num, + added_on, True, blob_completed_callback ) blobs.append(blob_info) blobs.append( @@ -272,7 +277,7 @@ async def create_stream( file_name = os.path.basename(file_path) suggested_file_name = sanitize_file_name(file_name) descriptor = cls( - loop, blob_dir, file_name, binascii.hexlify(key).decode(), suggested_file_name, blobs + loop, blob_manager, file_name, binascii.hexlify(key).decode(), suggested_file_name, blobs ) sd_blob = await descriptor.make_sd_blob( old_sort=old_sort, blob_completed_callback=blob_completed_callback, added_on=added_on, is_mine=True @@ -288,10 +293,11 @@ def upper_bound_decrypted_length(self) -> int: return self.lower_bound_decrypted_length() + (AES.block_size // 8) @classmethod - async def recover(cls, blob_dir: str, sd_blob: 'AbstractBlob', stream_hash: str, stream_name: str, + async def recover(cls, blob_manager: 'BlobManager', sd_blob: 'AbstractBlob', + stream_hash: str, stream_name: str, suggested_file_name: str, key: str, blobs: typing.List['BlobInfo']) -> typing.Optional['StreamDescriptor']: - descriptor = cls(asyncio.get_event_loop(), blob_dir, stream_name, key, suggested_file_name, + descriptor = cls(asyncio.get_event_loop(), blob_manager, stream_name, key, suggested_file_name, blobs, stream_hash, sd_blob.blob_hash) if descriptor.calculate_sd_hash() == sd_blob.blob_hash: # first check for a normal valid sd diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index 39e24b37e9..7da90d8f8c 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -81,7 +81,7 @@ async def load_descriptor(self, connection_id: int = 0): # parse the descriptor self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( - self.loop, self.blob_manager.blob_dir, sd_blob + self.loop, self.blob_manager, sd_blob ) log.info("loaded stream manifest %s", self.sd_hash) diff --git a/lbry/stream/reflector/server.py b/lbry/stream/reflector/server.py index cc221b8857..cbf77f39bd 100644 --- a/lbry/stream/reflector/server.py +++ b/lbry/stream/reflector/server.py @@ -96,7 +96,7 @@ async def handle_request(self, request: typing.Dict): # pylint: disable=too-man try: await asyncio.wait_for(self.sd_blob.verified.wait(), 30) self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( - self.loop, self.blob_manager.blob_dir, self.sd_blob + self.loop, self.blob_manager, self.sd_blob ) self.send_response({"received_sd_blob": True}) except asyncio.TimeoutError: @@ -109,7 +109,7 @@ async def handle_request(self, request: typing.Dict): # pylint: disable=too-man self.writer = None else: self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( - self.loop, self.blob_manager.blob_dir, self.sd_blob + self.loop, self.blob_manager, self.sd_blob ) self.incoming.clear() self.not_incoming.set() diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 7ecf7e442d..4c3f07fd06 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -78,7 +78,7 @@ async def recover_stream(sd_hash: str, stream_hash: str, stream_name: str, sd_blob = self.blob_manager.get_blob(sd_hash) blobs = await self.storage.get_blobs_for_stream(stream_hash) descriptor = await StreamDescriptor.recover( - self.blob_manager.blob_dir, sd_blob, stream_hash, stream_name, suggested_file_name, key, blobs + self.blob_manager, sd_blob, stream_hash, stream_name, suggested_file_name, key, blobs ) if not descriptor: return @@ -236,7 +236,7 @@ async def _retriable_reflect_stream(self, stream, host, port): async def create(self, file_path: str, key: Optional[bytes] = None, iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedStream: descriptor = await StreamDescriptor.create_stream( - self.loop, self.blob_manager.blob_dir, file_path, key=key, iv_generator=iv_generator, + self.loop, self.blob_manager, file_path, key=key, iv_generator=iv_generator, blob_completed_callback=self.blob_manager.blob_completed ) await self.storage.store_stream( diff --git a/lbry/wallet/orchstr8/node.py b/lbry/wallet/orchstr8/node.py index ea041802ad..b6c479e61b 100644 --- a/lbry/wallet/orchstr8/node.py +++ b/lbry/wallet/orchstr8/node.py @@ -199,7 +199,8 @@ async def stop(self, cleanup=True): cleanup and self.cleanup() def cleanup(self): - shutil.rmtree(self.data_path, ignore_errors=True) + log.warning("skipping cleanup of data_path: %s", self.data_path) + #shutil.rmtree(self.data_path, ignore_errors=True) class SPVNode: diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 95e92ce1e9..e32179a330 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -636,7 +636,7 @@ async def test_file_management(self): class TestBackgroundDownloaderComponent(CommandTestCase): async def get_blobs_from_sd_blob(self, sd_blob): descriptor = await StreamDescriptor.from_stream_descriptor_blob( - asyncio.get_running_loop(), self.daemon.blob_manager.blob_dir, sd_blob + asyncio.get_running_loop(), self.daemon.blob_manager, sd_blob ) return descriptor.blobs diff --git a/tests/integration/datanetwork/test_streaming.py b/tests/integration/datanetwork/test_streaming.py index 4a61f08ccd..15bf6de1bf 100644 --- a/tests/integration/datanetwork/test_streaming.py +++ b/tests/integration/datanetwork/test_streaming.py @@ -31,10 +31,10 @@ async def _setup_stream(self, data: bytes, save_blobs: bool = True, save_files: self.data = data await self.stream_create('foo', '0.01', data=self.data, file_size=file_size) if save_blobs: - self.assertGreater(len(os.listdir(self.daemon.blob_manager.blob_dir)), 1) + self.assertGreater(len(self.daemon.blob_manager.list_blobs()), 1) await (await self.daemon.jsonrpc_file_list())['items'][0].fully_reflected.wait() await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, claim_name='foo') - self.assertEqual(0, len(os.listdir(self.daemon.blob_manager.blob_dir))) + self.assertEqual(0, len(self.daemon.blob_manager.list_blobs())) # await self._restart_stream_manager() await self.daemon.streaming_runner.setup() site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host, @@ -313,20 +313,20 @@ async def test_switch_save_blobs_while_running(self): self.daemon.conf.save_blobs = True blobs_in_stream = (await self.daemon.jsonrpc_file_list())['items'][0].blobs_in_stream sd_hash = (await self.daemon.jsonrpc_file_list())['items'][0].sd_hash - start_file_count = len(os.listdir(self.daemon.blob_manager.blob_dir)) + start_file_count = len(self.daemon.blob_manager.list_blobs()) await self._test_range_requests() - self.assertEqual(start_file_count + blobs_in_stream, len(os.listdir(self.daemon.blob_manager.blob_dir))) + self.assertEqual(start_file_count + blobs_in_stream, len(self.daemon.blob_manager.list_blobs())) self.assertEqual(0, (await self.daemon.jsonrpc_file_list())['items'][0].blobs_remaining) # switch back self.daemon.conf.save_blobs = False await self._test_range_requests() - self.assertEqual(start_file_count + blobs_in_stream, len(os.listdir(self.daemon.blob_manager.blob_dir))) + self.assertEqual(start_file_count + blobs_in_stream, len(self.daemon.blob_manager.list_blobs())) self.assertEqual(0, (await self.daemon.jsonrpc_file_list())['items'][0].blobs_remaining) await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, sd_hash=sd_hash) - self.assertEqual(start_file_count, len(os.listdir(self.daemon.blob_manager.blob_dir))) + self.assertEqual(start_file_count, len(self.daemon.blob_manager.list_blobs())) await self._test_range_requests() - self.assertEqual(start_file_count, len(os.listdir(self.daemon.blob_manager.blob_dir))) + self.assertEqual(start_file_count, len(self.daemon.blob_manager.list_blobs())) self.assertEqual(blobs_in_stream, (await self.daemon.jsonrpc_file_list())['items'][0].blobs_remaining) async def test_file_save_streaming_only_save_blobs(self): @@ -400,7 +400,7 @@ async def test_range_requests_with_blob_lru_cache(self): await self.stream_create('foo', '0.01', data=self.data, file_size=0) await (await self.daemon.jsonrpc_file_list())['items'][0].fully_reflected.wait() await self.daemon.jsonrpc_file_delete(delete_from_download_dir=True, claim_name='foo') - self.assertEqual(0, len(os.listdir(self.daemon.blob_manager.blob_dir))) + self.assertEqual(0, len(self.daemon.blob_manager.list_blobs())) await self.daemon.streaming_runner.setup() site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host, From 62db078080961f1c1c728fcf9b998c9f69871777 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Sun, 8 Jan 2023 11:25:08 -0600 Subject: [PATCH 2/7] Fix unit tests. --- lbry/blob/blob_file.py | 16 ++++++++++++---- tests/unit/stream/test_managed_stream.py | 12 ++++++------ tests/unit/stream/test_stream_descriptor.py | 11 ++++++++--- tests/unit/stream/test_stream_manager.py | 8 ++++---- 4 files changed, 30 insertions(+), 17 deletions(-) diff --git a/lbry/blob/blob_file.py b/lbry/blob/blob_file.py index 838a79d2d4..b4aa055ffa 100644 --- a/lbry/blob/blob_file.py +++ b/lbry/blob/blob_file.py @@ -84,6 +84,7 @@ def __init__( blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, blob_manager: typing.Optional['BlobManager'] = None, added_on: typing.Optional[int] = None, is_mine: bool = False, + error_fmt: str = "invalid blob directory '%s'", ): if not is_valid_blobhash(blob_hash): raise InvalidBlobHashError(blob_hash) @@ -104,7 +105,7 @@ def __init__( self.is_mine = is_mine if not self.blob_directory or not os.path.isdir(self.blob_directory): - raise OSError(f"cannot create blob in directory: '{self.blob_directory}'") + raise OSError(error_fmt%(self.blob_directory)) def __del__(self): if self.writers or self.readers: @@ -207,7 +208,10 @@ async def create_from_unencrypted( blob_bytes, blob_hash = encrypt_blob_bytes(key, iv, unencrypted) length = len(blob_bytes) - blob = cls(loop, blob_hash, length, blob_completed_callback, blob_manager, added_on, is_mine) + blob = cls( + loop, blob_hash, length, blob_completed_callback, blob_manager, added_on, is_mine, + error_fmt="cannot create blob in directory: '%s'", + ) writer = blob.get_blob_writer() writer.write(blob_bytes) await blob.verified.wait() @@ -314,9 +318,13 @@ def __init__( self, loop: asyncio.AbstractEventLoop, blob_hash: str, length: typing.Optional[int] = None, blob_completed_callback: typing.Optional[typing.Callable[['AbstractBlob'], asyncio.Task]] = None, blob_manager: typing.Optional['BlobManager'] = None, - added_on: typing.Optional[int] = None, is_mine: bool = False + added_on: typing.Optional[int] = None, is_mine: bool = False, + error_fmt: str = "invalid blob directory '%s'", ): - super().__init__(loop, blob_hash, length, blob_completed_callback, blob_manager, added_on, is_mine) + super().__init__( + loop, blob_hash, length, blob_completed_callback, blob_manager, added_on, is_mine, + error_fmt, + ) self.file_path = os.path.join(self.blob_directory, self.blob_hash) if self.file_exists: file_size = int(os.stat(self.file_path).st_size) diff --git a/tests/unit/stream/test_managed_stream.py b/tests/unit/stream/test_managed_stream.py index 387fbef77d..b6af5ee88d 100644 --- a/tests/unit/stream/test_managed_stream.py +++ b/tests/unit/stream/test_managed_stream.py @@ -24,7 +24,7 @@ async def create_stream(self, blob_count: int = 10, file_name='test_file'): file_path = os.path.join(self.server_dir, file_name) with open(file_path, 'wb') as f: f.write(self.stream_bytes) - descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager.blob_dir, file_path) + descriptor = await StreamDescriptor.create_stream(self.loop, self.server_blob_manager, file_path) descriptor.suggested_file_name = file_name descriptor.stream_hash = descriptor.get_stream_hash() self.sd_hash = descriptor.sd_hash = descriptor.calculate_sd_hash() @@ -166,16 +166,16 @@ async def test_create_and_decrypt_one_blob_stream(self, blobs=1, corrupt=False): descriptor = await self.create_stream(blobs) # copy blob files - shutil.copy(os.path.join(self.server_blob_manager.blob_dir, self.sd_hash), - os.path.join(self.client_blob_manager.blob_dir, self.sd_hash)) + shutil.copy(os.path.join(self.server_blob_manager._blob_dir(self.sd_hash)[0], self.sd_hash), + os.path.join(self.client_blob_manager._blob_dir(self.sd_hash)[0], self.sd_hash)) self.stream = ManagedStream(self.loop, self.client_config, self.client_blob_manager, self.sd_hash, self.client_dir) for blob_info in descriptor.blobs[:-1]: - shutil.copy(os.path.join(self.server_blob_manager.blob_dir, blob_info.blob_hash), - os.path.join(self.client_blob_manager.blob_dir, blob_info.blob_hash)) + shutil.copy(os.path.join(self.server_blob_manager._blob_dir(blob_info.blob_hash)[0], blob_info.blob_hash), + os.path.join(self.client_blob_manager._blob_dir(blob_info.blob_hash)[0], blob_info.blob_hash)) if corrupt and blob_info.length == MAX_BLOB_SIZE: - with open(os.path.join(self.client_blob_manager.blob_dir, blob_info.blob_hash), "rb+") as handle: + with open(os.path.join(self.client_blob_manager._blob_dir(blob_info.blob_hash)[0], blob_info.blob_hash), "rb+") as handle: handle.truncate() handle.flush() await self.stream.save_file() diff --git a/tests/unit/stream/test_stream_descriptor.py b/tests/unit/stream/test_stream_descriptor.py index b46012711d..ca80088da7 100644 --- a/tests/unit/stream/test_stream_descriptor.py +++ b/tests/unit/stream/test_stream_descriptor.py @@ -29,7 +29,7 @@ async def asyncSetUp(self): with open(self.file_path, 'wb') as f: f.write(self.cleartext) - self.descriptor = await StreamDescriptor.create_stream(self.loop, self.tmp_dir, self.file_path, key=self.key) + self.descriptor = await StreamDescriptor.create_stream(self.loop, self.blob_manager, self.file_path, key=self.key) self.sd_hash = self.descriptor.calculate_sd_hash() self.sd_dict = json.loads(self.descriptor.as_json()) @@ -114,7 +114,7 @@ async def test_old_key_sort_sd_blob(self): writer.write(sd_bytes) await blob.verified.wait() descriptor = await StreamDescriptor.from_stream_descriptor_blob( - loop, blob_manager.blob_dir, blob + loop, blob_manager, blob ) self.assertEqual(stream_hash, descriptor.get_stream_hash()) self.assertEqual(sd_hash, descriptor.calculate_old_sort_sd_hash()) @@ -124,10 +124,15 @@ async def test_decode_corrupt_blob_raises_proper_exception_and_deletes_corrupt_f loop = asyncio.get_event_loop() tmp_dir = tempfile.mkdtemp() self.addCleanup(lambda: shutil.rmtree(tmp_dir)) + self.conf = Config() + storage = SQLiteStorage(self.conf, ":memory:") + await storage.open() + blob_manager = BlobManager(loop, tmp_dir, storage, self.conf) + sd_hash = '9313d1807551186126acc3662e74d9de29cede78d4f133349ace846273ef116b9bb86be86c54509eb84840e4b032f6b2' with open(os.path.join(tmp_dir, sd_hash), 'wb') as handle: handle.write(b'doesnt work') - blob = BlobFile(loop, sd_hash, blob_directory=tmp_dir) + blob = BlobFile(loop, sd_hash, blob_manager=blob_manager) self.assertTrue(blob.file_exists) self.assertIsNotNone(blob.length) with self.assertRaises(InvalidStreamDescriptorError): diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index ba6d8dbc82..43c8312ed1 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -136,7 +136,7 @@ async def setup_stream_manager(self, balance=10.0, fee=None, old_sort=False): with open(file_path, 'wb') as f: f.write(os.urandom(20000000)) descriptor = await StreamDescriptor.create_stream( - self.loop, self.server_blob_manager.blob_dir, file_path, old_sort=old_sort + self.loop, self.server_blob_manager, file_path, old_sort=old_sort ) self.sd_hash = descriptor.sd_hash self.mock_wallet, self.uri = await get_mock_wallet(self.sd_hash, self.client_storage, self.client_wallet_dir, @@ -453,7 +453,7 @@ async def check_post(event): self.client_blob_manager.stop() # partial removal, only sd blob is missing. # in this case, we recover the sd blob while the other blobs are kept untouched as 'finished' - os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash)) + os.remove(os.path.join(self.client_blob_manager._blob_dir(stream.sd_hash)[0], stream.sd_hash)) await self.client_blob_manager.setup() await self.stream_manager.start() self.assertEqual(1, len(self.stream_manager.streams)) @@ -470,9 +470,9 @@ async def check_post(event): # full removal, check that status is preserved (except sd blob, which was written) self.client_blob_manager.stop() - os.remove(os.path.join(self.client_blob_manager.blob_dir, stream.sd_hash)) + os.remove(os.path.join(self.client_blob_manager._blob_dir(stream.sd_hash)[0], stream.sd_hash)) for blob in stream.descriptor.blobs[:-1]: - os.remove(os.path.join(self.client_blob_manager.blob_dir, blob.blob_hash)) + os.remove(os.path.join(self.client_blob_manager._blob_dir(blob.blob_hash)[0], blob.blob_hash)) await self.client_blob_manager.setup() await self.stream_manager.start() for blob_hash in [b.blob_hash for b in stream.descriptor.blobs[:-1]]: From 403f1f81cdf024bbb9b1f06890c70b9699dd73ba Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Sun, 8 Jan 2023 12:04:34 -0600 Subject: [PATCH 3/7] Fix more unit tests. --- lbry/blob/blob_file.py | 9 +++-- tests/unit/blob/test_blob_file.py | 59 +++++++++++----------------- tests/unit/blob/test_blob_manager.py | 4 +- 3 files changed, 32 insertions(+), 40 deletions(-) diff --git a/lbry/blob/blob_file.py b/lbry/blob/blob_file.py index b4aa055ffa..c72bb5294f 100644 --- a/lbry/blob/blob_file.py +++ b/lbry/blob/blob_file.py @@ -89,14 +89,14 @@ def __init__( if not is_valid_blobhash(blob_hash): raise InvalidBlobHashError(blob_hash) from lbry.blob.blob_manager import BlobManager # pylint: disable=import-outside-toplevel - if not isinstance(blob_manager, BlobManager): + if not isinstance(blob_manager, (BlobManager, type(None))): raise TypeError(f"{type(blob_manager)} not instance of BlobManager") self.loop = loop self.blob_hash = blob_hash self.length = length self.blob_completed_callback = blob_completed_callback - self.blob_directory, _ = blob_manager._blob_dir(blob_hash) + self.blob_directory, _ = blob_manager._blob_dir(blob_hash) if blob_manager is not None else (None, None) self.writers: typing.Dict[typing.Tuple[typing.Optional[str], typing.Optional[int]], HashBlobWriter] = {} self.verified: asyncio.Event = asyncio.Event() self.writing: asyncio.Event = asyncio.Event() @@ -104,10 +104,13 @@ def __init__( self.added_on = added_on or time.time() self.is_mine = is_mine - if not self.blob_directory or not os.path.isdir(self.blob_directory): + if blob_manager and (not self.blob_directory or not os.path.isdir(self.blob_directory)): raise OSError(error_fmt%(self.blob_directory)) def __del__(self): + if not hasattr(self, 'writers') and not hasattr(self, 'readers'): + # object initialization failed + return if self.writers or self.readers: log.warning("%s not closed before being garbage collected", self.blob_hash) self.close() diff --git a/tests/unit/blob/test_blob_file.py b/tests/unit/blob/test_blob_file.py index ff632adb56..d7cb947cfd 100644 --- a/tests/unit/blob/test_blob_file.py +++ b/tests/unit/blob/test_blob_file.py @@ -23,15 +23,17 @@ async def asyncSetUp(self): self.blob_manager = BlobManager(self.loop, self.tmp_dir, self.storage, self.config) await self.storage.open() - def _get_blob(self, blob_class=AbstractBlob, blob_directory=None): - blob = blob_class(self.loop, self.blob_hash, len(self.blob_bytes), self.blob_manager.blob_completed, - blob_directory=blob_directory) + def _get_blob(self, blob_class=AbstractBlob, blob_manager=None): + blob = blob_class( + self.loop, self.blob_hash, len(self.blob_bytes), self.blob_manager.blob_completed, + blob_manager=blob_manager + ) self.assertFalse(blob.get_is_verified()) self.addCleanup(blob.close) return blob - async def _test_create_blob(self, blob_class=AbstractBlob, blob_directory=None): - blob = self._get_blob(blob_class, blob_directory) + async def _test_create_blob(self, blob_class=AbstractBlob, blob_manager=None): + blob = self._get_blob(blob_class, blob_manager) writer = blob.get_blob_writer() writer.write(self.blob_bytes) await blob.verified.wait() @@ -39,8 +41,8 @@ async def _test_create_blob(self, blob_class=AbstractBlob, blob_directory=None): await asyncio.sleep(0) # wait for the db save task return blob - async def _test_close_writers_on_finished(self, blob_class=AbstractBlob, blob_directory=None): - blob = self._get_blob(blob_class, blob_directory=blob_directory) + async def _test_close_writers_on_finished(self, blob_class=AbstractBlob, blob_manager=None): + blob = self._get_blob(blob_class, blob_manager=blob_manager) writers = [blob.get_blob_writer('1.2.3.4', port) for port in range(5)] self.assertEqual(5, len(blob.writers)) @@ -61,20 +63,20 @@ async def _test_close_writers_on_finished(self, blob_class=AbstractBlob, blob_di with self.assertRaises(IOError): other.write(self.blob_bytes) - def _test_ioerror_if_length_not_set(self, blob_class=AbstractBlob, blob_directory=None): + def _test_ioerror_if_length_not_set(self, blob_class=AbstractBlob, blob_manager=None): blob = blob_class( self.loop, self.blob_hash, blob_completed_callback=self.blob_manager.blob_completed, - blob_directory=blob_directory + blob_manager=blob_manager ) self.addCleanup(blob.close) writer = blob.get_blob_writer() with self.assertRaises(IOError): writer.write(b'') - async def _test_invalid_blob_bytes(self, blob_class=AbstractBlob, blob_directory=None): + async def _test_invalid_blob_bytes(self, blob_class=AbstractBlob, blob_manager=None): blob = blob_class( self.loop, self.blob_hash, len(self.blob_bytes), blob_completed_callback=self.blob_manager.blob_completed, - blob_directory=blob_directory + blob_manager=blob_manager ) self.addCleanup(blob.close) writer = blob.get_blob_writer() @@ -88,24 +90,20 @@ async def test_add_blob_buffer_to_db(self): self.assertEqual(db_status, 'pending') async def test_add_blob_file_to_db(self): - blob = await self._test_create_blob(BlobFile, self.tmp_dir) + blob = await self._test_create_blob(BlobFile, self.blob_manager) db_status = await self.storage.get_blob_status(blob.blob_hash) self.assertEqual(db_status, 'finished') async def test_invalid_blob_bytes(self): await self._test_invalid_blob_bytes(BlobBuffer) - await self._test_invalid_blob_bytes(BlobFile, self.tmp_dir) + await self._test_invalid_blob_bytes(BlobFile, self.blob_manager) def test_ioerror_if_length_not_set(self): - tmp_dir = tempfile.mkdtemp() - self.addCleanup(lambda: shutil.rmtree(tmp_dir)) self._test_ioerror_if_length_not_set(BlobBuffer) - self._test_ioerror_if_length_not_set(BlobFile, tmp_dir) + self._test_ioerror_if_length_not_set(BlobFile, self.blob_manager) async def test_create_blob_file(self): - tmp_dir = tempfile.mkdtemp() - self.addCleanup(lambda: shutil.rmtree(tmp_dir)) - blob = await self._test_create_blob(BlobFile, tmp_dir) + blob = await self._test_create_blob(BlobFile, self.blob_manager) self.assertIsInstance(blob, BlobFile) self.assertTrue(os.path.isfile(blob.file_path)) @@ -128,15 +126,11 @@ async def test_create_blob_buffer(self): self.assertIsNone(blob._verified_bytes) async def test_close_writers_on_finished(self): - tmp_dir = tempfile.mkdtemp() - self.addCleanup(lambda: shutil.rmtree(tmp_dir)) await self._test_close_writers_on_finished(BlobBuffer) - await self._test_close_writers_on_finished(BlobFile, tmp_dir) + await self._test_close_writers_on_finished(BlobFile, self.blob_manager) async def test_concurrency_and_premature_closes(self): - blob_directory = tempfile.mkdtemp() - self.addCleanup(lambda: shutil.rmtree(blob_directory)) - blob = self._get_blob(BlobBuffer, blob_directory=blob_directory) + blob = self._get_blob(BlobBuffer, blob_manager=self.blob_manager) writer = blob.get_blob_writer('1.1.1.1', 1337) self.assertEqual(1, len(blob.writers)) with self.assertRaises(OSError): @@ -158,10 +152,7 @@ async def test_delete(self): self.assertIsNone(blob_buffer._verified_bytes) self.assertFalse(blob_buffer.get_is_verified()) - tmp_dir = tempfile.mkdtemp() - self.addCleanup(lambda: shutil.rmtree(tmp_dir)) - - blob_file = await self._test_create_blob(BlobFile, tmp_dir) + blob_file = await self._test_create_blob(BlobFile, self.blob_manager) self.assertIsInstance(blob_file, BlobFile) self.assertTrue(os.path.isfile(blob_file.file_path)) self.assertTrue(blob_file.get_is_verified()) @@ -174,7 +165,7 @@ async def test_delete_corrupt(self): self.addCleanup(lambda: shutil.rmtree(tmp_dir)) blob = BlobFile( self.loop, self.blob_hash, len(self.blob_bytes), blob_completed_callback=self.blob_manager.blob_completed, - blob_directory=tmp_dir + blob_manager=self.blob_manager ) writer = blob.get_blob_writer() writer.write(self.blob_bytes) @@ -182,7 +173,7 @@ async def test_delete_corrupt(self): blob.close() blob = BlobFile( self.loop, self.blob_hash, len(self.blob_bytes), blob_completed_callback=self.blob_manager.blob_completed, - blob_directory=tmp_dir + blob_manager=self.blob_manager ) self.assertTrue(blob.get_is_verified()) @@ -190,7 +181,7 @@ async def test_delete_corrupt(self): f.write(b'\x00') blob = BlobFile( self.loop, self.blob_hash, len(self.blob_bytes), blob_completed_callback=self.blob_manager.blob_completed, - blob_directory=tmp_dir + blob_manager=self.blob_manager ) self.assertFalse(blob.get_is_verified()) self.assertFalse(os.path.isfile(blob.file_path)) @@ -219,7 +210,5 @@ async def read_blob_buffer(): self.assertEqual(err.exception, ValueError("I/O operation on closed file")) async def test_close_reader(self): - tmp_dir = tempfile.mkdtemp() - self.addCleanup(lambda: shutil.rmtree(tmp_dir)) await self._test_close_reader(BlobBuffer) - await self._test_close_reader(BlobFile, tmp_dir) + await self._test_close_reader(BlobFile, self.blob_manager) diff --git a/tests/unit/blob/test_blob_manager.py b/tests/unit/blob/test_blob_manager.py index 788ab09537..a8a0f93811 100644 --- a/tests/unit/blob/test_blob_manager.py +++ b/tests/unit/blob/test_blob_manager.py @@ -35,7 +35,7 @@ async def test_sync_blob_file_manager_on_startup(self): # add a blob file blob_hash = "7f5ab2def99f0ddd008da71db3a3772135f4002b19b7605840ed1034c8955431bd7079549e65e6b2a3b9c17c773073ed" blob_bytes = b'1' * ((2 * 2 ** 20) - 1) - with open(os.path.join(self.blob_manager.blob_dir, blob_hash), 'wb') as f: + with open(os.path.join(self.blob_manager._blob_dir(blob_hash)[0], blob_hash), 'wb') as f: f.write(blob_bytes) # it should not have been added automatically on startup @@ -57,7 +57,7 @@ async def test_sync_blob_file_manager_on_startup(self): self.blob_manager.stop() # manually delete the blob file and restart the blob manager - os.remove(os.path.join(self.blob_manager.blob_dir, blob_hash)) + os.remove(os.path.join(self.blob_manager._blob_dir(blob_hash)[0], blob_hash)) await self.blob_manager.setup() self.assertSetEqual(self.blob_manager.completed_blob_hashes, set()) From cd3701e123a494a9c9c2f0c06fbb2ba8a87ce28a Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Sun, 8 Jan 2023 12:09:43 -0600 Subject: [PATCH 4/7] Fix one more unit test. --- tests/unit/database/test_SQLiteStorage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/database/test_SQLiteStorage.py b/tests/unit/database/test_SQLiteStorage.py index 0d85e56519..ee2932a47e 100644 --- a/tests/unit/database/test_SQLiteStorage.py +++ b/tests/unit/database/test_SQLiteStorage.py @@ -86,7 +86,7 @@ async def store_fake_blob(self, blob_hash, length=100): async def store_fake_stream(self, stream_hash, blobs=None, file_name="fake_file", key="DEADBEEF"): blobs = blobs or [BlobInfo(1, 100, "DEADBEEF", 0, random_lbry_hash())] descriptor = StreamDescriptor( - asyncio.get_event_loop(), self.blob_dir, file_name, key, file_name, blobs, stream_hash + asyncio.get_event_loop(), self.blob_manager, file_name, key, file_name, blobs, stream_hash ) sd_blob = await descriptor.make_sd_blob() await self.storage.store_stream(sd_blob, descriptor) From 0f5c7feba0cf8215ae504bc4628bffd44e77aa5f Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Sun, 8 Jan 2023 12:20:38 -0600 Subject: [PATCH 5/7] Add comment about subdir setup. --- lbry/extras/daemon/components.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lbry/extras/daemon/components.py b/lbry/extras/daemon/components.py index abed8d3739..ee5b8d360c 100644 --- a/lbry/extras/daemon/components.py +++ b/lbry/extras/daemon/components.py @@ -241,6 +241,9 @@ def setup_subdirs(path, depth): if not os.path.isdir(blob_dir): os.mkdir(blob_dir) #print(f'created blob dir: {blob_dir}') + # TODO: Should subdir setup be done for new or empty blob dirs only? + # Setting up the subdirs will not relocate existing blobs and + # will just slow down lookups until & unless the subdirs fill up. setup_subdirs(blob_dir, 3) self.blob_manager = BlobManager(self.component_manager.loop, blob_dirs, storage, self.conf, data_store) From df57d091f10f96620de1987d8f2eda0682b57b48 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Fri, 20 Jan 2023 11:09:07 -0600 Subject: [PATCH 6/7] Add basic test for conf.blob_dirs and use ensure_directory_exists(). --- lbry/conf.py | 7 ------- lbry/extras/cli.py | 2 ++ tests/unit/test_conf.py | 39 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 7 deletions(-) diff --git a/lbry/conf.py b/lbry/conf.py index af356fe706..02d28873eb 100644 --- a/lbry/conf.py +++ b/lbry/conf.py @@ -286,13 +286,6 @@ def validate(self, value): class Paths(Strings): - def validate(self, value): - super().validate(value) - for idx, path in enumerate(value): - assert os.path.isdir(path), \ - f"Path '{path}' at index {idx} in setting " \ - f"'{self.name}' must be a path to a directory." - def __get__(self, obj, owner) -> List[str]: values = super().__get__(obj, owner) if isinstance(values, list): diff --git a/lbry/extras/cli.py b/lbry/extras/cli.py index a33543faf0..9d60785822 100644 --- a/lbry/extras/cli.py +++ b/lbry/extras/cli.py @@ -296,6 +296,8 @@ def main(argv=None): conf = Config.create_from_arguments(args) for directory in (conf.data_dir, conf.download_dir, conf.wallet_dir): ensure_directory_exists(directory) + for directory in conf.blob_dirs: + ensure_directory_exists(directory) if args.cli_version: print(f"lbrynet {lbrynet_version}") diff --git a/tests/unit/test_conf.py b/tests/unit/test_conf.py index 6454064622..05234f1e47 100644 --- a/tests/unit/test_conf.py +++ b/tests/unit/test_conf.py @@ -331,3 +331,42 @@ def test_known_hubs_list(self): ('new.hub.io', 99): {'jurisdiction': 'us'}, ('any.hub.io', 99): {} }) + + def test_blob_dirs_from_yaml(self): + with tempfile.TemporaryDirectory() as temp_dir: + config = os.path.join(temp_dir, 'settings.yml') + with open(config, 'w') as fd: + fd.write( + f"blob_dirs:\n" + f" - {os.path.join('~', 'dir0', 'dir1')}\n" + f" - {os.path.join('~', 'dir0', 'dir2')}\n" + ) + c = Config.create_from_arguments( + types.SimpleNamespace(config=config) + ) + self.assertEqual(len(c.blob_dirs), 2) + self.assertRegex(c.blob_dirs[0], f"^.+{os.path.join('dir0', 'dir1')}$") + self.assertRegex(c.blob_dirs[1], f"^.+{os.path.join('dir0', 'dir2')}$") + with c.update_config(): + c.blob_dirs = [] + with open(config, 'r') as fd: + self.assertEqual(fd.read(), 'blob_dirs: []\n') + + def test_blob_dirs_from_args(self): + parser = argparse.ArgumentParser() + Config.contribute_to_argparse(parser) + + # default + args = parser.parse_args([]) + c = Config.create_from_arguments(args) + self.assertEqual(c.blob_dirs, []) + + # two dirs + args = parser.parse_args([ + f"--blob-dirs={os.path.join('~', 'dir0', 'dir1')}", + f"--blob-dirs={os.path.join('~', 'dir0', 'dir2')}", + ]) + c = Config.create_from_arguments(args) + self.assertEqual(len(c.blob_dirs), 2) + self.assertRegex(c.blob_dirs[0], f"^.+{os.path.join('dir0', 'dir1')}$") + self.assertRegex(c.blob_dirs[1], f"^.+{os.path.join('dir0', 'dir2')}$") From 71dd10cdf64ac6c390dd255ba8820a18778f7f27 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Fri, 20 Jan 2023 11:33:23 -0600 Subject: [PATCH 7/7] Fix matching condition on Windows path. --- tests/unit/test_conf.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_conf.py b/tests/unit/test_conf.py index 05234f1e47..035686989b 100644 --- a/tests/unit/test_conf.py +++ b/tests/unit/test_conf.py @@ -345,8 +345,8 @@ def test_blob_dirs_from_yaml(self): types.SimpleNamespace(config=config) ) self.assertEqual(len(c.blob_dirs), 2) - self.assertRegex(c.blob_dirs[0], f"^.+{os.path.join('dir0', 'dir1')}$") - self.assertRegex(c.blob_dirs[1], f"^.+{os.path.join('dir0', 'dir2')}$") + self.assertTrue(c.blob_dirs[0].endswith(os.path.join('dir0', 'dir1'))) + self.assertTrue(c.blob_dirs[1].endswith(os.path.join('dir0', 'dir2'))) with c.update_config(): c.blob_dirs = [] with open(config, 'r') as fd: @@ -368,5 +368,5 @@ def test_blob_dirs_from_args(self): ]) c = Config.create_from_arguments(args) self.assertEqual(len(c.blob_dirs), 2) - self.assertRegex(c.blob_dirs[0], f"^.+{os.path.join('dir0', 'dir1')}$") - self.assertRegex(c.blob_dirs[1], f"^.+{os.path.join('dir0', 'dir2')}$") + self.assertTrue(c.blob_dirs[0].endswith(os.path.join('dir0', 'dir1'))) + self.assertTrue(c.blob_dirs[1].endswith(os.path.join('dir0', 'dir2')))