In [None]:
import requests
import json
import re
from tqdm import tqdm
from urllib.parse import unquote # decode percent-encoded (e.g. '%26' -> '&')
import subprocess
import os

In [None]:
if 'qtapp' not in globals():
    from PyQt6.QtCore import QCoreApplication
    import PyQt6.QtQml as qml
    import sys
    qtapp = QCoreApplication(sys.argv)
    jsengine = qml.QJSEngine()

In [None]:
content_url = 'https://www.youtube.com/watch?v=Opp9nqiN5m0'
# content_url = 'https://www.youtube.com/watch?v=8xg3vE8Ie_E'
# content_url = 'https://www.youtube.com/watch?v=HCtu_YaFcJc'
save_dir = r'./'
save_filename = None # filename without extension. if set to None here, the title of content is used

In [None]:
http_ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36'
http_referer = 'https://www.youtube.com/'

session = requests.Session()
session.headers.update({
    'User-Agent': http_ua,
    'Referer':    http_referer
})

def legalized_filename(filename):
    quote_pattern = r'"([^"]*)"'
    quote_replace = lambda m: f"“{m.groups()[0]}”"
    filename = re.sub(quote_pattern, quote_replace, filename)
    return filename.replace('\n', '') \
            .replace('/', '／').replace('\\', '＼').replace('?', '？') \
            .replace('|', '｜').replace(':', '：') \
            .replace('"', '“').replace('*', '＊')

In [None]:
r = session.get(content_url)
content_html = r.text

m = re.search(r'var ytInitialPlayerResponse\s*=\s*{', content_html)
info_start_pos = m.span()[1] - 1
info_last_pos = content_html.find('};', info_start_pos)
info = json.loads(content_html[info_start_pos:info_last_pos+1])

if save_filename is None:
    content_title = eval(f"'''{info['videoDetails']['title']}'''")
    save_filename = legalized_filename(content_title)
    print(save_filename)

In [None]:
streams = info['streamingData']['adaptiveFormats']
video_streams = [st for st in streams if 'video' in st['mimeType']]
audio_streams = [st for st in streams if 'audio' in st['mimeType']]
assert(len(video_streams) + len(audio_streams) == len(streams))

def video_cmp_key(v):
    resolution = v['width'] * v['height']
    return resolution, v['fps'], 'vp9' in v['mimeType']

def audio_cmp_key(a):
    quality_map = {'AUDIO_QUALITY_LOW': 1, 'AUDIO_QUALITY_MEDIUM': 4}
    return a['audioSampleRate'], quality_map[a['audioQuality']], 'webm' in a['mimeType'], a['bitrate']

def video_desc(v):
    return f"{v['qualityLabel']} {v['width']}x{v['height']}@{v['fps']}fps  {v['mimeType']}"

def audio_desc(a):
    return f"{a['audioSampleRate']}Hz {a['bitrate']/1e3:.1f}Kbps  {a['mimeType']}"

video = max(video_streams, key=video_cmp_key)
audio = max(audio_streams, key=audio_cmp_key)
# for v in video_streams:
#     print('video:', video_desc(v))
# for a in audio_streams:
#     print('audio:', audio_desc(a))
print('select video:', video_desc(video))
print('select audio:', audio_desc(audio))

In [None]:
m = re.search(r'"([^"]+/base\.js)"', content_html)
r = session.get('https://www.youtube.com' + m.group(1))
base_js = r.text

def find_end_brace(s, pos):
    pending_brace_cnt = 1
    quote = None
    escape = False
    while pos + 1 < len(s):
        pos += 1
        ch = s[pos]
        if escape:
            escape = False
            continue
        elif ch == '\\':
            escape = True
        elif ch in ['"', "'"]:
            if quote is None:
                quote = ch
            elif ch == quote:
                quote = None
        elif quote is not None:
            continue
        if ch == '{':
            pending_brace_cnt += 1
        elif ch == '}':
            pending_brace_cnt -= 1
            if pending_brace_cnt == 0:
                return pos

def find_begin_brace(s, pos):
    pending_brace_cnt = 1
    quote = None
    while pos + 1 < len(s):
        pos -= 1
        ch = s[pos]
        if ch in ['"', "'"]:
            if quote is None:
                quote = ch
            elif ch == quote:
                is_escaped = (pos > 0 and s[pos-1] == '\\')
                if not is_escaped:
                    quote = None
        elif quote is not None:
            continue
        if ch == '{':
            pending_brace_cnt -= 1
            if pending_brace_cnt == 0:
                return pos
        elif ch == '}':
            pending_brace_cnt += 1


