In [None]:
YTDLP_DOWNLOAD_ARCHIVE: Optional[Path] = AUDIO_CACHE_DIR.parent / "youtube_download_archive.txt"
YTDLP_COOKIES_FROM_BROWSER: Optional[str] = "chrome"
YTDLP_COOKIES_FILE: Optional[Path] = None
YTDLP_SLEEP_INTERVAL: Optional[float] = 5.0
YTDLP_MAX_SLEEP_INTERVAL: Optional[float] = 10.0
YTDLP_MAX_RETRIES: int = 10
YTDLP_FRAGMENT_RETRIES: int = 10
YTDLP_MAX_WORKERS: int = 4

In [None]:
@dataclass
class MusicCapsSample:
    ytid: str
    start_s: float
    text: str
    audio_path: Path

In [None]:
def load_musiccaps_metadata(sample_limit: Optional[int] = None) -> List[Dict[str, str]]:
    dataset = load_dataset(MUSICCAPS_DATASET, split=MUSICCAPS_SPLIT)
    if sample_limit is None or sample_limit >= len(dataset):
        return list(dataset)
    return [dataset[i] for i in range(sample_limit)]

def download_audio(ytid: str, start_s: float) -> Optional[Path]:
    AUDIO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
    clip_path = AUDIO_CACHE_DIR / f"{ytid}_{int(start_s)}.wav"
    if clip_path.exists():
        return clip_path
    temp_template = str(AUDIO_CACHE_DIR / f"{ytid}.%(ext)s")
    ydl_opts = {
        "outtmpl": temp_template,
        "format": "bestaudio/best",
        "quiet": True,
        "no_warnings": True,
        "ignoreerrors": False,
        "retries": YTDLP_MAX_RETRIES,
        "fragment_retries": YTDLP_FRAGMENT_RETRIES,
        "retry_sleep_functions": {"http": lambda n: 2 ** (n - 1)},  # экспоненциальное ожидание
    }
    if YTDLP_SLEEP_INTERVAL is not None:
        ydl_opts["sleep_interval"] = YTDLP_SLEEP_INTERVAL
        if YTDLP_MAX_SLEEP_INTERVAL is not None:
            ydl_opts["max_sleep_interval"] = max(YTDLP_SLEEP_INTERVAL, YTDLP_MAX_SLEEP_INTERVAL)
    if YTDLP_DOWNLOAD_ARCHIVE is not None:
        YTDLP_DOWNLOAD_ARCHIVE.parent.mkdir(parents=True, exist_ok=True)
        ydl_opts["download_archive"] = str(YTDLP_DOWNLOAD_ARCHIVE)
    if YTDLP_COOKIES_FROM_BROWSER:
        if browser_cookie3 is None:
            raise RuntimeError(
                "browser-cookie3 is required for YTDLP_COOKIES_FROM_BROWSER; install it via "
                "'python -m pip install browser-cookie3' or disable browser cookies."
            )
        ydl_opts["cookiesfrombrowser"] = (YTDLP_COOKIES_FROM_BROWSER, None, None, None)
    elif YTDLP_COOKIES_FILE:
        YTDLP_COOKIES_FILE.parent.mkdir(parents=True, exist_ok=True)
        ydl_opts["cookiefile"] = str(YTDLP_COOKIES_FILE)
    url = f"https://www.youtube.com/watch?v={ytid}"
    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            result = ydl.extract_info(url, download=True)
            downloaded_path = Path(ydl.prepare_filename(result))
    except DownloadError as exc:
        print(f"[yt-dlp] skip {ytid}: {exc}")
        return None
    except Exception as exc:  # pragma: no cover - resilience for unexpected extractor errors
        print(f"[yt-dlp] unexpected error for {ytid}: {exc}")
        return None
    try:
        subprocess.run(
            [
                "ffmpeg",
                "-hide_banner",
                "-loglevel",
                "error",
                "-y",
                "-ss",
                str(start_s),
                "-t",
                str(CLIP_SECONDS),
                "-i",
                str(downloaded_path),
                "-ar",
                str(AUDIO_SAMPLING_RATE),
                "-ac",
                "1",
                str(clip_path),
            ],
            check=True,
        )
    except subprocess.CalledProcessError as exc:
        print(f"[ffmpeg] failed to trim {ytid}: {exc}")
        return None
    finally:
        if 'downloaded_path' in locals() and downloaded_path.exists():
            downloaded_path.unlink(missing_ok=True)
    return clip_path


In [None]:
def prepare_musiccaps_samples(sample_limit: Optional[int] = None) -> List[MusicCapsSample]:
    metadata = load_musiccaps_metadata(sample_limit)
    ordered_rows: List[Dict[str, str]] = list(metadata)
    order_index = {
        (row["ytid"], float(row["start_s"])): idx for idx, row in enumerate(ordered_rows)
    }

    existing_samples: List[MusicCapsSample] = []
    for row in ordered_rows:
        clip_path = AUDIO_CACHE_DIR / f"{row['ytid']}_{int(row['start_s'])}.wav"
        if clip_path.exists():
            existing_samples.append(
                MusicCapsSample(
                    ytid=row["ytid"],
                    start_s=row["start_s"],
                    text=row["caption"],
                    audio_path=clip_path,
                )
            )
        else:
            rows_to_fetch.append(row)

    def fetch(row: Dict[str, str]) -> Optional[MusicCapsSample]:
        path = download_audio(row["ytid"], row["start_s"])
        if path is None:
            return None
        return MusicCapsSample(
            ytid=row["ytid"],
            start_s=row["start_s"],
            text=row["caption"],
            audio_path=path,
        )

    fetched_samples: List[MusicCapsSample] = []
    if rows_to_fetch:
        worker_count = max(1, YTDLP_MAX_WORKERS)
        with ThreadPoolExecutor(max_workers=worker_count) as executor:
            future_map = {executor.submit(fetch, row): row for row in rows_to_fetch}
            for future in as_completed(future_map):
                result = future.result()
                if result is not None:
                    fetched_samples.append(result)

    all_samples = existing_samples + fetched_samples
    all_samples.sort(key=lambda sample: order_index[(sample.ytid, float(sample.start_s))])
    return all_samples