In [1]:
from __future__ import annotations

import io
import math
import time
from functools import lru_cache
from pathlib import Path

import gradio as gr
import matplotlib
import numpy as np
import pyroomacoustics as pra
import soundfile as sf
from PIL import Image
from scipy import signal

# Usar backend 'Agg' para evitar que las figuras se muestren en el notebook directamente
# ya que las convertiremos a im√°genes para Gradio.
matplotlib.use("Agg")
import matplotlib.pyplot as plt

In [2]:
# -----------------------------------------------------------------------------
# Ubicaciones y constantes globales
# -----------------------------------------------------------------------------
# En un notebook, usamos el directorio actual de trabajo
BASE_DIR = Path.cwd()
DATA_DIR = BASE_DIR / "data"
INPUTS_DIR = DATA_DIR / "inputs"
OUTPUTS_DIR = DATA_DIR / "outputs"

# Crear directorios si no existen
INPUTS_DIR.mkdir(parents=True, exist_ok=True)
OUTPUTS_DIR.mkdir(parents=True, exist_ok=True)

SAMPLE_RATE_OPTIONS = [48_000, 44_100]
INPUT_OPTIONS = [
    "Aplauso sint√©tico",
    "Barrido senoidal (20 Hz ‚Äì 12 kHz)",
    "Subir un audio WAV/FLAC/MP3",
]

FILTER_MAP = {
    "Ninguno": "none",
    "Pasa-bajos": "lowpass",
    "Pasa-altos": "highpass",
    "Pasa-banda": "bandpass",
}

IR_PRESETS = {
    "Sala peque√±a": {
        "type": "room",
        "color": "#A23B72",
        "description": "Sala 4√ó3√ó2.5 m, absorci√≥n media-alta.",
        "params": {
            "room_dim": (4.0, 3.0, 2.5),
            "absorption": 0.45,
            "max_order": 8,
            "src_pos": (1.2, 1.0, 1.2),
            "mic_pos": (3.2, 1.8, 1.2),
            "ir_length_s": 1.0,
        },
    },
    "Sala mediana": {
        "type": "room",
        "color": "#F18F01",
        "description": "Sala 8√ó5.5√ó3.2 m, absorci√≥n intermedia.",
        "params": {
            "room_dim": (8.0, 5.5, 3.2),
            "absorption": 0.25,
            "max_order": 12,
            "src_pos": (2.0, 2.3, 1.5),
            "mic_pos": (6.0, 3.1, 1.5),
            "ir_length_s": 1.2,
        },
    },
    "Sala grande / Hall": {
        "type": "room",
        "color": "#C73E1D",
        "description": "Hall 16√ó11√ó6 m, cola larga y brillante.",
        "params": {
            "room_dim": (16.0, 11.0, 6.0),
            "absorption": 0.12,
            "max_order": 15,
            "src_pos": (4.0, 3.0, 2.0),
            "mic_pos": (12.0, 7.0, 2.5),
            "ir_length_s": 1.6,
        },
    },
    "Pir√°mide (Quetzal) üå™Ô∏è": {
        "type": "stepped",
        "color": "#2E86AB",
        "description": "Modelo geom√©trico de escalinata (Kukulk√°n) afinado para que el eco suene como el quetzal (~0.8‚Äì1 kHz).",
        "params": {
            "step_height_m": 0.26,
            "step_depth_m": 0.18,
            "num_steps": 91,
            "listener_distance_m": 8.5,
            "step_reflection": 0.985,
            "distance_rolloff": 1.05,
            "extra_decay_db_per_step": 0.02,
            "jitter_ms": 0.25,
            "air_lowpass_hz": 6_500.0,
            "ir_length_s": 0.50,
        },
    },
}

In [3]:
# -----------------------------------------------------------------------------
# Utilidades de audio y DSP
# -----------------------------------------------------------------------------
def normalize_audio(x: np.ndarray, peak: float = 0.98) -> np.ndarray:
    x = np.asarray(x, dtype=float)
    max_val = np.max(np.abs(x)) + 1e-12
    return (x / max_val) * peak


def resample_audio(x: np.ndarray, orig_sr: int, target_sr: int) -> np.ndarray:
    if orig_sr == target_sr:
        return np.asarray(x, dtype=float)
    g = math.gcd(int(orig_sr), int(target_sr))
    up = target_sr // g
    down = orig_sr // g
    return signal.resample_poly(x, up, down).astype(float)


