<a href="https://colab.research.google.com/github/officialcyber88/Apps/blob/main/Cyber_88_Apps_Collection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:

# @title Mount Google Colab
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# @title Hugging Face to GitHub Transfer UI

import sys
import os
import shutil
import subprocess
from pathlib import Path
import gradio as gr
import requests
from dotenv import load_dotenv

def install(package):
    try:
        __import__(package)
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])

install("gradio")
install("python-dotenv")
install("requests")

def run(cmd):
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    log = f"$ {cmd}\n"
    if result.stdout:
        log += result.stdout
    if result.stderr:
        log += result.stderr
    if result.returncode != 0:
        raise RuntimeError(log)
    return log

def create_github_repo(token, username, repo_name, private):
    url = "https://api.github.com/user/repos"
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/vnd.github+json"
    }
    data = {
        "name": repo_name,
        "private": private == "private",
        "auto_init": False
    }
    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 201:
        return f"✅ GitHub repository '{repo_name}' created.\n"
    elif response.status_code == 422 and "already exists" in response.text:
        return f"⚠️ Repo '{repo_name}' already exists. Continuing...\n"
    else:
        error_msg = response.json().get("message", "Unknown error")
        docs_url = response.json().get("documentation_url", "")
        raise RuntimeError(f"❌ GitHub repo creation failed: {error_msg}\nDocumentation: {docs_url}")

def validate_github_token(token):
    """Validate the GitHub token with detailed error handling"""
    url = "https://api.github.com/user"
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/vnd.github+json"
    }

    try:
        response = requests.get(url, headers=headers, timeout=10)
    except requests.exceptions.RequestException as e:
        raise RuntimeError(f"❌ Network error during token validation: {str(e)}")

    if response.status_code == 200:
        # Check token scopes
        scopes = response.headers.get('X-OAuth-Scopes', '')
        if 'repo' not in scopes:
            return response.json()["login"], f"⚠️ Token missing 'repo' scope. Your scopes: {scopes}"
        return response.json()["login"], None
    else:
        # Handle specific GitHub error codes
        if response.status_code == 401:
            error_msg = "Token is invalid or expired"
        elif response.status_code == 403:
            # Check for rate limiting
            rate_limit = response.headers.get('X-RateLimit-Limit', '?')
            rate_remaining = response.headers.get('X-RateLimit-Remaining', '?')
            reset_time = response.headers.get('X-RateLimit-Reset', '?')
            error_msg = (f"API rate limit exceeded (Remaining: {rate_remaining}/{rate_limit}, "
                         f"Resets at: {reset_time})")
        else:
            try:
                error_json = response.json()
                error_msg = error_json.get('message', f"HTTP error {response.status_code}")
                if 'documentation_url' in error_json:
                    error_msg += f"\nDocumentation: {error_json['documentation_url']}"
            except:
                error_msg = f"HTTP error {response.status_code}"

        # Provide troubleshooting tips
        tips = (
            "\n\n🔧 Troubleshooting Tips:\n"
            "1. Verify token has 'repo' scope: https://github.com/settings/tokens\n"
            "2. Check token expiration: https://github.com/settings/tokens\n"
            "3. For classic tokens, ensure 'repo' scope is selected\n"
            "4. Try regenerating your token"
        )

        raise RuntimeError(f"❌ Invalid GitHub token: {error_msg}{tips}")

def transfer_repo(
    hf_repo_url, github_username, github_repo_name, github_token,
    git_user_name, git_user_email, branch, token_file, visibility):

    try:
        # Load from token.env if uploaded
        if token_file:
            env_path = Path("token.env")
            shutil.copy(token_file.name, env_path)
            load_dotenv(dotenv_path=env_path)
            github_token = os.getenv("GITHUB_TOKEN", github_token)
            github_username = os.getenv("GITHUB_USERNAME", github_username)
            git_user_name = os.getenv("GIT_USER_NAME", git_user_name)
            git_user_email = os.getenv("GIT_USER_EMAIL", git_user_email)
            visibility = os.getenv("REPO_VISIBILITY", visibility)
            branch = os.getenv("GIT_BRANCH", branch)

        # Strip any accidental whitespace from token
        github_token = github_token.strip()

        # Validate GitHub token before proceeding
        try:
            validated_username, scope_warning = validate_github_token(github_token)
            if github_username and github_username != validated_username:
                return (f"❌ Token username mismatch! Token belongs to '{validated_username}' "
                        f"but you entered '{github_username}'", None)
            github_username = validated_username
        except Exception as e:
            return f"❌ Token validation failed: {str(e)}", None

        # Save token.env - WITHOUT COMMENTS
        token_env_path = "token.env"
        with open(token_env_path, "w") as f:
            f.write(f"GITHUB_TOKEN={github_token}\n")
            f.write(f"GITHUB_USERNAME={github_username}\n")
            f.write(f"GIT_USER_NAME={git_user_name}\n")
            f.write(f"GIT_USER_EMAIL={git_user_email}\n")
            f.write(f"REPO_VISIBILITY={visibility}\n")
            f.write(f"GIT_BRANCH={branch}\n")

        hf_repo_url = hf_repo_url.strip().replace("git clone", "").strip().rstrip("/")
        repo_name = hf_repo_url.split("/")[-1].replace(".git", "")
        github_repo_name = github_repo_name.strip().lower().replace(" ", "-")
        github_repo_url = f"https://{github_token}@github.com/{github_username}/{github_repo_name}.git"

        log = ""
        log += scope_warning + "\n" if scope_warning else ""
        log += create_github_repo(github_token, github_username, github_repo_name, visibility)

        if os.path.exists(repo_name):
            shutil.rmtree(repo_name)

        log += run(f"git clone {hf_repo_url}")
        os.chdir(repo_name)
        log += run(f'git config user.name "{git_user_name}"')
        log += run(f'git config user.email "{git_user_email}"')
        log += run("git remote rename origin hf-origin")
        log += run(f"git remote add origin {github_repo_url}")

        try:
            log += run(f"git pull --rebase origin {branch}")
        except RuntimeError as e:
            if "couldn't find remote ref" in str(e):
                log += f"⚠️ Remote branch '{branch}' not found. Skipping pull.\n"
            else:
                raise

        log += run(f"git push -u origin {branch}")
        os.chdir("..")

        # Success summary with minimal details
        summary = (
            "✅ Transfer complete!\n\n"
            f"**Hugging Face URL:** {hf_repo_url}\n"
            f"**New GitHub Repo:** {github_repo_name}\n"
            f"**GitHub URL:** https://github.com/{github_username}/{github_repo_name}\n"
            f"**Branch:** {branch}\n"
            f"**Visibility:** {visibility}\n\n"
            "⬇️ Download your token.env file below for future use."
        )

        return summary, token_env_path

    except Exception as e:
        return f"❌ Error: {str(e)}", None

