In [None]:
! pip install datasets[audio] yt-dlp

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
import subprocess
from collections import defaultdict

# ============ 只需改这里 ============
BASE_DIR = '/content/drive/MyDrive/yt2'     # 统一下载目录
# ====================================

# 目录结构
VIDEO_DIR = os.path.join(BASE_DIR, 'video')
AUDIO_DIR = os.path.join(BASE_DIR, 'audio')
LOG_DIR   = os.path.join(BASE_DIR, 'logs')
TXT_FILE  = os.path.join(BASE_DIR, 'url.txt')

# Cookies（可选）
COOKIES_TXT = os.path.join(BASE_DIR, 'cookies.txt')  # 若存在自动使用
COOKIES_FROM_BROWSER = None  # 例如: 'chrome'/'edge'/'firefox'（Colab上通常用 cookies.txt）

# 确保目录存在
for d in (BASE_DIR, VIDEO_DIR, AUDIO_DIR, LOG_DIR):
    os.makedirs(d, exist_ok=True)

# 日志文件
ERRORS_FILE = os.path.join(LOG_DIR, 'errors.txt')
DOWNLOAD_LOG = os.path.join(LOG_DIR, 'ytdownload_log.txt')


# ================= 通用工具 =================

def build_cookies_flag() -> str:
    """构造 yt-dlp 的 cookies 参数"""
    if COOKIES_TXT and os.path.exists(COOKIES_TXT):
        return f' --cookies "{COOKIES_TXT}" '
    if COOKIES_FROM_BROWSER:
        return f' --cookies-from-browser {COOKIES_FROM_BROWSER} '
    return ' '

def extract_youtube_id(url: str) -> str:
    """从 URL 提取视频ID（兼容 shorts / watch?v=）"""
    url = url.strip()
    if not url:
        return ''
    if 'shorts/' in url:
        return url.split('shorts/')[-1].split('?')[0].strip()
    if 'watch?v=' in url:
        return url.split('watch?v=')[-1].split('&')[0].strip()
    return url.rstrip('/').split('/')[-1].split('?')[0].strip()

def read_urls(txt_path: str) -> list:
    """读取 url.txt，去空行、去重（保序）"""
    if not os.path.exists(txt_path):
        raise FileNotFoundError(f'找不到链接文件：{txt_path}')
    with open(txt_path, 'r', encoding='utf-8') as f:
        raw = [line.strip() for line in f if line.strip()]
    seen, clean = set(), []
    for u in raw:
        if u not in seen:
            clean.append(u)
            seen.add(u)
    return clean

def ensure_log_files():
    # 确保日志文件存在
    if not os.path.exists(ERRORS_FILE):
        with open(ERRORS_FILE, 'w', encoding='utf-8') as _:
            pass
    if not os.path.exists(DOWNLOAD_LOG):
        with open(DOWNLOAD_LOG, 'a', encoding='utf-8') as _:
            pass


# ================= 下载相关 =================

def download_batch(mode: str):
    """
    批量下载
    mode: 'video' 或 'audio'
    """
    ensure_log_files()
    urls = read_urls(TXT_FILE)
    cookies_flag = build_cookies_flag()

    if ' --cookies ' not in cookies_flag and ' --cookies-from-browser ' not in cookies_flag:
        print("⚠️ 未使用 cookies，受限视频可能失败。可将 cookies.txt 放到 BASE_DIR。")

    with open(ERRORS_FILE, 'a', encoding='utf-8') as wf_err, open(DOWNLOAD_LOG, 'a', encoding='utf-8') as wf_log:
        for url in urls:
            yt_id = extract_youtube_id(url)
            if not yt_id:
                print(f'跳过无法解析ID的链接：{url}')
                wf_err.write(f'无法解析ID\t{url}\n')
                continue

            filename = f'youtube_{yt_id}'
            if mode == 'video':
                out_path = os.path.join(VIDEO_DIR, f'{filename}.mp4')
                if os.path.exists(out_path):
                    print(f'已存在（视频），跳过：{out_path}')
                    wf_log.write(f'{filename}\t{yt_id}\t视频\t已跳过\n')
                    wf_log.flush()
                    continue
                # 最佳视频+音频 -> mp4；否则回退 best
                cmd = f'yt-dlp{cookies_flag}-f "bv*+ba/b" --merge-output-format mp4 -o "{out_path}" "{url}"'
            else:
                out_path = os.path.join(AUDIO_DIR, f'{filename}.m4a')
                if os.path.exists(out_path):
                    print(f'已存在（音频），跳过：{out_path}')
                    wf_log.write(f'{filename}\t{yt_id}\t音频\t已跳过\n')
                    wf_log.flush()
                    continue
                # 提取音频为 m4a
                cmd = f'yt-dlp{cookies_flag}-x --audio-format m4a -o "{out_path}" "{url}"'

            print(f'开始下载：{url} -> {out_path}')
            ret = os.system(cmd)
            if ret == 0 and os.path.exists(out_path):
                print(f'✅ 成功：{out_path}')
                wf_log.write(f'{filename}\t{yt_id}\t{("视频" if mode=="video" else "音频")}\t下载成功\n')
            else:
                print(f'❌ 失败：{url}（退出码 {ret}）')
                wf_log.write(f'{filename}\t{yt_id}\t{("视频" if mode=="video" else "音频")}\t下载失败(退出码 {ret})\n')
                wf_err.write(f'{url}\t{out_path}\n')
            wf_log.flush()