def make_synthetic_clap(sr: int, duration: float = 0.14, decay: float = 3.8) -> np.ndarray:
    n = int(sr * duration)
    noise = np.random.randn(n)
    env = np.exp(-np.linspace(0, decay, n))
    clap = noise * env
    clap[0] += 1.0  # impulso inicial claro
    return normalize_audio(clap)


def make_sine_sweep(sr: int, duration: float = 2.0, f0: float = 20.0, f1: float = 12_000.0) -> np.ndarray:
    t = np.linspace(0, duration, int(sr * duration), endpoint=False)
    sweep = signal.chirp(t, f0=f0, f1=f1, t1=duration, method="logarithmic")
    window = signal.windows.tukey(len(sweep), alpha=0.1)
    return normalize_audio(sweep * window)


def load_mono_audio(path: Path, target_sr: int) -> np.ndarray:
    audio, sr = sf.read(path, always_2d=False)
    if audio.ndim > 1:
        audio = audio.mean(axis=1)
    return resample_audio(audio, sr, target_sr)


def butter_filter(
    x: np.ndarray,
    sr: int,
    mode: str,
    f_low: float,
    f_high: float,
    order: int = 4,
) -> np.ndarray:
    mode = mode.lower()
    if mode == "none":
        return x
    nyq = 0.5 * sr
    f_low = max(10.0, min(f_low, nyq * 0.99))
    f_high = max(f_low + 10.0, min(f_high, nyq * 0.999))

    if mode == "lowpass":
        wn = min(f_high / nyq, 0.999)
        b, a = signal.butter(order, wn, btype="low", output="ba")
    elif mode == "highpass":
        wn = max(f_low / nyq, 1e-4)
        b, a = signal.butter(order, wn, btype="high", output="ba")
    elif mode == "bandpass":
        lo = max(1e-4, min(f_low / nyq, 0.99))
        hi = max(lo * 1.05, min(f_high / nyq, 0.999))
        b, a = signal.butter(order, [lo, hi], btype="band", output="ba")
    else:
        return x
    return signal.lfilter(b, a, x).astype(float)


def apply_ir(x: np.ndarray, ir: np.ndarray, normalize: bool = True) -> np.ndarray:
    y = signal.fftconvolve(x, ir, mode="full")
    return normalize_audio(y) if normalize else y.astype(float)

In [4]:
# -----------------------------------------------------------------------------
# Respuestas al impulso
# -----------------------------------------------------------------------------
def _pad_to_length(x: np.ndarray, target: int) -> np.ndarray:
    if len(x) >= target:
        return x[:target]
    out = np.zeros(target, dtype=float)
    out[: len(x)] = x
    return out


def simulate_room_ir(
    sr: int,
    room_dim: tuple[float, float, float],
    absorption: float,
    max_order: int,
    src_pos: tuple[float, float, float],
    mic_pos: tuple[float, float, float],
    ir_length_s: float,
) -> np.ndarray:
    materials = pra.Material(absorption)
    room = pra.ShoeBox(room_dim, fs=sr, materials=materials, max_order=max_order)
    room.add_source(src_pos)
    mic_locs = np.array(mic_pos).reshape(3, 1)
    room.add_microphone_array(pra.MicrophoneArray(mic_locs, room.fs))
    room.compute_rir()
    ir = np.asarray(room.rir[0][0], dtype=float)
    target_len = int(sr * ir_length_s)
    return normalize_audio(_pad_to_length(ir, target_len))


def stepped_structure_ir(
    sr: int,
    step_height_m: float = 0.26,
    step_depth_m: float = 0.30,
    num_steps: int = 91,
    listener_distance_m: float = 10.0,
    step_reflection: float = 0.94,
    distance_rolloff: float = 1.15,
    jitter_ms: float = 0.06,
    extra_decay_db_per_step: float = 0.0,
    air_lowpass_hz: float | None = None,
    ir_length_s: float = 0.45,
) -> np.ndarray:
    """
    IR sint√©tica basada en la escalinata de Kukulk√°n (eco del quetzal).
    """
    c = 343.0  # velocidad del sonido [m/s]
    steps = np.arange(num_steps, dtype=float)

    z = steps * step_height_m
    x = listener_distance_m + steps * step_depth_m
    distances = np.sqrt(x**2 + z**2)
    delays_s = 2.0 * distances / c

    if jitter_ms > 0:
        rng = np.random.default_rng(2024)
        delays_s = np.maximum(delays_s + rng.normal(scale=jitter_ms / 1000.0, size=delays_s.shape), 0.0)

    target_len = int(sr * ir_length_s)
    max_delay = delays_s.max() if len(delays_s) else 0.0
    total_len = max(target_len, int(math.ceil(max_delay * sr)) + 1)
    ir = np.zeros(total_len, dtype=float)

    tilt = 10 ** (-(extra_decay_db_per_step * steps) / 20.0)
    amp = (step_reflection**steps) * tilt / (np.maximum(distances, 1e-3) ** distance_rolloff)
    for a, d in zip(amp, delays_s):
        idx = int(round(d * sr))
        if idx < total_len:
            ir[idx] += a

    ir[0] += 0.02  # sonido directo suave
    if air_lowpass_hz:
        nyq = 0.5 * sr
        cutoff = min(air_lowpass_hz, nyq * 0.95)
        b, a = signal.butter(4, cutoff / nyq, btype="low", output="ba")
        ir = signal.lfilter(b, a, ir)

    return normalize_audio(_pad_to_length(ir, target_len))