def toggle_inputs(token_file):
    if token_file:
        return (
            gr.update(visible=True),  # hf_repo_url
            gr.update(visible=True),  # github_repo_name
            gr.update(visible=False), # github_username
            gr.update(visible=False), # github_token
            gr.update(visible=False), # git_user_name
            gr.update(visible=False), # git_user_email
            gr.update(visible=True),  # branch
            gr.update(visible=True)   # visibility
        )
    else:
        return tuple([gr.update(visible=True)] * 8)

def launch_app():
    with gr.Blocks(title="Hugging Face to GitHub Transfer", theme=gr.themes.Soft()) as demo:
        gr.Markdown("# Hugging Face → GitHub Transfer Tool")

        gr.Markdown(
            "Transfer a Hugging Face model repo to GitHub.\n\n"
            "**Your credentials are only stored locally in a `.env` file.**\n\n"
            "### 🔐 Create a `token.env` file manually (optional)\n"
            "```ini\n"
            "GITHUB_TOKEN=your_github_token\n"
            "GITHUB_USERNAME=your_username\n"
            "GIT_USER_NAME=Your Name\n"
            "GIT_USER_EMAIL=you@example.com\n"
            "REPO_VISIBILITY=public\n"
            "GIT_BRANCH=main\n"
            "```\n\n"
            "### 🔑 GitHub Token Requirements\n"
            "- Create at: https://github.com/settings/tokens\n"
            "- Must be a **classic token** with **repo scope**\n"
            "- Token must start with `ghp_` (40 characters)\n"
            "- Required permissions: `repo` (full control)\n"
            "- Never share your token with anyone!\n"
            "- Token expires after 30 days by default (set expiration to 'No expiration')"
        )

        with gr.Row():
            token_file = gr.File(label="Upload token.env (optional)", file_types=[".env"])

        with gr.Column():
            hf_repo_url = gr.Textbox(label="Hugging Face Repo URL (.git)",
                                     placeholder="https://huggingface.co/username/repo_name")
            github_repo_name = gr.Textbox(label="New GitHub Repo Name",
                                         placeholder="my-new-repo")

        with gr.Row():
            github_username = gr.Textbox(label="GitHub Username",
                                        placeholder="your-github-username")
            github_token = gr.Textbox(label="GitHub Token", type="password",
                                     placeholder="ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")

        with gr.Row():
            git_user_name = gr.Textbox(label="Git Commit Author Name", value="Your Name")
            git_user_email = gr.Textbox(label="Git Commit Author Email", value="you@example.com")

        with gr.Row():
            branch = gr.Dropdown(["main", "dev", "test", "custom"], label="GitHub Branch", value="main")
            visibility = gr.Dropdown(["public", "private"], label="Visibility", value="public")

        run_btn = gr.Button("Start Transfer", variant="primary")

        with gr.Column():
            output = gr.Textbox(label="Transfer Log", lines=10, interactive=False, show_copy_button=True)
            token_download = gr.File(label="Download token.env", visible=False, interactive=True)

        token_file.change(
            fn=toggle_inputs,
            inputs=[token_file],
            outputs=[
                hf_repo_url, github_repo_name,
                github_username, github_token,
                git_user_name, git_user_email,
                branch, visibility
            ]
        )

        run_btn.click(
            fn=transfer_repo,
            inputs=[
                hf_repo_url, github_username, github_repo_name,
                github_token, git_user_name, git_user_email,
                branch, token_file, visibility
            ],
            outputs=[output, token_download]
        )

        # Display the token.env download link immediately after transfer
        token_download.change(
            lambda x: gr.update(visible=x is not None),
            inputs=[token_download],
            outputs=[token_download]
        )

        demo.launch()

if __name__ == "__main__":
    launch_app()

In [None]:
# @title File and Directory inspector UI

import sys
import tempfile
import json
import csv
import traceback
import subprocess
import os
import zipfile
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import textwrap

# Auto-install PyYAML if needed
try:
    import yaml
except ImportError:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "pyyaml"])
    import yaml

# Ensure Gradio is present
try:
    import gradio as gr
except ImportError:
    print("Please install gradio: pip install gradio", file=sys.stderr)
    sys.exit(1)

# No exclusions—accept all files
_EXCLUDE_EXTS = set()

# Syntax highlighting mapping for Markdown
_SYNTAX_MAP = {
    '.py': 'python', '.js': 'javascript', '.mjs': 'javascript',
    '.html': 'html', '.css': 'css', '.json': 'json',
    '.yml': 'yaml', '.yaml': 'yaml', '.md': 'markdown',
    '.sh': 'bash', '.java': 'java', '.c': 'c', '.cpp': 'cpp',
    '.h': 'cpp', '.cs': 'csharp', '.php': 'php', '.rb': 'ruby',
    '.go': 'go', '.rs': 'rust', '.swift': 'swift', '.kt': 'kotlin',
    '.ts': 'typescript', '.sql': 'sql', '.xml': 'xml',
    '.svg': 'xml', '.ini': 'ini', '.cfg': 'ini',
    '.toml': 'toml', '.lock': 'json',
}

def human_readable_size(num, suffix='B'):
    for unit in ['','K','M','G','T','P','E','Z']:
        if abs(num) < 1024.0:
            return f"{num:3.1f}{unit}{suffix}"
        num /= 1024.0
    return f"{num:.1f}Y{suffix}"

def is_text_file(path: Path, blocksize: int = 1024) -> bool:
    try:
        with path.open('rb') as f:
            if b'\0' in f.read(blocksize):
                return False
        return True
    except Exception:
        return False

def build_tree(root: Path):
    tree = []
    try:
        root_name = root.name or "root_directory"
        tree.append({
            "path": ".",
            "is_dir": True,
            "name": root_name,
            "size": 0,
            "hr_size": human_readable_size(0),
            "modified": root.stat().st_mtime
        })
        for p in root.rglob('*'):
            if p == root:
                continue
            try:
                rel = p.relative_to(root)
                stat = p.stat()
                tree.append({
                    "path": str(rel),
                    "is_dir": p.is_dir(),
                    "name": p.name,
                    "size": stat.st_size,
                    "hr_size": human_readable_size(stat.st_size),
                    "modified": stat.st_mtime
                })
            except Exception as e:
                print(f"Skipping {p}: {e}")
    except Exception as e:
        print(f"Error building tree: {e}")
        traceback.print_exc()
    return tree

