<a href="https://colab.research.google.com/github/prueba2001/primera-AI/blob/main/emotion2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 1️⃣ Montar tu Google Drive
from google.colab import drive
drive.mount('/content/drive')

# 2️⃣ Definir la ruta del archivo ZIP en tu Drive
# ⚠️ Ajusta la ruta según la ubicación exacta del archivo en tu Drive
zip_path = '/content/drive/MyDrive/dataset_mel.zip'

# 3️⃣ Definir la carpeta donde quieres descomprimirlo
extract_path = '/content/dataset'

# 4️⃣ Descomprimir el archivo
import zipfile
import os

# Crear la carpeta destino si no existe
os.makedirs(extract_path, exist_ok=True)

# Extraer el contenido
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

print(f"✅ Archivo descomprimido en: {extract_path}")

In [None]:
# Colab-ready: Fine-tune ResNet-101 on an ImageFolder dataset (no Drive saving)
# -------------------------------------------------------------------------
# Nota: si ves el error relacionado con PIL._typing, este script fuerza
# Pillow==9.5.0 antes de importar torchvision para evitar ese ImportError.
# -------------------------------------------------------------------------

# ------------------ 0) Instalar dependencias (ejecutar solo si es necesario) ------------------
# (Pillow se fija en 9.5.0 para evitar ImportError con PIL._typing)
!pip install -q --upgrade "tqdm" "scikit-learn" "matplotlib" "Pillow==9.5.0"

# Si quieres forzar también torch/torchvision (opcional y pesado):
# !pip install -q --upgrade "torch" "torchvision"

# ------------------ 1) Imports (intenta importar; si falla reinstala Pillow y reintenta) ------------------
import sys, subprocess, importlib

def try_imports():
    try:
        import os, time, copy, json
        from pathlib import Path
        from collections import Counter
        import torch
        import torch.nn as nn
        from torch.utils.data import DataLoader
        from torchvision import transforms, datasets, models
        from tqdm import tqdm
        import numpy as np
        from sklearn.metrics import classification_report, confusion_matrix, precision_recall_fscore_support
        import matplotlib.pyplot as plt
        from PIL import Image
        return True
    except Exception as e:
        print("Import error:", e)
        return False

if not try_imports():
    print("Reinstalando Pillow==9.5.0 y reintentando imports...")
    subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "--force-reinstall", "Pillow==9.5.0"])
    importlib.invalidate_caches()
    if not try_imports():
        raise RuntimeError(
            "Error importando dependencias aún después de reinstalar Pillow. "
            "En Colab reinicia el runtime (Runtime -> Restart runtime) y vuelve a ejecutar la celda."
        )

# Ahora los imports definitivos (si llegaste hasta aquí, deberían funcionar)
import os
import time
import copy
import json
from pathlib import Path
from collections import Counter

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import transforms, datasets, models
from tqdm import tqdm
import numpy as np

from sklearn.metrics import classification_report, confusion_matrix, precision_recall_fscore_support
import matplotlib.pyplot as plt

# ------------------ 2) Config (editar si quieres) ------------------
DATA_ROOT = '/content/dataset'  # mantener como dijiste
TRAIN_DIR = os.path.join(DATA_ROOT, 'train')
VAL_DIR   = os.path.join(DATA_ROOT, 'val')
TEST_DIR  = os.path.join(DATA_ROOT, 'test')

# <-- Salida local en /content para no usar Drive -->
OUTPUT_DIR = '/content/finetune_resnet101_output'
os.makedirs(OUTPUT_DIR, exist_ok=True)

NUM_EPOCHS = 8
BATCH_SIZE = 16            # si OOM, bajar a 8 o 4
IMG_SIZE = 224
LR = 1e-4
WEIGHT_DECAY = 1e-4
NUM_WORKERS = 2            # 2 es más seguro en Colab
PIN_MEMORY = True
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
PRINT_FREQ = 1
SEED = 42

torch.manual_seed(SEED)
np.random.seed(SEED)

