Skip to content

Commit

Permalink
Merge dbe3ace into 6258651
Browse files Browse the repository at this point in the history
  • Loading branch information
shyba committed Dec 16, 2022
2 parents 6258651 + dbe3ace commit 2f65f99
Show file tree
Hide file tree
Showing 16 changed files with 410 additions and 173 deletions.
4 changes: 2 additions & 2 deletions lbry/error/README.md
Expand Up @@ -81,8 +81,8 @@ Code | Name | Message
511 | CorruptBlob | Blobs is corrupted.
520 | BlobFailedEncryption | Failed to encrypt blob.
531 | DownloadCancelled | Download was canceled.
532 | DownloadSDTimeout | Failed to download sd blob {download} within timeout.
533 | DownloadDataTimeout | Failed to download data blobs for sd hash {download} within timeout.
532 | DownloadMetadataTimeout | Failed to download metadata for {download} within timeout.
533 | DownloadDataTimeout | Failed to download data blobs for {download} within timeout.
534 | InvalidStreamDescriptor | {message}
535 | InvalidData | {message}
536 | InvalidBlobHash | {message}
Expand Down
6 changes: 3 additions & 3 deletions lbry/error/__init__.py
Expand Up @@ -411,18 +411,18 @@ def __init__(self):
super().__init__("Download was canceled.")


class DownloadSDTimeoutError(BlobError):
class DownloadMetadataTimeoutError(BlobError):

def __init__(self, download):
self.download = download
super().__init__(f"Failed to download sd blob {download} within timeout.")
super().__init__(f"Failed to download metadata for {download} within timeout.")


class DownloadDataTimeoutError(BlobError):

def __init__(self, download):
self.download = download
super().__init__(f"Failed to download data blobs for sd hash {download} within timeout.")
super().__init__(f"Failed to download data blobs for {download} within timeout.")


