In [None]:
import os
from pathlib import Path
import time
from sklearn.svm import SVC

from sklearn.preprocessing import StandardScaler
from transformers import Wav2Vec2FeatureExtractor, Wav2Vec2Model
import numpy as np
import torch
import pandas as pd
import librosa
import soundfile as sf

from tqdm.auto import tqdm
tqdm.pandas()


In [None]:
from pathlib import Path

BASE_DIR = Path(r"ruta") ## aquí hay que escribir la ruta a la carpeta "LA" descargable en  https://www.kaggle.com/datasets/awsaf49/asvpoof-2019-dataset
PROTOCOLS_DIR = BASE_DIR / "ASVspoof2019_LA_cm_protocols"

dev_protocol_file = PROTOCOLS_DIR / "ASVspoof2019.LA.cm.dev.trl.txt"

cols = ["speaker_id", "audio_id", "unused", "unused2", "key"]

df_dev = pd.read_csv(
    dev_protocol_file,
    sep=" ",
    header=None,
    names=cols
)

df_dev.head()


In [None]:
DEV_AUDIO_DIR = BASE_DIR / "ASVspoof2019_LA_dev" / "flac"

df_dev["path"] = df_dev["audio_id"].apply(lambda x: DEV_AUDIO_DIR / f"{x}.flac")
df_dev.head()


In [None]:

example = df_dev.iloc[-60]

audio, sr = librosa.load(example["path"], sr=16000)
print(f"Tamaño de df_dev es {len(df_dev)} por {len(list(df_dev.columns))}")
print("Etiqueta:", example["key"])
print("Forma:", audio.shape)
print("Sampling rate:", sr)


# Wave2Vec XLSR

Es un modelo neuronal auto-supervisado para audio, desarrollado por facebook AI, ahora meta.
Su funcion principal es convertir audio "crudo" en representaciones vectoriales que capturan contenido fonético, esctructura rítmica, timbre características del hablante y artefactos acústicos.

## ¿Por qué es útil?
Un audio generado por IA puede contener:
* artifactos espectrales sutiles
* irregularidades en el pitch
* formantes no naturales
* ruidos no humanos
* inconsistencias temporales

Wave2Vec ha sido entrenado para diferenciar patrones naturales del habla humana, sus embeings permiten distingir entre humano (bonafide) o spoof (deepfake) sin tener que entrenar desde cero

# Cómo funciona?

1. Entra audio crudo $x(t)$, donde $x\in\mathbb{R}^T$ con $T$ muestras de audio
2. Feature Encoder (CNN temporal):
    * Primero el audio pasa por 7 capas convolucionales: $$ z =f_{CNN}(x) $$
       * Donde stride reduce la resolución temporal y kernels grandes capturan patrones locales, por lo que el resultado es $$z\in\mathbb{R}^{L\times d}$$. Cada "frame" de $z$ contiene información acústica, $z$ NO ES EL EMBEDING FINAL
# Masking (auto-supervisado)
La magia del preentrenamiento es que se ocultan (se enmascaran) bloques aleatorios de frames y el modelo debe predecir qué había ahí. Este comportamiento es parecido a BERT en NLM pero a audio.

# Transformer Encoder
La salida enmascarada pasa por un gran transformer c:
$$c=Transformer(z_{masked})$$

El transformer es el que aprende contexto global:
* fonemas
* ritmo
* prosodia
* estructura temporal
* acento
* características de la voz

El resultado final entonces es $x\in\mathbb{R}^{L\times H}$ donde $H\simeq 768-1024$.

Este $c$ es el que ya podemos usar como embedding. No necesitamos predecir los tokens contrastivos como en el pre-entrenamiento. Solo usamos el encoder como extractor de características


# Qué representa un embedding de este modelo

Cada vector de 1024 dimensiones contiene:
* timbre de voz
* patrón espectral global
* transición fonética
* ritmo
* calidad de la señal
* artefactos de vocoder
* distorsiones TTS
* ruido y reverberación
Todos estos factores son importantes para distinguir deepfakes.