# ------------------ 3) Transforms ------------------
train_transforms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transforms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# ------------------ 4) Datasets & Loaders ------------------
# Verifica que las rutas existan
for d in (TRAIN_DIR, VAL_DIR, TEST_DIR):
    if not os.path.isdir(d):
        raise FileNotFoundError(f"Directorio no encontrado: {d}. Asegúrate de tener /content/dataset/{{train,val,test}} con subcarpetas por clase.")

train_dataset = datasets.ImageFolder(TRAIN_DIR, transform=train_transforms)
val_dataset   = datasets.ImageFolder(VAL_DIR, transform=val_transforms)
test_dataset  = datasets.ImageFolder(TEST_DIR, transform=val_transforms)

class_names = train_dataset.classes
num_classes = len(class_names)
print(f"Detected classes ({num_classes}): {class_names}")

# Compute class weights (para imbalance)
counts = Counter([y for _, y in train_dataset])
class_counts = [counts[i] for i in range(num_classes)]
print('Train class counts:', dict(zip(class_names, class_counts)))

# Evitar división por cero: si una clase tiene 0 muestras, le damos peso 0.
if sum(class_counts) == 0:
    raise ValueError("No hay imágenes en el dataset de entrenamiento.")
class_weights = []
total = float(sum(class_counts))
for c in class_counts:
    if c > 0:
        class_weights.append(total / c)
    else:
        class_weights.append(0.0)
class_weights = torch.tensor(class_weights, dtype=torch.float32)
# normalizar (opcional)
if class_weights.sum().item() > 0:
    class_weights = class_weights / class_weights.sum() * num_classes
class_weights = class_weights.to(DEVICE)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY if torch.cuda.is_available() else False)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False,
                        num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY if torch.cuda.is_available() else False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False,
                         num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY if torch.cuda.is_available() else False)

# ------------------ 5) Modelo ------------------
# Usar la API de pesos recomendada
try:
    from torchvision.models import ResNet101_Weights
    weights = ResNet101_Weights.IMAGENET1K_V1
    model = models.resnet101(weights=weights)
except Exception:
    # fallback si la versión de torchvision no expone ResNet101_Weights
    model = models.resnet101(pretrained=True)

# Reemplazar la capa final
in_features = model.fc.in_features
model.fc = nn.Linear(in_features, num_classes)
model = model.to(DEVICE)

# ------------------ 6) Loss, optimizador, scheduler ------------------
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = torch.optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=LR, weight_decay=WEIGHT_DECAY)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2)

# Mixed precision si hay CUDA
use_amp = torch.cuda.is_available()
scaler = torch.cuda.amp.GradScaler(enabled=use_amp)

# ------------------ 7) Utilidades ------------------
def save_checkpoint(state, filename):
    torch.save(state, filename)

def evaluate(model, loader):
    model.eval()
    all_preds = []
    all_labels = []
    running_loss = 0.0
    with torch.no_grad():
        for images, labels in tqdm(loader, desc='Eval', leave=False):
            images = images.to(DEVICE)
            labels = labels.to(DEVICE)
            outputs = model(images)
            loss = criterion(outputs, labels)
            running_loss += loss.item() * images.size(0)
            preds = outputs.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    epoch_loss = running_loss / len(loader.dataset) if len(loader.dataset) > 0 else 0.0
    return np.array(all_labels), np.array(all_preds), epoch_loss

# ------------------ 8) Loop de entrenamiento ------------------
best_model_wts = copy.deepcopy(model.state_dict())
best_f1 = 0.0
history = {'train_loss': [], 'val_loss': [], 'val_f1': [], 'val_acc': []}

