Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

start returning bytes for a range request at the requested starting position #2179

Merged
merged 1 commit into from May 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 15 additions & 0 deletions lbrynet/extras/daemon/Daemon.py
Expand Up @@ -486,6 +486,7 @@ async def handle_old_jsonrpc(self, request):

async def handle_stream_get_request(self, request: web.Request):
if not self.conf.streaming_get:
log.warning("streaming_get is disabled, rejecting request")
raise web.HTTPForbidden()
name_and_claim_id = request.path.split("/get/")[1]
if "/" not in name_and_claim_id:
Expand All @@ -501,6 +502,20 @@ async def handle_stream_get_request(self, request: web.Request):
raise web.HTTPFound(f"/stream/{stream.sd_hash}")

async def handle_stream_range_request(self, request: web.Request):
try:
return await self._handle_stream_range_request(request)
except web.HTTPException as err:
log.warning("http code during /stream range request: %s", err)
raise err
except asyncio.CancelledError:
log.debug("/stream range request cancelled")
except Exception:
log.exception("error handling /stream range request")
raise
finally:
log.debug("finished handling /stream range request")

async def _handle_stream_range_request(self, request: web.Request):
sd_hash = request.path.split("/stream/")[1]
if not self.stream_manager.started.is_set():
await self.stream_manager.started.wait()
Expand Down
28 changes: 21 additions & 7 deletions lbrynet/stream/managed_stream.py
Expand Up @@ -3,7 +3,7 @@
import typing
import logging
import binascii
from aiohttp.web import Request, StreamResponse
from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable
from lbrynet.utils import generate_id
from lbrynet.error import DownloadSDTimeout
from lbrynet.schema.mime_types import guess_media_type
Expand Down Expand Up @@ -316,8 +316,10 @@ async def _aiter_read_stream(self, start_blob_num: typing.Optional[int] = 0, con
async def stream_file(self, request: Request, node: typing.Optional['Node'] = None) -> StreamResponse:
log.info("stream file to browser for lbry://%s#%s (sd hash %s...)", self.claim_name, self.claim_id,
self.sd_hash[:6])
headers, size, skip_blobs, first_blob_start_offset = self._prepare_range_response_headers(
request.headers.get('range', 'bytes=0-')
)
await self.start(node)
headers, size, skip_blobs = self._prepare_range_response_headers(request.headers.get('range', 'bytes=0-'))
response = StreamResponse(
status=206,
headers=headers
Expand All @@ -327,11 +329,16 @@ async def stream_file(self, request: Request, node: typing.Optional['Node'] = No
self.streaming.set()
try:
wrote = 0
async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=2):
async for blob_info, decrypted in self._aiter_read_stream(skip_blobs, connection_id=self.STREAMING_ID):
if not wrote:
decrypted = decrypted[first_blob_start_offset:]
if (blob_info.blob_num == len(self.descriptor.blobs) - 2) or (len(decrypted) + wrote >= size):
decrypted += (b'\x00' * (size - len(decrypted) - wrote - (skip_blobs * 2097151)))
log.debug("sending browser final blob (%i/%i)", blob_info.blob_num + 1,
len(self.descriptor.blobs) - 1)
await response.write_eof(decrypted)
else:
log.debug("sending browser blob (%i/%i)", blob_info.blob_num + 1, len(self.descriptor.blobs) - 1)
await response.write(decrypted)
wrote += len(decrypted)
log.info("sent browser %sblob %i/%i", "(final) " if response._eof_sent else "",
Expand Down Expand Up @@ -491,7 +498,7 @@ async def _delayed_stop(self):
return
await asyncio.sleep(1, loop=self.loop)

def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]:
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]:
if '=' in get_range:
get_range = get_range.split('=')[1]
start, end = get_range.split('-')
Expand All @@ -509,16 +516,23 @@ def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing
log.debug("estimating stream size")

start = int(start)
if not 0 <= start < size:
raise HTTPRequestRangeNotSatisfiable()

end = int(end) if end else size - 1

if end >= size:
raise HTTPRequestRangeNotSatisfiable()

skip_blobs = start // 2097150
skip = skip_blobs * 2097151
start = skip
skip_first_blob = start - skip
start = skip_first_blob + skip
final_size = end - start + 1

headers = {
'Accept-Ranges': 'bytes',
'Content-Range': f'bytes {start}-{end}/{size}',
'Content-Length': str(final_size),
'Content-Type': self.mime_type
}
return headers, size, skip_blobs
return headers, size, skip_blobs, skip_first_blob