<a href="https://colab.research.google.com/github/nanfenggushi/colab-all-in-one-downloader/blob/main/colab_all_in_one_downloader.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 我的专属全能云端下载器 (V2 专业增强版)

**功能:**
- **HTTP/HTTPS 直链下载**: 使用多线程 `aria2c`，已优化参数以达到最大速度。
- **BitTorrent 磁力链接下载**: 使用 `libtorrent`，采用激进的连接和缓存策略，**已升级为异步后台下载**。
- **YouTube 及流媒体视频下载**: 使用 `yt-dlp`，增加并发分片下载以提速。

**V2版新特性:**
- **BT异步下载**: 提交BT任务后可立即返回，同时下载多个BT任务。
- **任务管理器**: 新增独立的任务管理界面，可查看和终止后台HTTP/YouTube下载任务。
- **增强稳定性**: BT元数据获取增加超时机制，防止死链卡死；所有下载任务均保留日志以供排查。

**使用流程:**
1.  **从上到下，依次运行所有代码单元格**。
2.  在【步骤2】完成后，您需要根据提示完成 Google Drive 的授权。
3.  在【步骤3】的图形化界面中，选择下载类型，粘贴链接，确认路径，点击下载。
4.  在【步骤4】的任务管理器中，可以监控和管理后台任务。

In [None]:
# ===================================================================
# 步骤 1: 安装所有必需的工具 (最终修正版 v2)
# ===================================================================
print("🚀 开始安装所有必需的下载工具...")

# 1. 使用 apt-get 在系统层面安装 aria2c 命令行工具
print("正在安装 aria2c 命令行工具...")
!apt-get -qq update
!apt-get -qq install -y aria2

# 2. 使用 pip 安装 libtorrent 和其他Python库
print("正在安装 libtorrent, yt-dlp 等Python库...")
!pip -q install libtorrent yt-dlp

# 3. 安装后自检，确保关键工具都已就位
print("\n--- 安装后自检 ---")
!which aria2c
try:
    import libtorrent
    print(f"libtorrent-ok, version: {libtorrent.version}")
except ImportError as e:
    print(f"❌ libtorrent 导入失败: {e}")
!which yt-dlp
print("--------------------")

print("\n✅ 所有工具安装完毕！")

In [None]:
# ===================================================================
# 步骤 2: 连接Google Drive并准备核心功能 (V2 专业增强版)
# ===================================================================
print("📚 正在导入库并准备核心功能 (V2 专业增强版)...")

# --- 核心库导入 ---
import os
import time
import threading
import libtorrent as lt
from google.colab import drive
import ipywidgets as widgets
from IPython.display import display, clear_output

# --- 全局变量，用于管理BT任务 ---
bt_sessions = {}  # {info_hash: {'handle': handle, 'session': ses}}
METADATA_TIMEOUT = 120.0 # BT元数据获取超时时间(秒)

# --- 连接 Google Drive ---
print("🔗 正在请求连接到您的 Google Drive...")
try:
    drive.mount('/content/drive', force_remount=True)
    print("✅ Google Drive 连接成功！")
except Exception as e:
    print(f"❌ Google Drive 连接失败: {e}")

# --- 定义HTTP下载函数 ---
def start_http_download(links, save_path):
    os.makedirs(save_path, exist_ok=True)
    print(f"⬇️ 开始HTTP下载 (极限速度模式)，保存到: {save_path}")
    link_list = [link.strip() for link in links.strip().split('\n') if link.strip()]
    if not link_list:
        print("⚠️ 链接列表为空。")
        return
    aria2_options = "-c -x 16 -s 16 -k 10M --max-concurrent-downloads=20 --summary-interval=0 --allow-overwrite=true"
    command = f'nohup aria2c {aria2_options} -d "{save_path}"'
    for link in link_list:
        command += f' "{link}"'
    # 日志保存到文件
    command += ' > /content/aria2_log.txt 2>&1 &'
    get_ipython().system(command)
    print(f"\n🎉 {len(link_list)} 个HTTP下载任务已提交到后台。日志文件: /content/aria2_log.txt")
    print("ℹ️ 您可以继续添加其他任务或关闭页面。")

# --- 定义BT下载核心线程函数 (后台运行) ---
def _bt_download_thread(info_hash_str, output_widget):
    if info_hash_str not in bt_sessions:
        return

    handle = bt_sessions[info_hash_str]['handle']
    session = bt_sessions[info_hash_str]['session']
    name = handle.status().name

    while not handle.status().is_seeding:
        time.sleep(2) # 后台线程不需要频繁更新

    with output_widget:
        print(f"✅ BT任务 '{name}' 下载完成！")

    # 任务完成，清理会话
    session.remove_torrent(handle)
    del bt_sessions[info_hash_str]