for epoch in range(1, NUM_EPOCHS+1):
    model.train()
    running_loss = 0.0
    pbar = tqdm(train_loader, desc=f'Epoch {epoch}/{NUM_EPOCHS}')
    for images, labels in pbar:
        images = images.to(DEVICE)
        labels = labels.to(DEVICE)
        optimizer.zero_grad()
        with torch.cuda.amp.autocast(enabled=use_amp):
            outputs = model(images)
            loss = criterion(outputs, labels)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        running_loss += loss.item() * images.size(0)
        processed = min((pbar.n+1) * train_loader.batch_size, max(1, len(train_loader.dataset)))
        pbar.set_postfix({'loss': f'{running_loss / processed:.4f}'})

    epoch_train_loss = running_loss / len(train_loader.dataset) if len(train_loader.dataset) > 0 else 0.0
    history['train_loss'].append(epoch_train_loss)

    # Validación
    y_true, y_pred, val_loss = evaluate(model, val_loader)
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='macro', zero_division=0)
    acc = (y_true == y_pred).mean() if y_true.size > 0 else 0.0
    history['val_loss'].append(val_loss)
    history['val_f1'].append(f1)
    history['val_acc'].append(acc)

    print(f"Epoch {epoch} Train loss: {epoch_train_loss:.4f} | Val loss: {val_loss:.4f} | Val acc: {acc:.4f} | Val macro-F1: {f1:.4f}")

    # Scheduler con la métrica f1
    scheduler.step(f1)

    # Guardar mejor
    if f1 > best_f1:
        best_f1 = f1
        best_model_wts = copy.deepcopy(model.state_dict())
        ckpt_path = os.path.join(OUTPUT_DIR, f'resnet101_best_f1_{best_f1:.4f}.pth')
        save_checkpoint({'epoch': epoch, 'model_state': best_model_wts, 'optimizer_state': optimizer.state_dict(), 'f1': best_f1}, ckpt_path)
        print('Saved best model to', ckpt_path)

# Cargar mejores pesos
model.load_state_dict(best_model_wts)

# Guardar modelo final y el historial (local)
torch.save(model.state_dict(), os.path.join(OUTPUT_DIR, 'resnet101_final.pth'))
with open(os.path.join(OUTPUT_DIR, 'training_history.json'), 'w') as f:
    json.dump(history, f)

# ------------------ 9) Evaluación en test ------------------
print('\nEvaluating on test set...')
y_true, y_pred, test_loss = evaluate(model, test_loader)
test_acc = (y_true == y_pred).mean() if y_true.size>0 else 0.0
print(f'Test loss: {test_loss:.4f} | Test acc: {test_acc:.4f}')

# Reporte por clase
report = classification_report(y_true, y_pred, target_names=class_names, zero_division=0)
print('\nClassification report:\n')
print(report)

cm = confusion_matrix(y_true, y_pred) if y_true.size>0 else np.zeros((num_classes, num_classes), dtype=int)
print('\nConfusion matrix:\n')
print(cm)

# Guardar report y cm (local)
with open(os.path.join(OUTPUT_DIR, 'classification_report.txt'), 'w') as f:
    f.write(report)
np.savetxt(os.path.join(OUTPUT_DIR, 'confusion_matrix.csv'), cm, fmt='%d', delimiter=',')

# Plot confusion matrix
plt.figure(figsize=(8,6))
plt.imshow(cm, interpolation='nearest')
plt.title('Confusion matrix')
plt.colorbar()
plt.xticks(range(num_classes), class_names, rotation=45, ha='right')
plt.yticks(range(num_classes), class_names)
plt.tight_layout()
plt.savefig(os.path.join(OUTPUT_DIR, 'confusion_matrix.png'))
plt.show()

print('\nAll outputs saved to (local):', OUTPUT_DIR)

# ------------- FIN -------------
# Consejos si tienes OOM: reducir BATCH_SIZE a 8 o 4, o congelar capas iniciales:
# for name, param in model.named_parameters():
#     if 'layer4' not in name and 'fc' not in name:
#         param.requires_grad = False

In [None]:
# Colab-ready: Inference single image(s) with ResNet-101 trained model
# Pegar y ejecutar EN UNA SOLA CELDA en Google Colab.
# Sube las imágenes cuando te lo pida la interfaz.