def init_restore_param_sig():
    m = re.search(r'{var c\s*=\s*a\[0\];a\[0\]\s*=\s*a\[b%a\.length\];a\[b%a\.length\]\s*=\s*c', base_js)
    cls_begin, cls_end = m.span()
    cls_begin = find_begin_brace(base_js, cls_begin)
    cls_end = find_end_brace(base_js, cls_end) + 1
    m = re.findall(r'(\w+):function[^{]+{([^}]+)}', base_js[cls_begin:cls_end])
    meta_ops = {}
    for fn, s in m:
        if 'reverse' in s:
            meta_ops[fn] = 'reverse'
        elif 'splice(0' in s:
            meta_ops[fn] = 'popfront'
        elif '.length]' in s:
            meta_ops[fn] = 'swap'
    pos = cls_begin
    while not base_js[pos].isalnum():
        pos -= 1
    wrapper_var_end = pos + 1
    while base_js[pos].isalnum():
        pos -= 1
    wrapper_var = base_js[pos+1:wrapper_var_end]

    m = re.search(r'function\(a\){a\s*=\s*a\.split\(""\);([^}]+)}', base_js)
    ops = []
    for s in m.group(1).split(';'):
        for fn, op in meta_ops.items():
            if f'{wrapper_var}.{fn}' in s:
                arg = re.search(r'\(a,\s*([0-9]+)\)', s).group(1)
                ops.append((op, int(arg)))
                break

    def restore_param_sig(sig):
        a = list(unquote(sig))
        for op, arg in ops:
            if op == 'reverse':
                a.reverse()
            elif op == 'swap':
                a[0], a[arg] = a[arg], a[0]
            elif op == 'popfront':
                a = a[arg:]
        return ''.join(a)
    return restore_param_sig


def init_restore_param_n():
    m = re.search(r'function\(a\){var b\s*=\s*a.split\(""\),\s*c', base_js)
    func_begin, func_end = m.span()
    func_end = find_end_brace(base_js, func_end) + 1
    restore_n_func = base_js[func_begin:func_end]
    restore_n_func = jsengine.evaluate('(' + restore_n_func + ')')

    def restore_param_n(n: str):
        return restore_n_func.call([n]).toString()

    return restore_param_n

yt_restore_param_sig = init_restore_param_sig()
yt_restore_param_n = init_restore_param_n()

In [None]:
def params_str_to_dict(p: str):
    items = p.split('&')
    items = [item.split('=', maxsplit=1) for item in items]
    return {k : v for k, v in items}

def download_stream(stream, path):
    if 'url' in stream:
        url = stream['url']
        url = unquote(url.replace(r'\u0026', '&'))
        url, params_str = url.split('?')
        params = params_str_to_dict(unquote(params_str))
    else:
        assert('signatureCipher' in stream)
        sig_params = params_str_to_dict(stream['signatureCipher'].replace(r'\u0026', '&'))
        url = unquote(sig_params.pop('url'))
        url, params_str = url.split('?')
        params = params_str_to_dict(unquote(params_str))
        params[sig_params['sp']] = yt_restore_param_sig(sig_params['s'])

    params['n'] = yt_restore_param_n(params['n'])

    params['range'] = "0-32767"
    params['alr'] = 'yes'
    # params['ump'] = '1'
    r = session.post(url, params=params, data=b'x\x00')
    content = r.content
    if len(content) == 0:
        if 'signatureCipher' in stream and '=' in params['sig'] and not params['sig'].endswith('='):
            print("param sig is wrong")
        assert(0)
    if content[:4] == b'http':
        url = content.decode()
        url, params_str = url.split('?')
        params = params_str_to_dict(unquote(params_str))

    BLOCK_SIZE = 4 << 20  # 4 MiB
    total_size = int(stream['contentLength'])

    with open(path, 'wb') as f:
        start = 0
        with tqdm(total=total_size, unit='B', unit_scale=True) as pbar:
            while start < total_size:
                end = min(start + BLOCK_SIZE, total_size)
                params['range'] = f"{start}-{end-1}"
                r = session.post(url, params=params, data=b'x\x00', stream=True)
                for chunk in r.iter_content(chunk_size=4096):
                    f.write(chunk)
                    pbar.update(len(chunk))
                start = end

In [None]:
video_path = os.path.join(save_dir, save_filename + '-video.webm')
audio_path = os.path.join(save_dir, save_filename + '-audio.webm')
# download_stream(video, video_path)
download_stream(audio, audio_path)

In [None]:
ret = subprocess.run([
    'ffmpeg',
    '-i', video_path,
    '-i', audio_path,
    '-c', 'copy',
    '-y',
    os.path.join(save_dir, save_filename + '.webm')
])

assert(ret.returncode == 0)
os.remove(video_path)
os.remove(audio_path)