In [None]:
# @title 1. 环境初始化与恢复
import os
import sys

if not os.path.exists("installed_marker"):
    !pip install -q "torchaudio<2.9" yt-dlp stable-ts librosa
    with open("installed_marker", "w") as f:
        f.write("done")
    os.kill(os.getpid(), 9)

import torch
import torchaudio
import numpy as np
import librosa
import stable_whisper
from google.colab import drive, files
from datetime import timedelta

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Environment ready. Device: {device}")

In [None]:
# @title 2. 配置编辑
import os
import re
import subprocess

source_type = "Youtube 链接" # @param ["\u672C\u5730\u8DEF\u5F84", "Google Drive", "Youtube \u94FE\u63A5"]
source_value = "https://www.youtube.com/watch?v=WCDLyXJgbIo" # @param {type:"string"}

audio_file_path = ""
base_name = ""  # 用于后续生成 lrc 的主文件名

def sanitize_filename(name):
    """清理文件名中的非法字符"""
    cleaned = re.sub(r'[\\/*?:"<>|]', "_", name).strip()
    return cleaned

if source_type == "Youtube 链接":
    print(f"1. Fetching YouTube title for: {source_value}")

    # 尝试获取视频标题
    try:
        # 使用 subprocess 获取标题，避免直接 ! 命令的输出解析问题
        # 加上 --user-agent 和 client 伪装，防止获取标题时也被 403
        cmd = [
            "yt-dlp", "--get-title",
            "--extractor-args", "youtube:player_client=android",
            "--user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
            source_value
        ]
        result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
        raw_title = result.stdout.strip()

        if not raw_title:
            print("Warning: Could not fetch title, using default name.")
            raw_title = "youtube_audio"

        base_name = sanitize_filename(raw_title)
        print(f"   Video Title: {raw_title}")
        print(f"   Saved Filename: {base_name}.wav")

        # 指定下载文件名为清洗后的标题
        audio_file_path = f"{base_name}.wav"

        print("2. Downloading audio...")
        !yt-dlp -x --audio-format wav \
                -o "{audio_file_path}" \
                --force-overwrites \
                --extractor-args "youtube:player_client=android" \
                --user-agent "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" \
                "{source_value}"

    except Exception as e:
        print(f"Error processing YouTube: {e}")

elif source_type == "Google Drive":
    if not os.path.exists('/content/drive'):
        drive.mount('/content/drive')
    audio_file_path = source_value
    if not os.path.exists(audio_file_path):
        raise FileNotFoundError(f"File not found in Drive: {audio_file_path}")

    filename_with_ext = os.path.basename(audio_file_path)
    base_name = os.path.splitext(filename_with_ext)[0]

elif source_type == "本地路径":
    audio_file_path = source_value
    if not os.path.exists(audio_file_path):
        print(f"File not found: {audio_file_path}. Please upload it via the files tab.")
        from google.colab import files
        uploaded = files.upload()
        audio_file_path

In [None]:
!pip install -q demucs

# @title 2.5 人声分离 (可选：去除 BGM 仅保留人声)
import shutil
import subprocess

if not audio_file_path or not os.path.exists(audio_file_path):
    print("❌ 未发现有效的音频文件路径，请先执行第 2 步")
else:
    # 准备临时文件名，防止 demucs 处理带特殊字符的文件名出错
    original_ext = os.path.splitext(audio_file_path)[1]
    temp_input = "/content/temp_sep_input" + original_ext
    shutil.copy(audio_file_path, temp_input)

    print(f"正在提取人声与伴奏 (使用 mdx_extra 模型)...")
    print("注意：首次运行会下载模型（约 4GB），请耐心等待。")

    try:
        # 运行 demucs 分离人声 (vocals) 和 伴奏 (no_vocals)
        subprocess.run(
            ["python3", "-m", "demucs.separate",
              "-n", "mdx_extra",
              "--two-stems=vocals",
              "--out", "/content/separated",
              temp_input],
            check=True
        )

        # Demucs 默认输出路径结构: /content/separated/mdx_extra/temp_sep_input/vocals.wav
        base_output_path = "/content/separated/mdx_extra/temp_sep_input"
        vocal_path = os.path.join(base_output_path, "vocals.wav")
        instr_path = os.path.join(base_output_path, "no_vocals.wav")

        if os.path.exists(vocal_path):
            # 将结果重命名为与原始视频标题相关的名称
            final_vocal_name = f"/content/{base_name}_vocals.wav"
            shutil.move(vocal_path, final_vocal_name)

            # 【关键点】更新 audio_file_path，让接下来的第 3 步使用纯人声文件
            audio_file_path = final_vocal_name
            print(f"✅ 人声提取成功: {final_vocal_name}")

        if os.path.exists(instr_path):
            final_instr_name = f"/content/{base_name}_instr.wav"
            shutil.move(instr_path, final_instr_name)
            print(f"✅ 伴奏提取成功: {final_instr_name}")

    except Exception as e:
        print(f"❌ 分离过程出现异常: {e}")
    finally:
        # 清理临时文件夹
        if os.path.exists(temp_input):
            os.remove(temp_input)
        if os.path.exists("/content/separated"):
            shutil.rmtree("/content/separated")