# ------------------ 0) Asegurar dependencias ------------------
# Forzamos una versión estable de Pillow para evitar el error "cannot import name '_Ink'".
# Instalamos matplotlib también. Si torch/torchvision ya están instalados en tu runtime de Colab,
# dejaremos comentada la línea para instalarlos (descomentar si necesitas reinstalarlos).
import sys
print("Comprobando / instalando dependencias (puede tardar algunos segundos)...")

# Instalar/forzar Pillow estable
!pip install -q --upgrade "Pillow==9.5.0" matplotlib

# Si necesitas reinstalar torch/torchvision en Colab (ejecuta solo si hace falta):
# En muchos runtimes de Colab, torch y torchvision ya vienen instalados y optimizados.
# !pip install -q --upgrade torch torchvision

print("Dependencias instaladas/actualizadas. Si el runtime se reinicia, re-ejecuta la celda completa.")

# Si tras actualizar Pillow, la importación sigue fallando, reiniciaremos el runtime automáticamente.
# (Reinicio forzado en Colab: os.kill -> el runtime se reiniciará; vuelve a ejecutar la celda completa después).
try:
    from PIL import Image
except Exception as e:
    import os, time
    print("ImportError al importar PIL tras instalar Pillow. Forzando reinicio del runtime para aplicar cambios...")
    print("Error:", e)
    # Forzar reinicio del runtime (Colab). El usuario debe volver a ejecutar la celda después del reinicio.
    os.kill(os.getpid(), 9)

# ------------------ 1) Imports ------------------
import os
import json
import torch
import torch.nn as nn
import numpy as np
from pathlib import Path
from PIL import Image
import matplotlib.pyplot as plt
from torchvision import transforms, models
from google.colab import files
from collections import OrderedDict
import warnings
warnings.filterwarnings("ignore")

# ------------------ 2) Config (editar si quieres) ------------------
MODEL_PATH = '/content/resnet101_final.pth'  # ruta por defecto al modelo que guardaste localmente
DATASET_TRAIN_DIR = '/content/dataset/train'  # se usa como fallback para obtener class names
IMG_SIZE = 224
TOPK = 5
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print("Device:", DEVICE)
print("Model path:", MODEL_PATH)

# ------------------ 3) Helper: cargar state_dict robustamente ------------------
def load_checkpoint_state_dict(path, map_location='cpu'):
    ckpt = torch.load(path, map_location=map_location)
    # si guardaste un dict con 'model_state' o 'model_state_dict' o 'state_dict'
    if isinstance(ckpt, dict):
        for key in ('model_state', 'model_state_dict', 'state_dict', 'model'):
            if key in ckpt:
                sd = ckpt[key]
                return sd
    # si es directamente state_dict
    if isinstance(ckpt, OrderedDict) or isinstance(ckpt, dict):
        return ckpt
    # si fue guardado como modelo completo (rare), intentar extraer state_dict
    if hasattr(ckpt, 'state_dict'):
        return ckpt.state_dict()
    raise RuntimeError("Formato de checkpoint no reconocido.")

def clean_state_dict(sd):
    # remover prefijo 'module.' si existe (para modelos entrenados con DataParallel)
    new_sd = OrderedDict()
    for k, v in sd.items():
        nk = k
        if k.startswith('module.'):
            nk = k[len('module.'):]
        new_sd[nk] = v
    return new_sd

# ------------------ 4) Asegurar que el checkpoint existe (si no, pedir subida) ------------------
if not os.path.isfile(MODEL_PATH):
    print(f"No se encuentra el modelo en {MODEL_PATH}. Por favor, sube el archivo .pth desde tu ordenador.")
    print("Selecciona el archivo .pth (por ejemplo resnet101_final.pth) en la ventana de upload.")
    uploaded_model = files.upload()
    if len(uploaded_model) == 0:
        raise FileNotFoundError("No se subió ningún modelo. Deteniendo ejecución.")
    else:
        # Tomar el primer archivo subido como modelo
        first_model = list(uploaded_model.keys())[0]
        MODEL_PATH = os.path.join('/content', first_model)
        print("Modelo subido y ubicado en:", MODEL_PATH)

