Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 931 lines (804 sloc) 35.5 KB
#!/usr/bin/env python3
import itertools as it, operator as op, functools as ft
import os, sys, argparse, logging, secrets, re, json, shutil
import contextlib, inspect, tempfile, pathlib, collections, enum
import asyncio, asyncio.subprocess, socket, signal, heapq, struct
import aiohttp
class TVFConfig:
slice_start = 0
slice_len = None
slice_scatter_len = None
slice_scatter_interval = None
aria2c_opts = None # overrides opts in aria2_conf_func
ytdl_opts = None
ytdl_output_format = None
aiohttp_opts = dict(read_timeout=30, conn_timeout=60)
verbose = False
use_temp_dirs = True
keep_tempfiles = False
file_part_suffix = False
file_append_batch = 10
file_append_max = 100 # should not be needed normally
compat_windows = sys.platform == 'win32'
aria2_cmd = 'aria2c'
aria2_conf_func = lambda self, opts: '\n'.join( [
'allow-overwrite=true', # always-resume=true can force-reuse these
f'user-agent={}' ]
+ (self.aria2c_opts or list()) + [''] )
aria2_term_wait = 10, 20 # (clean, term), clean should be >3s
aria2_startup_checks = 8, 10.0
aria2_queue_batch = 50
aria2_ws_heartbeat = 20.0
aria2_ws_debug = False # very noisy
aria2_ws_debug_signals = False
class LogMessage(object):
def __init__(self, fmt, a, k): self.fmt, self.a, self.k = fmt, a, k
def __str__(self): return self.fmt.format(*self.a, **self.k) if self.a or self.k else self.fmt
class LogStyleAdapter(logging.LoggerAdapter):
def __init__(self, logger, extra=None):
super(LogStyleAdapter, self).__init__(logger, extra or {})
def log(self, level, msg, *args, **kws):
if not self.isEnabledFor(level): return
log_kws = {} if 'exc_info' not in kws else dict(exc_info=kws.pop('exc_info'))
msg, kws = self.process(msg, kws)
self.logger._log(level, LogMessage(msg, args, kws), (), log_kws)
get_logger = lambda name: LogStyleAdapter(logging.getLogger(name))
class AsyncExitStack:
# Might be merged to 3.7, see
# Implementation from
def __init__(self):
self._exit_callbacks = collections.deque()
def pop_all(self):
new_stack = type(self)()
new_stack._exit_callbacks = self._exit_callbacks
self._exit_callbacks = collections.deque()
return new_stack
def push(self, exit_obj):
_cb_type = type(exit_obj)
exit_method = getattr(_cb_type, '__aexit__', None)
if exit_method is None: exit_method = _cb_type.__exit__
except AttributeError: self._exit_callbacks.append(exit_obj)
else: self._push_cm_exit(exit_obj, exit_method)
return exit_obj
def _create_exit_wrapper(cm, cm_exit):
if inspect.iscoroutinefunction(cm_exit):
async def _exit_wrapper(exc_type, exc, tb):
return await cm_exit(cm, exc_type, exc, tb)
def _exit_wrapper(exc_type, exc, tb):
return cm_exit(cm, exc_type, exc, tb)
return _exit_wrapper
def _push_cm_exit(self, cm, cm_exit):
_exit_wrapper = self._create_exit_wrapper(cm, cm_exit)
_exit_wrapper.__self__ = cm
def _create_cb_wrapper(callback, *args, **kwds):
if inspect.iscoroutinefunction(callback):
async def _exit_wrapper(exc_type, exc, tb): await callback(*args, **kwds)
def _exit_wrapper(exc_type, exc, tb): callback(*args, **kwds)
return _exit_wrapper
def callback(self, callback, *args, **kwds):
_exit_wrapper = self._create_cb_wrapper(callback, *args, **kwds)
_exit_wrapper.__wrapped__ = callback
return callback
def _shutdown_loop(self, *exc_details):
received_exc = exc_details[0] is not None
frame_exc = sys.exc_info()[1]
def _fix_exception_context(new_exc, old_exc):
while True:
exc_context = new_exc.__context__
if exc_context is old_exc: return
if exc_context is None or exc_context is frame_exc: break
new_exc = exc_context
new_exc.__context__ = old_exc
suppressed_exc = pending_raise = False
while self._exit_callbacks:
cb = self._exit_callbacks.pop()
cb_result = yield cb(*exc_details)
if cb_result:
suppressed_exc, pending_raise = True, False
exc_details = (None, None, None)
new_exc_details = sys.exc_info()
_fix_exception_context(new_exc_details[1], exc_details[1])
pending_raise, exc_details = True, new_exc_details
if pending_raise:
fixed_ctx = exc_details[1].__context__
raise exc_details[1]
except BaseException:
exc_details[1].__context__ = fixed_ctx
return received_exc and suppressed_exc
async def enter(self, cm):
_cm_type = type(cm)
_exit = getattr(_cm_type, '__aexit__', None)
if _exit is not None: result = await _cm_type.__aenter__(cm)
_exit = _cm_type.__exit__
result = _cm_type.__enter__(cm)
self._push_cm_exit(cm, _exit)
return result
async def close(self):
await self.__aexit__(None, None, None)
async def __aenter__(self): return self
async def __aexit__(self, *exc_details):
gen = self._shutdown_loop(*exc_details)
result = next(gen)
while True:
if inspect.isawaitable(result): result = await result
result = gen.send(result)
except StopIteration: raise
except BaseException as e: result = gen.throw(e)
except StopIteration as e: return e.value
def add_stack_wrappers(cls):
def _make_wrapper(func):
async def _wrapper(self, *args, **kws):
async with AsyncExitStack() as ctx:
return await func(self, ctx, *args, **kws)
return ft.wraps(func)(_wrapper)
for name, func in inspect.getmembers(cls, inspect.isroutine):
if name.startswith('__'): continue
sig = inspect.signature(func)
if ( len(sig.parameters) <= 1 or
list(sig.parameters.values())[1].annotation is not AsyncExitStack ): continue
setattr(cls, name, _make_wrapper(func))
return cls
it_adjacent = lambda seq, n, fill=None: it.zip_longest(fillvalue=fill, *([iter(seq)] * n))
it_adjacent_nofill = lambda seq, n:\
( tuple(filter(lambda v: v is not it_adjacent, chunk))
for chunk in it_adjacent(seq, n, fill=it_adjacent) )
class adict(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__dict__ = self
def log_lines(log_func, lines, log_func_last=False):
if isinstance(lines, str):
lines = list(line.rstrip() for line in lines.rstrip().split('\n'))
uid = secrets.token_urlsafe(3)
for n, line in enumerate(lines, 1):
if isinstance(line, str): line = '[{}] {}', uid, line
else: line = ['[{}] {}'.format(uid, line[0])] + list(line[1:])
if log_func_last and n == len(lines): log_func_last(*line)
else: log_func(*line)
def retries_within_timeout( tries, timeout,
backoff_func=lambda e,n: ((e**n-1)/e), slack=1e-2 ):
'Return list of delays to make exactly n tires within timeout, with backoff_func.'
a, b = 0, timeout
while True:
m = (a + b) / 2
delays = list(backoff_func(m, n) for n in range(tries))
error = sum(delays) - timeout
if abs(error) < slack: return delays
elif error > 0: b = m
else: a = m
def parse_pos_spec(pos):
if not pos: return
try: mins, secs = pos.rsplit(':', 1)
except ValueError: hrs, mins, secs = 0, 0, pos
try: hrs, mins = mins.rsplit(':', 1)
except ValueError: hrs = 0
return sum( a*b for a, b in
zip([3600, 60, 1], map(float, [hrs, mins, secs])) )
except ValueError as err:
raise argparse.ArgumentTypeError(
f'Failed to parse {pos!r} as [[hours:]minutes:]seconds value: {err}' )
async def maybe_coro(res):
if asyncio.iscoroutine(res): res = await res
return res
class TVFError(Exception): pass
class TVFWSClosed(TVFError): pass
class TVFReqError(TVFError): pass
async def vod_fetch(loop, conf, vod_queue, list_formats_only=False):
exit_code, log = 1, get_logger('tvf.fetcher')
task = asyncio.Task.current_task(loop)
def sig_handler(sig, code):'Exiting on {} signal with code: {}', sig, code)
nonlocal exit_code
exit_code = code
if not conf.compat_windows:
for sig, code in ('INT', 1), ('TERM', 0):
loop.add_signal_handler(getattr(signal, f'SIG{sig}'), ft.partial(sig_handler, sig, code))
async with TVF(loop, conf, log) as fetcher:
fetch = fetcher.get if not list_formats_only else fetcher.ytdl_probe_formats
for n, (url, prefix) in enumerate(vod_queue, 1):
info_suffix = None if len(vod_queue) == 1 else f' [{n} / {len(vod_queue)}]'
try: await fetch(url, prefix, info_suffix=info_suffix)
except asyncio.CancelledError as err: break
except TVFError as err:
if err.args: log.exception('BUG: {}', err)
log.error('Failed to fetch VoD #{} {!r} (url={!r}), exiting', n, prefix, url)
else: exit_code = 0
return exit_code
class TVFFileCache:
update_lock = True
def __init__(self, prefix, ext, text=True):
self.b, self.path = '' if text else 'b', f'{prefix}.{ext}'
def __enter__(self):
assert self.update_lock
self.update_lock = False
return self
def __exit__(self, *err):
self.update_lock = True
def cached(self):
if not os.path.exists(self.path): return None
with open(self.path, 'r' + self.b) as src: return
def update(self, data):
assert not self.update_lock
with open(self.path, 'w' + self.b) as dst: dst.write(data)
return data
class TVFAria2Proc:
def __init__(self, loop, conf, http, cmd_conf, key, port, path_func):
self.loop, self.conf, self.proc, self.http = loop, conf, None, http = self.ws_ctx = self.ws_poll_task = self.ws_handlers = self.chunks = None
self.ws_closed = asyncio.Event()
self.cmd_conf, self.ws_key, self.ws_url, self.path_func = (
cmd_conf, f'token:{key}', f'ws://localhost:{port}/jsonrpc', path_func )
self.log = get_logger('tvf.aria2')
async def __aenter__(self):
cmd = [self.conf.aria2_cmd, '--conf-path', self.cmd_conf]
self.log.debug('Starting aria2c daemon: {}', ' '.join(cmd))
self.proc = await asyncio.create_subprocess_exec(*cmd)
self.ws_ctx = AsyncExitStack()
return self
async def __aexit__(self, *err):
proc_term_delay = 0
if self.ws_ctx:
if not self.ws_closed.is_set():
await self.req('shutdown', sync=False)
proc_term_delay = self.conf.aria2_term_wait[0]
await self.ws_ctx.close()
if self.ws_handlers:
for func in self.ws_handlers.values(): await maybe_coro(func(StopIteration))
if self.ws_poll_task: await self.ws_poll_task = self.ws_ctx = self.ws_poll_task = self.ws_handlers = None
if self.proc:
if proc_term_delay > 0:
await asyncio.wait_for(self.proc.wait(), proc_term_delay)
self.log.debug( 'Sending SIGTERM to aria2c pid'
' (timeout={:.2f}s): {}', self.conf.aria2_term_wait[1], )
try: await asyncio.wait_for(self.proc.wait(), timeout=self.conf.aria2_term_wait[1])
except asyncio.TimeoutError:
self.log.debug('Sending SIGKILL to aria2c pid: {}',
except OSError: pass
exit_code = await self.proc.wait()
if exit_code: self.log.warning('aria2c has exited with error code: {}', exit_code)
self.proc = None
async def connect(self):
ts, delays = self.loop.time(), retries_within_timeout(*self.conf.aria2_startup_checks)
ts_next, ts_max, delay_iter = ts, ts + sum(delays), iter(delays)
while True:
ts = self.loop.time()
if ts >= ts_max: break
timeout = ts_max - ts
self.log.debug('Connecting to aria2c ws url (timeout={:.2f}s): {}', timeout, self.ws_url)
try: = await self.ws_ctx.enter(self.http.ws_connect(
self.ws_url, heartbeat=self.conf.aria2_ws_heartbeat, timeout=timeout ))
except aiohttp.ClientError as err: self.log.debug('aria2c conn attempt error: {}', err)
self.log.debug('Connected to aria2c json-rpc ws')
ts = self.loop.time()
if ts_next > ts:
await asyncio.sleep(ts_next - ts)
ts = self.loop.time()
while ts_next < ts:
try: ts_next += next(delay_iter)
except StopIteration: break
if not
raise TVFError( 'aria2c connection failed'
' (max_tries={}, timeout={})'.format(*self.conf.aria2_startup_checks) )
jrpc_uid_ns = secrets.token_urlsafe(3)
self.ws_jrpc_uid_iter = (f'{jrpc_uid_ns}.{n}' for n in range(1, 2**30))
self.ws_handlers, self.ws_poll_task = dict(), self.loop.create_task(self._ws_poller())
def ws_close_wrap(self):
'Raises TVFWSClosed in current task on ws_closed event.'
async def _ev_wait():
nonlocal triggered
await self.ws_closed.wait()
triggered = True
triggered, task = False, asyncio.Task.current_task(self.loop)
ev_wait_task = self.loop.create_task(_ev_wait())
try: yield
except asyncio.CancelledError:
if not triggered: raise
raise TVFWSClosed() from None
finally: ev_wait_task.cancel()
async def _ws_poller(self):
async for msg in
if msg.type == aiohttp.WSMsgType.TEXT:
if self.conf.aria2_ws_debug: self.log.debug('rpc-msg: {}',
msg_data, hs_discard = json.loads(, set()
if self.conf.aria2_ws_debug_signals and msg_data.get('method'):
self.log.debug('rpc-signal: {}',
for k, func in self.ws_handlers.items():
oneshot = await maybe_coro(func(msg_data))
if oneshot: hs_discard.add(k)
for k in hs_discard: del self.ws_handlers[k]
elif msg.type == aiohttp.WSMsgType.closed: break
elif msg.type == aiohttp.WSMsgType.error:
self.log.error('ws protocol error, aborting: {}', msg)
else: self.log.warning('Unhandled ws msg type {}, ignoring: {}', msg.type, msg)
except Exception as err:
self.log.exception('Unhandled ws handler failure, aborting: {}', err)
await self._ws_close()
async def _ws_close(self):
def _add_handler(self, func): self.ws_handlers[id(func)] = func
async def _expect_handler(self, uid_or_func, fut_or_cb, oneshot, msg):
if msg is StopIteration or self.ws_closed.is_set():
if not callable(fut_or_cb): fut_or_cb.cancel()
if callable(uid_or_func) and not uid_or_func(msg): return
uid = msg.get('id') or msg.get('method')
if not uid: return # seem to be non-critical/random, bug in aria2?
# # Some request got ignored and there's not idea to tell which one.
# # No way to handle this error sanely, likely a bug in json or ws protocol.
# # How it looks: dict( id=None, jsonrpc='2.0',
# # error=dict(code=-32600, message='Invalid Request.') )
# self.log.error( 'Fatal protocol error from aria2'
# ' - probably a bug in this scipt, exiting: {!r}', msg )
# return await self._ws_close()
if uid.startswith('aria2.'): uid = uid[6:]
if uid != uid_or_func: return
if callable(fut_or_cb): fut_or_cb(msg)
else: fut_or_cb.set_result(msg)
return oneshot
def expect(self, uid_or_func, fut_or_cb=None, oneshot=False):
if not fut_or_cb: fut_or_cb = asyncio.Future()
self._expect_handler, uid_or_func, fut_or_cb, oneshot ))
return fut_or_cb
async def req(self, method, *params, sync=True):
if method != 'system.multicall':
method, params = f'aria2.{method}', [self.ws_key] + list(params)
res = req_uid = next(self.ws_jrpc_uid_iter)
data_req = dict(
jsonrpc='2.0', id=req_uid,
method=method, params=params )
if self.conf.aria2_ws_debug:
self.log.debug( 'rpc-req{} [{}]: {}',
'-sync' if sync else '', req_uid, json.dumps(data_req) )
if sync:
with self.ws_close_wrap(): res = await self.expect(req_uid)
if self.conf.aria2_ws_debug:
self.log.debug('rpc-res-sync [{}]: {}', req_uid, json.dumps(res))
if 'result' not in res:
raise TVFReqError(f'Request failed (method={method}, params={params}): {res}')
res = res['result']
return res
async def req_ok(self, method, *params):
res = await self.req(method, *params)
if res != 'OK': raise TVFError(f'aria2c command failed: {method}{params}')
def _queue_params(self, gid, url, pos=None):
return (
[[url], dict(gid=gid, out=str(self.path_func(gid)))]
+ ([] if pos is None else [pos]) )
async def queue(self, gid, url, pos=None):
res = await self.req('addUri', *self._queue_params(gid, url, pos))
if res != gid:
self.log.error('addUri error (gid={}): {}', gid, res)
raise TVFError(f'Failed to queue chunk URL to aria2c')
async def queue_batch(self, *gid_urls, pos=None):
if pos is not None: pos = iter(range(pos, 2**30))
res = await self.req('system.multicall', list(
dict(methodName='aria2.addUri', params=(
[self.ws_key] + self._queue_params(gid, url, pos and next(pos)) ))
for gid, url in gid_urls ))
res_chk = list([gid] for gid, url in gid_urls)
if res != res_chk:
log_lines(self.log.error, [
'Result gid match check failed for submitted urls.',
(' expected: {}', ', '.join(str(r[0]) for r in res_chk)),
(' returned: {}', ', '.join(( str(r[0])
if isinstance(r, list) else repr(r) ) for r in res)) ])
raise TVFError(f'Failed to queue {len(gid_urls)} chunk URLs to aria2c')
async def queue_chunks(self, chunks):
self.chunks = chunks
count, gid_last = 0, None
with self.ws_close_wrap():
for gid_url_batch in it_adjacent_nofill(chunks, self.conf.aria2_queue_batch):
await self.queue_batch(*gid_url_batch)
count, gid_last = count + len(gid_url_batch), gid_url_batch[-1][0]
return count, gid_last
class TVFChunkIter:
def __init__(self, conf, url_base, pls_text, gids_done=None):
self.conf = conf
self.gid_urls = self.parse_pls(url_base, pls_text)
self.gids_done = set(gids_done or list())
def __iter__(self):
for gid, url in self.gid_urls.items():
if gid in self.gids_done: continue
yield gid, url
def scatter_check_coro(self, *scatter_opts):
(a, b), res = scatter_opts, True
while True:
td = yield res
if a > 0: a -= td
if a <= 0: res = False
b -= td
if b <= 0: (a, b), res = scatter_opts, True
def parse_pls(self, url_base, pls_text):
slice_start, slice_len = self.conf.slice_start, self.conf.slice_len
scatter_check = self.scatter_check_coro(
self.conf.slice_scatter_len, self.conf.slice_scatter_interval )\
if self.conf.slice_scatter_len is not None else None
if scatter_check: next(scatter_check)
# aria2c requires 16-char gid, format used here fits number in
# first 6 chars because it looks nice in (tuncated) aria2c output
gid_iter = iter(map('{:06d}0000000000'.format, range(1, 999999)))
chunks = collections.OrderedDict()
for line in pls_text.splitlines():
m ='^#EXTINF:([\d.]+),', line)
if m:
td = float(
slice_start -= td
if slice_start > 0: continue
if not line or line.startswith('#'): continue
if slice_len and slice_len + slice_start < 0: break
if scatter_check and not scatter_check.send(td): continue
chunks[next(gid_iter)] = f'{url_base}/{line}'
return chunks
def mark_needed(self, gid, check=True):
if check and gid not in self.gids_done:
raise TVFError(f'BUG: gid was not marked as done: {gid}')
def mark_done(self, gid):
def finished(self):
return len(self.gid_urls) == len(self.gids_done)
class TVF:
def __init__(self, loop, conf, log):
self.loop, self.conf = loop, conf
self.log = log or get_logget('tvf.fetcher')
async def __aenter__(self): return self
async def __aexit__(self, *err): pass
async def ytdl_run(self, ytdl_op, *args, check=True, out=False, **popen_kws):
cmd = ['youtube-dl']
if self.conf.verbose: cmd.append('--verbose')
cmd = cmd + [ytdl_op] + (self.conf.ytdl_opts or list()) + list(args)
self.log.debug('Running "youtube-dl {}" command: {}', ytdl_op, ' '.join(cmd))
if out: popen_kws.setdefault('stdout', asyncio.subprocess.PIPE)
proc = await asyncio.create_subprocess_exec(*cmd, **popen_kws)
if out: proc_stdout = (await
exit_code = await proc.wait()
if check and exit_code != 0:
raise TVFError(f'"youtube-dl {ytdl_op}" command exited with error (code={exit_code})')
return exit_code if not out else proc_stdout.decode()
async def ytdl_probe_formats(self, url, file_prefix, info_suffix=None): '--- Listing formats available'
' for VoD {} (url={}){}', file_prefix, url, info_suffix or '' )
return await self.ytdl_run('--list-formats', url, check=False)
async def get(self, ctx: AsyncExitStack, url, file_prefix, info_suffix=None):
file_prefix = pathlib.Path(file_prefix)
file_dst_path = file_prefix.with_suffix('.mp4')
file_dst_done = file_dst_path
if self.conf.use_temp_dirs:
name =
file_prefix_dir = file_prefix.with_suffix('.tmp')
file_prefix = file_prefix_dir / name
vod_cache = ft.partial(TVFFileCache, file_prefix)
if self.conf.ytdl_output_format:
ytdl_fn_vc = await ctx.enter(vod_cache('ytdl_filename'))
ytdl_filename = ytdl_fn_vc.cached
if not ytdl_filename:
ytdl_output_opts = list() \
if self.conf.ytdl_output_format is True \
else ['--output', self.conf.ytdl_output_format]
ytdl_filename = await self.ytdl_run('--get-filename', *ytdl_output_opts, url, out=True)
'Using youtube-dl output filename (template={}): {}',
'[default]' if self.conf.ytdl_output_format
is True else self.conf.ytdl_output_format, ytdl_filename )
file_dst_done = file_dst_path.parent / ytdl_filename
elif self.conf.file_part_suffix:
file_dst_done, file_dst_path = file_dst_path, file_dst_path.with_suffix('.part.mp4')
# Assuming file_dst is fully downloaded if there's no playlist tempfile around
# It's not a problem even with --keep-tempfiles, as file_dst_mark will be kept there too
if file_dst_done.exists() and (
file_dst_path != file_dst_done or not file_prefix.with_suffix('.m3u8').exists() ): '--- Skipping download'
' for existing file: {} (rename/remove it to force)', file_dst_done )
if self.conf.use_temp_dirs: file_prefix_dir.mkdir(exist_ok=True)
if self.conf.ytdl_output_format and not ytdl_fn_vc.cached: ytdl_fn_vc.update(ytdl_filename)
http = await ctx.enter(aiohttp.ClientSession(**self.conf.aiohttp_opts)) '--- Downloading VoD {} ({}url={}){}',
file_dst_path, f'final-name={str(file_dst_done)!r}, '
if file_dst_done != file_dst_path else '', url, info_suffix or '' )
with vod_cache('m3u8.url') as vc:
url_pls = vc.cached or vc.update(await self.ytdl_run('--get-url', url, out=True))
assert ' ' not in url_pls, url_pls # can return URLs of multiple flvs for *really* old VoDs
url_base = url_pls.rsplit('/', 1)[0]
with vod_cache('') as vc:
ua = vc.cached or vc.update(await self.ytdl_run('--dump-user-agent', out=True))
with vod_cache('m3u8') as vc:
pls = vc.cached
if not pls:
self.log.debug('Fetching playlist from URL: {}', url_pls)
async with http.get(url_pls, headers={'User-Agent': ua}) as res:
pls = vc.update(await res.text())
# Config is always updated between aria2c runs, to account for any cli opts changes
with vod_cache('aria2c_conf') as vc:
key = secrets.token_urlsafe(18)
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(('localhost', 0))
addr, port = s.getsockname()
key=key, port=port, ua=ua,
log_level='notice' if self.conf.verbose else 'warn' )))
aria2c_conf_path = vc.path
# file_dst_mark stores (pos, seq) tuples, appending each update to the end
# It is assumed here that such tiny writes will be atomic
file_dst_pos = file_dst_seq = 0 # position in dst file, sequential gid int
file_dst_mark_fmt = '>QI'
file_dst_mark = await ctx.enter(file_prefix.with_suffix('.pos').open('a+b'))
if file_dst_mark.tell() != 0:, os.SEEK_END)
file_dst_pos, file_dst_seq = struct.unpack(file_dst_mark_fmt,
if not file_dst_path.exists() and file_dst_pos > 0:
raise TVFError( f'Missing partial-dst-file'
f' (expecting bytes={file_dst_pos:,} gid-seq={file_dst_seq}): {file_dst_path}' )
file_dst ='a+b')
if file_dst.tell() < file_dst_pos:
raise TVFError( f'Missing chunk of partial-dst-file'
f' (expecting size>={file_dst_pos:,} actual-size={file_dst.tell():,}): {file_dst_path}' )
# seq - gid converted to a sequential int, to sort appended chunks by
seq_heap, seq_chunks = list(), list()
seq_chunk = collections.namedtuple('seq_chunk', 'seq path')
gid_to_seq = lambda gid: int(gid[:6])
seq_to_gid = lambda seq: f'{seq:06d}0000000000'
gid_pretty = lambda gid: gid[:6]
chunks = TVFChunkIter(self.conf, url_base, pls)
for gid, url in chunks:
if gid_to_seq(gid) > file_dst_seq: break
async with TVFAria2Proc( self.loop,
self.conf, http, aria2c_conf_path, key, port,
lambda gid: file_prefix.with_suffix(f'.{gid}.mp4.chunk') ) as aria2c:
await aria2c.connect()
# All chunks from iter(chunks) get queued for download here
info = await aria2c.req('getVersion')
self.log.debug( 'Starting downloads'
' (rpc-port={}, aria2-version={})...', port, info['version'] )
count, gid_last = await aria2c.queue_chunks(chunks)
if gid_last: '\n\n ------ Started {} downloads,'
' last gid: {} ------ \n', count, gid_pretty(gid_last) )
else:'No extra downloads were necessary')
### Main download-control loop ahead
# Download control strategy:
# - listen for aria2.onDownloadComplete event for each gid
# - append sequential-gid chunk to partial file, update mark-file, remove chunk
# - non-sequential gids - just remember and check/append next time
# - if >file_append_max non-sequential-gid chunks downloaded - abort
# - for all error/paused/stopped events: prepend to download queue
# - stop on chunks.finished (all chunks marked as "done")
ev_t = enum.Enum('ev_type', 'complete stop pause error')
evq, ev_tuple = asyncio.Queue(), collections.namedtuple('ev', 't data')
for t in ev_t:
aria2c.expect( f'onDownload{}',
lambda e,t=t: evq.put_nowait(ev_tuple(t, e)) )
while not chunks.finished:
with aria2c.ws_close_wrap():
ev = await evq.get()
for e in['params']:
if ev.t == ev_t.complete: # completed chunk goes to seq_heap
files = list(map(op.itemgetter('path'), await aria2c.req('getFiles', e['gid'])))
if len(files) != 1:
raise TVFError( 'Completed aria2 download'
f' should produce single chunk-file, instead has: {files}' )
heapq.heappush( seq_heap,
seq_chunk(gid_to_seq(e['gid']), pathlib.Path(files[0])) )
await aria2c.req('removeDownloadResult', e['gid'], sync=False)
else: # stop/pause/error - retry immediately
# Limit on forced retries (on top of what aria2 does) for same gid can be added here
if self.log.isEnabledFor(logging.DEBUG):
status = await aria2c.req( 'tellStatus', e['gid'],
['status', 'errorCode', 'errorMessage', 'completedLength', 'totalLength'] )
self.log.debug( 'Chunk {} download'
' stopped or failed, retrying: {}', e['gid'], status )
await aria2c.req_ok('removeDownloadResult', e['gid'])
await aria2c.queue(e['gid'], chunks.gid_urls[e['gid']], 0)
if chunks.finished or len(seq_heap) + len(seq_chunks) >= self.conf.file_append_batch:
while seq_heap and seq_heap[0].seq == file_dst_seq + 1:
sc = heapq.heappop(seq_heap)
file_dst_seq = sc.seq
if len(seq_chunks) >= self.conf.file_append_batch:
for p in seq_chunks:
with'rb') as chunk: shutil.copyfileobj(chunk, file_dst)
file_dst_pos = file_dst.tell()
file_dst_mark.write(struct.pack(file_dst_mark_fmt, file_dst_pos, file_dst_seq))
for p in seq_chunks: os.unlink(p)
# self.log.debug('\nExtended partial dst file by {} chunk(s)', len(seq_chunks))
if len(seq_heap) > self.conf.file_append_max:
err_msg = ( 'Chunk-append sequence is broken'
f' (pending={len(seq_heap)} max={self.conf.file_append_max})' )
log_lines(self.log.error, [ ('{}:', err_msg),
(' expecting gid: {}', seq_to_gid(file_dst_seq)),
(' first gid among pending: {}', seq_to_gid(seq_heap[0].seq)) ])
raise TVFError(err_msg)
### done!
if file_dst_done != file_dst_path:
shutil.move(file_dst_path, file_dst_done, copy_function=shutil.copy)
file_dst_path = file_dst_done
if not self.conf.keep_tempfiles:
self.log.debug('Cleanup for prefix (dir={}): {}', self.conf.use_temp_dirs, file_prefix)
if self.conf.use_temp_dirs: shutil.rmtree(file_prefix_dir)
for p in file_prefix.parent.glob(f'{}.*'):
if p != file_dst_path: os.unlink(p)'--- Downloaded VoD file: {}{}', file_dst_path, info_suffix or '')
def main(args=None, conf=None):
if not conf: conf = TVFConfig()
import argparse
parser = argparse.ArgumentParser(
usage='%(prog)s [options] { url-1 [url-2 ...] | url file_prefix [url-2 file_prefix-2 ...] }',
description='Download VoD (or a specified slice of it) from')
help='URL for a VoD to fetch.'
' Can be either followed by file_prefix argument'
' (and any number of such url/prefix pairs after that),'
' or any number of urls (detected by "http(s):" prefix)'
' to use simple sequential prefixes like "vod-001".'
' See also -o/--ytdl-output-format option wrt destination filenames for urls.')
parser.add_argument('file_prefix', nargs='?',
help='File prefix for destination file and temp dir.'
' Used even with -o/--ytdl-output-format while download is in progress.')
parser.add_argument('more_url_and_prefix_pairs', nargs='*',
help='Any number of extra urls or url/file_prefix pairs can be specified.')
group = parser.add_argument_group('Download slice options')
group.add_argument('-s', '--start-pos',
type=parse_pos_spec, metavar='[[hours:]minutes:]seconds',
help='Only download video chunks after specified start position.'
' If multiple url/prefix args are specified, this option will be applied to all of them.')
group.add_argument('-l', '--length',
type=parse_pos_spec, metavar='[[hours:]minutes:]seconds',
help='Only download specified length of the video (from specified start or beginning).'
' If multiple url/prefix args are specified, this option will be applied to all of them.')
group.add_argument('-x', '--scatter',
help='Out of whole video (or a chunk specified by --start and --length),'
' download only every N seconds (or mins/hours) out of M.'
' E.g. "1:00/10:00" spec here will download 1 first min of video out of every 10.'
' Idea here is to produce something like preview of the video to allow'
' to easily narrow down which part of it is interesting and worth downloading in full.')
group = parser.add_argument_group('youtube-dl/aria2c options')
group.add_argument('-F', '--ytdl-list-formats',
action='store_true', help='Do not download anything,'
' just list formats available for each specified URL and exit.')
group.add_argument('-y', '--ytdl-opts',
action='append', metavar='opts',
help='Extra opts for youtube-dl --get-url/--get-filename commands.'
' Will be split on spaces, unless option is used multiple times.')
group.add_argument('-o', '--ytdl-output-format',
metavar='ytdl-output-template', const=True, nargs='?',
help='Use youtube-dl output filename template.'
' Passed to youtube-dl --get-filename as the'
' --output argument to get the final file name for the video.'
' If option is used, but no format is specified, youtube-dl\'s default format will be used.')
group.add_argument('-a', '--aria2c-opts',
action='append', metavar='opts',
help='Extra options to store in aria2c configuration file.'
' These are same as long-form command-line options, but without double-dash prefix.'
' Will be split on spaces, unless option is used multiple times.'
' Example: --aria2c-opts "lowest-speed-limit=20K max-overall-download-limit=1M"')
group = parser.add_argument_group('Misc/debug options')
group.add_argument('-p', '--use-part-suffix',
action='store_true', help='Use .part suffix for'
' partial destination file until it is fully downloaded.')
group.add_argument('-n', '--no-temp-dirs',
action='store_true', help='Do not create temporary'
' directories for download parts, keep everything in the destination one.')
group.add_argument('-k', '--keep-tempfiles',
action='store_true', help='Do not remove all the'
' temporary files after successfully assembling resulting mp4.'
' Chunks in particular might be useful to download different but overlapping video slices.')
group.add_argument('--debug', action='store_true', help='Verbose operation mode.')
opts = parser.parse_args(sys.argv[1:] if args is None else args)
datefmt='%Y-%m-%d %H:%M:%S',
format='%(asctime)s :: {}%(levelname)s :: %(message)s'\
.format('%(name)s ' if opts.debug else ''),
level=logging.DEBUG if opts.debug else logging.INFO )
log = get_logger('tvf.main')
conf.slice_start, conf.slice_len = opts.start_pos or 0, opts.length
if opts.scatter:
scatter = opts.scatter.split('/', 1)
if len(scatter) != 2:
parser.error( f'Invalid value for -x/--scatter option'
' - {opts.scatter!r} - should be in "len/interval" format.' )
conf.slice_scatter_len, conf.slice_scatter_interval = map(parse_pos_spec, scatter)
conf.ytdl_opts = opts.ytdl_opts or list()
if len(conf.ytdl_opts) == 1: conf.ytdl_opts = conf.ytdl_opts[0].strip().split()
conf.aria2c_opts = opts.aria2c_opts or list()
if len(conf.aria2c_opts) == 1: conf.aria2c_opts = conf.aria2c_opts[0].strip().split()
conf.verbose = opts.debug
conf.keep_tempfiles = opts.keep_tempfiles
conf.use_temp_dirs = not opts.no_temp_dirs
conf.file_part_suffix = opts.use_part_suffix
conf.ytdl_output_format = opts.ytdl_output_format
vod_queue, url_re = list(), re.compile(r'^https?:')
args = list(filter( None,
[opts.url, opts.file_prefix] + (opts.more_url_and_prefix_pairs or list()) ))
if all(map(, args)):
vod_queue.extend((url, f'vod-{n:03d}') for n, url in enumerate(args, 1))
if len(args) % 2:
parser.error( 'Odd number of url/prefix args specified'
f' ({len(args)}), while these should always come in pairs.' )
for url, prefix in it_adjacent(args, 2):
parser.error( 'Both url/file_prefix args seem'
f' to be an URL, only first one should be: {url} {prefix}' )
prefix, url = url, prefix
log.warn( 'Looks like url/prefix args got'
' mixed-up, correcting that to prefix={} url={}', prefix, url )
vod_queue.append((url, prefix))
for url, prefix in vod_queue:
if'^https?://[^/]+/([^/]+/v|videos)/', url): continue
parser.error( 'Provided URL appears to be for unsupported'
f' VoD format (only /user/v/ or /videos/ VoDs are supported): {url}' )
if conf.compat_windows:
# Use the Proactor event loop on Windows for subprocess support
loop = asyncio.ProactorEventLoop()
log.debug('Starting vod_fetch loop...')
with contextlib.closing(asyncio.get_event_loop()) as loop:
exit_code = loop.run_until_complete(vod_fetch(
loop, conf, vod_queue, list_formats_only=opts.ytdl_list_formats ))
return exit_code
if __name__ == '__main__': sys.exit(main())