def build_files(root: Path, max_workers: int = None):
    max_workers = max_workers or (os.cpu_count() or 4)
    all_files = [p for p in root.rglob('*') if p.is_file()]

    def read_and_clean(p):
        if not is_text_file(p):
            return None
        try:
            text = p.read_text(encoding='utf-8', errors='replace')
            lines = []
            for line in text.splitlines():
                if len(line) > 120:
                    lines.extend(textwrap.wrap(line, width=120, break_long_words=False))
                else:
                    lines.append(line)
            return "\n".join(lines)
        except Exception as e:
            return f"Error reading {p}: {e}"

    files = []
    with ThreadPoolExecutor(max_workers=max_workers) as exe:
        futures = {exe.submit(read_and_clean, p): p for p in all_files}
        for fut in as_completed(futures):
            content = fut.result()
            if content is not None:
                p = futures[fut]
                files.append({
                    "path": str(p.relative_to(root)),
                    "content": content,
                    "syntax": _SYNTAX_MAP.get(p.suffix.lower(), '')
                })
    return files

def export_txt(tree, files):
    parts = []
    if tree:
        parts.append("Directory Structure:")
        for e in tree:
            indent = '    ' * e['path'].count(os.sep)
            parts.append(f"{indent}{e['name']} ({e['hr_size']}): {e['path']}")
    if files:
        parts.append("\nFile Contents:")
        for f in files:
            parts.append(f"\nFile: {f['path']}\n{f['content']}")
    return "\n".join(parts) or "No content found"

def export_json(tree, files):
    return json.dumps({
        "metadata": {
            "directory": {
                "name": tree[0]['name'] if tree else "",
                "file_count": len(files),
                "directory_count": sum(1 for t in tree if t['is_dir'])
            }
        },
        "structure": tree,
        "files": files
    }, indent=2)

def export_jsonl(tree, files):
    lines = []
    if tree:
        for entry in tree:
            lines.append(json.dumps({
                "type": "structure",
                "data": entry
            }))
    if files:
        for file_entry in files:
            lines.append(json.dumps({
                "type": "content",
                "data": file_entry
            }))
    return "\n".join(lines)

def export_yaml(tree, files):
    return yaml.dump({
        "metadata": {
            "directory": {
                "name": tree[0]['name'] if tree else "",
                "file_count": len(files),
                "directory_count": sum(1 for t in tree if t['is_dir'])
            }
        },
        "structure": tree,
        "files": files
    }, sort_keys=False, allow_unicode=True)

def export_markdown(tree, files):
    md = []
    if tree:
        md.append("# Directory Structure")
        for e in tree:
            indent = '  ' * e['path'].count(os.sep)
            md.append(f"{indent}- **{e['name']}** ({e['hr_size']}): `{e['path']}`")
    if files:
        md.append("\n# File Contents")
        for f in files:
            lang = f['syntax'] or 'text'
            md.append(f"\n## {f['path']}\n```{lang}\n{f['content']}\n```")
    return "\n".join(md) or "No content found"

def export_csv(tree, files):
    from io import StringIO
    out = StringIO()
    w = csv.writer(out)
    w.writerow(["Type","Path","Size","Modified"])
    for e in tree:
        w.writerow([
            "DIR" if e['is_dir'] else "FILE",
            e['path'], e['size'], e['modified']
        ])
    if files:
        w.writerow([])
        w.writerow(["File Path","Content Excerpt"])
        for f in files:
            excerpt = (f['content'][:200] + '...') if len(f['content'])>200 else f['content']
            w.writerow([f['path'], excerpt])
    return out.getvalue()

def export_tsv(tree, files):
    from io import StringIO
    out = StringIO()
    w = csv.writer(out, delimiter='\t')
    w.writerow(["Type","Path","Size","Modified"])
    for e in tree:
        w.writerow([
            "DIR" if e['is_dir'] else "FILE",
            e['path'], e['size'], e['modified']
        ])
    if files:
        w.writerow([])
        w.writerow(["File Path","Content Excerpt"])
        for f in files:
            excerpt = (f['content'][:200] + '...') if len(f['content'])>200 else f['content']
            w.writerow([f['path'], excerpt])
    return out.getvalue()

def export_html(tree, files):
    lines = ['<html><head><meta charset="utf-8"><title>File Inspector Report</title></head><body>']
    lines.append(f"<h1>Directory: {tree[0]['name'] if tree else ''}</h1>")
    if tree:
        lines.append("<details open><summary>Structure</summary><pre>")
        for e in tree:
            lines.append(f"{e['path']} ({e['hr_size']})")
        lines.append("</pre></details>")
    if files:
        lines.append("<details><summary>File Contents</summary>")
        for f in files:
            lines.append(f"<h2>{f['path']}</h2><pre>{f['content']}</pre>")
        lines.append("</details>")
    lines.append("</body></html>")
    return "\n".join(lines)

_FORMATS = {
    "TXT":      {"ext":"txt",  "exporter":export_txt,      "desc":"Plain text. Quick & simple."},
    "JSON":     {"ext":"json", "exporter":export_json,     "desc":"Structured JSON for APIs."},
    "JSONL":    {"ext":"jsonl","exporter":export_jsonl,    "desc":"JSON Lines for streaming/NDJSON."},
    "YAML":     {"ext":"yaml", "exporter":export_yaml,     "desc":"Human-friendly YAML."},
    "Markdown": {"ext":"md",   "exporter":export_markdown, "desc":"Markdown with syntax coloring."},
    "CSV":      {"ext":"csv",  "exporter":export_csv,      "desc":"CSV for spreadsheets."},
    "TSV":      {"ext":"tsv",  "exporter":export_tsv,      "desc":"TSV for tab-delimited data."},
    "HTML":     {"ext":"html","exporter":export_html,      "desc":"Interactive HTML report."}
}

def process_and_save(source: str, local_path: str, zip_file, action: str, fmt: str):
    try:
        if source == "Upload ZIP":
            if zip_file is None:
                return None, "Error: no ZIP uploaded"
            tmpdir = Path(tempfile.mkdtemp())
            with zipfile.ZipFile(zip_file.name, 'r') as z:
                z.extractall(tmpdir)
            root = tmpdir
        else:
            root = Path(local_path).expanduser().resolve()

        if not root.is_dir():
            return None, "Error: Invalid directory"

        tree  = build_tree(root)  if "Tree" in action else []
        files = build_files(root) if "Extract Code" in action else []

        if fmt not in _FORMATS:
            return None, f"Error: Unknown format {fmt}"
        exporter = _FORMATS[fmt]["exporter"]
        content  = exporter(tree, files)

        out_name = f"{root.name or 'output'}.{_FORMATS[fmt]['ext']}"
        out_path = Path(tempfile.gettempdir()) / out_name
        out_path.write_text(content, encoding='utf-8')
        return str(out_path), content

    except Exception as e:
        return None, f"Processing error: {e}\n{traceback.format_exc()}"

def explain_format(fmt: str):
    info = _FORMATS.get(fmt)
    return f"**{fmt} Format**\n{info['desc']}" if info else "Select a valid format"