# ------------------ 5) Cargar state_dict y construir modelo compatible ------------------
print("Cargando checkpoint desde:", MODEL_PATH)
raw_sd = load_checkpoint_state_dict(MODEL_PATH, map_location=DEVICE)
sd = clean_state_dict(raw_sd)

# determinar num_classes a partir de la forma de fc.weight en el state_dict
if 'fc.weight' in sd:
    num_classes = sd['fc.weight'].shape[0]
else:
    # fallback: intentar deducir por otras claves
    fc_keys = [k for k in sd.keys() if k.endswith('fc.weight') or k.endswith('.fc.weight')]
    if len(fc_keys) > 0:
        num_classes = sd[fc_keys[0]].shape[0]
    else:
        raise RuntimeError("No pude deducir num_classes desde el state_dict. Asegúrate de que el checkpoint contiene 'fc.weight'.")

# crear modelo (sin descargar pesos externos)
# Nota: en versiones recientes de torchvision la API acepta weights=None
model = models.resnet101(weights=None)
in_features = model.fc.in_features
model.fc = nn.Linear(in_features, num_classes)
model = model.to(DEVICE)

# Intentar cargar pesos; si falla por discrepancias, intentar con strict=False y avisar.
try:
    model.load_state_dict(sd)
    print("Pesos cargados con strict=True.")
except Exception as e:
    print("Carga con strict=True falló:", e)
    print("Intentando cargar con strict=False (ignorar keys faltantes/excedentes).")
    load_res = model.load_state_dict(sd, strict=False)
    print("Resultado carga strict=False:", load_res)

model.eval()
print(f"Modelo ResNet-101 listo. num_classes = {num_classes}")

# ------------------ 6) Obtener nombres de clase (intentos automáticos) ------------------
CLASS_NAMES = None

# 1) buscar archivo class_names.json en mismo folder del modelo
possible_json = os.path.join(os.path.dirname(MODEL_PATH), 'class_names.json')
if os.path.isfile(possible_json):
    try:
        with open(possible_json, 'r') as f:
            CLASS_NAMES = json.load(f)
        print("Cargadas class names desde", possible_json)
    except Exception as e:
        print("No se pudo leer class_names.json:", e)

# 2) fallback: listar carpetas en /content/dataset/train
if CLASS_NAMES is None and os.path.isdir(DATASET_TRAIN_DIR):
    classes = sorted([d.name for d in Path(DATASET_TRAIN_DIR).iterdir() if d.is_dir()])
    if len(classes) == num_classes:
        CLASS_NAMES = classes
        print("Cargadas class names desde el directorio de train:", DATASET_TRAIN_DIR)
    else:
        print(f"Encontradas {len(classes)} carpetas en {DATASET_TRAIN_DIR} (esperado {num_classes}). No se usarán como class names automáticas.")

# 3) último recurso: nombres genéricos
if CLASS_NAMES is None:
    CLASS_NAMES = [f"class_{i}" for i in range(num_classes)]
    print("No se encontraron nombres de clase en disco. Usando nombres genéricos class_0..")

# ------------------ 7) Transforms de entrada (mismo que validación/entrenamiento) ------------------
preprocess = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# ------------------ 8) Subir imagen(es) desde la UI de Colab ------------------
print("\nSelecciona 1 o más imagenes desde la ventana emergente (Upload).")
uploaded = files.upload()  # abre selector; carga archivos a /content

if len(uploaded) == 0:
    print("No se subió ninguna imagen. Termina la ejecución.")
