In [None]:
# @title Install Dependencies & Run Bulk Tool
!pip install -q gradio pydub openai-whisper rapidfuzz
!apt-get install -y ffmpeg

import gradio as gr
from pydub import AudioSegment
import whisper
import os
import shutil
import string
from rapidfuzz import fuzz
from pathlib import Path

# Load Whisper model
print("Loading Whisper model...")
model = whisper.load_model("medium")
print("Model loaded.")

def get_chunk_letter(index):
    """Converts 0 -> A, 1 -> B, ..., 26 -> AA..."""
    letters = ""
    while index >= 0:
        letters = chr(index % 26 + 65) + letters
        index = index // 26 - 1
    return letters

def find_best_split_point(whisper_text, source_text_window):
    """
    Finds the best cut-off point in the source text that matches the whisper text.
    """
    if not whisper_text or len(whisper_text.strip()) == 0:
        return 0

    target_len = len(whisper_text)
    best_ratio = 0
    best_length = 0

    # Heuristic window: 70% to 150% of whisper length
    min_search = int(target_len * 0.7)
    max_search = int(target_len * 1.5)
    max_search = min(max_search, len(source_text_window))

    if max_search <= min_search:
        return max_search

    # Scan for best match
    for length in range(min_search, max_search, 5):
        candidate = source_text_window[:length]
        score = fuzz.ratio(whisper_text, candidate)

        if score >= best_ratio:
            best_ratio = score
            best_length = length

    # Snap to nearest space to avoid cutting words
    ref_end = best_length
    for i in range(0, 15): # Look 15 chars left/right for a space
        if ref_end + i < len(source_text_window) and source_text_window[ref_end + i] == ' ':
            return ref_end + i
        if ref_end - i > 0 and source_text_window[ref_end - i] == ' ':
            return ref_end - i

    return best_length

def process_single_pair(audio_path, text_path, master_output_dir):
    """
    Processes a single Audio/Text pair and writes to master_output_dir
    """
    base_name = Path(audio_path).stem

    # Create a subfolder for this song inside the master output
    song_dir = os.path.join(master_output_dir, base_name)
    os.makedirs(song_dir, exist_ok=True)

    logs = [f"=== Processing: {base_name} ==="]

    # Load Audio
    try:
        audio = AudioSegment.from_file(audio_path)
    except Exception as e:
        return [f"Error loading audio {base_name}: {str(e)}"]

    # Load Text
    try:
        with open(text_path, 'r', encoding='utf-8') as f:
            full_lyrics = f.read().replace('\r', '').replace('\n', ' ').strip()
            while "  " in full_lyrics:
                full_lyrics = full_lyrics.replace("  ", " ")
    except Exception as e:
        return [f"Error loading text {base_name}: {str(e)}"]

    # Calculate Chunks
    chunk_length_ms = 30 * 1000
    total_length_ms = len(audio)
    num_chunks = total_length_ms // chunk_length_ms

    current_text_idx = 0

    for i in range(num_chunks):
        start_ms = i * chunk_length_ms
        end_ms = (i + 1) * chunk_length_ms

        chunk = audio[start_ms:end_ms]

        # Naming
        chunk_letter = get_chunk_letter(i)
        mp3_name = f"{base_name}-{chunk_letter}.mp3"
        txt_name = f"{base_name}-{chunk_letter}.txt"

        mp3_out = os.path.join(song_dir, mp3_name)
        txt_out = os.path.join(song_dir, txt_name)

        # Export Audio
        chunk.export(mp3_out, format="mp3")

        # Transcribe
        result = model.transcribe(mp3_out)
        whisper_text = result["text"].strip()

        # Align
        remaining_text = full_lyrics[current_text_idx:]
        match_len = find_best_split_point(whisper_text, remaining_text)

        real_lyrics_segment = remaining_text[:match_len].strip()

        # Export Text
        with open(txt_out, "w", encoding="utf-8") as tf:
            tf.write(real_lyrics_segment)

        current_text_idx += match_len

        logs.append(f"  [{chunk_letter}] Matches: {real_lyrics_segment[:30]}...")

    return logs

def bulk_process(audio_files, text_files):
    # Setup Master Directory
    master_dir = "batch_output"
    if os.path.exists(master_dir):
        shutil.rmtree(master_dir)
    os.makedirs(master_dir)

    global_logs = []

    # Sort files to help potential matching, though we use dicts below
    if not audio_files or not text_files:
        return None, "Error: Please upload both Audio and Text files."

    # Map filenames (without extension) to file paths
    audio_map = {Path(f).stem: f for f in audio_files}
    text_map = {Path(f).stem: f for f in text_files}

    # Find matches
    matches = []
    for name, audio_path in audio_map.items():
        if name in text_map:
            matches.append((audio_path, text_map[name]))
        else:
            global_logs.append(f"⚠️ WARNING: No matching text file found for audio: {name}")

    for name in text_map:
        if name not in audio_map:
            global_logs.append(f"⚠️ WARNING: No matching audio file found for text: {name}")

    global_logs.append(f"Found {len(matches)} valid pairs to process.\n")

    # Process Loop
    for audio_path, text_path in matches:
        song_logs = process_single_pair(audio_path, text_path, master_dir)
        global_logs.extend(song_logs)
        global_logs.append("") # Spacer

    # Zip everything
    shutil.make_archive("batch_processed", 'zip', master_dir)

    return "batch_processed.zip", "\n".join(global_logs)

# Gradio Interface
iface = gr.Interface(
    fn=bulk_process,
    inputs=[
        gr.File(file_count="multiple", label="Upload MP3 Files"),
        gr.File(file_count="multiple", label="Upload TXT Files")
    ],
    outputs=[
        gr.File(label="Download Master Zip"),
        gr.Textbox(label="Processing Log", lines=20)
    ],
    title="Bulk MP3 Splitter & Lyric Aligner",
    description="Upload multiple MP3s and multiple TXTs. Files with the same name will be paired and processed."
)

iface.launch(share=True, debug=True)

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/803.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m798.7/803.2 kB[0m [31m27.4 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m803.2/803.2 kB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m102.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for openai-whisper (pyproject.toml) ... [?25l[?25hdone
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 37 not upgraded.
Loading Whisper model...


100%|█████████████████████████████████████| 1.42G/1.42G [00:19<00:00, 77.7MiB/s]


Model loaded.
Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://29ddbe687af2e5199e.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
