Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ie/niconico] Directly download live timeshift videos; WebSocket fixes #9411

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
5 changes: 3 additions & 2 deletions yt_dlp/downloader/__init__.py
Expand Up @@ -30,7 +30,7 @@ def get_suitable_downloader(info_dict, params={}, default=NO_DEFAULT, protocol=N
from .http import HttpFD
from .ism import IsmFD
from .mhtml import MhtmlFD
from .niconico import NiconicoDmcFD, NiconicoLiveFD
from .niconico import NiconicoDmcFD, NiconicoLiveFD, NiconicoLiveTimeshiftFD
from .rtmp import RtmpFD
from .rtsp import RtspFD
from .websocket import WebSocketFragmentFD
Expand All @@ -50,7 +50,8 @@ def get_suitable_downloader(info_dict, params={}, default=NO_DEFAULT, protocol=N
'ism': IsmFD,
'mhtml': MhtmlFD,
'niconico_dmc': NiconicoDmcFD,
'niconico_live': NiconicoLiveFD,
'm3u8_niconico_live': NiconicoLiveFD,
'm3u8_niconico_live_timeshift': NiconicoLiveTimeshiftFD,
'fc2_live': FC2LiveFD,
'websocket_frag': WebSocketFragmentFD,
'youtube_live_chat': YoutubeLiveChatFD,
Expand Down
204 changes: 166 additions & 38 deletions yt_dlp/downloader/niconico.py
@@ -1,12 +1,23 @@
import contextlib
import json
import math
import threading
import time

from . import get_suitable_downloader
from .common import FileDownloader
from .external import FFmpegFD
from ..downloader.hls import HlsFD
from ..networking import Request
from ..utils import DownloadError, str_or_none, try_get
from ..networking.exceptions import RequestError
from ..utils import (
DownloadError,
RetryManager,
str_or_none,
traverse_obj,
try_get,
urljoin,
)


class NiconicoDmcFD(FileDownloader):
Expand Down Expand Up @@ -56,34 +67,33 @@ def heartbeat():
return success


class NiconicoLiveFD(FileDownloader):
""" Downloads niconico live without being stopped """
class NiconicoLiveBaseFD(FileDownloader):
_WEBSOCKET_RECONNECT_DELAY = 10

def real_download(self, filename, info_dict):
video_id = info_dict['video_id']
ws_url = info_dict['url']
ws_extractor = info_dict['ws']
ws_origin_host = info_dict['origin']
live_quality = info_dict.get('live_quality', 'high')
live_latency = info_dict.get('live_latency', 'high')
dl = FFmpegFD(self.ydl, self.params or {})

new_info_dict = info_dict.copy()
new_info_dict.update({
'protocol': 'm3u8',
})
@contextlib.contextmanager
def _ws_context(self, info_dict):
""" Hold a WebSocket object and release it when leaving """

video_id = info_dict['id']
live_latency = info_dict['live_latency']
self.ws = info_dict['__ws']

self.m3u8_lock = threading.Event()
self.m3u8_url = info_dict['manifest_url']
self.m3u8_lock.set()

def communicate_ws(reconnect):
if reconnect:
ws = self.ydl.urlopen(Request(ws_url, headers={'Origin': f'https://{ws_origin_host}'}))
self.ws = self.ydl.urlopen(Request(
self.ws.url, headers={'Origin': self.ws.wsw.request.headers['Origin']}))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wsw is internal only (part of the websockets library handler) and not part of the websocket response interface, so this will break when we introduce a new library.

(Sorry I've been needing to rename it)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to that, if it is needed we can prob add the original Request object to WebSocket responses

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @coletdjnz ! Thanks for your comment.

As a high-level class, using internal things is not OK. In the original code, the hostname comes from IE. I can change it to that.

# Info Extractor
def _real_extract(self, url):
  return {
    "__ws": {
      "ws": ws,
      "origin": f'https://{hostname}',
    },
  }

# Downloader
self.ws.url, headers={'Origin': self.ws['origin']}))

Your opinion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps in http_headers infodict property?

Otherwise I'd say that is probably fine too, since it's internal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps in http_headers infodict property?

Done. Please see 41c6125 .

if self.ydl.params.get('verbose', False):
self.to_screen('[debug] Sending startWatching request')
ws.send(json.dumps({
self.ws.send(json.dumps({
'type': 'startWatching',
'data': {
'stream': {
'quality': live_quality,
'protocol': 'hls+fmp4',
'quality': 'abr',
'protocol': 'hls',
'latency': live_latency,
'chasePlay': False
},
Expand All @@ -94,47 +104,165 @@ def communicate_ws(reconnect):
'reconnect': True,
}
}))
else:
ws = ws_extractor
with ws:
with self.ws:
while True:
recv = ws.recv()
recv = self.ws.recv()
if not recv:
continue
data = json.loads(recv)
if not data or not isinstance(data, dict):
continue
if data.get('type') == 'ping':
# pong back
ws.send(r'{"type":"pong"}')
ws.send(r'{"type":"keepSeat"}')
self.ws.send(r'{"type":"pong"}')
self.ws.send(r'{"type":"keepSeat"}')
elif data.get('type') == 'stream':
self.m3u8_url = data['data']['uri']
self.m3u8_lock.set()
elif data.get('type') == 'disconnect':
self.write_debug(data)
return True
return
elif data.get('type') == 'error':
self.write_debug(data)
message = try_get(data, lambda x: x['body']['code'], str) or recv
return DownloadError(message)
raise DownloadError(message)
elif self.ydl.params.get('verbose', False):
if len(recv) > 100:
recv = recv[:100] + '...'
self.to_screen('[debug] Server said: %s' % recv)

stopped = threading.Event()

def ws_main():
reconnect = False
while True:
while not stopped.is_set():
try:
ret = communicate_ws(reconnect)
if ret is True:
return
except BaseException as e:
self.to_screen('[%s] %s: Connection error occured, reconnecting after 10 seconds: %s' % ('niconico:live', video_id, str_or_none(e)))
time.sleep(10)
continue
finally:
communicate_ws(reconnect)
break # Disconnected
except BaseException as e: # Including TransportError
if stopped.is_set():
break

self.m3u8_lock.clear() # m3u8 url may be changed

self.to_screen('[%s] %s: Connection error occured, reconnecting after %d seconds: %s' % ('niconico:live', video_id, self._WEBSOCKET_RECONNECT_DELAY, str_or_none(e)))
time.sleep(self._WEBSOCKET_RECONNECT_DELAY)

reconnect = True

self.m3u8_lock.set() # Release possible locks

thread = threading.Thread(target=ws_main, daemon=True)
thread.start()

return dl.download(filename, new_info_dict)
try:
yield self
finally:
stopped.set()
self.ws.close()
thread.join()

def _master_m3u8_url(self):
""" Get the refreshed manifest url after WebSocket reconnection to prevent HTTP 403 """

self.m3u8_lock.wait()
return self.m3u8_url


class NiconicoLiveFD(NiconicoLiveBaseFD):
""" Downloads niconico live without being stopped """

def real_download(self, filename, info_dict):
with self._ws_context(info_dict):
new_info_dict = info_dict.copy()
new_info_dict.update({
'protocol': 'm3u8',
})

return FFmpegFD(self.ydl, self.params or {}).download(filename, new_info_dict)


class NiconicoLiveTimeshiftFD(NiconicoLiveBaseFD, HlsFD):
""" Downloads niconico live timeshift VOD """

_PER_FRAGMENT_DOWNLOAD_RATIO = 0.1

def real_download(self, filename, info_dict):
with self._ws_context(info_dict) as ws_context:
Copy link
Member

@pukkandan pukkandan Mar 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to avoid adding more "protocols". Can't we keep the single protocol and do something like:

Suggested change
class NiconicoLiveFD(NiconicoLiveBaseFD):
""" Downloads niconico live without being stopped """
def real_download(self, filename, info_dict):
with self._ws_context(info_dict):
new_info_dict = info_dict.copy()
new_info_dict.update({
'protocol': 'm3u8',
})
return FFmpegFD(self.ydl, self.params or {}).download(filename, new_info_dict)
class NiconicoLiveTimeshiftFD(NiconicoLiveBaseFD, HlsFD):
""" Downloads niconico live timeshift VOD """
_PER_FRAGMENT_DOWNLOAD_RATIO = 0.1
def real_download(self, filename, info_dict):
with self._ws_context(info_dict) as ws_context:
class NiconicoLiveFD(NiconicoLiveBaseFD):
"""Downloads niconico live/timeshift VOD"""
_PER_FRAGMENT_DOWNLOAD_RATIO = 0.1
def real_download(self, filename, info_dict):
with self._ws_context(info_dict) as ws_context:
if info_dict.get('is_live'):
info_dict = info_dict.copy()
info_dict['protocol'] = 'm3u8'
return FFmpegFD(self.ydl, self.params or {}).download(filename, info_dict)

from ..extractor.niconico import NiconicoIE
ie = NiconicoIE(self.ydl)

video_id = info_dict['id']

# Get format index
for format_index, fmt in enumerate(info_dict['formats']):
if fmt['format_id'] == info_dict['format_id']:
break

# Get video info
total_duration = 0
fragment_duration = 0
for line in ie._download_webpage(info_dict['url'], video_id, note='Downloading m3u8').splitlines():
if '#STREAM-DURATION' in line:
total_duration = int(float(line.split(':')[1]))
if '#EXT-X-TARGETDURATION' in line:
fragment_duration = int(line.split(':')[1])
if not all({total_duration, fragment_duration}):
raise DownloadError('Unable to get required video info')

ctx = {
'filename': filename,
'total_frags': math.ceil(total_duration / fragment_duration),
}

self._prepare_and_start_frag_download(ctx, info_dict)

downloaded_duration = ctx['fragment_index'] * fragment_duration
while True:
if downloaded_duration > total_duration:
break

retry_manager = RetryManager(self.params.get('fragment_retries'), self.report_retry)
for retry in retry_manager:
try:
# Refresh master m3u8 (if possible) and get the url of the previously-chose format
master_m3u8_url = ws_context._master_m3u8_url()
formats = ie._extract_m3u8_formats(
master_m3u8_url, video_id, query={"start": downloaded_duration}, live=False, note=False, fatal=False)
media_m3u8_url = traverse_obj(formats, (format_index, {dict}, 'url'), get_all=False)
if not media_m3u8_url:
raise DownloadError('Unable to get playlist')

# Get all fragments
media_m3u8 = ie._download_webpage(media_m3u8_url, video_id, note=False)
fragment_urls = traverse_obj(media_m3u8.splitlines(), (
lambda _, v: not v.startswith('#'), {lambda url: urljoin(media_m3u8_url, url)}))

with self.DurationLimiter(len(fragment_urls) * fragment_duration * self._PER_FRAGMENT_DOWNLOAD_RATIO):
for fragment_url in fragment_urls:
success = self._download_fragment(ctx, fragment_url, info_dict)
if not success:
return False
self._append_fragment(ctx, self._read_fragment(ctx))
downloaded_duration += fragment_duration
pukkandan marked this conversation as resolved.
Show resolved Hide resolved

except (DownloadError, RequestError) as err: # Including HTTPError and TransportError
retry.error = err
continue

if retry_manager.error:
return False

return self._finish_frag_download(ctx, info_dict)

class DurationLimiter():
def __init__(self, target):
self.target = target

def __enter__(self):
self.start = time.time()

def __exit__(self, *exc):
remaining = self.target - (time.time() - self.start)
if remaining > 0:
time.sleep(remaining)
Comment on lines +247 to +257
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo, this is cleaner inline than as a context manager. But just personal preference. I wont force you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I tried to inline the logic but got a bit more complicated code lines with additional comments, so gave up.