else:
    for filename in uploaded.keys():
        img_path = os.path.join('/content', filename)
        try:
            img = Image.open(img_path).convert('RGB')
        except Exception as e:
            print(f"No pude abrir {filename}: {e}")
            continue

        # Preprocesar
        input_tensor = preprocess(img).unsqueeze(0).to(DEVICE)  # shape (1,C,H,W)

        # Inferencia
        with torch.no_grad():
            logits = model(input_tensor)
            probs = torch.softmax(logits, dim=1).cpu().numpy()[0]

        # Top-K
        topk_idx = probs.argsort()[::-1][:TOPK]
        topk_probs = probs[topk_idx]
        topk_names = [CLASS_NAMES[i] if i < len(CLASS_NAMES) else f"class_{i}" for i in topk_idx]

        # Mostrar imagen + predicción principal
        top_name = topk_names[0]
        top_prob = topk_probs[0]

        plt.figure(figsize=(6,6))
        plt.imshow(img)
        plt.axis('off')
        plt.title(f"Predicción: {top_name} ({top_prob*100:.2f}%)", fontsize=14)
        plt.show()

        # Mostrar Top-K
        print(f"Resultados para {filename}:")
        for i, (n, p) in enumerate(zip(topk_names, topk_probs), 1):
            print(f"  {i}. {n:20s}  {p*100:5.2f}%")
        print("-"*40)

    print("Proceso de inferencia finalizado.")

In [None]:
# Colab: convertir un único WAV subido a Mel-spectrogram 16 kHz y guardar PNG
# Requiere (ejecutar solo una vez si hace falta):
!pip install -q librosa soundfile matplotlib tqdm

import os
from pathlib import Path
import librosa
import numpy as np
import matplotlib.pyplot as plt
import soundfile as sf
from tqdm import tqdm
from google.colab import files
from PIL import Image
import io

# ---------- CONFIG ----------
# Salida por defecto (una sola imagen)
DST_ROOT = Path("/content/single_mel")
SAMPLE_RATE = 16000       # 16 kHz requerido
N_MELS = 128              # número de bandas Mel
N_FFT = 1024              # tamaño FFT
HOP_LENGTH = 256
FMIN = 0
FMAX = None               # None -> sr/2
FIG_DPI = 100             # resolución al guardar png
SAVE_NPY = False          # si True guarda también .npy mel
# ----------------------------

def ensure_dir(path: Path):
    if not path.exists():
        path.mkdir(parents=True, exist_ok=True)

def wav_to_mel_db(y, sr, n_mels=N_MELS, n_fft=N_FFT, hop_length=HOP_LENGTH, fmin=FMIN, fmax=FMAX):
    mel = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=n_fft, hop_length=hop_length,
                                         n_mels=n_mels, fmin=fmin, fmax=fmax)
    mel_db = librosa.power_to_db(mel, ref=np.max)
    return mel_db

def save_mel_png(mel_db, out_path: Path, dpi=FIG_DPI):
    # Guarda el espectrograma mel_db como PNG sin ejes, fondo negro y margen cero.
    # Ajustamos figura para que la resolución sea consistente.
    h, w = mel_db.shape
    plt.figure(figsize=(w / dpi, h / dpi), dpi=dpi)
    plt.axis('off')
    plt.imshow(mel_db, aspect='auto', origin='lower', interpolation='nearest')
    plt.tight_layout(pad=0)
    plt.savefig(out_path, bbox_inches='tight', pad_inches=0)
    plt.close()

def process_single_wav_bytes(wav_bytes: bytes, out_dir: Path, filename_stem: str):
    try:
        # Guardar temporalmente para que librosa pueda abrirlo sin problemas
        tmp_path = out_dir / (filename_stem + "_tmp.wav")
        ensure_dir(out_dir)
        with open(tmp_path, "wb") as f:
            f.write(wav_bytes)

        # Cargar y re-muestrear a SAMPLE_RATE (librosa.load mezcla a mono por defecto)
        y, sr = librosa.load(str(tmp_path), sr=SAMPLE_RATE, mono=True)
        if y.size == 0:
            print(f"Warning: archivo vacío {filename_stem}")
            tmp_path.unlink(missing_ok=True)
            return False

        mel_db = wav_to_mel_db(y, SAMPLE_RATE)
        out_img_path = out_dir / (filename_stem + ".png")
        save_mel_png(mel_db, out_img_path)

        if SAVE_NPY:
            npy_path = out_dir / (filename_stem + ".npy")
            np.save(npy_path, mel_db.astype(np.float32))

        # limpiar temporal
        tmp_path.unlink(missing_ok=True)
        print(f"Guardado: {out_img_path}")
        return out_img_path, mel_db
    except Exception as e:
        print(f"ERROR procesando {filename_stem}: {e}")
        return False