def launch_app():
    with gr.Blocks(theme=gr.themes.Soft(), title="File Inspector") as demo:
        gr.Markdown("# File Inspector Tool")
        gr.Markdown("Choose a local folder or upload a ZIP archive, then view/export its contents.")

        with gr.Row():
            with gr.Column(scale=2):
                source_input = gr.Dropdown(
                    label="Input Source",
                    choices=["Local Path", "Upload ZIP"],
                    value="Local Path"
                )
                dir_input = gr.Textbox(
                    label="Directory Path",
                    placeholder="/path/to/project",
                    value=str(Path.home())
                )
                zip_input = gr.File(
                    label="Upload ZIP Archive",
                    file_types=[".zip"],
                    visible=False
                )
                action_input = gr.Dropdown(
                    label="Action",
                    choices=["Display Tree", "Extract Code", "Display Tree + Extract Code"],
                    value="Display Tree + Extract Code"
                )
                format_input = gr.Dropdown(
                    label="Export Format",
                    choices=list(_FORMATS.keys()),
                    value="TXT"
                )
                fmt_explain = gr.Markdown(explain_format("TXT"))
                run_btn = gr.Button("Run", variant="primary")
                download = gr.File(label="Download Output", interactive=False)

            with gr.Column(scale=3):
                content_box = gr.TextArea(
                    label="Output",
                    lines=20,
                    interactive=False,
                    elem_id="output_txt",
                    elem_classes=["output-box"],
                    show_copy_button=True  # <-- use built-in copy
                )

        source_input.change(
            fn=lambda src: (
                gr.update(visible=(src=="Local Path")),
                gr.update(visible=(src=="Upload ZIP"))
            ),
            inputs=source_input,
            outputs=[dir_input, zip_input]
        )

        format_input.change(
            fn=explain_format,
            inputs=format_input,
            outputs=fmt_explain
        )

        run_btn.click(
            fn=process_and_save,
            inputs=[source_input, dir_input, zip_input, action_input, format_input],
            outputs=[download, content_box]
        )

        demo.css = """
        .output-box textarea {
            font-family: 'Courier New', monospace !important;
            white-space: pre !important;
            user-select: text !important;
            -webkit-user-select: text !important;
        }
        """
        demo.launch()

if __name__ == "__main__":
    launch_app()

In [None]:
# @title Audio Preprocessor UI

import sys
import shutil
import os
import re
import numpy as np
import librosa
import matplotlib.pyplot as plt
import soundfile as sf
import torch
import tempfile
import warnings
import pyloudnorm as pyln
import subprocess
from dataclasses import dataclass
import gradio as gr
import traceback
from uuid import uuid4
import zipfile
import concurrent.futures

# === Pre-flight Check: Install missing dependencies if needed ===
try:
    import resampy
except ImportError:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "resampy"])
    import resampy

try:
    import gdown
except ImportError:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "gdown"])
    import gdown

# === Dependency Check ===
if shutil.which("ffmpeg") is None:
    sys.stderr.write("Missing dependency: ffmpeg\n")
    sys.exit(1)
warnings.filterwarnings("ignore", category=UserWarning)

# === Global Setup ===
use_cuda = torch.cuda.is_available()
device = torch.device('cuda' if use_cuda else 'cpu')
OUTPUT_DIR = tempfile.mkdtemp(prefix="audio_preprocessor_")
print(f"Audio output will be saved in: {OUTPUT_DIR}")

# === Constants ===
DB_THRESH = -45
EDGE_SILENCE_THRESHOLD = 3e-3
VALID_FORMATS = ('.wav', '.mp3', '.flac', '.aiff', '.ogg', '.m4a')
MIN_FRAMES_FOR_RMS = 50
DEFAULT_HOP_LENGTH = 512

# === Config Dataclass ===
@dataclass(frozen=True)
class Config:
    sample_rate: int
    bit_depth: str
    channels: str
    target_lufs: float
    target_peak: float
    use_cuda: bool
    device: torch.device
    visualize: bool
    segmentation: bool
    duration: float
    panning: bool
    mp3_bitrate: str
    silence_trimming: bool  # Added silence trimming flag

# === Helpers ===

def pan_percent(l, r):
    pl, pr = np.sum(l**2), np.sum(r**2)
    t = pl + pr
    if t < 1e-10:
        return 50.0, 50.0, 0.0
    return pl/t*100, pr/t*100, abs(pl/t*100 - 50)