class InvalidStreamDescriptorError(BlobError):
Expand Down
13 changes: 7 additions & 6 deletions lbry/extras/daemon/daemon.py
Expand Up @@ -36,7 +36,7 @@
from lbry.blob_exchange.downloader import download_blob
from lbry.dht.peer import make_kademlia_peer
from lbry.error import (
DownloadSDTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError,
DownloadMetadataTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError,
CommandDoesNotExistError, BaseError, WalletNotFoundError, WalletAlreadyLoadedError, WalletAlreadyExistsError,
ConflictingInputValueError, AlreadyPurchasedError, PrivateKeyNotFoundError, InputStringIsBlankError,
InputValueError
Expand Down Expand Up @@ -639,7 +639,7 @@ async def handle_stream_get_request(self, request: web.Request):
stream = await self.jsonrpc_get(uri)
if isinstance(stream, dict):
raise web.HTTPServerError(text=stream['error'])
raise web.HTTPFound(f"/stream/{stream.sd_hash}")
raise web.HTTPFound(f"/stream/{stream.identifier}")

async def handle_stream_range_request(self, request: web.Request):
try:
Expand All @@ -658,12 +658,13 @@ async def handle_stream_range_request(self, request: web.Request):
log.debug("finished handling /stream range request")

async def _handle_stream_range_request(self, request: web.Request):
sd_hash = request.path.split("/stream/")[1]
identifier = request.path.split("/stream/")[1]
if not self.file_manager.started.is_set():
await self.file_manager.started.wait()
if sd_hash not in self.file_manager.streams:
stream = self.file_manager.get_filtered(identifier=identifier)
if not stream:
return web.HTTPNotFound()
return await self.file_manager.stream_partial_content(request, sd_hash)
return await self.file_manager.stream_partial_content(request, identifier)

async def _process_rpc_call(self, data):
args = data.get('params', {})
Expand Down Expand Up @@ -1139,7 +1140,7 @@ async def jsonrpc_get(
save_file=save_file, wallet=wallet
)
if not stream:
raise DownloadSDTimeoutError(uri)
raise DownloadMetadataTimeoutError(uri)
except Exception as e:
# TODO: use error from lbry.error
log.warning("Error downloading %s: %s", uri, str(e))
Expand Down
16 changes: 4 additions & 12 deletions lbry/extras/daemon/json_response_encoder.py
Expand Up @@ -285,18 +285,18 @@ def encode_file(self, managed_stream):
else:
total_bytes_lower_bound = total_bytes = managed_stream.torrent_length
result = {
'streaming_url': None,
'streaming_url': managed_stream.stream_url,
'completed': managed_stream.completed,
'file_name': None,
'download_directory': None,
'download_path': None,
'points_paid': 0.0,
'stopped': not managed_stream.running,
'stream_hash': None,
'stream_name': None,
'suggested_file_name': None,
'stream_name': managed_stream.stream_name,
'suggested_file_name': managed_stream.suggested_file_name,
'sd_hash': None,
'mime_type': None,
'mime_type': managed_stream.mime_type,
'key': None,
'total_bytes_lower_bound': total_bytes_lower_bound,
'total_bytes': total_bytes,
Expand Down Expand Up @@ -326,12 +326,8 @@ def encode_file(self, managed_stream):
}
if is_stream:
result.update({
'streaming_url': managed_stream.stream_url,
'stream_hash': managed_stream.stream_hash,
'stream_name': managed_stream.stream_name,
'suggested_file_name': managed_stream.suggested_file_name,
'sd_hash': managed_stream.descriptor.sd_hash,
'mime_type': managed_stream.mime_type,
'key': managed_stream.descriptor.key,
'blobs_completed': managed_stream.blobs_completed,
'blobs_in_stream': managed_stream.blobs_in_stream,
Expand All @@ -340,10 +336,6 @@ def encode_file(self, managed_stream):
'reflector_progress': managed_stream.reflector_progress,
'uploading_to_reflector': managed_stream.uploading_to_reflector
})
else:
result.update({
'streaming_url': f'file://{managed_stream.full_path}',
})
if output_exists:
result.update({
'file_name': managed_stream.file_name,
Expand Down
32 changes: 24 additions & 8 deletions lbry/extras/daemon/storage.py
Expand Up @@ -5,6 +5,7 @@
import asyncio
import binascii
import time
from operator import itemgetter
from typing import Optional
from lbry.wallet import SQLiteMixin
from lbry.conf import Config
Expand Down Expand Up @@ -211,23 +212,26 @@ def delete_torrent(transaction: sqlite3.Connection, bt_infohash: str):
transaction.execute("delete from torrent where bt_infohash=?", (bt_infohash,)).fetchall()


def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str],
def store_file(transaction: sqlite3.Connection, identifier_value: str, file_name: typing.Optional[str],
download_directory: typing.Optional[str], data_payment_rate: float, status: str,
content_fee: typing.Optional[Transaction], added_on: typing.Optional[int] = None) -> int:
if not file_name and not download_directory:
encoded_file_name, encoded_download_dir = None, None
else:
encoded_file_name = binascii.hexlify(file_name.encode()).decode()
encoded_download_dir = binascii.hexlify(download_directory.encode()).decode()
is_torrent = len(identifier_value) == 40
time_added = added_on or int(time.time())
transaction.execute(
"insert or replace into file values (?, NULL, ?, ?, ?, ?, ?, ?, ?)",
(stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status,
f"insert or replace into file values ({'NULL, ?' if is_torrent else '?, NULL'}, ?, ?, ?, ?, ?, ?, ?)",
(identifier_value, encoded_file_name, encoded_download_dir, data_payment_rate, status,
1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0,
None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added)
).fetchall()

return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0]
return transaction.execute(
f"select rowid from file where {'bt_infohash' if is_torrent else 'stream_hash'}=?",
(identifier_value, )).fetchone()[0]


class SQLiteStorage(SQLiteMixin):
Expand Down Expand Up @@ -632,6 +636,13 @@ def update_db_removed(transaction: sqlite3.Connection, removed):
def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]:
return self.db.run(get_all_lbry_files)

async def get_all_torrent_files(self) -> typing.List[typing.Dict]:
def _get_all_torrent_files(transaction):
cursor = transaction.execute(
"select file.ROWID as rowid, * from file join torrent on file.bt_infohash=torrent.bt_infohash")
return map(lambda row: dict(zip(list(map(itemgetter(0), cursor.description)), row)), cursor.fetchall())
return list(await self.db.run(_get_all_torrent_files))

def change_file_status(self, stream_hash: str, new_status: str):
log.debug("update file status %s -> %s", stream_hash, new_status)
return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash))
Expand Down Expand Up @@ -872,15 +883,20 @@ async def save_content_claim(self, stream_hash, claim_outpoint):
if stream_hash in self.content_claim_callbacks:
await self.content_claim_callbacks[stream_hash]()