@lru_cache(maxsize=32)
def get_ir(preset: str, sr: int) -> np.ndarray:
    meta = IR_PRESETS[preset]
    params = dict(meta["params"])
    if meta["type"] == "room":
        return simulate_room_ir(sr=sr, **params)
    return stepped_structure_ir(sr=sr, **params)


def estimate_rt60(ir: np.ndarray, sr: int) -> float:
    energy = ir**2
    energy_db = 10 * np.log10(energy / (np.max(energy) + 1e-12) + 1e-12)
    try:
        idx_5 = np.where(energy_db < -5)[0][0]
        idx_35 = np.where(energy_db < -35)[0][0]
        rt30 = (idx_35 - idx_5) / sr
        return max(rt30 * 2, 0.0)
    except IndexError:
        return 0.0

In [5]:
# -----------------------------------------------------------------------------
# Gr√°ficos
# -----------------------------------------------------------------------------
def _figure_to_image(fig: plt.Figure) -> np.ndarray:
    buf = io.BytesIO()
    fig.savefig(buf, format="png", dpi=150, bbox_inches="tight")
    plt.close(fig)
    buf.seek(0)
    return np.array(Image.open(buf))


def waveform_image(x: np.ndarray, sr: int, color: str) -> np.ndarray:
    t = np.arange(len(x)) / sr
    fig, ax = plt.subplots(figsize=(10, 3))
    ax.plot(t, x, lw=0.9, color=color)
    ax.set_xlabel("Tiempo [s]")
    ax.set_ylabel("Amplitud")
    ax.set_title("Forma de onda ‚Äî salida")
    ax.grid(True, alpha=0.3)
    fig.tight_layout()
    return _figure_to_image(fig)


def spectrogram_image(x: np.ndarray, sr: int) -> np.ndarray:
    f, t, Sxx = signal.spectrogram(x, fs=sr, nperseg=1024, noverlap=512, scaling="spectrum")
    Sxx_db = 10 * np.log10(Sxx + 1e-12)
    fig, ax = plt.subplots(figsize=(10, 3.5))
    mesh = ax.pcolormesh(t, f, Sxx_db, shading="auto", cmap="magma")
    ax.set_ylabel("Frecuencia [Hz]")
    ax.set_xlabel("Tiempo [s]")
    ax.set_ylim(0, min(sr / 2, 12_000))
    ax.set_title("Espectrograma ‚Äî salida")
    fig.colorbar(mesh, ax=ax, label="Magnitud [dB]")
    fig.tight_layout()
    return _figure_to_image(fig)

In [6]:
# -----------------------------------------------------------------------------
# Pipeline principal
# -----------------------------------------------------------------------------
def _slugify(text: str) -> str:
    return "".join(ch if ch.isalnum() else "_" for ch in text.lower()).strip("_")


def prepare_download(y: np.ndarray, sr: int, label: str) -> str:
    timestamp = time.strftime("%Y%m%d_%H%M%S")
    slug = _slugify(label or "salida")
    path = OUTPUTS_DIR / f"auralizacion_{slug}_{timestamp}.wav"
    sf.write(path, y, sr)
    return str(path)


