# Corpus Downloader 
------
Autor: Ruy Cabello \
Fecha (última revisión): 20/12/2023  


La finalidad de este código es automatizar la descarga de los videos, como audios en formato mp3, encontrados en el archivo _corpus yt 2023.csv_. Este documento contiene una extensa recopilación de hablantes de varios estados de México. Por cada estado, se trata de recopilar 6 hablantes: 3 mujeres y 3 hombres distintos. De cada uno de ellos se recopilan 3 videos. La recopilación aún sigue en proceso. 

In [1]:
import pandas as pd
import os
import time 
from tinydb import TinyDB, Query
from pytubefix import YouTube
from pytubefix.exceptions import VideoUnavailable
from pytubefix.cli import on_progress
import whisper
from whisper.utils import get_writer
from IPython.display import clear_output

## Constantes y variables auxiliares

In [2]:
ESTADOS = {1:"Aguascalientes", 2:"Baja California", 3:"Baja California Sur", 4:"Campeche", 
            5:"Chiapas", 6:"Chihuahua", 7:"Coahuila", 8:"Colima", 9:"Durango", 10:"Guanajuato",
            11:"Guerrero", 12:"Hidalgo", 13:"Jalisco", 14:"Ciudad de México", 15:"Estado de México", 
            16:"Michoacán", 17:"Morelos", 18:"Nayarit", 19:"Nuevo León", 20:"Oaxaca", 21:"Puebla", 22:"Querétaro", 
            23:"Quintana Roo", 24:"San Luis Potosí", 25:"Sinaloa", 26:"Sonora", 27:"Tabasco", 28:"Tamaulipas",
            29:"Tlaxcala", 30:"Veracruz", 31:"Yucatán", 32:"Zacatecas"}

# Modidificar si deseas guardar los audios o captions en otra ubicación
AUDIOS_DIR = os.path.join(os.getcwd(), "audios")
CAPTIONS_DIR = os.path.join(os.getcwd(), "captions")
YT_CAPTIONS_DIR = os.path.join(CAPTIONS_DIR, "yt_captions")
WHISPER_CAPTIONS_DIR = os.path.join(CAPTIONS_DIR, "whisper_captions")

## Creando un dataframe del corpus actual

Actualizado el: 11/11/2023

In [3]:
df = pd.read_csv("corpus_youtube.csv")
df.rename(str.strip, axis="columns", inplace = True)
df.loc[:, "Nombre"].ffill(limit=2, inplace = True)
df.loc[:, "Estado"].ffill(inplace=True)
df.loc[:, "Edad"].ffill(limit=2, inplace = True)
df.loc[:, "Género"].ffill(limit=2, inplace = True)

df.head(5)

Unnamed: 0,Estado,Nombre,Edad,Género,Título,Fecha de creación,Link,Fecha de recopilación,Aportación
0,Aguascalientes,Alejandra García,26,Femenino,REGALO COMÍDA en la CALLE | lo logramos 🙏🏻🥳 Al...,31 agosto 2023,https://youtu.be/hWKy7Ns1pNo?si=CtYGM3G0f_TtUXUz,18 septiembre 2023,Heili
1,Aguascalientes,Alejandra García,26,Femenino,"habitación principal | REMODELACIÓN, CON POQUI...",30 noviembre 2022,https://youtu.be/bm2xz0_HrGc?si=anegbcnciRTpv535,18 septiembre 2023,Heili
2,Aguascalientes,Alejandra García,26,Femenino,Nuestro Árbol de Navidad Casero | Ale García ✨,21 noviembre 2021,https://youtu.be/JT3WoDrrl1A?si=U7yvebArA8Qx6wkv,18 septiembre 2023,Heili
3,Aguascalientes,Paulina Espinoza Ávila,37,Femenino,inflamación abdominal,1 agosto 2023,https://youtu.be/6DRWV94GF_M?si=-bhaBjaK_jonBLNz,26 de octubre 2023,Heili
4,Aguascalientes,Paulina Espinoza Ávila,37,Femenino,6 de enero de 2023,6 enero 2023,https://youtu.be/8rAqZKE6ZRk?si=gH_-kllispldo99U,26 de octubre 2023,Heili


## Database con TinyDB

In [4]:
db_file = "audio_database.json"

# Tratamos de abrir el archivo json, en caso de que no este, lo creamos.
try:
    with open(db_file, 'r'):
        pass
    db = TinyDB(db_file)
except FileNotFoundError:
    db = TinyDB(db_file)


