In [1]:
import os
import csv
import time
import socket
import logging
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from selenium import webdriver
from selenium.webdriver.edge.service import Service
from webdriver_manager.microsoft import EdgeChromiumDriverManager
import yt_dlp
from googletrans import Translator
from datetime import datetime
import signal
import sys
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed


In [2]:

# 设置日志
logging.basicConfig(filename='download.log', level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')

# 初始化翻译器
translator = Translator()

# 检查网络连接
def check_network():
    try:
        socket.create_connection(("8.8.8.8", 53), timeout=5)
        return True
    except OSError:
        return False

# 获取 YouTube 视频链接
def get_playlist_videos(url):
    ydl_opts = {'quiet': True, 'extract_flat': True, 'playlist_items': '1-1000'}
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        result = ydl.extract_info(url, download=False)
        if 'entries' in result:
            return [(entry['url'], entry['title']) for entry in result['entries']]
        return [(url, None)]

# 获取 YouTube cookies
def get_youtube_cookies(output_dir):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    options = webdriver.EdgeOptions()
    options.add_argument('--headless')
    driver = webdriver.Edge(service=Service(EdgeChromiumDriverManager().install()), options=options)
    driver.get("https://www.youtube.com")
    time.sleep(5)
    cookies = driver.get_cookies()
    driver.quit()
    cookies_file = os.path.join(output_dir, "cookies.txt")
    with open(cookies_file, "w") as f:
        for cookie in cookies:
            f.write(f"{cookie['name']}={cookie['value']}\n")
    return cookies_file

# 检查视频是否已存在
def video_exists(video_title, output_dir):
    sanitized_title = ''.join(c for c in video_title if c.isalnum() or c in ' -_')
    for file in os.listdir(output_dir):
        if sanitized_title in file:
            return True
    return False

# 下载视频并获取信息
def download_video_and_get_info(video_url, video_title, index, cookies_file, output_dir, progress_var, total_videos):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    if not os.path.exists(cookies_file):
        logging.warning(f"未找到 {cookies_file}，尝试重新生成...")
        cookies_file = get_youtube_cookies(output_dir)
    
    filename_template = os.path.join(output_dir, f"{index:03d}|%(title)s.%(ext)s")
    ydl_opts = {
        'format': 'bestvideo+bestaudio',
        'merge_output_format': 'mp4',
        'outtmpl': filename_template,
        'cookies': cookies_file,
        'quiet': True,
        'retries': 10,
        'fragment_retries': 10,
        'postprocessors': [{'key': 'FFmpegVideoConvertor', 'preferedformat': 'mp4'}],
        'postprocessor_args': ['-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k'],
    }
    
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        for attempt in range(3):
            if not check_network():
                logging.warning(f"网络不可用，等待 10 秒后重试 (尝试 {attempt + 1}/3)...")
                time.sleep(10)
                continue
            try:
                logging.info(f"开始下载视频: {video_url} (尝试 {attempt + 1}/3)")
                info = ydl.extract_info(video_url, download=False)
                original_title = video_title if video_title else info.get('title', 'N/A')
                if video_exists(original_title, output_dir):
                    logging.info(f"视频 '{original_title}' 已存在，跳过下载")
                    return None
                
                ydl.download([video_url])
                
                for trans_attempt in range(3):
                    try:
                        chinese_title = translator.translate(original_title, dest='zh-cn').text
                        break
                    except Exception as e:
                        if trans_attempt < 2:
                            logging.warning(f"翻译失败: {e}, 重试 {trans_attempt + 1}/3...")
                            time.sleep(2)
                        else:
                            logging.error(f"翻译失败: {e}, 使用原标题")
                            chinese_title = original_title
                
                language = info.get('language', 'en')
                if language == 'N/A' or not language:
                    language = 'en'
                resolution = info.get('resolution', 'N/A') or f"{info.get('height', 'N/A')}p"
                format_type = 'mp4'
                upload_date = info.get('upload_date', 'N/A')
                if upload_date != 'N/A':
                    upload_date = datetime.strptime(upload_date, '%Y%m%d').strftime('%Y-%m-%d')
                uploader = info.get('uploader', 'N/A')
                
                return {
                    '原标题': original_title,
                    '中文标题': chinese_title,
                    '语言': language,
                    '分辨率': resolution,
                    '格式': format_type,
                    '发布日期': upload_date,
                    '来源': uploader
                }
            except Exception as e:
                logging.error(f"下载 {video_url} 失败: {str(e)}")
                if attempt < 2:
                    time.sleep(5)
                else:
                    return None
    # Update progress
    progress_var.set(progress_var.get() + (100 / total_videos))

# 追加写入 CSV
def append_to_csv(video_info, csv_output):
    output_dir = os.path.dirname(csv_output)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    file_exists = os.path.exists(csv_output)
    with open(csv_output, 'a', newline='', encoding='utf-8-sig') as csvfile:
        fieldnames = ['原标题', '中文标题', '语言', '分辨率', '格式', '发布日期', '来源']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        if not file_exists:
            writer.writeheader()
        writer.writerow(video_info)
    logging.info(f"已更新 CSV 文件: {csv_output}")

# GUI 和多线程下载
class YouTubeDownloaderApp:
    def __init__(self, root):
        self.root = root
        self.root.title("YouTube Video Downloader")
        
        # URL 输入
        tk.Label(root, text="视频链接:").grid(row=0, column=0, padx=5, pady=5)
        self.url_entry = tk.Entry(root, width=50)
        self.url_entry.grid(row=0, column=1, padx=5, pady=5)
        
        # 视频输出目录
        tk.Label(root, text="视频输出目录:").grid(row=1, column=0, padx=5, pady=5)
        self.video_dir_entry = tk.Entry(root, width=50)
        self.video_dir_entry.grid(row=1, column=1, padx=5, pady=5)
        tk.Button(root, text="浏览", command=self.browse_video_dir).grid(row=1, column=2, padx=5, pady=5)
        
        # CSV 输出路径
        tk.Label(root, text="CSV 输出路径:").grid(row=2, column=0, padx=5, pady=5)
        self.csv_entry = tk.Entry(root, width=50)
        self.csv_entry.grid(row=2, column=1, padx=5, pady=5)
        tk.Button(root, text="浏览", command=self.browse_csv).grid(row=2, column=2, padx=5, pady=5)
        
        # 下载按钮
        self.download_button = tk.Button(root, text="开始下载", command=self.start_download)
        self.download_button.grid(row=3, column=1, pady=10)
        
        # 进度条
        self.progress = tk.DoubleVar()
        self.progress_bar = ttk.Progressbar(root, variable=self.progress, maximum=100)
        self.progress_bar.grid(row=4, column=0, columnspan=3, padx=5, pady=5, sticky="ew")

    def browse_video_dir(self):
        dir_path = filedialog.askdirectory()
        if dir_path:
            self.video_dir_entry.delete(0, tk.END)
            self.video_dir_entry.insert(0, dir_path)

    def browse_csv(self):
        file_path = filedialog.asksaveasfilename(defaultextension=".csv", filetypes=[("CSV files", "*.csv")])
        if file_path:
            self.csv_entry.delete(0, tk.END)
            self.csv_entry.insert(0, file_path)

    def start_download(self):
        playlist_url = self.url_entry.get()
        video_output_dir = self.video_dir_entry.get()
        csv_output = self.csv_entry.get()
        
        if not all([playlist_url, video_output_dir, csv_output]):
            messagebox.showerror("错误", "请填写所有字段！")
            return
        
        if not check_network():
            messagebox.showerror("错误", "网络不可用，请检查连接！")
            return
        
        self.download_button.config(state="disabled")
        self.progress.set(0)
        logging.info("开始下载任务")
        
        cookies_file = get_youtube_cookies(video_output_dir)
        video_urls = get_playlist_videos(playlist_url)
        total_videos = len(video_urls)
        
        with ThreadPoolExecutor(max_workers=3) as executor:  # Adjust max_workers as needed
            futures = [
                executor.submit(download_video_and_get_info, url, title, i+1, cookies_file, video_output_dir, self.progress, total_videos)
                for i, (url, title) in enumerate(video_urls)
            ]
            for future in as_completed(futures):
                video_info = future.result()
                if video_info:
                    append_to_csv(video_info, csv_output)
        
        self.download_button.config(state="normal")
        messagebox.showinfo("完成", "下载完成！")
        logging.info("下载任务完成")



In [5]:

if __name__ == "__main__":
    root = tk.Tk()
    app = YouTubeDownloaderApp(root)
    root.mainloop()