async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
def _save_torrent(transaction):
async def add_torrent(self, bt_infohash, length, name):
def _save_torrent(transaction, bt_infohash, length, name):
transaction.execute(
"insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name)
).fetchall()
return await self.db.run(_save_torrent, bt_infohash, length, name)

async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name):
def _save_torrent_claim(transaction):
transaction.execute(
"insert or replace into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint)
).fetchall()
await self.db.run(_save_torrent)
await self.add_torrent(bt_infohash, length, name)
await self.db.run(_save_torrent_claim)
# update corresponding ManagedEncryptedFileDownloader object
if bt_infohash in self.content_claim_callbacks:
await self.content_claim_callbacks[bt_infohash]()
Expand All @@ -898,7 +914,7 @@ async def get_content_claim(self, stream_hash: str, include_supports: typing.Opt

async def get_content_claim_for_torrent(self, bt_infohash):
claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash])
return claims[bt_infohash].as_dict() if claims else None
return claims[bt_infohash] if claims else None

# # # # # # # # # reflector functions # # # # # # # # #

Expand Down
33 changes: 19 additions & 14 deletions lbry/file/file_manager.py
Expand Up @@ -3,7 +3,7 @@
import typing
from typing import Optional
from aiohttp.web import Request
from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError
from lbry.error import ResolveError, DownloadMetadataTimeoutError, InsufficientFundsError
from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError
from lbry.error import InvalidStreamURLError
from lbry.stream.managed_stream import ManagedStream
Expand Down Expand Up @@ -139,7 +139,7 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag
existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name
)
claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier)
existing[0].set_claim(claim_info, claim)
existing[0].set_claim(claim_info.as_dict() if claim_info else None, claim)
else:
await self.storage.save_content_claim(
existing[0].stream_hash, outpoint
Expand Down Expand Up @@ -242,15 +242,15 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag
stream.identifier, outpoint, stream.torrent_length, stream.torrent_name
)
claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier)
stream.set_claim(claim_info, claim)
stream.set_claim(claim_info.as_dict() if claim_info else None, claim)
if save_file:
await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download))
return stream
except asyncio.TimeoutError:
error = DownloadDataTimeoutError(stream.sd_hash)
error = DownloadDataTimeoutError(stream.identifier)
raise error
except Exception as err: # forgive data timeout, don't delete stream
expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
expected = (DownloadMetadataTimeoutError, DownloadDataTimeoutError, InsufficientFundsError,
KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError)
if isinstance(err, expected):
log.warning("Failed to download %s: %s", uri, str(err))
Expand Down Expand Up @@ -290,19 +290,24 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag
)
)

async def stream_partial_content(self, request: Request, sd_hash: str):
return await self.source_managers['stream'].stream_partial_content(request, sd_hash)
async def stream_partial_content(self, request: Request, identifier: str):
for source_manager in self.source_managers.values():
if source_manager.get_filtered(identifier=identifier):
return await source_manager.stream_partial_content(request, identifier)

def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]:
"""
Get a list of filtered and sorted ManagedStream objects
:param sort_by: field to sort by
:param reverse: reverse sorting
:param comparison: comparison operator used for filtering
:param search_by: fields and values to filter by
Get a list of filtered and sorted ManagedDownloadSource objects from all available source managers
"""
return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), [])
result = last_error = None
for manager in self.source_managers.values():
try:
result = (result or []) + manager.get_filtered(*args, **kwargs)
except ValueError as error:
last_error = error
if result is not None:
return result
raise last_error