Aplicamos, entonces,
$$e = \frac{1}{L}\sum_{i=1}^{L}c_i$$

que es la definición de media.

Un solo vector de 1024 representa todo el audio.

Básicamente Wav2Vec fue entrenado con decenas de miles de horas de habla humana real, aprendió a reconocer patrones humanos y cuando ve algo "antinatural" el embedding es diferente por lo que un clasificador clásico de ML podría separar bonafide vs spoof

# Pipeline a grosso modo
Audio crudo → CNN Encoder → Transformer → Embeddings → Clasificador


In [None]:
# 1. Cargar Feature Extractor 

feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(
    "facebook/wav2vec2-xls-r-300m"
)

In [None]:
# 2. Cargar modelo Wav2Vec2 XLSR
# Este modelo produce embeddings de dimensión 1024 por frame.

device = "cuda" if torch.cuda.is_available() else "cpu"

feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(
    "facebook/wav2vec2-xls-r-300m"
)

model = Wav2Vec2Model.from_pretrained(
    "facebook/wav2vec2-xls-r-300m"
)

# MUY IMPORTANTE: mover el modelo COMPLETO a GPU AQUÍ
model = model.to(device)

print("Modelo está en:", next(model.parameters()).device)

In [None]:
# 3. Función para cargar audio y convertirlo en embedding

def load_audio(path, target_sr=16000):
    audio, sr = librosa.load(path, sr=target_sr)
    return audio


def extract_embedding(path):
    audio = load_audio(path)

    # preparar tensores
    inputs = feature_extractor(
        audio,
        sampling_rate=16000,
        return_tensors="pt",
        padding=True
    )

    # Mover tensores al GPU
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Forward pass en GPU
    with torch.no_grad():
        outputs = model(**inputs)

    # Mean pooling
    emb = torch.mean(outputs.last_hidden_state, dim=1)

    # Pasar a CPU para convertir a numpy
    return emb.squeeze().cpu().numpy()

device = "cuda" if torch.cuda.is_available() else "cpu"

print("CUDA disponible:", torch.cuda.is_available())
print("Usando dispositivo:", device)

if device == "cuda":
    print("GPU:", torch.cuda.get_device_name(0))
else:
    print("No se detectó GPU en PyTorch.")

test_path = df_dev["path"].iloc[0]

start = time.time()
_ = extract_embedding(test_path)
end = time.time()

tiempo_gpu = end - start
print(f"Tiempo por audio en GPU: {tiempo_gpu:.4f} segundos")


num_dev = len(df_dev)
tiempo_est_dev = num_dev * tiempo_gpu

print(f"Audios en DEV: {num_dev}")
print(f"Tiempo estimado (DEV): {tiempo_est_dev/60:.2f} minutos")

try:
    num_train = len(df_train)
    tiempo_est_train = num_train * tiempo_gpu

    print(f"Audios en TRAIN: {num_train}")
    print(f"Tiempo estimado (TRAIN): {tiempo_est_train/60:.2f} minutos")

except:
    print("df_train no cargado todavía")



In [None]:
# 4. PRUEBA: extraer embedding de un archivo del dataset DEV
# ------------------------------------------------------------

example_path = df_dev["path"].iloc[0]
example_label = df_dev["key"].iloc[0]

print("Archivo:", example_path)
print("Etiqueta:", example_label)

emb = extract_embedding(example_path)

print("Embedding shape:", emb.shape)
print("Primeros valores:", emb[:10])

# Generar embeddings para todo el dataset (train/dev)

Con esto construiremos:

* X_train: matriz de embeddings

* y_train: etiquetas bonafide/spoof

* X_dev: embeddings del conjunto de validación

* y_dev: etiquetas

Este paso es indispensable para poder entrenar un clasificador de detección de deepfake.

1. Convertir "bonafide" → 0 y "spoof" → 1

2. Crear una función generadora de embeddings por fila del dataframe

3. Aplicarlo con progress_apply (tqdm)

4. Guardar los embeddings en arrays numpy

In [None]:
tqdm.pandas()