# ================= 统计相关 =================

def format_size(size: int) -> str:
    units = ['B', 'KB', 'MB', 'GB', 'TB']
    s = float(size)
    for u in units:
        if s < 1024:
            return f'{s:.2f} {u}'
        s /= 1024
    return f'{s:.2f} PB'

def get_folder_stats(folder: str):
    file_types_count = defaultdict(int)
    total_size = 0
    for dirpath, _, filenames in os.walk(folder):
        for filename in filenames:
            ext = os.path.splitext(filename)[1].lower()
            fpath = os.path.join(dirpath, filename)
            try:
                fsize = os.path.getsize(fpath)
            except OSError:
                fsize = 0
            file_types_count[ext] += 1
            total_size += fsize
    return file_types_count, total_size

def print_stats_for(label: str, folder: str):
    counts, total = get_folder_stats(folder)
    print(f"\n📂 {label} — 路径: {folder}")
    if not counts:
        print("（空目录）")
        return total
    print("文件类型统计：")
    for ext, cnt in sorted(counts.items(), key=lambda x: (-x[1], x[0])):
        print(f"  {ext or '(无扩展名)'}: {cnt}")
    print(f"总大小：{format_size(total)}")
    return total

def stats_all():
    print("==== 下载目录统计 ====")
    total_video = print_stats_for('视频目录', VIDEO_DIR)
    total_audio = print_stats_for('音频目录', AUDIO_DIR)
    print("\n==== 合计 ====")
    print(f"视频 + 音频 总大小：{format_size(total_video + total_audio)}")


# ================= 压缩相关 =================

def zip_folder(src_folder: str, out_zip: str, overwrite=True):
    if overwrite and os.path.exists(out_zip):
        try:
            os.remove(out_zip)
        except OSError:
            pass
    print(f"开始压缩：{src_folder} -> {out_zip}")
    try:
        result = subprocess.run(
            ["zip", "-r", out_zip, src_folder],
            check=True, text=True, capture_output=True
        )
        print("✅ 压缩完成")
    except subprocess.CalledProcessError as e:
        print("❌ 压缩失败")
        print(e.stderr)

def compress_menu():
    print("\n选择要压缩的目标：")
    print("  1) 仅压缩 视频目录")
    print("  2) 仅压缩 音频目录")
    print("  3) 压缩 整个 BASE_DIR（包含video/audio/logs等）")
    sub = input("请输入 1/2/3：").strip()
    if sub == '1':
        out_zip = os.path.join(BASE_DIR, 'video.zip')
        zip_folder(VIDEO_DIR, out_zip, overwrite=True)
    elif sub == '2':
        out_zip = os.path.join(BASE_DIR, 'audio.zip')
        zip_folder(AUDIO_DIR, out_zip, overwrite=True)
    else:
        out_zip = os.path.join(BASE_DIR, 'downloads_all.zip')
        zip_folder(BASE_DIR, out_zip, overwrite=True)


# ================= 主菜单 =================

def main():
    print("\n====== 批量下载 / 统计 / 压缩 一体化 ======")
    print(f"工作目录：{BASE_DIR}")
    print("确保链接清单：url.txt 位于该目录，每行一个链接。\n")
    print("1) 下载 视频（mp4）")
    print("2) 下载 音频（m4a）")
    print("3) 统计 下载目录（video/audio/合计）")
    print("4) 压缩 下载文件夹（可选 video/audio/全部）")
    choice = input("请输入选项 1/2/3/4：").strip()

    if choice == '1':
        download_batch('video')
    elif choice == '2':
        download_batch('audio')
    elif choice == '3':
        stats_all()
    elif choice == '4':
        compress_menu()
    else:
        print("未识别的选项。")

if __name__ == '__main__':
    main()