# Import Dependencies

In [1]:
# Import Main Dependencies
import os, re, ffmpeg, whisper
from pytubefix import YouTube, Stream
from pytubefix.cli import on_progress
from pytubefix.innertube import _default_clients

# Import Other Dependencies
import torch
from tqdm.auto import tqdm

# Define Utilities

In [2]:
def sanitize_filename(filename: str) -> str:
    # Escape Double Quotes
    filename = filename.replace('"', '\\"')

    # Replace Invalid Characters with "_"
    invalid_chars = re.compile(r'[<>:"/\\|?*]')
    sanitized_filename = invalid_chars.sub("_", filename)

    return sanitized_filename
    
def read_unique_items_from_file(file: str) -> list:
    with open(file, "r") as f:
        return list(set(url.strip() for url in f.readlines() if url.strip()))

# Set Configurations

In [3]:
# File Names
yt_video_links_filename = "YouTube Video Links.txt"
transcript_sentences_filename = "transcript_sentences.csv"

# Folder Names
video_output_path = "Video"
audio_output_path = "Audio"
transcription_output_path = "Transcription"

# Boolean Flags
remove_video = True
remove_audio = True

# Additional Dependency Configurations
_default_clients["ANDROID_MUSIC"] = _default_clients["ANDROID_CREATOR"]

# Collect Data (YouTube Videos)

In [4]:
def download_youtube_video(video_filename: str, stream: Stream) -> tuple[str, str]:
    # Create Video Directory
    os.makedirs(video_output_path, exist_ok=True)
    
    # Set Path for Video File
    video_file = os.path.join(video_output_path, video_filename)
    
    # Delete Old Existing Video File (note: to clean any corrupted file)
    if os.path.exists(video_file):
        os.remove(video_file)
        
    # Download Video File
    print("") # Just New Line for Better Output
    print(f'Downloading (Video): {video_filename}')
    print("") # Just New Line for Better Output
    stream.download(output_path=video_output_path, filename=video_filename)
    print("") # Just New Line for Better Output
    print("") # Just New Line for Better Output
    
    # Return Video File and Name
    return video_file, video_filename

# Audio Extraction (Video to Audio)

In [5]:
def extract_audio_from_video(video_file: str, video_filename: str) -> tuple[str, str]:
    # Create the Audio Directory
    os.makedirs(audio_output_path, exist_ok=True)

    # Set Audio File Name ("[YouTube Video ID] [title].mp3")
    audio_filename = f'{os.path.splitext(video_filename)[0]}.mp3'

    # Set Path for Audio File
    audio_file = os.path.join(audio_output_path, audio_filename)
    
    # Delete Old Existing Audio File (note: to clean any corrupted file)
    if os.path.exists(audio_file):
        os.remove(audio_file)
    
    # Extract Audio File
    print(f'Extracting (Audio): {audio_filename}')
    print("") # Just New Line for Better Output
    (
        ffmpeg
        .input(video_file)
        .output(audio_file, format="mp3", acodec="libmp3lame", loglevel="info")
        .run(overwrite_output=True)
    )
    
    # Return Audio File and Name
    return audio_file, audio_filename

# Transcription (Audio to Text)

In [6]:
def transcribe_audio_to_text(audio_file: str, audio_filename: str):
    # Create the Transcription Directory
    os.makedirs(transcription_output_path, exist_ok=True)
    
    # Set Transcription File Name ("[YouTube Video ID] [title].txt")
    transcription_filename = f'{os.path.splitext(audio_filename)[0]}.txt'
    
    # Set Path for Transcription File
    transcription_file = os.path.join(transcription_output_path, transcription_filename)
            
    # Get/Download OpenAI Whisper Model
    """ 
    Models: 
        tiny, base, small, medium, large, turbo
    English-Only:
        tiny.en, base.en, small.en, medium.en
    
    Required VRAM:              Speed:
        1) 1GB - tiny, base         1) 10x - tiny
        2) 2GB - small              2) 8x - turbo
        3) 5GB - medium             3) 7x - base
        4) 6GB - turbo              4) 4x - small
        5) 10GB - large             5) 2x - medium
                                    6) 1x - large
    
    Quote from OpenAI: 
        - The .en models for English-only applications tend to perform better, especially for the tiny.en and base.en models.
        We observed that the difference becomes less significant for the small.en and medium.en models.
    
    Note: 4GB lang VRAM ko kaya small.en ginamit
    """  
    print(f'Transcribing (Text): {transcription_filename}')
    print("") # Just New Line for Better Output
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 
    model = whisper.load_model("small.en", device=device)
    
    # Transcribe Audio File (Saves Whole Text in Memory Before Disk to Avoid Corruption)
    result = model.transcribe(audio_file, fp16=False, verbose=False)
    try:
        with open(transcription_file, "w", encoding="utf-8") as f:
            f.write(result["text"])
    except:
        if os.path.exists(transcription_file):
            os.remove(transcription_file)