In [None]:
# 1. Convertir las etiquetas de texto a etiquetas numéricas

# bonafide = 0 (voz humana real)
# spoof = 1 (deepfake / TTS / voice conversion)


label_map = {"bonafide": 0, "spoof": 1}

df_dev["label_num"] = df_dev["key"].map(label_map)

# Verificamos que esté correcto
df_dev[df_dev["label_num"] == 1][["key", "label_num"]].head(2)


In [None]:
df_dev[df_dev["label_num"] == 0][["key", "label_num"]].head(2)

In [None]:
# 2. Función auxiliar para aplicar extract_embedding con tqdm
# Esta función recibe una fila del DataFrame y usa la columna "path"
# para extraer el embedding con la función extract_embedding().

def get_embedding(row):
    try:
        return extract_embedding(row["path"])
    except Exception as e:
        print("Error con archivo:", row["path"], e)
        return np.zeros(1024)  # vector nulo si falla algo


df_dev["embedding"] = df_dev.progress_apply(get_embedding, axis=1)

# Mostrar primeras filas para verificar
df_dev.head()


In [None]:


TRAIN_PROTOCOL  = BASE_DIR /"ASVspoof2019_LA_cm_protocols\ASVspoof2019.LA.cm.train.trn.txt"

df_train = pd.read_csv(
    TRAIN_PROTOCOL,
    sep=" ",
    header=None,
    names=["speaker_id", "audio_id", "unused1", "system_id", "key"]
)

print("Protocol loaded. Shape:", df_train.shape)
df_train.head()

In [None]:
TRAIN_AUDIO_DIR = BASE_DIR / "ASVspoof2019_LA_train/flac/"

def make_path(audio_id):
    return os.path.join(TRAIN_AUDIO_DIR, audio_id + ".flac")

df_train["path"] = df_train["audio_id"].apply(make_path)
df_train[["path"]].head()

In [None]:
df_train["exists"] = df_train["path"].apply(os.path.exists)
df_train["exists"].value_counts()
label_map = {"bonafide": 0, "spoof": 1}
df_train["label_num"] = df_train["key"].map(label_map)
print(df_train["label_num"].value_counts())




In [None]:
# Asegurar GPU
import torchaudio
import torchaudio.transforms as T
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

def extract_embedding(path):
    # Leer audio
    speech, sr = torchaudio.load(path)
    speech = speech.squeeze()

    if sr != 16000:
        speech = torchaudio.functional.resample(speech, sr, 16000)

    # Procesor a GPU
    inputs = feature_extractor(
        speech.numpy(),
        sampling_rate=16000,
        return_tensors="pt",
        padding=True
    )

    # Enviar inputs a GPU
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Forward pass sin gradiente
    with torch.no_grad():
        outputs = model(**inputs)

    # Mean pooling
    emb = torch.mean(outputs.last_hidden_state, dim=1).squeeze()

    # Regresar emb a CPU como numpy
    return emb.cpu().numpy()

# Extraer embeddings masivamente
df_train["embedding"] = [
    extract_embedding(path) for path in tqdm(df_train["path"])
]

print("Embeddings TRAIN listos.")


In [None]:
X_train = np.vstack(df_train["embedding"].values)
y_train = df_train["label_num"].values
np.save("X_train.npy", X_train)
np.save("y_train.npy", y_train)



Para predecir utilizaré una máquina de soporte vectorial con Kernel de Función de Base Radial
* Los embeddings están en $\mathbb{R}^{1024}$
* La separacion entre bonafide vs spoof no es perfectamente lineal (o eso espero)
* RBF kernel permite fronteras no lineales suaves
* los embedings están centrados y normalizados por diseño dle modelo
* el número de muestras es moderado

In [None]:
# 4. Convertir embeddings y etiquetas a matrices numpy

X_dev = np.vstack(df_dev["embedding"].values)
y_dev = df_dev["label_num"].values

X_dev.shape, y_dev.shape
## debe salir ((N,1024),(N,))

In [None]:
np.save("X_dev.npy", X_dev)
np.save("y_dev.npy", y_dev)