# Guardamos: video_id, estado, autor, edad, genero, ruta_audio, yt_trad, whisper_trad, audio_descargado
def create_db_entries(df_entry, r_aud, r_yt, r_whisp="N/A"):
    id = YouTube(df_entry["Link"]).video_id
    db.insert({"Estado":df_entry["Estado"], "Autor":df_entry["Nombre"], "Edad":df_entry["Edad"], 
               "Genero":df_entry["Género"], "VideoID":id, "Ruta":r_aud, "Caption_YT": r_yt,
               "Caption_Whisper": r_whisp})


def is_downloaded(link: str):
    vid_id = YouTube(link).video_id
    yt_vid = Query()

    return db.search(yt_vid.VideoID == vid_id)
    


def errase_db():
    db.truncate()
    db.all()

def errase_downloaded_txt():
    try:
        with open("downloaded.txt", "w") as _:
           pass
    except FileNotFoundError as e:
        print(f'File not found: {e}')

## Funciones para generar las transcripciones. 

Las transcripcciones, o subtítulos o _captions_ es el texto generadoro en cada video. En este caso, utilizamos transcripciones generadas por dos métodos: 

1. Generadas de manera automática por YouTube.
2. Generadas por medio de Whisper.

In [5]:
# Generación de transcripciones
def captions_whisper(ruta_audio: str) -> str:
    model = whisper.load_model("base")
    audio = r'{}'.format(ruta_audio)
    result = model.transcribe(audio=audio,
                            fp16=False, word_timestamps=True,
                            verbose=False)
    srt_writer = get_writer("srt", WHISPER_CAPTIONS_DIR)
    srt_writer(result, audio)  

    n = ruta_audio.find(".mp3") - 11
    name = ruta_audio[n:len(ruta_audio)-4] + ".srt"
    ruta = WHISPER_CAPTIONS_DIR + '\\' + name
    return ruta

def captions_youtube(yt: YouTube) -> str:
    vid_id = yt.video_id
    flag = True
    try: 
        caption = yt.captions["a.es"]
        file_path = os.path.join(YT_CAPTIONS_DIR, f"{vid_id}.srt")
        caption.download(yt.video_id)
        os.rename(f'{yt.video_id +" (a.es).srt"}', file_path)
    except:
        print(f"No se pudo descargar los subtitulos del video: {vid_id}")
        flag = False

    ruta = file_path if flag else "N/A"

    return ruta
    

## Función para descargar videos

In [6]:
errors = {}
def download_audio(link:str, row, i:int):
    try:
        yt = YouTube(link, on_progress_callback=on_progress)
        ys = yt.streams.get_audio_only()
        name = yt.video_id + ".mp3"
        dest_path = ys.download(output_path=AUDIOS_DIR, filename=name)
        # Automatically save the transcription from yt.
        rYT = captions_youtube(yt)
        create_db_entries(row, dest_path, rYT)
    except Exception as e:
        if "Video unavailable" in str(e):
            print(f'Video at {link} is unavailable, skipping...')
            errors[i] = yt
        else:
            print(f'An error occurred while processing the video: {e}')
            errors[i] = yt


## Main

In [7]:
downloaded_counter = 0 

df_available = df.dropna()
available_total = len(df_available)
exp_time = lambda x: x*71.8/5/60
print('**'*30)
print(f'Iniciando descarga de {available_total} audios.\nTiempo esperado de descarga: {round(exp_time(available_total),2)} minutos')

# testing
# stop = 4

start_time = time.time()
for i, row in df_available.iterrows():

    link = row["Link"]

    if downloaded_counter % 5 == 0 and downloaded_counter != 0: 
        # Cada 5 descargas esperamos un minuto para evitar ser bloqueados
        time.sleep(60)

    if is_downloaded(link):
        print("Audio ya descargado, saltando...")
        continue
    else:
        download_audio(link, row, i)

    # testing
    #if i == stop:
    #    break

    downloaded_counter += 1

# Whisper captions
end_time = time.time()
total_time = round((end_time - start_time) / 60, 2)
print("DESCARGA FINALIZADA")
print(f'Tiempo real de descarga: {total_time} minutos')
time.sleep(5)
clear_output(True)

print('Deseas descargar en este momento las transcripciones de Whisper?')
print('(Puede tardar hasta 24 horas)')
res = input('Si, descargar -> s\nNo, en otro momento -> n')