# --- 定义启动BT下载的函数 (异步) ---
def start_bt_download(magnet_link, save_path, output_widget):
    os.makedirs(save_path, exist_ok=True)
    if not magnet_link.startswith("magnet:?"):
        print("⚠️ 无效的磁力链接。")
        return

    settings = {'user_agent': 'qBittorrent/4.4.2', 'listen_interfaces': '0.0.0.0:6881', 'connections_limit': 1000, 'cache_size': 2048}
    ses = lt.session(settings)
    params = {'save_path': save_path}

    try:
        handle = lt.add_magnet_uri(ses, magnet_link, params)
    except Exception as e:
        print(f"❌ 添加磁力链接失败: {e}")
        return

    print("正在获取种子元数据 (超时时间: 120秒)...")
    start_time = time.time()
    while not handle.has_metadata():
        if time.time() - start_time > METADATA_TIMEOUT:
            print("❌ 获取元数据超时。这可能是一个无效或死链接。")
            ses.remove_torrent(handle) # 清理无效句柄
            return
        time.sleep(1)

    info_hash_str = str(handle.info_hash())
    if info_hash_str in bt_sessions:
        print("⚠️ 该BT任务已在下载队列中。")
        return

    print(f"🧲 元数据获取成功！种子名称: {handle.status().name}")
    bt_sessions[info_hash_str] = {'handle': handle, 'session': ses}

    # 启动后台下载线程
    thread = threading.Thread(target=_bt_download_thread, args=(info_hash_str, output_widget))
    thread.start()
    print(f"\n🎉 BT任务 '{handle.status().name}' 已提交到后台处理。")
    print("ℹ️ 您可以使用下方的按钮检查所有BT任务的状态。")

# --- 定义检查BT任务状态的函数 ---
def check_bt_status(output_widget):
    with output_widget:
        clear_output(wait=True)
        if not bt_sessions:
            print("当前没有正在进行的BT下载任务。")
            return

        print(f"--- {len(bt_sessions)}个BT任务状态 ---")
        for info_hash, data in list(bt_sessions.items()):
            s = data['handle'].status()
            state_str = ['排队中', '检查中', '下载元数据', '下载中', '已完成', '做种中', '分配空间'][s.state]
            print(f"\n▶️ {s.name}\n"
                  f"  状态: {state_str} | 进度: {s.progress * 100:.2f}%\n"
                  f"  速度: ↓ {s.download_rate / 1000 / 1000:.2f} MB/s | ↑ {s.upload_rate / 1000 / 1000:.2f} MB/s\n"
                  f"  Peers: {s.num_peers} (总计: {s.list_peers}) | 已下载: {s.total_done / 1000 / 1000:.2f} MB")
        print("----------------------")

# --- 定义YouTube下载函数 (带日志记录) ---
def start_youtube_download(url, save_path):
    os.makedirs(save_path, exist_ok=True)
    print(f"📹 开始YouTube/视频下载 (极限速度模式)，保存到: {save_path}")
    if not url.strip():
        print("⚠️ 请提供有效的视频链接。")
        return

    yt_dlp_options = "-N 8 --concurrent-fragments 10"
    # 日志保存到文件
    command = f'nohup yt-dlp {yt_dlp_options} -P "{save_path}" -f "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best" --merge-output-format mp4 "{url}" > /content/yt_dlp_log.txt 2>&1 &'
    get_ipython().system(command)
    print("\n🎉 视频下载任务已提交到后台。日志文件: /content/yt_dlp_log.txt")
    print("ℹ️ 您可以继续添加其他任务或关闭页面。")

print("✅ 核心功能已准备就绪！")

In [None]:
# ===================================================================
# 步骤 3: 启动下载器图形界面 (GUI)
# ===================================================================

# --- 默认保存路径 ---
default_save_path = '/content/drive/MyDrive/Downloads'

# --- 创建HTTP下载界面组件 ---
http_links_input = widgets.Textarea(placeholder='在此处输入一个或多个HTTP/HTTPS直链，每行一个。', layout={'height': '100px', 'width': '98%'})
http_save_path_input = widgets.Text(value=default_save_path, description='保存路径:', layout={'width': '98%'})
http_button = widgets.Button(description="开始HTTP下载", button_style='success', icon='download')
http_output = widgets.Output()
def on_http_button_clicked(b):
    with http_output:
        clear_output(wait=True)
        start_http_download(http_links_input.value, http_save_path_input.value)
