# Dazbo's YouTube and Video Demos

## Overview

Examples of how to work with YouTube videos using Python. Here I'll demonstrate:

- How to [download videos and extract audio](#downloading-videos-and-extracting-audio)
- How to [transcribe audio to text using a speech-to-text API](#extracting-audio-using-python-speech-recognition)
- How to [extract existing transcripts and translate](#extract-existing-transcripts-from-videos)

**To run this notebook, first execute the cells in the [Setup](#Setup) section, as described below.** Then you can experiment with any of the subsequent cells.

A few useful notes:

- The source for this notebook source lives in my GitHub repo, <a href="https://github.com/derailed-dash/youtube-and-video" target="_blank">Youtube-and-Video</a>.
- Check out further guidance - including tips on how to run the notebook, in the project's `README.md`.
- For example, you could...
  - Run the notebook locally, in your own Jupyter environment.
  - Run the notebook in a cloud-based Jupyter environment, with no setup required on your part! For example, with **Google Colab**: <br><br><a href="https://colab.research.google.com/github/derailed-dash/youtube-and-video/blob/main/src/notebooks/youtube-demos.ipynb" target="_blank"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Google Colab"/></a><br><br>It looks like this:<br><br><img src="https://github.com/derailed-dash/youtube-and-video/blob/main/src/notebooks/static/images/collab-view.png?raw=1" width="640px"></img>
- For more ways to run Jupyter Notebooks, check out [my guide](https://medium.com/python-in-plain-english/five-ways-to-run-jupyter-labs-and-notebooks-23209f71e5c0).


## Setup

### Packages

First, let's install any dependent packages:

In [None]:
%pip install --upgrade --no-cache-dir python-dotenv dazbo-commons pytubefix moviepy yt_dlp

In [None]:
import IPython
from IPython.display import display
from IPython.core.display import Markdown

import logging
import re
import io
import sys
from pathlib import Path
from dataclasses import dataclass
import dazbo_commons as dc
from dotenv import load_dotenv

In [None]:
# Colab requires an older version of Ipykernel
if not "google.colab" in sys.modules:
    pass
    %pip install --upgrade --no-cache-dir ipykernel


### Logging

Now we'll setup logging. Here I'm using coloured logging from my [dazbo-commons](https://pypi.org/project/dazbo-commons/) package. Feel free to change the logging level.

In [None]:
# Setup logging
APP_NAME="dazbo-yt-demos"
logger = dc.retrieve_console_logger(APP_NAME)
logger.setLevel(logging.DEBUG)
logger.info("Logger initialised.")
logger.debug("DEBUG level logging enabled.")

### File Locations

Here we initialise some file path locations, e.g. an output folder.

In [None]:
locations = dc.get_locations(APP_NAME)
for attribute, value in vars(locations).items():
    logger.debug(f"{attribute}: {value}")

### Utility Functions

In [None]:
def clean_filename(filename):
    """ Create a clean filename by removing unallowed characters. """
    pattern = r'[^a-zA-Z0-9._\s-]'
    return  re.sub(pattern, '_', filename)

### Install Additional Packages You May Need

You can run the cell below, but it may not work on your OS.

So you might need to install packages manually, e.g.

<table>
  <col style="width:10%">
  <col style="width:45%">
  <col style="width:45%"> <!-- Adjust as needed or remove for auto-sizing -->
  <tr>
    <th>Package</th>
    <th>Purpose</th>
    <th>Install Command</th>
  </tr>
  <tr>
    <td><a href="https://ffmpeg.org/">ffmpeg</a></td>
    <td>A useful utility for video and audio format conversion. Many Python libraries use it. It will not generally be used by this notebook, but if you run into errors requiring ffmpeg, you will want to run this section.</td>
    <td>Linux: <code>sudo apt install ffmpeg</code><br>Windows: <code>winget install ffmpeg</code></td>
  </tr>
  <tr>
    <td><a href="https://xiph.org/flac/download.html">FLAC</a></td>
    <td>The Python <code>speech_recognition</code> library uses the FLAC utility to convert audio files into a format that can be processed for speech recognition.</td>
    <td>Linux: <code>sudo apt install flac</code><br>Windows: Download the latest</td>
  </tr>
</table>




In [None]:
import os
import platform
import subprocess

def run_command(command):
    """Run a shell command and print its output in real-time."""
    process = subprocess.Popen(
        command,
        shell=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )

    # Read and print the output line by line
    if process.stdout is not None:
        for line in iter(process.stdout.readline, b''):
            logger.info(line.decode().strip())
        process.stdout.close()

    process.wait()

def install_software(appname: str):
    os_name = platform.system()
    logger.info(f"Installing {appname} on {os_name}...")

    # Mapping operating systems to their respective installation commands
    command_map = {
        "Windows": f"winget install {appname} --silent --no-upgrade",
        "Linux": f"apt -qq -y install {appname}",
        "Darwin": f"brew install {appname}"
    }
    command = command_map.get(os_name)
    if command:
        run_command(command)
        logger.info(f"Done.")
    else:
        logger.error(f"Unsupported operating system: {os_name}")

def check_installed(app_exec: str) -> bool:
    appname, *arg = app_exec.split()
    arg = " ".join(arg)
    logger.debug(f"Checking if {appname} is installed")

    try:
        output = subprocess.check_output([appname, arg], stderr=subprocess.STDOUT)
        logger.debug(f"{appname} version: {output.decode().strip()}")
        logger.debug(f"{appname} is already installed.")
        return True
    except (subprocess.CalledProcessError, FileNotFoundError):
        logger.debug(f"{appname} is not installed or absent from path.")

    return False

apps = [ ("ffmpeg", "ffmpeg -version"),
         ("flac", "flac --version") ]

for app_install, app_exec in apps:
    if not check_installed(app_exec):
        install_software(app_install)


Now we'll check `ffmpeg` has been installed.

On Windows, this may not have been added to your path. If so, you can check your default install location using `winget --info`, and then add it to your path.

In [None]:
logger.info("Note that installed applications may not be immediately available after first installing.\n" \
            "It may be necessary to relaunch the notebook environment.")

!ffmpeg -version

### Videos to Work With

We start by defining a list of videos to test our application with, along with a function that takes a full YouTube URL and returns just the id portion.

I’ve used these videos because…

- The first is the fantastic [Burning Bridges](https://www.youtube.com/watch?v=udRAIF6MOm8) by Sigrid. The video has no embedded transcript.
- The second is the beautiful song [I Believe](https://www.youtube.com/watch?v=CiTn4j7gVvY) by Melissa Hollick. It’s one of my favourite songs of all time. When I get a migraine, I turn off the lights, and listen to this to feel better! And for those who enjoy gaming, this song is the end titles to the amazing Wolfenstein: New Order game. This video has an embedded transcript.
- Then we have a short [Jim Carey speech](https://www.youtube.com/watch?v=nLgHNu2N3JU), which gives us dialog without music or other ambient noise. It has an embedded transcript.
- And finally, a [Ukrainian song](https://www.youtube.com/watch?v=d4N82wPpdg8) from Eurovision 2024, by Jerry Heil and Alyona Alyona. This gives us an opportunity to test translation. It also has an embedded transcript.

In [None]:
# Videos to download
urls = [
    "https://www.youtube.com/watch?v=udRAIF6MOm8",  # Sigrid - Burning Bridges (English)
    "https://www.youtube.com/watch?v=CiTn4j7gVvY",  # Melissa Hollick - I Believe (English)
    "https://www.youtube.com/watch?v=nLgHNu2N3JU",  # Jim Carey - Motivational speech (English)
    "https://www.youtube.com/watch?v=d4N82wPpdg8",  # Jerry Heil & Alyona Alyona - Teresa & Maria (Ukrainian)
]

def get_video_id(url: str) -> str:
    """ Return the video ID, which is the part after 'v=' """
    return url.split("v=")[-1]

## Downloading Videos and Extracting Audio

Here I'll demonstrate a few different Python libraries for working with YouTube videos.

### Option 1 - With PyTubeFix

Here I'll use the [pytubefix](https://github.com/JuanBindez/pytubefix) library to download YouTube videos, and then to download mp3 audio-only streams as files.

This library is a community-maintained fork of `pytube`. It was created to provide quick fixes for issues that the official pytube library faced, particularly when YouTube's updates break `pytube`.

Pros:

- The library is very easy to use.
- We can work with video, audio, channels, playlists, and even search and filter.
- It is [well documented](https://pytubefix.readthedocs.io/en/latest/).
- It can be used from the command line, with its simple CLI.
- It is VERY FAST!

Cons:

- Does not offer some of the more sophisticated capabilities that are offered by `yt_dlp`.
- It does not appear to set mp3 headers correctly. The mp3s are actually encoded as mp4a. I don't think this is a problem, but it's worth bearing in mind!

In [None]:

from pytubefix import YouTube
from pytubefix.cli import on_progress

output_locn = f"{locations.output_dir}/pytubefix"

def process_yt_videos():
    for i, url in enumerate(urls):
        logger.info(f"Downloads progress: {i+1}/{len(urls)}")

        try:
            yt = YouTube(url, on_progress_callback=on_progress)
            logger.info(f"Getting: {yt.title}")
            video_stream = yt.streams.get_highest_resolution()
            if not video_stream:
                raise Exception("Stream not available.")

            # YouTube resource titles may contain special characters which
            # can't be used when saving the file. So we need to clean the filename.
            cleaned = clean_filename(yt.title)

            video_output = f"{output_locn}/{cleaned}.mp4"
            logger.info(f"Downloading video {cleaned}.mp4 ...")
            video_stream.download(output_path=output_locn, filename=f"{cleaned}.mp4")

            logger.info(f"Creating audio...")
            audio_stream = yt.streams.get_audio_only()
            audio_stream.download(output_path=output_locn, filename=cleaned, mp3=True)

            logger.info("Done")

        except Exception as e:
            logger.error(f"Error processing URL '{url}'.")
            logger.debug(f"The cause was: {e}")

    logger.info(f"Downloads finished. See files in {output_locn}.")

process_yt_videos()


### Option 2 - PyTubeFix and MoviePy

Here I'm doing the same as before, but I'm extracting the audio using the Python [MoviePy](https://github.com/Zulko/moviepy) library. This is a powerful video and audio editing library.

Pros:

- We can extract audio as mp3 with correct headers.
- It is [well documented](https://zulko.github.io/moviepy/).
- It is powerful.

Cons:

- It is slower to extract the audio than using `pytubefix` alone.

In [None]:

from pytubefix import YouTube
from pytubefix.cli import on_progress
from moviepy.editor import VideoFileClip

output_locn = f"{locations.output_dir}/pytubefix_with_moviepy"

def process_yt_videos():
    for i, url in enumerate(urls):
        logger.info(f"Downloads progress: {i+1}/{len(urls)}")

        try:
            yt = YouTube(url, on_progress_callback=on_progress)
            logger.info(f"Getting: {yt.title}")
            video_stream = yt.streams.get_highest_resolution()
            if not video_stream:
                raise Exception("Stream not available.")

            # YouTube resource titles may contain special characters which
            # can't be used when saving the file. So we need to clean the filename.
            cleaned = clean_filename(yt.title)

            video_output = f"{output_locn}/{cleaned}.mp4"
            logger.info(f"Downloading video {cleaned}.mp4 ...")
            video_stream.download(output_path=output_locn, filename=f"{cleaned}")

            logger.info(f"Creating audio...")
            video_clip = VideoFileClip(video_output) # purely to give us access to methods
            assert video_clip.audio is not None
            video_clip.audio.write_audiofile(f"{output_locn}/{cleaned}.mp3")
            video_clip.close()

            logger.info("Done")

        except Exception as e:
            logger.error(f"Error processing URL '{url}'.")
            logger.debug(f"The cause was: {e}")

    logger.info(f"Downloads finished. See files in {output_locn}.")

process_yt_videos()

### Option 3 - With YT_DLP

I wanted to try the other popular YouTube package: [yt-dlp](https://pypi.org/project/yt-dlp/). The [repo](https://github.com/yt-dlp/yt-dlp) is a fork of the now unmaintained `youtube-dl`.

Pros:

- It is very powerful, with far more options and features than `pytubefix`.
- It can be installed as a standalone command-line executable, or as a pip-installable Python package.
- Sets mp3 headers properly!
- It has some powerful and network proxy settings. This can be useful if, for example, you are trying to download videos that are geo-restricted.

Cons:

- It is more complicated to use.
- The documentation is complex and somewhat hard to understand. And there's no real Python-specific documentation.
- It depends on having ffmpeg installed for some use cases.
- It is significantly slower that `pytubefix` for performing video download and audio extraction.


In [None]:
import yt_dlp

output_locn = f"{locations.output_dir}/yt_dlp"

def process_yt_videos():
    for i, url in enumerate(urls):
        logger.info(f"Downloads progress: {i+1}/{len(urls)}")

        try:
            # Options for downloading the video
            video_opts = {
                'format': 'best',  # Download the best quality video
                'outtmpl': f'{output_locn}/%(title)s.%(ext)s',  # Save video in output directory
            }

            # Download the video
            with yt_dlp.YoutubeDL(video_opts) as ydl:
                logger.info("Downloading video...")
                ydl.download([url])

            # Options for extracting audio and saving as MP3
            audio_opts = {
                'format': 'bestaudio',  # Download the best quality audio
                'outtmpl': f'{output_locn}/%(title)s.%(ext)s',  # Save audio in output directory
                'postprocessors': [{
                    'key': 'FFmpegExtractAudio',
                    'preferredcodec': 'mp3',
                }],
            }

            # Download and extract audio
            with yt_dlp.YoutubeDL(audio_opts) as ydl:
                logger.info("Extracting and saving audio as MP3...")
                ydl.download([url])

        except Exception as e:
            logger.error(f"Error processing URL '{url}'.")
            logger.debug(f"The cause was: {e}")

    logger.info(f"Downloads finished. Check out files at {output_locn}.")

process_yt_videos()

### Conclusion

If you:

- Want to just download the videos and/or audio in the simplest and fastest way possible, then go with [Option 1](#option-1---with-pytubefix).
- Want to download the videos and/or audio and then carry out some sort of manipulation or conversion of the media, go with [Option 2](#option-2---pytubefix-and-moviepy).
- If you want out-of-the-box proxy configuration, e.g. to bypass geo-restrictions, then go with [Option 3](#option-3---with-yt_dlp).

## Transcribing Audio to Text

### Extracting Audio Using Python Speech Recognition

The Python `speech_recognition` package has a number of built in `Recognizer` implementations. Here I'm using the [Google Web Speech API](https://wicg.github.io/speech-api/) `Recognizer`, which has its default API key hard coded into the Python `speech_recognition` library. It is free, but has some limitations. For example, it only allows a max of 60s segments.

In [None]:
%pip install --upgrade --no-cache-dir pydub SpeechRecognition ffmpeg-python

In [None]:
import speech_recognition as sr
from pydub import AudioSegment
import ffmpeg

In [None]:
def divide_chunks(sound, segment_size_secs=60):
    """ Split audio file into 60s chunks """

    segment_size_ms = segment_size_secs*1000
    for start_idx in range(0, len(sound), segment_size_ms):
        # Yield a chunk of audio data from start_idx to start_idx + segment_size_ms
        yield sound[start_idx:start_idx + segment_size_ms]

def transcribe_audio():
    """ Use Speech Recognition API with Google Web Speech API
    to convert audio dialog to text """
    recogniser = sr.Recognizer()
    for mp3_file in Path(output_locn).glob(f'*.mp3'):
        transcribe_audio_file(recogniser, mp3_file)

def transcribe_audio_file(recogniser, mp3_file, language="en-US"):
    logger.info(f"Converting {mp3_file}...")
    try:
        audio = AudioSegment.from_file(mp3_file)
        # If AudioSegment is not working - e.g. due to broken mp3 headers - we
        # can use ffmpeg as a workaround. However, it's a lot slower.
        # ffmpeg.input(mp3_file).output(wav_file).run() # Convert with ffmpeg
        # logger.info(f"Successfully converted {mp3_file} to {wav_file}.")
        # audio = AudioSegment.from_wav(wav_file) # Read the audio

        segments = list(divide_chunks(audio, segment_size_secs=60)) # split the wav into 60s segments
        transcription_extracts = {}
        for index, chunk in enumerate(segments):
            with io.BytesIO() as wav_io:
                chunk.export(wav_io, format='wav')
                wav_io.seek(0)  # Move to the start of the BytesIO object before reading from it

                with sr.AudioFile(wav_io) as source:
                    audio_data = recogniser.record(source)

                try:
                    extracted = recogniser.recognize_google(audio_data, language=language)
                    logger.debug(f"Chunk {index} extracted.")
                    transcription_extracts[index] = extracted
                except sr.UnknownValueError:
                        # Log the unknown value error and continue
                    logger.warning(f"Chunk {index}: Could not understand the audio. Maybe it was empty.")

        logger.info("Extract:")
        for idx, extract in transcription_extracts.items():
            logger.info(f"{idx}: {extract}")

    except ffmpeg.Error as e:
        logger.error(f"FFmpeg failed to convert {mp3_file}: {str(e)}")
    except Exception as e:
        logger.error("Unexpected error.", exc_info=True)

transcribe_audio()
logger.info("Done")

### Results

It's a bit flakey!  Sometimes it runs, but sometimes the API returns errors and fails to run.

When the API does run...

- It fails to transcribe the Ukrainian song. Not too surprising, since this API does not detect language automatically, and defaults to recognising English.
- It does an amazing job with the Jim Carey speech.
- It is partially successful when transcribing songs.

### Conclusions

It's not great!  It's pretty good if there's no background sound or ambient noise.  But it's pretty poor when working with songs. And it seems unreliable.

### Transcribing Ukrainian

Let's try and transcribe from the Ukrainian song:

In [None]:
def transcribe_ua():
    recogniser = sr.Recognizer()
    for mp3_file in Path(output_locn).glob(f'alyona*.mp3'):
        transcribe_audio_file(recogniser, mp3_file, language="uk-UA")

transcribe_ua()

### Results

Partial success.  But overall... Not great!

## Extract Existing Transcripts from Videos

Now I'm going to use the [youtube-transcript-api](https://github.com/jdepoix/youtube-transcript-api) to extract existing transcripts from YouTube videos. Not only will it return the transcript, but it can also be used to translate those to translate those transcripts into other languages.  So now I can download my Ukrainian song, and see both the Ukrainian transcript and the English translation. This is pretty awesome!

However, some videos do not contain transcripts.

In [None]:
%pip install --upgrade --no-cache-dir youtube_transcript_api

In [None]:
import youtube_transcript_api as yt_api
from pytubefix import YouTube
from pytubefix.cli import on_progress

def get_transcripts():
    """ Extract existing transcript data from videos """
    for url in urls:
        try: # Just so we can get the video title
            yt = YouTube(url, on_progress_callback=on_progress)
        except Exception as e:
            logger.error(f"Error processing URL '{url}'.")
            logger.debug(f"The cause was: {e}")
            continue

        logger.info(f"Processing '{yt.title}'...")
        video_id = get_video_id(url)

        try:
            # By default, we get a list of 1: only get the preferred language transcript
            transcript_list = yt_api.YouTubeTranscriptApi.list_transcripts(video_id)
        except Exception as e:
            logger.error(f"Unable to extract transcript for '{yt.title}'.")
            logger.debug(e)
            continue

        # iterate over all available transcripts
        for transcript in transcript_list:
            # The Transcript object provides metadata properties. Here are some...
            properties = {
                "video_id": transcript.video_id,
                "language": transcript.language,
                "language_code": transcript.language_code,
                "is_generated": transcript.is_generated,  # Whether it has been manually created or generated by YouTube
                "is_translatable": transcript.is_translatable,  # Whether this transcript can be translated or not
                "translation_languages": transcript.translation_languages,
            }

            for prop, value in properties.items():
                logger.info(f"{prop}: {value}")

            # Fetch the actual transcript data
            transcript_data = transcript.fetch() # returns a list of dicts
            logger.info(f"Raw transcript:\n{transcript_data}")

            processed_transcript = process_transcript(transcript_data)
            logger.info(f"Processed transcript:\n{processed_transcript}")

            # Translate to en if we can
            if (transcript.language_code != "en" and
                    transcript.is_translatable and
                    any(lang['language_code'] == 'en' for lang in transcript.translation_languages)):
                transcript_data = transcript.translate('en').fetch() # translate to en
                processed_transcript = process_transcript(transcript_data)
                logger.info(f"Processed translated transcript:\n{processed_transcript}")

def process_transcript(transcript_data):
    """ Get all entries that are of type 'text' and NOT starting with [ """
    return "\n".join([entry['text'] for entry in transcript_data
                                     if entry['text'][0] != "["])

get_transcripts()

How cool is this!?

## Adding Google Cloud Smarts

Let's integrate some Google Cloud Vertex AI smarts. Start by installing the **Google Cloud Vertex AI SDK for Python**.

From [Introduction to the Vertex AI SDK for Python](https://cloud.google.com/vertex-ai/docs/python-sdk/use-vertex-ai-python-sdk#sdk-vs-client-library):

When you install the Vertex AI SDK for Python (`google.cloud.aiplatform`), the Vertex AI Python client library (`google.cloud.aiplatform.gapic`) is also installed. The Vertex AI SDK and the Vertex AI Python client library provide similar functionality with different levels of granularity. The Vertex AI SDK operates at a higher level of abstraction than the client library and is suitable for most common data science workflows. If you need lower-level functionality, then use the Vertex AI Python client library.

In [None]:
# Install Vertex AI SDK for Python and Vertex Generative AI SDK for Python
%pip install --upgrade google-cloud-aiplatform google-generativeai

In [None]:
from google.cloud import aiplatform # Google Cloud Vertex AI SDK for Python
# import vertexai   # Google Cloud Vertex Generative AI SDK for Python
import google.generativeai as genai  # Google Gemini API (GenAI)
from vertexai.generative_models import GenerativeModel

import sys
from getpass import getpass

# If we're running Google Colab, authenticate
if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()

# Retrieve PROJECT_ID and other variables from any .env we can find
try:
    dc.get_envs_from_file()
except ValueError as e:
    logger.error(f"Problem reading env file:\n{e}")

env_vars = ["PROJECT_ID", "REGION"] # The vars we want to retrieve
for env_var in env_vars:
    if not os.getenv(env_var):
        gcp_project_id = '' # @param {type: "string"}
        # If not retrieved from .env we'll need to input the value
        os.environ[env_var] = getpass(f"Enter {env_var}: ")

    # Set Python variable of the same name as the env var, e.g. PROJECT_ID
    globals()[env_var] = os.environ[env_var]
    val = globals()[env_var]
    if env_var == "PROJECT_ID":
        logger.info(f"{env_var} retrieved: {val[-3:]}")
    else:
        logger.info(f"{env_var} retrieved: {val}")

Only run the next cell if you want to manually clear the environment variables and then input new values. In this scenario, you'll also want to comment out any variables in your .env file.

In [None]:
# Optionally run this if we want to clear env vars
for env_var in env_vars:
    if env_var in os.environ:
        del os.environ[env_var]
        logger.info(f"Cleared environment variable: {env_var}")