if res.lower() == 's':
    audios = Query()
    for item in db:
        vid_id = item['VideoID']
        audio_dir = item['Ruta']
        whisper_dir = item['Caption_Whisper']
        if whisper_dir == 'N/A':
            r = captions_whisper(audio_dir)
            db.update({'Caption_Whisper':r}, audios.VideoID == vid_id)
        else:
            print("Transcripcion ya generada, saltando...")
else:
    print("Para descargar en otro momento las transcripciones, \
           ejecuta el bloque siguiente")

### Creacion de captions


************************************************************
Iniciando descarga de 241 audios.
Tiempo esperado de descarga: 57.68 minutos
Audio ya descargado, saltando...
Audio ya descargado, saltando...
Audio ya descargado, saltando...
Audio ya descargado, saltando...
Audio ya descargado, saltando...
 ↳ |██████████████████████████████████████████████████████████████████████████████████████| 100.0%

KeyboardInterrupt: 

In [None]:
# Ejecutar si deseas descargar las transcripciones de Whisper
# Primero se debieron haber descargado los audios (bloque anterior)
audios = Query()
for item in db:
    vid_id = item['VideoID']
    audio_dir = item['Ruta']
    if audio_dir == 'N/A':
        r = captions_whisper(audio_dir)
        db.update({'Caption_Whisper':r}, audios.VideoID == vid_id)
    else:
        print("Transcripcion ya generada, saltando...")

In [6]:
# Descomentar y ejectuar si deseas borrar la base de datos
# errase_db()

## Ejecutar el siguiente bloque de código para filtrar resultados

In [8]:
#### DIFERENTES QUERIES

def state_equals(estado, return_results=True) :
    Video = Query()
    if return_results:
        return db.search(Video.Estado == estado)
    else:
        return Video.Estado == estado

def age_equals(edad, return_results=True):
    Video = Query()
    if return_results:
        return db.search(Video.Edad == edad)
    else:
        return Video.Edad == edad

def age_is_more(num=18, return_results=True):
    # Busca por edad que sea mayor o igual a un numero 
    Video = Query()
    if return_results:
        return db.search(Video.Edad >= num)
    else:
        return Video.Edad >= num

def age_is_less(num=18, return_results=True):
    Video = Query()
    if return_results:
        return db.search(Video.Edad <= num)
    else:
        Video.Edad <= num

def gender_is(genero, return_results=True):
    # Busca por genero, feminino o masculino
    Video = Query()
    if return_results:
        return db.search(Video.Genero == genero)
    else:
        return Video.Genero == genero

def autor_is(autor, return_results=True):
    ## Agregar regex
    Video = Query()
    if return_results:
        return db.search(Video.Nombre == autor)
    else:
        return Video.Nombre == autor

## ¿Cómo usar TinyDB?

TinyDB es una librería de Python que facilita el uso de una mini base de datos. En nuestro caso, se utiliza para guardar todas las características de los autores de los videos, así como de las ubicaciones de los archivos descargados y generados. 

### Obtén la ruta de los audios según ciertos atributos
-----
Para poder ubicar la ruta de los audios según un criterio de búsqueda, considera los siguientes atributos de búsqueda: 

1. **Estado**: entidad federativa del hablante.
2. **Autor**: nombre del hablante. 
3. **Edad**: edad del hablante.
4. **Género**: género del hablante (masculino o femenino).

### Tipos de búsqueda
#### 1. Búsqueda simple
Se utiliza cuando se desea hacer una búsqueda utilizando un solo atributo, por ejemplo: 
- Encontrar audios del **Estado** de Baja California.
- Buscar todos los audios cuya **edad** del hablante sea mayor o igual que 18. 
- Conseguir todos los audios del **género** masculino.

### ¿Cómo usarlo?
- Selecciona el número del atributo que desees consultar. 
- Unas opciones se presentarán según el criterio a buscar. Brindar la información necesaria. 
- Se muestran los resultados correspondientes
- Al finalizar, señalar si se desean guardas los resultados en un archivo txt

In [11]:
def print_opciones():
    print("Selecciona un criterio de búsqueda: ")
    print("1. Estado\n2. Autor\n3. Edad\n4. Género")

def imprimir_edos():
    for estado in ESTADOS.items():
        print(estado)