# ====== MAIN: pedir al usuario que suba un WAV ======
print("Arrastra/sube un archivo WAV (o mp3/flac) ahora — solo un archivo será procesado:")
uploaded = files.upload()
if not uploaded:
    raise SystemExit("No se subió ningún archivo. Ejecuta la celda nuevamente y sube un archivo WAV.")

# Tomar el primer archivo de audio subido válido
wav_bytes = None
wav_name = None
for fname, b in uploaded.items():
    if any(fname.lower().endswith(ext) for ext in ('.wav', '.flac', '.mp3', '.m4a', '.aac', '.ogg')):
        wav_bytes = b
        wav_name = fname
        break

if wav_bytes is None:
    raise SystemExit("No se detectó un archivo de audio válido entre los archivos subidos.")

# Procesar
ensure_dir(DST_ROOT)
stem = Path(wav_name).stem
res = process_single_wav_bytes(wav_bytes, DST_ROOT, stem)
if not res:
    raise SystemExit("Fallo en el procesamiento del archivo de audio.")

out_img_path, mel_db = res

# Mostrar imagen resultante en el notebook y algunas estadísticas
from IPython.display import display
img = Image.open(out_img_path)
display(img)

print("\nDetalles:")
print(" - Archivo de audio:", wav_name)
print(" - Output PNG:", out_img_path)
print(" - Forma del mel (bands x frames):", mel_db.shape)
print(" - Sample rate for processing:", SAMPLE_RATE)
if SAVE_NPY:
    print(" - También se guardó .npy con los datos MEL en la misma carpeta.")

print("\nListo — tu espectrograma MEL (16 kHz) está en:", DST_ROOT)


In [None]:
# Colab-ready: convertir todos los WAV en /content/dataset/... a espectrogramas MEL 16kHz
# Salida: /content/dataset_mel/train|val|test/<clase>/<misma_name>.png
# Requiere: librosa, matplotlib, soundfile, tqdm
# Ejecuta solo una vez la instalación:
!pip install -q librosa soundfile matplotlib tqdm

import os
from pathlib import Path
import librosa
import numpy as np
import matplotlib.pyplot as plt
import soundfile as sf
from tqdm import tqdm

# ---------- CONFIG ----------
SRC_ROOT = Path("/content/dataset")            # carpeta origen (ya la tienes)
DST_ROOT = Path("/content/dataset_mel")        # carpeta destino (nueva)
SAMPLE_RATE = 16000                            # 16 kHz requerido
N_MELS = 128                                   # número de bandas Mel
N_FFT = 1024                                   # tamaño FFT
HOP_LENGTH = 256
FMIN = 0
FMAX = None                                    # None -> sr/2
FIG_DPI = 100                                  # resolución al guardar png
SAVE_NPY = False                                # si True guarda también .npy mel (descomentar si quieres)
# ----------------------------

def ensure_dir(path: Path):
    if not path.exists():
        path.mkdir(parents=True, exist_ok=True)

def wav_to_mel_db(y, sr, n_mels=N_MELS, n_fft=N_FFT, hop_length=HOP_LENGTH, fmin=FMIN, fmax=FMAX):
    # Calcula mel spectrogram y lo pasa a escala dB (log)
    mel = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=n_fft, hop_length=hop_length,
                                         n_mels=n_mels, fmin=fmin, fmax=fmax)
    mel_db = librosa.power_to_db(mel, ref=np.max)
    return mel_db

