In [None]:
# !pip install yt-dlp openai-whisper pydub

# Dependencies (systemwide)
#sudo apt install ffmpeg  # Linux
#brew install ffmpeg      # macOS

In [8]:
import yt_dlp
import re
import time
import os
import whisper
import json
from tqdm import tqdm

In [3]:
def scrape_channel_meta(channel_url, limits):
  
  # use this kind of address as channel_url: https://www.youtube.com/@creator/videos
  # limits is how many videos from the playlist ("videos" is also interpreted as playlist) you want to grab in descending order

  options = {
          'ignoreerrors': True,
          'playlistend': limits,
          'sleep_interval': 1,
          'max_sleep_interval': 3,
          'cookiesfrombrowser': ('chromium',),
      }

  with yt_dlp.YoutubeDL(options) as ydl:
    info = ydl.extract_info(channel_url, download=False)
    
    if not info:
        raise ValueError("Failed to retrieve channel metadata.")
  return info

In [9]:
def get_video_urls(channel_info):
    return [item["webpage_url"] for item in channel_info["entries"] if item is not None]

In [None]:
# import re
# import time
# import os

In [1]:
def process_single_video(video_url, start_count, data_dir="data", media_dir = "media"):
    
    """
    Downloads audio from a YouTube video and returns a structured metadata dictionary.

    Args:
        video_url (str): Full URL to the YouTube video.
        start_count (int): Index to label the entry (video) uniquely.

    Returns:
        tuple: (result_dict, updated_start_count)
    """

    options = {
        'format': 'bestaudio/best',
        'sleep_interval': 2,
        'max_sleep_interval': 5,
        'cookiesfrombrowser': ('chromium',),
    }

    # Try to extract metadata first
    with yt_dlp.YoutubeDL(options) as ydl:
        try:
            info = ydl.extract_info(video_url, download=False)
            errors = None
            if not isinstance(info, dict):
                raise ValueError("No metadata returned.")
        except Exception as e:
            errors = str(e)
            info = {
                "title": f"no_title_{int(time.time())}",
                "duration": 0,
                "uploader": "unknown",
                "view_count": 0,
                "upload_date": "unknown",
            }

    # Normalize file/folder names
    max_file = 200
    max_dir = 7

    title = info["title"].lower()
    normalized_title = re.sub(r'[\\/*?:"<>|!]', "", title)
    normalized_title = re.sub(r"[^\x00-\x7F]", "", normalized_title)
    filename_part = re.sub(r'\s+', "_", normalized_title.strip())[:max_file]

    uploader = info.get("uploader", "unknown").lower()
    directory = re.sub(r'[^\w\d]', "", uploader.strip())[:max_dir].ljust(max_dir, "x")

    subfolder_path = os.path.join(data_dir, media_dir, directory)

    if not os.path.exists(subfolder_path):
        os.makedirs(subfolder_path)

    output_file = os.path.join(subfolder_path, f"{start_count}_{directory}_{filename_part}.%(ext)s")
    options['outtmpl'] = output_file

    # Download actual audio
    try:
        with yt_dlp.YoutubeDL(options) as ydl:
            ydl.download([video_url])
    except Exception as e:
        errors = str(e)

    # Final metadata dictionary
    result = {
        "entry_no": start_count,
        "title": info["title"],
        "directory": directory,
        "filename": f"{start_count}_{directory}_{filename_part}.webm",
        "duration": info["duration"],
        "uploader": uploader,
        "view_count": info["view_count"],
        "upload_date": info["upload_date"],
        "url": video_url,
        "errors": errors,
    }

    return result, start_count + 1

In [11]:
def batch_process_videos(video_urls, start_count):
    
    """
    Downloads a batch of YouTube videos and returns their metadata.

    Args:
        video_urls (list of str): List of full YouTube video URLs.
        start_count (int): Starting index for naming and tracking.

    Returns:
        list: List of metadata dictionaries for each downloaded video.
    """
    
    downloaded_videos = []
    
    for url in video_urls:
        result, start_count = process_single_video(url, start_count)
        downloaded_videos.append(result)
    
    return downloaded_videos

In [None]:
def batch_transcribe(downloaded_videos_db, data_dir="data", media_dir = "media", model=None):
    
    """
    Transcribes audio files from the downloaded metadata database.

    Args:
        downloaded_videos_db (list): List of metadata dictionaries from video downloads (by default received as the output of "batch process videos").
        data_dir (str): Path to root data directory where audio files are stored.

    Returns:
        list: Updated list of metadata dictionaries, each with a "transcript" key.
    """

    if model is None:
        model = whisper.load_model("medium")

    transcript_dir = "transcripts"
    transcript_path = os.path.join(data_dir, transcript_dir)

    if not os.path.exists(transcript_path):
            os.makedirs(transcript_path)

    for item in downloaded_videos_db:
        directory = item["directory"]
        filename = item["filename"]
        file_path = os.path.join(data_dir, media_dir, directory, filename)
        uploader_transcript_dir = os.path.join(transcript_path, directory)
        
        if not os.path.exists(uploader_transcript_dir):
            os.makedirs(uploader_transcript_dir)
        
        base_filename, _ = os.path.splitext(filename)
        transcript_file = os.path.join(uploader_transcript_dir, base_filename)

        if not os.path.isfile(file_path):
            item["transcript"] = "File missing — could not transcribe."
            continue

        try:
            result = model.transcribe(
                file_path,
                language="en",
                task="transcribe",
                temperature=0.0,
                best_of=1,
                fp16=False,
                no_speech_threshold=0.6,
                condition_on_previous_text=False
            )
            item["transcript"] = result["text"]
            
            with open(f"{transcript_file}.txt", "w", encoding="utf-8") as file:
                file.write(result["text"])

        except Exception as e:
            item["transcript"] = f"Transcription error: {str(e)}"

    return downloaded_videos_db

In [13]:
def export_to_json(db, name_of_file):
    """
    Exports a list of dictionaries to a JSON file.

    Args:
        db (list): The data to export.
        name_of_file (str): File path or name (without .json extension).
    """
    try:
        if isinstance(db, list):
            json_file = f"{name_of_file}.json"
            with open(json_file, "w", encoding="utf-8") as file:
                json.dump(db, file, indent=4, ensure_ascii=False)
            print(f"Exported to {json_file}")
        else:
            print(f"Invalid DB type: {type(db)}. Expected a list.")
    except Exception as e:
        print(f"JSON export failed for {name_of_file}: {str(e)}")

In [None]:
channels = """
https://www.youtube.com/@creator_1/videos
https://www.youtube.com/@creator_2/videos
"""

channel_urls = channels.strip().split()
print(f"Total channels: {len(channel_urls)}")

In [None]:
full_db = []
start_count = 0
model = whisper.load_model("medium")
from tqdm import tqdm

for url in channel_urls:
    try:
        channel_info = scrape_channel_meta(url, 2)
        video_urls = get_video_urls(channel_info)

        print(f"\n{url} — Found {len(video_urls)} videos")

        downloaded = []
        for video_url in video_urls:
            result, start_count = process_single_video(video_url, start_count)
            downloaded.append(result)

        transcribed = batch_transcribe(downloaded, model=model)
        full_db.extend(transcribed)

    except Exception as e:
        print(f"Error processing {url}: {str(e)}")

databases_dir = "corpus"
data_dir = "data"
output_dir = os.path.join(data_dir, databases_dir)
os.makedirs(output_dir, exist_ok=True)
export_to_json(full_db, os.path.join(output_dir, "channels_full_db"))