def seleccionar_edad(return_results = True) -> Query:

    edad = int(input("Ingresa la edad de búsqueda: "))
    print("Ingresa el número según tu criterio de búsqueda.")
    print(f"1. Edad igual a {edad}")
    print(f"2. Edad mayor o igual a {edad}")
    print(f"3. Edad menor o igual a {edad}")

    op = int(input())

    if op == 1:
        clear_output()
        print(f"Buscando resultados con edad igual a: {edad}")
        if return_results:
            return age_equals(edad) 
        else:
            return age_equals(edad, False)
    elif op == 2:
        clear_output()
        print(f"Buscando resultados con edad mayor o igual a: {edad}")
        if return_results:
            return age_is_more(edad) 
        else:
            return age_is_more(edad, False)
    elif op == 3:
        clear_output()
        print(f"Buscando resultados con edad menor o igual a: {edad}")
        if return_results:
            return age_is_less(edad) 
        else:
            return age_is_less(edad, False)
    else:
        print("Opción inválida")
    
    return []

def salvar_resultados(query):
    current_time = time.time()
    struct = time.localtime(current_time)
    s = time.strftime("%H-%M-%S", struct)
    print(s)
    with open(f"{s}.txt", "w") as f:
        for item in query:
            f.write(str(item))
            f.write("\n")
    print(f"\nResultados salvados en: {os.getcwd()}\{s}.txt")

def escoger_opciones(opcion: int) -> list:
    
    resultados = ""

    if opcion == 1: 
        imprimir_edos()
        seleccion = int(input("Escribe el número de Estado a buscar: "))
        clear_output()
        print(f"Buscando resultados con estado igual a: {ESTADOS[seleccion]}")
        time.sleep(3)
        print("*")
        resultados = state_equals(ESTADOS[seleccion])
        print(resultados)

    elif opcion == 2: 
        seleccion = input("Escribe el nombre del autor a buscar. Incluye acentos: ")
        print(f"Buscando resultados con autor igual a: {seleccion}")
        time.sleep(3)
        print("*")
        resultados = autor_is(seleccion)
        print(resultados)

    elif opcion == 3: 
        resultados = seleccionar_edad()
        time.sleep(3)
        print("*")
        print(resultados)

    elif opcion == 4:
        print("Escoge el genero a buscar, ingresando el número:\n1. Masculino\n2. Femenino")
        op = int(input(" "))
        seleccion = 'Masculino' if op == 1 else 'Femenino'
        clear_output()
        print(f"Buscando resultados con género igual a: {seleccion}")
        time.sleep(3)
        print("*")
        resultados = gender_is(seleccion)
        print(resultados)
        
    else: 
        print("Seleccion invalida.")

    return resultados


#######################################################

query_finished = False


while not query_finished:
    print_opciones()
    op = int(input("Escribe el número de la opción a escoger: "))
    time.sleep(3)
    clear_output()
    resultados_final = escoger_opciones(op)
    query_finished = True

if query_finished:
    salvar = int(input("Deseas guardar la búsqueda en un archivo txt? 1 - Sí. 2 - No."))
    if salvar == 1:
        salvar_resultados(resultados_final)

Buscando resultados con estado igual a: Aguascalientes
*
[{'Estado': 'Aguascalientes', 'Autor': 'Alejandra García ', 'Edad': '26', 'Genero': 'Femenino', 'VideoID': 'hWKy7Ns1pNo', 'Ruta': 'c:\\Users\\ruyca\\Desktop\\UNAM\\2024-1\\IIMAS-servicio\\codigos\\corpus_downloader\\audios\\hWKy7Ns1pNo.mp3', 'Caption_YT': 'c:\\Users\\ruyca\\Desktop\\UNAM\\2024-1\\IIMAS-servicio\\codigos\\corpus_downloader\\captions\\yt_captions\\hWKy7Ns1pNo.srt', 'Caption_Whisper': 'N/A'}, {'Estado': 'Aguascalientes', 'Autor': 'Alejandra García ', 'Edad': '26', 'Genero': 'Femenino', 'VideoID': 'bm2xz0_HrGc', 'Ruta': 'c:\\Users\\ruyca\\Desktop\\UNAM\\2024-1\\IIMAS-servicio\\codigos\\corpus_downloader\\audios\\bm2xz0_HrGc.mp3', 'Caption_YT': 'c:\\Users\\ruyca\\Desktop\\UNAM\\2024-1\\IIMAS-servicio\\codigos\\corpus_downloader\\captions\\yt_captions\\bm2xz0_HrGc.srt', 'Caption_Whisper': 'N/A'}, {'Estado': 'Aguascalientes', 'Autor': 'Alejandra García ', 'Edad': '26', 'Genero': 'Femenino', 'VideoID': 'JT3WoDrrl1A', 'Ru