In [None]:
# @title 3. 音频转写
import os
import json
import zipfile
import torch
import stable_whisper
from google.colab import files
from datetime import timedelta

# ---------------------------------------------------------
# 1. 辅助函数定义
# ---------------------------------------------------------

def sec_to_srt(seconds):
    """转换秒数为 SRT 格式 00:00:00,000"""
    td = timedelta(seconds=seconds)
    total_seconds = int(td.total_seconds())
    hours = total_seconds // 3600
    minutes = (total_seconds % 3600) // 60
    secs = total_seconds % 60
    millis = int(td.microseconds / 1000)
    return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"

def sec_to_ass(seconds):
    """转换秒数为 ASS 格式 0:00:00.00"""
    td = timedelta(seconds=seconds)
    total_seconds = int(td.total_seconds())
    hours = total_seconds // 3600
    minutes = (total_seconds % 3600) // 60
    secs = total_seconds % 60
    # ASS 使用厘秒 (centiseconds)
    cs = int(td.microseconds / 10000)
    return f"{hours}:{minutes:02d}:{secs:02d}.{cs:02d}"

def sec_to_lrc(seconds):
    """转换秒数为 LRC 格式 00:00.00"""
    td = timedelta(seconds=seconds)
    minutes = int(td.seconds // 60)
    secs = int(td.seconds % 60)
    millis = int(td.microseconds / 10000)
    return f"{minutes:02d}:{secs:02d}.{millis:02d}"

# ---------------------------------------------------------
# 2. 模型加载 (常驻内存)
# ---------------------------------------------------------
if 'vad_model' not in globals() or 'get_speech_timestamps' not in globals():
    print("正在加载 VAD 模型...")
    vad_model, utils = torch.hub.load(repo_or_dir='snakers4/silero-vad',
                                      model='silero_vad',
                                      force_reload=False,
                                      trust_repo=True)
    (get_speech_timestamps, save_audio, read_audio, VADIterator, collect_chunks) = utils
else:
    print("✅ VAD 模型已在内存中。")

if 'whisper_model' not in globals():
    print("正在加载 Whisper Large-v3 模型...")
    whisper_model = stable_whisper.load_model('large-v3', device=device)
else:
    print("✅ Whisper 模型已在内存中。")

# ---------------------------------------------------------
# 3. 执行识别逻辑 (收集数据)
# ---------------------------------------------------------
if not os.path.exists(audio_file_path):
    raise FileNotFoundError("找不到音频文件，请先成功运行第 2 步！")

if not base_name:
    base_name = "transcription_output"

print(f"\n正在处理音频: {audio_file_path} ...")
wav = read_audio(audio_file_path)
sr = 16000
total_duration = len(wav) / sr

print("Running VAD...")
speech_timestamps = get_speech_timestamps(
    wav,
    vad_model,
    threshold=0.5,
    min_speech_duration_ms=250,
    min_silence_duration_ms=100
)

print(f"识别到 {len(speech_timestamps)} 个语音片段，开始转写并收集数据...")

global_segments = []

for i, ts in enumerate(speech_timestamps):
    start_sample = ts['start']
    end_sample = ts['end'] # VAD 检测到的物理结束点

    global_offset_sec = start_sample / sr
    vad_absolute_end = end_sample / sr  # 计算 VAD 片段的绝对结束时间

    chunk = wav[start_sample:end_sample].numpy()

    # 转写
    result = whisper_model.transcribe(chunk, language=None)

    if not result.text.strip():
        continue

    # 处理这一句的数据结构
    if result.all_words():
        first_word_start = result.all_words()[0].start

        # 核心修改：
        # 句子的开始时间：依然基于第一个词的开始（避免吸入前面的呼吸声）
        # 句子的结束时间：强制使用 VAD 的结束时间（Vad_absolute_end），填满句尾空白
        sentence_abs_start = global_offset_sec + first_word_start
        sentence_abs_end = vad_absolute_end

        # 构建单词列表
        words_data = []
        words_list = result.all_words()

        for idx, word in enumerate(words_list):
            w_start = global_offset_sec + word.start
            w_end = global_offset_sec + word.end

            # 细节优化：如果是本句最后一个词，强制将其结束时间延长至句子结束时间
            # 这样在 ASS 卡拉OK模式下，最后一个字的变色会持续到句子完全消失
            if idx == len(words_list) - 1:
                w_end = sentence_abs_end

            words_data.append({
                "word": word.word.strip(),
                "start": w_start,
                "end": w_end
            })

        segment_data = {
            "text": result.text.strip(),
            "start": sentence_abs_start,
            "end": sentence_abs_end,
            "words": words_data
        }
        global_segments.append(segment_data)

        print(f"[{int((i+1)/len(speech_timestamps)*100)}%] {result.text.strip()}")

# ---------------------------------------------------------
# 4. 生成五种格式的文件内容
# ---------------------------------------------------------
print("\n正在生成文件内容...")

# 1. Enhanced LRC
lrc_lines = []
for seg in global_segments:
    lrc_header = f"[{sec_to_lrc(seg['start'])}]"
    word_seq = "".join([f" <{sec_to_lrc(w['start'])}>{w['word']}" for w in seg['words']])
    lrc_lines.append(f"{lrc_header}{word_seq}")
content_lrc = "\n".join(lrc_lines)

# 2. Sentence-level SRT
srt_lines = []
for idx, seg in enumerate(global_segments):
    srt_lines.append(str(idx + 1))
    srt_lines.append(f"{sec_to_srt(seg['start'])} --> {sec_to_srt(seg['end'])}")
    srt_lines.append(seg['text'])
    srt_lines.append("")
content_srt = "\n".join(srt_lines)

# 3. Word-level SRT
word_srt_lines = []
w_idx = 1
for seg in global_segments:
    for w in seg['words']:
        word_srt_lines.append(str(w_idx))
        word_srt_lines.append(f"{sec_to_srt(w['start'])} --> {sec_to_srt(w['end'])}")
        word_srt_lines.append(w['word'])
        word_srt_lines.append("")
        w_idx += 1
content_word_srt = "\n".join(word_srt_lines)

# 4. Custom JSON
json_output = {
    "metadata": {
        "file": audio_file_path,
        "duration": round(total_duration, 2),
        "language": "auto"
    },
    "segments": global_segments
}
content_json = json.dumps(json_output, indent=2, ensure_ascii=False)

# 5. Karaoke ASS
ass_header = """[Script Info]
ScriptType: v4.00+
PlayResX: 1920
PlayResY: 1080
Timer: 100.0000

[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Default,Arial,50,&H00FFFFFF,&H000000FF,&H00000000,&H00000000,0,0,0,0,100,100,0,0,1,2,0,2,10,10,10,1

[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
ass_events = []
for seg in global_segments:
    start_fmt = sec_to_ass(seg['start'])
    end_fmt = sec_to_ass(seg['end'])

    k_text = ""
    for i, w in enumerate(seg['words']):
        duration = w['end'] - w['start']
        k_val = int(duration * 100) # 转换为 ASS 的厘秒单位

        prefix = " " if i > 0 else ""
        k_text += f"{prefix}{{\\k{k_val}}}{w['word']}"

    event_line = f"Dialogue: 0,{start_fmt},{end_fmt},Default,,0,0,0,,{k_text}"
    ass_events.append(event_line)

content_ass = ass_header + "\n".join(ass_events)

# ---------------------------------------------------------
# 5. 打包与下载
# ---------------------------------------------------------
zip_filename = f"{base_name}_subs.zip"
print(f"\n正在打包文件到 {zip_filename} ...")

with zipfile.ZipFile(zip_filename, 'w') as zipf:
    zipf.writestr(f"{base_name}.lrc", content_lrc)
    zipf.writestr(f"{base_name}.srt", content_srt)
    zipf.writestr(f"{base_name}.word.srt", content_word_srt)
    zipf.writestr(f"{base_name}.ass", content_ass)
    zipf.writestr(f"{base_name}.json", content_json)

print(f"✅ 处理完成，自动下载压缩包。")
files.download(zip_filename)