def save_mel_png(mel_db, out_path: Path, dpi=FIG_DPI):
    # Guarda el espectrograma mel_db como PNG sin ejes, fondo negro y margen cero.
    plt.figure(figsize=(mel_db.shape[1] / dpi, mel_db.shape[0] / dpi), dpi=dpi)
    plt.axis('off')
    # mostrar con origin='lower' para que el eje mel crezca hacia arriba
    plt.imshow(mel_db, aspect='auto', origin='lower', interpolation='nearest')
    plt.tight_layout(pad=0)
    plt.savefig(out_path, bbox_inches='tight', pad_inches=0)
    plt.close()

def process_file(src_path: Path, dst_img_path: Path):
    try:
        # cargamos y re-muestreamos a SAMPLE_RATE
        # librosa.load hace mezcla a mono por defecto (sr resamplea)
        y, sr = librosa.load(str(src_path), sr=SAMPLE_RATE, mono=True)
        if y.size == 0:
            print(f"Warning: archivo vacío {src_path}")
            return False

        mel_db = wav_to_mel_db(y, SAMPLE_RATE)
        ensure_dir(dst_img_path.parent)
        save_mel_png(mel_db, dst_img_path)

        if SAVE_NPY:
            npy_path = dst_img_path.with_suffix('.npy')
            np.save(npy_path, mel_db.astype(np.float32))

        return True
    except Exception as e:
        print(f"ERROR procesando {src_path}: {e}")
        return False

# ====== MAIN ======
if not SRC_ROOT.exists():
    raise SystemExit(f"Directorio origen no existe: {SRC_ROOT}")

# Recrear estructura train/val/test y subcarpetas de clases en DST_ROOT
splits = ["train", "val", "test"]
files_to_process = []

for split in splits:
    src_split = SRC_ROOT / split
    if not src_split.exists():
        # si alguna de las carpetas no existiera, lo notificamos pero seguimos
        print(f"Advertencia: {src_split} no existe. Se omitirá.")
        continue
    for class_dir in src_split.iterdir():
        if not class_dir.is_dir():
            continue
        # creamos carpeta destino correspondiente
        dst_class_dir = DST_ROOT / split / class_dir.name
        ensure_dir(dst_class_dir)
        # recogemos archivos de audio dentro de esta carpeta
        # aceptamos wav, wav1, wav2, flac, mp3 (librosa soporta varios)
        for ext in ("*.wav", "*.WAV", "*.flac", "*.FLAC", "*.mp3", "*.MP3", "*.m4a", "*.M4A"):
            for p in class_dir.glob(ext):
                files_to_process.append((p, dst_class_dir / (p.stem + ".png")))

print(f"Archivos a procesar: {len(files_to_process)}")
# Procesar con tqdm
errors = 0
for src_path, dst_img_path in tqdm(files_to_process):
    ok = process_file(src_path, dst_img_path)
    if not ok:
        errors += 1

print("=== FIN ===")
print(f"Total archivos: {len(files_to_process)}  - Errores: {errors}")
print(f"Espectrogramas guardados en: {DST_ROOT}")

# Opcional: listar algunas muestras guardadas
sample_preview = list((DST_ROOT / "train").rglob("*.png"))[:6]
if sample_preview:
    print("Algunas muestras (primeras 6) guardadas:")
    for p in sample_preview:
        print(" -", p)
else:
    print("No se encontraron muestras en destino. Revisa rutas.")


In [None]:
from google.colab import drive
import shutil
import os

# 1️⃣ Montar Google Drive
drive.mount('/content/drive')

# 2️⃣ Rutas
carpeta_origen = '/content/dataset_mel'
ruta_salida = '/content/drive/MyDrive/dataset_mel.zip'  # Cambia el nombre o la ruta si quieres

# 3️⃣ Comprimir la carpeta
shutil.make_archive(base_name=ruta_salida.replace('.zip', ''), format='zip', root_dir=carpeta_origen)

print(f"✅ Carpeta comprimida y guardada en Drive: {ruta_salida}")