http_button.on_click(on_http_button_clicked)
http_tab = widgets.VBox([widgets.Label(value="将HTTP/S直链文件下载到您的Google Drive。"), http_save_path_input, http_links_input, http_button, http_output])

# --- 创建BT下载界面组件 (V2异步版) ---
bt_magnet_input = widgets.Text(placeholder='在此处输入一个磁力链接。', layout={'width': '98%'})
bt_save_path_input = widgets.Text(value=default_save_path, description='保存路径:', layout={'width': '98%'})
bt_button = widgets.Button(description="开始BT下载", button_style='info', icon='magnet')
bt_status_button = widgets.Button(description="检查BT任务状态", icon='info')
bt_output = widgets.Output()
def on_bt_button_clicked(b):
    with bt_output:
        clear_output(wait=True)
        start_bt_download(bt_magnet_input.value, bt_save_path_input.value, bt_output)
def on_bt_status_clicked(b):
    check_bt_status(bt_output)
bt_button.on_click(on_bt_button_clicked)
bt_status_button.on_click(on_bt_status_clicked)
bt_buttons = widgets.HBox([bt_button, bt_status_button])
bt_tab = widgets.VBox([widgets.Label(value="通过磁力链接下载文件到您的Google Drive (异步)。"), bt_save_path_input, bt_magnet_input, bt_buttons, bt_output])

# --- 创建YouTube下载界面组件 ---
yt_url_input = widgets.Text(placeholder='在此处输入一个YouTube或其他支持的视频链接。', layout={'width': '98%'})
yt_save_path_input = widgets.Text(value=default_save_path, description='保存路径:', layout={'width': '98%'})
yt_button = widgets.Button(description="开始视频下载", button_style='danger', icon='youtube')
yt_output = widgets.Output()
def on_yt_button_clicked(b):
    with yt_output:
        clear_output(wait=True)
        start_youtube_download(yt_url_input.value, yt_save_path_input.value)
yt_button.on_click(on_yt_button_clicked)
yt_tab = widgets.VBox([widgets.Label(value="从YouTube等网站下载视频到您的Google Drive。"), yt_save_path_input, yt_url_input, yt_button, yt_output])

# --- 创建并显示选项卡界面 ---
tab = widgets.Tab()
tab.children = [http_tab, bt_tab, yt_tab]
tab.set_title(0, '🔗 HTTP / HTTPS')
tab.set_title(1, '🧲 BitTorrent (异步)')
tab.set_title(2, '📹 YouTube & More')

print("✅ 下载器界面已准备好，请在下方操作。")
display(tab)

In [None]:
# ===================================================================
# 步骤 4: 后台任务管理器 (HTTP & YouTube)
# ===================================================================

task_manager_output = widgets.Output(layout={'border': '1px solid black', 'padding': '5px'})

def refresh_tasks(b):
    with task_manager_output:
        clear_output(wait=True)
        print("正在刷新任务列表...")
        # 使用 grep '[a]ria2c' 技巧避免 grep 进程本身被列出
        !ps -ef | grep '[a]ria2c\|[y]t-dlp'
        print("\n提示: PID是第二列的数字。复制它到下面的输入框以终止任务。")

refresh_button = widgets.Button(description="刷新任务列表", icon='refresh')
refresh_button.on_click(refresh_tasks)

pid_input = widgets.Text(placeholder='在此处输入要终止任务的PID')
kill_button = widgets.Button(description="终止任务", button_style='danger', icon='stop')

def kill_task(b):
    pid = pid_input.value
    with task_manager_output:
        if not pid.isdigit():
            print(f"\n❌ 无效的PID: '{pid}'。请输入一个数字。")
            return
        print(f"\n正在尝试终止PID为 {pid} 的任务...")
        !kill -9 {pid}
        print(f"✅ 已发送终止信号到PID {pid}。请点击刷新按钮确认任务是否已停止。")
        pid_input.value = '' # 清空输入框

kill_button.on_click(kill_task)

kill_box = widgets.HBox([pid_input, kill_button])
manager_ui = widgets.VBox([refresh_button, kill_box, task_manager_output])

print("✅ 任务管理器已准备好。用它来监控和管理由HTTP和YouTube标签页启动的后台任务。")
display(manager_ui)