def describe_session(
    signal_label: str,
    ir_label: str,
    sr: int,
    y: np.ndarray,
    ir: np.ndarray,
    filter_label: str,
) -> str:
    duration = len(y) / sr
    peak = np.max(np.abs(y))
    rms = np.sqrt(np.mean(y**2))
    rt60 = estimate_rt60(ir, sr)
    return (
        f"**Entrada:** {signal_label} ‚Äî {sr:,} Hz\n"
        f"**IR:** {ir_label} ¬∑ RT60 ‚âà {rt60:.2f} s\n"
        f"**Filtro posterior:** {filter_label}\n"
        f"**Salida:** {duration:.2f} s ¬∑ Pico {peak:.2f} ¬∑ RMS {rms:.3f}"
    )


def get_input_signal(
    source: str,
    upload: tuple[int, np.ndarray] | None,
    sr: int,
    clap_duration: float,
    sweep_duration: float,
) -> tuple[np.ndarray, str]:
    if source.startswith("Aplauso"):
        return make_synthetic_clap(sr, duration=clap_duration), "Aplauso sint√©tico"
    if source.startswith("Barrido"):
        return make_sine_sweep(sr, duration=sweep_duration), "Barrido senoidal log"

    if not upload:
        raise gr.Error("Debes subir un archivo de audio para esta opci√≥n.")
    up_sr, samples = upload
    samples = np.asarray(samples, dtype=float)
    if samples.ndim > 1:
        samples = samples.mean(axis=1)
    resampled = resample_audio(samples, int(up_sr), sr)
    if not np.any(resampled):
        raise gr.Error("El archivo de audio parece estar vac√≠o.")
    return normalize_audio(resampled), "Audio subido"


def process(
    sample_rate: int,
    signal_source: str,
    clap_duration: float,
    sweep_duration: float,
    uploaded_audio: tuple[int, np.ndarray] | None,
    ir_source_type: str,
    ir_choice: str,
    uploaded_ir: tuple[int, np.ndarray] | None,
    normalize_output: bool,
    filter_choice: str,
    freq_low: float,
    freq_high: float,
) -> tuple[
    tuple[int, np.ndarray],
    tuple[int, np.ndarray],
    tuple[int, np.ndarray],
    np.ndarray,
    np.ndarray,
    str,
    str,
]:
    sr = int(sample_rate)
    x, signal_label = get_input_signal(signal_source, uploaded_audio, sr, clap_duration, sweep_duration)

    # Selecci√≥n de IR
    if ir_source_type == "Preset":
        ir = get_ir(ir_choice, sr)
        ir_label = ir_choice
        meta = IR_PRESETS[ir_choice]
        color = meta["color"]
    else:
        # IR subida por el usuario
        if not uploaded_ir:
            raise gr.Error("Por favor, sube un archivo de IR o selecciona 'Preset'.")
        up_sr, samples = uploaded_ir
        samples = np.asarray(samples, dtype=float)
        if samples.ndim > 1:
            samples = samples.mean(axis=1)
        
        # Resamplear si es necesario
        ir = resample_audio(samples, int(up_sr), sr)
        # Normalizar para evitar problemas de ganancia excesiva
        ir = normalize_audio(ir)
        
        ir_label = "IR Personalizada"
        color = "#444444"  # Color gris oscuro para IRs custom

    y = apply_ir(x, ir, normalize=normalize_output)

    filter_mode = FILTER_MAP.get(filter_choice, "none")
    if filter_mode != "none":
        y = butter_filter(y, sr, filter_mode, freq_low, freq_high)
        if normalize_output:
            y = normalize_audio(y)

    # Usamos 'color' determinado arriba
    wave_img = waveform_image(y, sr, color)
    spec_img = spectrogram_image(y, sr)
    summary = describe_session(signal_label, ir_label, sr, y, ir, filter_choice)
    download_path = prepare_download(y, sr, f"{signal_label}_{_slugify(ir_label)}")

    return (
        (sr, x),
        (sr, ir),
        (sr, y),
        wave_img,
        spec_img,
        summary,
        download_path,
    )

In [7]:
# -----------------------------------------------------------------------------
# Interfaz Gradio
# -----------------------------------------------------------------------------
DESCRIPTION_MD = """
# Hito 3 ¬∑ Prototipo de Convoluci√≥n Avanzado

Explora el fen√≥meno del ‚Äúeco del quetzal‚Äù y presets de salas, o **sube tus propias Respuestas al Impulso (IR)**.
Configura la se√±al de entrada (duraci√≥n del aplauso o barrido), selecciona la IR y aplica filtros sencillos.
"""