def calculate_adaptive_hop_length(L):
    return min(DEFAULT_HOP_LENGTH, max(32, L // MIN_FRAMES_FOR_RMS))

def measure_loudness(y, sr):
    BLOCK = 0.4
    if y.size == 0:
        return {'lufs': None, 'peak': -np.inf}
    y_m = np.mean(y, axis=0) if y.ndim > 1 else y
    min_len = int(BLOCK * sr)
    if len(y_m) < min_len:
        y_m = np.pad(y_m, (0, min_len - len(y_m)))
    meter = pyln.Meter(sr, block_size=BLOCK)
    try:
        lufs = meter.integrated_loudness(y_m)
    except:
        lufs = None
    pk = np.max(np.abs(y))
    pdb = 20 * np.log10(pk) if pk > 0 else -np.inf
    return {'lufs': lufs, 'peak': pdb}

def normalize_loudness(y, sr, log, tgt_lufs, tgt_peak):
    if y.size == 0:
        return y, {'method': 'empty'}
    orig_pk = np.max(np.abs(y))
    orig_db = 20 * np.log10(orig_pk) if orig_pk > 0 else -np.inf
    y_m = np.mean(y, axis=0) if y.ndim > 1 else y
    BLOCK = 0.4
    min_len = max(int(BLOCK * sr), int(0.001 * sr))
    if len(y_m) < min_len:
        log(f"[Normalize] Padding {min_len - len(y_m)} samples")
        y_m = np.pad(y_m, (0, min_len - len(y_m)))
    meter = pyln.Meter(sr)
    try:
        orig_lufs = meter.integrated_loudness(y_m)
        log(f"[Normalize] Orig LUFS {orig_lufs:.2f}, Peak {orig_db:.2f}")
    except Exception as e:
        log(f"[Normalize] LUFS failed ({e}), peak-only")
        scale = (10 ** (tgt_peak / 20)) / orig_pk if orig_pk > 0 else 1
        y_n = y * scale
        fp = 20 * np.log10(np.max(np.abs(y_n))) if np.max(np.abs(y_n)) > 0 else -np.inf
        return y_n, {
            'original_lufs': None,
            'original_peak': orig_db,
            'normalized_lufs': None,
            'normalized_peak': fp,
            'method': 'peak_only'
        }
    gain = 10 ** ((tgt_lufs - orig_lufs) / 20)
    y_l = y * gain
    pk_after = np.max(np.abs(y_l))
    if pk_after > 10 ** (tgt_peak / 20):
        log("[Normalize] Limiting peak")
        y_n = y_l * (10 ** (tgt_peak / 20) / pk_after)
    else:
        y_n = y_l
    fl = measure_loudness(y_n, sr)['lufs']
    fp = 20 * np.log10(np.max(np.abs(y_n))) if np.max(np.abs(y_n)) > 0 else -np.inf
    log(f"[Normalize] Final LUFS {fl:.2f}, Peak {fp:.2f}")
    return y_n, {
        'original_lufs': orig_lufs,
        'original_peak': orig_db,
        'normalized_lufs': fl,
        'normalized_peak': fp,
        'method': 'true_lufs'
    }

def normalize_panning(a, log):
    if a.ndim != 2 or a.shape[0] != 2:
        return a
    lp, rp, _ = pan_percent(a[0], a[1])
    log(f"[Panning] Orig L{lp:.1f}% R{rp:.1f}%")
    r1, r2 = np.sqrt(np.mean(a[0]**2)), np.sqrt(np.mean(a[1]**2))
    if r1 < 1e-7 or r2 < 1e-7:
        return a
    corr = np.vstack((a[0], a[1] * (r1 / r2)))
    lp2, rp2, _ = pan_percent(corr[0], corr[1])
    log(f"[Panning] Corr L{lp2:.1f}% R{rp2:.1f}%")
    return corr

def detect_clipping(y):
    if y.size == 0:
        return False, 0.0
    c = np.sum(np.abs(y) >= 0.999) / y.size
    return c > 0.001, c

def attenuate_clipped_audio(y, log):
    clip, ratio = detect_clipping(y)
    if clip:
        log(f"[Clipping] {ratio:.1%} clipped; attenuating")
        pk = np.max(np.abs(y))
        tgt = 10 ** (-1 / 20)
        if pk > 0:
            return y * (tgt / pk)
    return y

def auto_slice_audio(y, sr):
    if y.size == 0:
        return 0, 0
    L = y.shape[-1]
    if L < 128:
        return 0, L
    hop = calculate_adaptive_hop_length(L)
    frame = min(2048, L)
    rms = librosa.feature.rms(y=y, frame_length=frame, hop_length=hop)[0]
    db = librosa.amplitude_to_db(rms, ref=np.max)
    idx = np.where(db > DB_THRESH)[0]
    if idx.size == 0:
        return 0, L
    return idx[0] * hop, min(L, (idx[-1] + 1) * hop)

def process_silence(y, sr, log):
    if y.size == 0:
        return y, (0,0,0), (0,0)
    L = y.shape[-1]
    y_m = np.mean(y,axis=0) if y.ndim>1 else y
    s0,e0 = auto_slice_audio(y_m, sr)
    if e0<=s0:
        log("[Silence] All silent")
        return np.array([]),(L/sr,0,L/sr),(0,0)
    t = y[...,s0:e0]
    tm = np.mean(t,axis=0) if t.ndim>1 else t
    nz = np.where(np.abs(tm)>EDGE_SILENCE_THRESHOLD)[0]
    if nz.size==0:
        log("[Silence] All trimmed")
        return np.array([]),(L/sr,0,L/sr),(0,0)
    fs,fe = nz[0],nz[-1]+1
    final = t[...,fs:fe]
    rem = L-final.shape[-1]
    return final,((s0+fs)/sr,(L-(s0+fe))/sr,rem/sr),(s0+fs,s0+fe)

def format_duration(s): return f"{s:.3f}s"

def plot_zoomed_silence(y, sr, s0, e0, zoom=0.05):
    zs = int(sr*zoom)
    fig, axs = plt.subplots(2,1,figsize=(6,4))
    pre = y[...,max(0,s0-zs):s0]
    t0 = np.linspace(-zoom, 0, pre.shape[-1])
    if pre.size>0: axs[0].plot(t0, pre.T if pre.ndim>1 else pre)
    else: axs[0].text(0.5,0.5,"No pre-silence",ha='center')
    axs[0].set_xlim(t0[0] if pre.size>0 else -zoom, 0)
    axs[0].set_title("Zoomed Silence Pre-trim")
    post = y[...,e0:e0+zs]
    t1 = np.linspace(0, zoom, post.shape[-1])
    if post.size>0: axs[1].plot(t1, post.T if post.ndim>1 else post)
    else: axs[1].text(0.5,0.5,"No post-silence",ha='center')
    axs[1].set_xlim(0, t1[-1] if post.size>0 else zoom)
    axs[1].set_title("Zoomed Silence Post-trim")
    plt.tight_layout()
    return fig

def plot_waveform(y, sr, title, time_unit_str="s", s0=None, e0=None, segments=None):
    fig, ax = plt.subplots(figsize=(6,2.5))
    t = np.arange(y.shape[-1])/sr
    if y.ndim>1:
        for c in range(y.shape[0]):
            ax.plot(t, y[c], alpha=0.7, label=f'Ch{c+1}')
        ax.legend(fontsize="small")
    else:
        ax.plot(t, y)
    if s0 is not None and e0 is not None:
        ax.axvline(s0/sr, linestyle='--')
        ax.axvline(e0/sr, linestyle='--')
    if segments:
        for st,en in segments:
            ax.axvline(st/sr, linestyle='-', alpha=0.6)
            ax.axvline(en/sr, linestyle='-', alpha=0.6)
    ax.set_title(title)
    ax.set_xlabel(f"Time ({time_unit_str})")
    ax.set_ylabel("Amplitude")
    ax.grid(alpha=0.3)
    plt.tight_layout()
    return fig

def get_all_audio_files(path):
    files = []
    if os.path.isfile(path) and path.lower().endswith(VALID_FORMATS):
        files.append(path)
    elif os.path.isdir(path):
        for root, _, fnames in os.walk(path):
            for f in fnames:
                if f.lower().endswith(VALID_FORMATS):
                    files.append(os.path.join(root, f))
    elif os.path.isfile(path) and path.lower().endswith('.zip'):
        tmp = tempfile.mkdtemp(prefix="zip_extract_")
        with zipfile.ZipFile(path, 'r') as zf:
            zf.extractall(tmp)
        for root, _, fnames in os.walk(tmp):
            for f in fnames:
                if f.lower().endswith(VALID_FORMATS):
                    files.append(os.path.join(root, f))
    return files

def download_from_gdrive_folder(url, log):
    m = re.search(r'/folders/([^/?]+)', url)
    if not m:
        log("❌ URL must be a shared FOLDER link")
        return None, None
    fid = m.group(1)
    parent = tempfile.mkdtemp(prefix="gdrive_dl_")
    outdir = os.path.join(parent, fid)
    os.makedirs(outdir, exist_ok=True)
    log(f"[GDrive] Downloading folder ID {fid} to {outdir}")
    gdown.download_folder(url=url, output=outdir, quiet=True, use_cookies=False)
    subs = [d for d in os.listdir(outdir) if os.path.isdir(os.path.join(outdir, d))]
    if len(subs)==1:
        det = subs[0]
        log(f"[GDrive] Detected folder name: {det}")
        return outdir, det
    return outdir, fid

def export_audio(y, sr, orig, fmt, cfg, exp_dir, idx=None):
    base = os.path.splitext(os.path.basename(orig))[0]
    name = f"{base}_segment_{idx+1}.{fmt}" if idx is not None else f"{base}.{fmt}"
    out = os.path.join(exp_dir, name)
    if y.size==0:
        ch = 2 if cfg.channels=='stereo' else 1
        y = np.zeros((ch,1)) if ch>1 else np.zeros(1)
    dat = y.T if y.ndim>1 else y
    subtype = f"PCM_{cfg.bit_depth}"
    if fmt=="mp3":
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=True) as tmp:
            sf.write(tmp.name, dat, sr, subtype="FLOAT")
            cmd = ["ffmpeg","-y","-i", tmp.name, "-c:a","libmp3lame","-b:a",cfg.mp3_bitrate,"-ar",str(sr), out]
            subprocess.run(cmd, check=True, capture_output=True)
    else:
        if fmt=="flac" and cfg.bit_depth=="32":
            subtype = "PCM_24"
        sf.write(out, dat, sr, subtype=subtype)
    return out

def process_file(fp, cfg, params):
    res = {'messages':[], 'export_paths':[], 'plot_data':None}
    def log(m): res['messages'].append(m)
    try:
        log(f"--- Processing {os.path.basename(fp)} ---")
        log(f"Device: {'GPU' if cfg.use_cuda else 'CPU'}")
        y, orig_sr = librosa.load(fp, sr=None, mono=False)
        if y.ndim==1: y = y[np.newaxis,:]
        info = sf.info(fp)
        log(f"Input: {info.samplerate}Hz, {info.channels}ch, {format_duration(info.duration)}")
        if cfg.panning and y.shape[0]==2:
            y = normalize_panning(y, log)
        if orig_sr!=cfg.sample_rate:
            log(f"Resampling {orig_sr}→{cfg.sample_rate}")
            y = resampy.resample(y, orig_sr, cfg.sample_rate)
        if y.ndim==1: y = y[np.newaxis,:]
        if cfg.channels=="mono" and y.shape[0]>1:
            log("Converting to mono")
            y = np.mean(y, axis=0, keepdims=True)
        elif cfg.channels=="stereo" and y.shape[0]==1:
            y = np.vstack([y,y])
        y = attenuate_clipped_audio(y, log)
        y_pre = y.copy()

        # Apply silence trimming if enabled
        if cfg.silence_trimming:  # New conditional check
            y_proc, (_pre,_post,total), (s0,e0) = process_silence(y, cfg.sample_rate, log)
            log(f"Silence removed {format_duration(total)} (start {format_duration(_pre)}, end {format_duration(_post)})")
        else:
            y_proc = y
            s0, e0 = 0, y.shape[-1]
            log("Silence trimming skipped")

        if y_proc.size==0:
            log("Empty after silence; skipping")
            return res

        segments = [(0, y_proc.shape[-1])]
        if cfg.segmentation:
            dur_sec = y_proc.shape[-1]/cfg.sample_rate
            if dur_sec < cfg.duration:
                log("⚠ shorter than segment duration; skipping export")
                return res
            ss = int(cfg.sample_rate * cfg.duration)
            nseg = int(np.ceil(dur_sec / cfg.duration))
            segments = [(i*ss, min((i+1)*ss, y_proc.shape[-1])) for i in range(nseg)]
            log(f"Segmenting into {format_duration(cfg.duration)}, created {len(segments)} segments")

        res['plot_data'] = {'y_pre':y_pre, 'y_proc':y_proc, 'sr':cfg.sample_rate, 's0':s0, 'e0':e0, 'segments':segments}

        for i,(st,en) in enumerate(segments):
            seg = y_proc[..., st:en]
            if params['normalize']!="No Normalization":
                log(f"Normalizing {'segment '+str(i+1) if cfg.segmentation else 'file'}")
                seg, _ = normalize_loudness(seg, cfg.sample_rate, log, cfg.target_lufs, cfg.target_peak)
            out_path = export_audio(seg, cfg.sample_rate, fp, params['out_fmt'], cfg, OUTPUT_DIR, idx=(i if cfg.segmentation else None))
            res['export_paths'].append(out_path)
            log(f"✅ Exported {os.path.basename(out_path)}")
        return res

    except Exception:
        tb = traceback.format_exc()
        res['messages'].append(f"❌ ERROR:\n{tb}")
        return res

def gradio_process(input_mode, uploads, path_in, gdrive_url,
                  out_fmt, sr, bd, ch, mp3_br,
                  norm, pan, seg, dur, tu, viz,
                  zip_enable, custom_zip_name, silence_trimming):  # Added silence_trimming parameter
    logs = ["=== Input Method ==="]
    raw_inputs = []
    base_name_for_zip = None

    if input_mode=="Path" and path_in.strip():
        logs.append(f"Mode: Path ➞ {path_in}")
        raw_inputs = get_all_audio_files(path_in.strip())
        base_name_for_zip = os.path.basename(path_in.rstrip("/"))
    elif input_mode=="Google Drive URL" and gdrive_url.strip():
        downloaded, detected = download_from_gdrive_folder(gdrive_url.strip(), logs.append)
        if not downloaded:
            return "\n".join(logs+["❌ Aborting: invalid Google Drive URL."]), [], [], gr.update(choices=[], value=None), None
        logs.append(f"Mode: Google Drive URL ➞ {gdrive_url}")
        base_name_for_zip = detected
        for root,_,_ in os.walk(downloaded):
            raw_inputs.extend(get_all_audio_files(root))
    else:
        logs.append(f"Mode: Upload ➞ {len(uploads) if uploads else 0} file(s)")
        raw_inputs = [f.name for f in uploads] if uploads else []
        base_name_for_zip = None

    logs.append("=== Original File Details ===")
    for fp in raw_inputs:
        try:
            info = sf.info(fp)
            logs.append(f"{os.path.basename(fp)}: {info.samplerate}Hz, {info.channels}ch, {format_duration(info.duration)}")
        except:
            logs.append(f"{os.path.basename(fp)}: <could not read metadata>")
    logs.append("")

    logs.append("=== Output Settings ===")
    logs.append(f"Format: {out_fmt}")
    logs.append(f"Sample Rate: {sr}")
    if out_fmt.lower()=="mp3":
        logs.append(f"MP3 Bitrate: {mp3_br}")
    else:
        logs.append(f"Bit Depth: {bd}")
    logs.append(f"Channels: {ch}")
    logs.append("")

    export_bd = bd
    if out_fmt=="flac" and bd=="32":
        logs.append("⚠ FLAC does not support 32-bit; falling back to 24-bit")
        export_bd = "24"

    logs.append("=== Processing Options ===")
    logs.append(f"Normalization Profile: {norm}")
    logs.append(f"Panning Correction: {pan}")
    logs.append(f"Silence Trimming: {silence_trimming}")  # New log entry
    logs.append(f"Segmentation: {'Yes' if seg else 'No'}" + (f", Duration {dur}{tu}" if seg else ""))
    logs.append(f"Show Visualizations: {'Yes' if viz else 'No'}")
    logs.append(f"Custom ZIP Name: {custom_zip_name or '(none)'}")
    logs.append("")

    if not raw_inputs:
        return "\n".join(logs+["❌ No audio files provided."]), [], [], gr.update(choices=[], value=None), None

    params = {'out_fmt': out_fmt, 'normalize': norm}
    tgt_lufs, tgt_peak = {"Spotify":(-14.0,-1.0), "Apple Music":(-16.0,-1.0)}.get(norm, (None, None))
    raw = float(dur)
    if tu=="Milliseconds": dsec = raw/1000
    elif tu=="Minutes":
        dsec = raw*60
    elif tu=="Hours":
        dsec = raw*3600
    else:
        dsec = raw

    cfg = Config(
        sample_rate=int(sr.replace("Hz","")),
        bit_depth=export_bd, channels=ch,
        target_lufs=tgt_lufs, target_peak=tgt_peak,
        use_cuda=use_cuda, device=device,
        visualize=viz, segmentation=seg,
        duration=dsec, panning=(pan=="Yes"),
        mp3_bitrate=mp3_br,
        silence_trimming=(silence_trimming=="Yes")  # New config parameter
    )

    gallery_images = []
    export_paths = []

    workers = os.cpu_count() or 1
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
        futures = {executor.submit(process_file, fp, cfg, params): fp for fp in raw_inputs}
        for fut in concurrent.futures.as_completed(futures):
            r = fut.result()
            logs.extend(r['messages'])
            logs.append("")
            export_paths.extend(r['export_paths'])
            if viz and r.get('plot_data'):
                pd = r['plot_data']
                figs = [
                    (plot_waveform(pd['y_pre'], pd['sr'], "Original w/ Trim", tu[0], pd['s0'], pd['e0'], pd['segments']), 'pre'),
                    (plot_waveform(pd['y_proc'], pd['sr'], "Processed Output", tu[0]), 'post'),
                    (plot_zoomed_silence(pd['y_pre'], pd['sr'], pd['s0'], pd['e0']), 'zoom')
                ]
                for fig, tag in figs:
                    fn = os.path.join(OUTPUT_DIR, f"{uuid4().hex}_{tag}.png")
                    fig.savefig(fn)
                    gallery_images.append(fn)

    play_paths = []
    for p in export_paths:
        if p.lower().endswith('.flac'):
            wav_play = p[:-5] + '_playback.wav'
            y, sr_load = sf.read(p)
            sf.write(wav_play, y, sr_load)
            play_paths.append(wav_play)
        else:
            play_paths.append(p)

    if zip_enable and export_paths:
        if custom_zip_name.strip():
            zip_base = custom_zip_name.strip()
        elif base_name_for_zip:
            zip_base = base_name_for_zip
        else:
            zip_base = os.path.splitext(os.path.basename(raw_inputs[0]))[0]
        zip_filename = f"{zip_base}.zip"
        zip_path = os.path.join(OUTPUT_DIR, zip_filename)
        with zipfile.ZipFile(zip_path, 'w') as zf:
            for ex in export_paths:
                arc = os.path.relpath(ex, OUTPUT_DIR)
                zf.write(ex, arcname=arc)
        download_paths = [zip_path]
    else:
        download_paths = export_paths

    logs.append("=== Exported Files ===")
    logs.append(f"Count: {len(download_paths)}")
    logs.append("--- Used Settings ---")
    logs.append(f"Format: {out_fmt}")
    logs.append(f"Sample Rate: {sr}")
    if out_fmt.lower()=="mp3":
        logs.append(f"MP3 Bitrate: {mp3_br}")
    else:
        logs.append(f"Bit Depth: {export_bd}")
    logs.append(f"Channels: {ch}")
    logs.append(f"Normalization Profile: {norm}")
    logs.append(f"Panning Correction: {pan}")
    logs.append(f"Silence Trimming: {silence_trimming}")  # New log entry
    logs.append(f"Segmentation: {'Yes' if seg else 'No'}" + (f", Duration {dur}{tu}" if seg else ""))
    logs.append(f"Visualizations: {'Yes' if viz else 'No'}")
    logs.append("")
    for fn in download_paths:
        logs.append(os.path.basename(fn))

    default_play = play_paths[0] if play_paths else None
    dropdown_update = gr.update(choices=play_paths, value=default_play)

    return (
        "\n".join(logs),
        gallery_images,
        download_paths,
        dropdown_update,
        default_play
    )

# === Gradio UI ===
with gr.Blocks(title="Audio Preprocessor GUI", theme=gr.themes.Soft()) as demo:
    gr.Markdown("# Audio Preprocessor")
    gr.Markdown(f"Outputs saved in `{OUTPUT_DIR}`")

    with gr.Row():
        with gr.Column(scale=2):
            input_mode     = gr.Radio(["Upload","Path","Google Drive URL"], value="Upload", label="Input Method")
            file_uploader  = gr.File(file_count="multiple", file_types=["audio"], label="Upload Audio Files")
            path_text      = gr.Textbox(placeholder="/path/to/dir", label="Or Enter Path", visible=False)
            gdrive_text    = gr.Textbox(placeholder="URL Link", label="Or Enter Google Drive FOLDER URL", visible=False)
            input_mode.change(
                lambda m: (
                    gr.update(visible=(m=="Upload")),
                    gr.update(visible=(m=="Path")),
                    gr.update(visible=(m=="Google Drive URL"))
                ),
                inputs=[input_mode],
                outputs=[file_uploader, path_text, gdrive_text]
            )

        with gr.Column(scale=3):
            gr.Markdown("### Output Settings")
            with gr.Row():
                out_fmt     = gr.Dropdown(["wav","mp3","flac","aiff"],    value="wav",      label="Format")
                sample_rate = gr.Dropdown(["16000Hz","44100Hz","48000Hz"], value="48000Hz", label="Sample Rate")
                bit_depth   = gr.Dropdown(["16","24","32"],               value="24",       label="Bit Depth")
            with gr.Row():
                channels    = gr.Radio(["mono","stereo"],                value="mono",     label="Channels")
                mp3_bitrate = gr.Dropdown(["128k","192k","256k","320k"],  value="192k",     label="MP3 Bitrate")

    with gr.Accordion("Processing Options", open=True):
        with gr.Row():
            norm_profile   = gr.Dropdown(["No Normalization","Spotify","Apple Music"], value="Spotify", label="Normalization Profile")
            panning_option = gr.Dropdown(["Yes","No"],                             value="Yes",     label="Enable Panning Correction")
            silence_trimming = gr.Dropdown(["Yes","No"],                           value="Yes",     label="Enable Silence Trimming")  # New dropdown
            visualize      = gr.Checkbox(value=True, label="Show Visualizations")
        with gr.Row():
            segmentation = gr.Checkbox(value=False, label="Enable Segmentation")
            duration     = gr.Slider(minimum=1, maximum=60, step=1, value=30, label="Segment Duration")
            time_unit    = gr.Dropdown(["Milliseconds","Seconds","Minutes","Hours"], value="Seconds", label="Time Unit")
        with gr.Row():
            zip_enable      = gr.Checkbox(value=True, label="Save outputs as ZIP")
            custom_zip_name = gr.Textbox(placeholder="Enter ZIP name (without .zip)", label="Custom ZIP Name (optional)")

    process_btn = gr.Button("Process Audio", variant="primary")

    with gr.Tabs():
        with gr.TabItem("Logs"):
            logs_out = gr.Textbox(lines=15, label="Processing Logs", interactive=False)
        with gr.TabItem("Visualizations"):
            gr.Markdown("All waveform plots (3 per file)")
            gallery  = gr.Gallery(label="Plots", columns=3, height="auto")
        with gr.TabItem("Output Files"):
            audio_out = gr.File(file_count="multiple", label="Processed Audio Files", interactive=False)
        with gr.TabItem("Audio Player"):
            file_selector = gr.Dropdown(choices=[], label="Select File to Play")
            audio_player  = gr.Audio(label="Play Processed Audio", interactive=True)

    process_btn.click(
        fn=gradio_process,
        inputs=[
            input_mode, file_uploader, path_text, gdrive_text,
            out_fmt, sample_rate, bit_depth, channels, mp3_bitrate,
            norm_profile, panning_option, segmentation, duration,
            time_unit, visualize, zip_enable, custom_zip_name,
            silence_trimming  # Added new input
        ],
        outputs=[logs_out, gallery, audio_out, file_selector, audio_player]
    )

    file_selector.change(fn=lambda f: f, inputs=file_selector, outputs=audio_player)

    # Enable Gradio queueing to avoid HTTP timeouts
    demo.queue()

if __name__ == "__main__":
    demo.launch(debug=True)

In [None]:
# @title Beat Identifier UI

# === Auto-install Required Packages ===
import subprocess
import sys

def install(pkg):
    try:
        __import__(pkg)
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])

