# Download positive audios

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed

import pandas as pd
import yt_dlp
from pydub import AudioSegment

In [None]:
# List of positive class names (for reference)
positive_names = [
    "Emergency vehicle",
    "Ambulance (siren)",
    "Police car (siren)",
    "Fire engine, fire truck (siren)",
]

# Define positive labels (target classes)
positive_labels = ["/m/03j1ly", "/m/012n7d", "/m/04qvtq", "/m/012ndj"]
csv_path = "../../data/csv_files"

# Load the class label data and training dataset
class_labels = pd.read_csv(f"{csv_path}/class_labels_indices.csv")

b_train = pd.read_csv(
    f"{csv_path}/balanced_train_segments.csv", sep=", ", engine="python"
)

b_val = pd.read_csv(f"{csv_path}/eval_segments.csv", sep=", ", engine="python")

u_train = pd.read_csv(
    f"{csv_path}/unbalanced_train_segments.csv", sep=", ", engine="python"
)

datasets = [b_train, b_val, u_train]

OUTPUT_PATH = "../../data/audios/positive"  # Output directory for positive audio files


In [None]:
def process_row(dataset, i, row):
    for positive_label in positive_labels:
        row_labels = row["positive_labels"].replace('"', "").split(",")
        video_code = dataset["YTID"][i]

        if positive_label in row_labels:
            class_match = class_labels[class_labels["mid"] == positive_label][
                "display_name"
            ]
            class_name = class_match.values[0].replace(" ", "_")
            output_filename = f"0-{class_name}-{video_code}"
            url = f"https://www.youtube.com/watch?v={video_code}"

            ydl_opts = {
                "format": "bestaudio/best",
                "geo_bypass": True,
                "http_headers": {
                    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"
                },
                "outtmpl": f"{OUTPUT_PATH}/{output_filename}",
                "postprocessors": [
                    {
                        "key": "FFmpegExtractAudio",
                        "preferredcodec": "wav",
                        "preferredquality": "192",
                    }
                ],
                "postprocessor_args": ["-ar", "16000", "-ac", "1"],
                "quiet": True,  # Desactiva todos los mensajes excepto errores
                "no_warnings": True,  # Suprime las advertencias
            }

            try:
                # Calculate start and end times in milliseconds (pydub works with ms)
                start = dataset["start_seconds"][i] * 1000
                end = dataset["end_seconds"][i] * 1000

                # Download the audio using yt_dlp
                with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                    ydl.download([url])

                # Load the downloaded audio and crop it to the desired segment
                audio = AudioSegment.from_file(
                    f"{OUTPUT_PATH}/{output_filename}.wav", format="wav"
                )
                cropped_audio = audio[
                    start:end
                ]  # Crop the audio between the start and end times
                cropped_audio.export(
                    f"{OUTPUT_PATH}/{output_filename}.wav", format="wav"
                )  # Save the cropped audio

                print(f"Downloaded and processed: {output_filename}")
            except Exception as e:
                print(f"Error: {e}")
            finally:
                break


In [None]:
print("Downloading positive audios...")

with ThreadPoolExecutor(max_workers=8) as executor:
    futures = []

    for dataset in datasets:
        for i, row in dataset.iterrows():
            futures.append(
                executor.submit(
                    process_row,
                    dataset,
                    i,
                    row,
                )
            )

    for future in as_completed(futures):
        try:
            future.result()
        except Exception as e:
            print(f"Error during processing: {e}")

print("Download completed")