with gr.Blocks(title="Hito 3 ‚Äî Demo de Convoluci√≥n", theme=gr.themes.Soft()) as demo:
    gr.Markdown(DESCRIPTION_MD)

    with gr.Row():
        with gr.Column():
            sample_rate = gr.Dropdown(
                label="Frecuencia de muestreo",
                choices=SAMPLE_RATE_OPTIONS,
                value=SAMPLE_RATE_OPTIONS[0],
            )
            signal_source = gr.Dropdown(label="Se√±al de entrada", choices=INPUT_OPTIONS, value=INPUT_OPTIONS[0])
            clap_duration = gr.Slider(
                minimum=0.05,
                maximum=0.50,
                value=0.14,
                step=0.01,
                label="Duraci√≥n aplauso sint√©tico [s]",
            )
            sweep_duration = gr.Slider(
                minimum=0.5,
                maximum=5.0,
                value=2.0,
                step=0.1,
                label="Duraci√≥n barrido senoidal [s]",
            )
            uploaded_audio = gr.Audio(
                label="Audio personalizado (usa esta entrada cuando selecciones 'Subir un audio')",
                type="numpy",
            )

            # --- Selecci√≥n de IR ---
            ir_source_type = gr.Radio(
                label="Fuente de Respuesta al Impulso",
                choices=["Preset", "Subir archivo"],
                value="Preset"
            )
            
            with gr.Column(visible=True) as preset_col:
                ir_choice = gr.Dropdown(label="Seleccionar Preset", choices=list(IR_PRESETS.keys()), value="Sala mediana")
            
            with gr.Column(visible=False) as upload_col:
                uploaded_ir = gr.Audio(label="Subir archivo de IR", type="numpy")

            def toggle_ir_source(source):
                return {
                    preset_col: gr.update(visible=(source == "Preset")),
                    upload_col: gr.update(visible=(source == "Subir archivo"))
                }

            ir_source_type.change(fn=toggle_ir_source, inputs=ir_source_type, outputs=[preset_col, upload_col])
            # -----------------------

            normalize_output = gr.Checkbox(label="Normalizar salida", value=True)

            filter_choice = gr.Dropdown(label="Filtro posterior", choices=list(FILTER_MAP.keys()), value="Ninguno")
            freq_low = gr.Slider(minimum=20.0, maximum=5_000.0, value=200.0, step=10.0, label="Frecuencia baja [Hz]")
            freq_high = gr.Slider(minimum=200.0, maximum=18_000.0, value=3_000.0, step=10.0, label="Frecuencia alta [Hz]")

            process_btn = gr.Button("Procesar", variant="primary")

        with gr.Column():
            input_audio = gr.Audio(label="Se√±al de entrada", type="numpy")
            ir_audio = gr.Audio(label="Respuesta al impulso", type="numpy")
            output_audio = gr.Audio(label="Auralizaci√≥n (salida)", type="numpy")
            waveform_output = gr.Image(label="Forma de onda", height=275)
            spectrogram_output = gr.Image(label="Espectrograma", height=320)
            summary_output = gr.Markdown(label="Resumen")
            download_output = gr.File(label="Descargar WAV")

    examples = gr.Examples(
        label="Ejemplos r√°pidos",
        examples=[
            [48_000, "Aplauso sint√©tico", 0.14, 2.0, None, "Preset", "Pir√°mide (Quetzal) üå™Ô∏è", None, True, "Ninguno", 200.0, 3_000.0],
            [44_100, "Barrido senoidal (20 Hz ‚Äì 12 kHz)", 0.14, 2.8, None, "Preset", "Sala grande / Hall", None, True, "Pasa-bajos", 20.0, 2_000.0],
        ],
        inputs=[
            sample_rate,
            signal_source,
            clap_duration,
            sweep_duration,
            uploaded_audio,
            ir_source_type,
            ir_choice,
            uploaded_ir,
            normalize_output,
            filter_choice,
            freq_low,
            freq_high,
        ],
    )

    process_btn.click(
        fn=process,
        inputs=[
            sample_rate,
            signal_source,
            clap_duration,
            sweep_duration,
            uploaded_audio,
            ir_source_type,
            ir_choice,
            uploaded_ir,
            normalize_output,
            filter_choice,
            freq_low,
            freq_high,
        ],
        outputs=[
            input_audio,
            ir_audio,
            output_audio,
            waveform_output,
            spectrogram_output,
            summary_output,
            download_output,
        ],
    )

# Lanzar la demo
demo.launch()

* Running on local URL:  http://127.0.0.1:7860
* To create a public link, set `share=True` in `launch()`.