required_packages = {
    "gradio": "gradio",
    "librosa": "librosa",
    "numpy": "numpy",
    "torch": "torch",
    "scipy": "scipy"
}

for pkg_name, pip_name in required_packages.items():
    install(pkg_name)

# === Main Script ===

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

import os
import tempfile
import socket
import gradio as gr
import librosa
import numpy as np
import torch
from concurrent.futures import ThreadPoolExecutor
import scipy.signal
from scipy.signal.windows import hann as _hann

# Patch SciPy so librosa.beat_track can find hann()
scipy.signal.hann = _hann

# Detect device
use_cuda = torch.cuda.is_available()
device = torch.device('cuda' if use_cuda else 'cpu')
print(f"Using device: {device}")

notes = ['C','C#','D','D#','E','F','F#','G','G#','A','A#','B']

def analyze_single(path):
    fn = os.path.basename(path)
    try:
        y, sr = librosa.load(path, sr=None)
        if y.size == 0:
            return f"Error: {fn} is empty.", None
        if use_cuda:
            import torch as _t
            y = _t.from_numpy(y).to(device).cpu().numpy()

        dur = librosa.get_duration(y=y, sr=sr)
        if dur == 0:
            return f"Error: {fn} duration is zero.", None
        m, s = divmod(int(dur), 60)

        onset_env = librosa.onset.onset_strength(y=y, sr=sr)
        bpm = 0 if onset_env.sum() == 0 else int(round(librosa.beat.beat_track(onset_envelope=onset_env, sr=sr)[0]))

        chroma = librosa.feature.chroma_cqt(y=y, sr=sr)
        key = notes[int(np.mean(chroma, axis=1).argmax())]

        tuning = librosa.estimate_tuning(y=y, sr=sr)

        summary = (
            f"**{fn}**\n\n"
            f"- Duration: {m}m {s}s ({dur:.2f}s)\n"
            f"- BPM: {bpm if bpm > 0 else '—'}\n"
            f"- Key: {key}\n"
            f"- Tuning offset: {tuning:.3f} semitones"
        )
        return summary, path

    except Exception as e:
        return f"Error analyzing {fn}: {e}", None