# Execute Data Gathering

In [None]:
yt_urls = read_unique_items_from_file(yt_video_links_filename)

with tqdm(total=len(yt_urls), desc="Getting YouTube URLs") as pbar:
    for index, url in enumerate(yt_urls):        
        try:
            current = f'{index+1}/{len(yt_urls)}'
    
            # Get Video Information
            yt = YouTube(url, on_progress_callback=on_progress)
            stream = yt.streams.get_audio_only()
            # Sanitize Video File Name and Add YouTube Video ID
            video_filename = f'[{yt.video_id}] {sanitize_filename(stream.default_filename)}'
            
            # Get File Name Without Extension (e.g., ".mp4")
            filename = os.path.splitext(video_filename)[0]
            
            # Skip If Transcription Already Exists
            transcription_exists = False
            pbar.set_description(f'Checking Existing Transcriptions (Video {current})')
            if os.path.exists(transcription_output_path):
                transcription_filename = f'{filename}.txt'
                for existing_transcription_filename in os.listdir(transcription_output_path):
                    if existing_transcription_filename == transcription_filename:
                        existing_transcription_path = os.path.join(transcription_output_path, existing_transcription_filename)
                        if os.path.exists(existing_transcription_path):
                            transcription_exists = True
            if transcription_exists:
                # Delete/Keep Video File
                if remove_video:
                    video_file = os.path.join(video_output_path, video_filename)
                    if os.path.exists(video_file):
                        os.remove(video_file)
                        
                # Delete/Keep Audio File
                if remove_audio:
                    audio_filename = f'{filename}.mp3'
                    audio_file = os.path.join(audio_output_path, audio_filename)
                    if os.path.exists(audio_file):
                        os.remove(audio_file)
                        
                pbar.update(1)
                continue
    
            # Log YouTube URL being Processed
            print("") # Just New Line for Better Output
            print(f'Found YouTube Video (URL): {url}')
            
            # Download YouTube Video
            pbar.set_description(f'Downloading (Video {current}) ')
            video_file, video_filename = download_youtube_video(video_filename, stream)
            
            # Extract Audio from Video -> Delete/Keep Video File
            pbar.set_description(f'Extracting (Audio {current})')
            audio_file, audio_filename = extract_audio_from_video(video_file, video_filename)
            if remove_video: os.remove(video_file)
            
            # Transcribe Audio to Text -> Delete/Keep Audio File
            pbar.set_description(f'Transcribing (Text {current})')
            transcribe_audio_to_text(audio_file, audio_filename)
            if remove_audio: os.remove(audio_file)
                
            pbar.update(1)
        except Exception as e: 
            print(e)
            pbar.update(1)
    pbar.set_description("Finished Data Gathering")

Getting YouTube URLs:   0%|          | 0/269 [00:00<?, ?it/s]


Found YouTube Video (URL): https://www.youtube.com/watch?v=P46C21DTz6A

Downloading (Video): [P46C21DTz6A] Kamala Harris Touts Popular Progressive Policy During Pennsylvania Stop.mp4

 ↳ |█████████████████████████████████████████| 100.0%

Extracting (Audio): [P46C21DTz6A] Kamala Harris Touts Popular Progressive Policy During Pennsylvania Stop.mp3

Transcribing (Text): [P46C21DTz6A] Kamala Harris Touts Popular Progressive Policy During Pennsylvania Stop.txt



  checkpoint = torch.load(fp, map_location=device)

  0%|                                        | 0/48032 [00:00<?, ?frames/s][A
  6%|█▋                          | 2882/48032 [00:08<02:18, 325.63frames/s][A
 11%|███▏                        | 5514/48032 [00:14<01:52, 376.90frames/s][A
 18%|████▉                       | 8434/48032 [00:21<01:35, 414.73frames/s][A
 23%|██████▎                    | 11270/48032 [00:29<01:38, 374.60frames/s][A
 29%|███████▉                   | 14166/48032 [00:37<01:30, 372.31frames/s][A
 36%|█████████▌                 | 17118/48032 [00:45<01:22, 376.64frames/s][A
 42%|███████████▏               | 19990/48032 [00:51<01:07, 413.25frames/s][A
 47%|████████████▋              | 22498/48032 [00:58<01:06, 386.88frames/s][A