async def delete(self, source: ManagedDownloadSource, delete_file=False):
for manager in self.source_managers.values():
Expand Down
17 changes: 13 additions & 4 deletions lbry/file/source.py
@@ -1,5 +1,6 @@
import os
import asyncio
import time
import typing
import logging
import binascii
Expand Down Expand Up @@ -43,7 +44,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: '
self.rowid = rowid
self.content_fee = content_fee
self.purchase_receipt = None
self._added_on = added_on
self._added_on = added_on or int(time.time())
self.analytics_manager = analytics_manager
self.downloader = None

Expand Down Expand Up @@ -91,6 +92,14 @@ def file_name(self) -> Optional[str]:
def added_on(self) -> Optional[int]:
return self._added_on

@property
def suggested_file_name(self):
return self._file_name

@property
def stream_name(self):
return self.suggested_file_name

@property
def status(self) -> str:
return self._status
Expand All @@ -99,9 +108,9 @@ def status(self) -> str:
def completed(self):
raise NotImplementedError()

# @property
# def stream_url(self):
# return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}
@property
def stream_url(self):
return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.identifier}"

@property
def finished(self) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions lbry/file/source_manager.py
Expand Up @@ -23,6 +23,7 @@

class SourceManager:
filter_fields = {
'identifier',
'rowid',
'status',
'file_name',
Expand Down Expand Up @@ -83,6 +84,7 @@ async def create(self, file_path: str, key: Optional[bytes] = None,
raise NotImplementedError()

async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
await self.storage.delete_torrent(source.identifier)
self.remove(source)
if delete_file and source.output_file_exists:
os.remove(source.full_path)
Expand Down
4 changes: 2 additions & 2 deletions lbry/stream/downloader.py
Expand Up @@ -4,7 +4,7 @@
import binascii

from lbry.dht.node import get_kademlia_peers_from_hosts
from lbry.error import DownloadSDTimeoutError
from lbry.error import DownloadMetadataTimeoutError
from lbry.utils import lru_cache_concurrent
from lbry.stream.descriptor import StreamDescriptor
from lbry.blob_exchange.downloader import BlobDownloader
Expand Down Expand Up @@ -77,7 +77,7 @@ async def load_descriptor(self, connection_id: int = 0):
log.info("downloaded sd blob %s", self.sd_hash)
self.time_to_descriptor = self.loop.time() - now
except asyncio.TimeoutError:
raise DownloadSDTimeoutError(self.sd_hash)
raise DownloadMetadataTimeoutError(self.sd_hash)

# parse the descriptor
self.descriptor = await StreamDescriptor.from_stream_descriptor_blob(
Expand Down
8 changes: 2 additions & 6 deletions lbry/stream/managed_stream.py
Expand Up @@ -5,7 +5,7 @@
import logging
from typing import Optional
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
from lbry.error import DownloadSDTimeoutError
from lbry.error import DownloadMetadataTimeoutError
from lbry.schema.mime_types import guess_media_type
from lbry.stream.downloader import StreamDownloader
from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name
Expand Down Expand Up @@ -104,10 +104,6 @@ def written_bytes(self) -> int:
def completed(self):
return self.written_bytes >= self.descriptor.lower_bound_decrypted_length()

@property
def stream_url(self):
return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}"

async def update_status(self, status: str):
assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED]
self._status = status
Expand Down Expand Up @@ -164,7 +160,7 @@ async def start(self, timeout: Optional[float] = None,
await asyncio.wait_for(self.downloader.start(), timeout)
except asyncio.TimeoutError:
self._running.clear()
raise DownloadSDTimeoutError(self.sd_hash)
raise DownloadMetadataTimeoutError(self.identifier)

if self.delayed_stop_task and not self.delayed_stop_task.done():
self.delayed_stop_task.cancel()
Expand Down
2 changes: 1 addition & 1 deletion lbry/stream/stream_manager.py
Expand Up @@ -32,7 +32,7 @@ def path_or_none(encoded_path) -> Optional[str]:
class StreamManager(SourceManager):
_sources: typing.Dict[str, ManagedStream]

filter_fields = SourceManager.filter_fields
filter_fields = set(SourceManager.filter_fields)
filter_fields.update({
'sd_hash',
'stream_hash',
Expand Down

0 comments on commit 2f65f99

Please sign in to comment.