def analyze_batch(paths):
    with ThreadPoolExecutor(max_workers=min(4, os.cpu_count())) as ex:
        results = list(ex.map(analyze_single, paths))
    summaries, out_paths, opts = [], [], []
    for summary, p in results:
        summaries.append(summary)
        if p:
            out_paths.append(p)
            opts.append((os.path.basename(p), p))
    return "\n\n---\n\n".join(summaries), out_paths, opts

def find_available_port(start=7860, end=7900):
    for port in range(start, end + 1):
        with socket.socket() as s:
            try:
                s.bind(("", port))
                return port
            except OSError:
                continue
    return start

def process_upload(files):
    if not files:
        return "⚠ Please upload one or more MP3/WAV files.", [], []
    paths = [f.name for f in files]
    summary, out_paths, opts = analyze_batch(paths)
    default = out_paths[0] if out_paths else None
    return summary, out_paths, gr.update(choices=opts, value=default)

# === Build GUI ===
with gr.Blocks(title="Beat Identifier", theme=gr.themes.Soft()) as demo:
    gr.Markdown("# Beat Identifier")
    gr.Markdown("Drag & drop your MP3/WAV files below")
    with gr.Row():
        with gr.Column(scale=2):
            upload = gr.File(
                label="Upload Audio Files",
                file_count="multiple",
                file_types=[".mp3", ".wav"],
                type="file"
            )
            btn = gr.Button("Analyze Audio", variant="primary")
        with gr.Column(scale=3):
            gr.Markdown("### Results")
    with gr.Tabs():
        with gr.TabItem("Summary"):
            out_md = gr.Markdown("", label="Analysis Summary")
        with gr.TabItem("Files"):
            out_files = gr.File(file_count="multiple", label="Download Files")
        with gr.TabItem("Playback"):
            selector = gr.Dropdown(choices=[], label="Select File to Play")
            audio_player = gr.Audio(label="Preview", interactive=True)

    selector.change(lambda p: p or None, selector, audio_player)
    btn.click(process_upload, inputs=[upload], outputs=[out_md, out_files, selector])

    demo.queue()  # server-side queue to avoid mobile disconnects

# === Launch ===
if __name__ == "__main__":
    port = find_available_port()
    in_colab = False
    try:
        import google.colab  # type: ignore
        in_colab = True
    except ImportError:
        pass

    demo.launch(
        server_name="0.0.0.0",
        server_port=port,
        share=in_colab,   # must share=True in Colab when using queue()
        debug=in_colab    # show errors inline in Colab
    )