# Dazbo's YouTube and Video Demos - with Google Gemini

## Overview

Welcome to notebook #2 in this tutorial guide. This notebook follows on from [YouTube and Video Demos #1](youtube-demos.ipynb). In the previous notebook I demonstrated:

- Multiple methods for downloading videos and extracting audio
- How to transcribe audio to text using a free speech-to-text API
- How to extract existing transcripts and translate to different languages

In this part we'll strip out parts of the first notebook we don't need, and add some smarts using Google technology.

## How to Launch and Run this Notebook

- The source for this notebook source lives in my GitHub repo, <a href="https://github.com/derailed-dash/youtube-and-video" target="_blank">Youtube-and-Video</a>.
- Check out further guidance - including tips on how to run the notebook - in the project's `README.md`.
- For example, you could...
  - Run the notebook locally, in your own Jupyter environment.
  - Run the notebook in a cloud-based Jupyter environment, with no setup required on your part! For example, with **Google Colab**: <br><br><a href="https://colab.research.google.com/github/derailed-dash/youtube-and-video/blob/main/src/notebooks/youtube-demos-with-google-gemini.ipynb" target="_blank"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Google Colab"/></a><br><br>It looks like this:<br><br><img src="static/images/collab-view.png" width="640px"></img>
- For more ways to run Jupyter Notebooks, check out [my guide](https://medium.com/python-in-plain-english/five-ways-to-run-jupyter-labs-and-notebooks-23209f71e5c0).

**When running this notebook, first execute the cells in the [Setup](#Setup) section, as described below.** Then you can experiment with any of the subsequent cells.


## Setup

### Packages

First, let's install any dependent packages:

In [None]:
%pip install --upgrade --no-cache-dir python-dotenv \
                                      dazbo-commons \
                                      pytubefix

In [None]:
import IPython
from IPython.display import display, Markdown

import io
import logging
import os
import re
import sys
from pathlib import Path
from dataclasses import dataclass
from dotenv import load_dotenv
import dazbo_commons as dc


In [None]:
import dazbo_commons as dc
# Colab requires an older version of Ipykernel
if not "google.colab" in sys.modules:
    pass
    %pip install --upgrade --no-cache-dir ipykernel
    

### Logging

Now we'll setup logging. Here I'm using coloured logging from my [dazbo-commons](https://pypi.org/project/dazbo-commons/) package. Feel free to change the logging level.

In [None]:
# Setup logging
APP_NAME="dazbo-yt-demos"
logger = dc.retrieve_console_logger(APP_NAME)
logger.setLevel(logging.DEBUG)
logger.info("Logger initialised.")
logger.debug("DEBUG level logging enabled.")

### File Locations

Here we initialise some file path locations, e.g. an output folder.

In [None]:
locations = dc.get_locations(APP_NAME)
for attribute, value in vars(locations).items():
    logger.debug(f"{attribute}: {value}")

### Utility Functions

In [None]:
def clean_filename(filename):
    """ Create a clean filename by removing unallowed characters. """
    pattern = r'[^a-zA-Z0-9._\s-]'
    return  re.sub(pattern, '_', filename)

### Videos to Work With

We start by defining a list of videos to test our application with, along with a function that takes a full YouTube URL and returns just the id portion.

I’ve used these videos because…

- The first is the fantastic [Burning Bridges](https://www.youtube.com/watch?v=udRAIF6MOm8) by Sigrid. The video has no embedded transcript.
- The second is the beautiful song [I Believe](https://www.youtube.com/watch?v=CiTn4j7gVvY) by Melissa Hollick. It’s one of my favourite songs of all time. When I get a migraine, I turn off the lights, and listen to this to feel better! And for those who enjoy gaming, this song is the end titles to the amazing Wolfenstein: New Order game. This video has an embedded transcript.
- Then we have a short [Jim Carey speech](https://www.youtube.com/watch?v=nLgHNu2N3JU), which gives us dialog without music or other ambient noise. It has an embedded transcript.
- And finally, a [Ukrainian song](https://www.youtube.com/watch?v=d4N82wPpdg8) from Eurovision 2024, by Jerry Heil and Alyona Alyona. This gives us an opportunity to test translation. It also has an embedded transcript.

In [None]:
# Videos to download
urls = [
    "https://www.youtube.com/watch?v=udRAIF6MOm8",  # Sigrid - Burning Bridges (English)
    "https://www.youtube.com/watch?v=CiTn4j7gVvY",  # Melissa Hollick - I Believe (English)
    "https://www.youtube.com/watch?v=nLgHNu2N3JU",  # Jim Carey - Motivational speech (English)
    "https://www.youtube.com/watch?v=d4N82wPpdg8",  # Jerry Heil & Alyona Alyona - Teresa & Maria (Ukrainian)
]

def get_video_id(url: str) -> str:
    """ Return the video ID, which is the part after 'v=' """
    return url.split("v=")[-1]

output_locn = f"{locations.output_dir}/pytubefix"

## Downloading Videos and Extracting Audio

Let's use the [pytubefix](https://github.com/JuanBindez/pytubefix) library to download YouTube videos, and then to download mp3 audio-only streams as files.

This library is a community-maintained fork of `pytube`. It was created to provide quick fixes for issues that the official pytube library faced, particularly when YouTube's updates break `pytube`.

In [None]:

from pytubefix import YouTube
from pytubefix.cli import on_progress

def process_yt_videos():
    for i, url in enumerate(urls):
        logger.info(f"Downloads progress: {i+1}/{len(urls)}")

        try:
            yt = YouTube(url, on_progress_callback=on_progress)
            logger.info(f"Getting: {yt.title}")
            video_stream = yt.streams.get_highest_resolution()
            if not video_stream:
                raise Exception("Stream not available.")
            
            # YouTube resource titles may contain special characters which 
            # can't be used when saving the file. So we need to clean the filename.
            cleaned = clean_filename(yt.title)
            
            video_output = f"{output_locn}/{cleaned}.mp4"
            logger.info(f"Downloading video {cleaned}.mp4 ...")
            video_stream.download(output_path=output_locn, filename=f"{cleaned}")
        
            logger.info(f"Creating audio...")
            audio_stream = yt.streams.get_audio_only()
            audio_stream.download(output_path=output_locn, filename=cleaned, mp3=True)
            
            logger.info("Done")
            
        except Exception as e:        
            logger.error(f"Error processing URL '{url}'.")
            logger.debug(f"The cause was: {e}") 
            
    logger.info(f"Downloads finished. See files in {output_locn}.")
    
process_yt_videos()


## Extract Existing Transcripts from Videos

Now I'm going to use the [youtube-transcript-api](https://github.com/jdepoix/youtube-transcript-api) to extract existing transcripts from YouTube videos. Not only will it return the transcript, but it can also be used to translate those to translate those transcripts into other languages.  So now I can download my Ukrainian song, and see both the Ukrainian transcript and the English translation. This is pretty awesome!

However, some videos do not contain transcripts.

In [None]:
%pip install --upgrade --no-cache-dir youtube_transcript_api

In [None]:
import youtube_transcript_api as yt_api
from pytubefix import YouTube
from pytubefix.cli import on_progress

def get_transcripts():
    """ Extract existing transcript data from videos """
    for url in urls:
        try: # Just so we can get the video title
            yt = YouTube(url, on_progress_callback=on_progress)
        except Exception as e:        
            logger.error(f"Error processing URL '{url}'.")
            logger.debug(f"The cause was: {e}") 
            continue
        
        logger.info(f"Processing '{yt.title}'...")
        video_id = get_video_id(url)
        
        try:
            # By default, we get a list of 1: only get the preferred language transcript
            transcript_list = yt_api.YouTubeTranscriptApi.list_transcripts(video_id)
        except Exception as e:
            logger.error(f"Unable to extract transcript for '{yt.title}'.")
            logger.debug(e)
            continue
        
        # iterate over all available transcripts
        for transcript in transcript_list:
            # The Transcript object provides metadata properties. Here are some...
            properties = {
                "video_id": transcript.video_id,
                "language": transcript.language,
                "language_code": transcript.language_code,
                "is_generated": transcript.is_generated,  # Whether it has been manually created or generated by YouTube
                "is_translatable": transcript.is_translatable,  # Whether this transcript can be translated or not
                "translation_languages": transcript.translation_languages,
            }
            
            for prop, value in properties.items():
                logger.info(f"{prop}: {value}")

            # Fetch the actual transcript data
            transcript_data = transcript.fetch() # returns a list of dicts
            logger.info(f"Raw transcript:\n{transcript_data}") 
            
            processed_transcript = process_transcript(transcript_data)
            logger.info(f"Processed transcript:\n{processed_transcript}")
            
            # Translate to en if we can
            if (transcript.language_code != "en" and 
                    transcript.is_translatable and 
                    any(lang['language_code'] == 'en' for lang in transcript.translation_languages)):
                transcript_data = transcript.translate('en').fetch() # translate to en
                processed_transcript = process_transcript(transcript_data)
                logger.info(f"Processed translated transcript:\n{processed_transcript}")

def process_transcript(transcript_data):
    """ Get all entries that are of type 'text' and NOT starting with [ """
    return "\n".join([entry['text'] for entry in transcript_data 
                                     if entry['text'][0] != "["])
                
get_transcripts()

How cool is this!?

## Adding Google Cloud Smarts

Now we're going to leverage Google Cloud APIs. In order to leverage these Google services, you'll need to have first created a Google Cloud project.

### How to Consume Google Cloud Services from your Notebook

Then, in order to give your notebook access to the Google Cloud APIs, you broadly have three options:

1. You can build and run your notebook locally.
1. You can build and run your notebook in Google Colab.
1. You can build and run your notebook in the Google Vertex AI Workbench environment.

Let's look at the options...

#### Local Notebook

For local development - e.g. a Jupyter notebook running in your own machine - you will need to:

1. Have the Google Cloud `gcloud CLI` installed. See instructions [here](https://cloud.google.com/sdk/docs/install).
2. Authenticate to `gcloud`, so we can externally run `gcloud` commands from the notebook.
3. Set your quota project, and set your Application Default Credentials (ADC) by authenticating to your gcloud environment.

```bash
# From your terminal...
export PROJECT_ID = <your project>
gcloud auth login # authenticate to gcloud
gcloud auth application-default login # set up ADC
gcloud auth application-default set-quota-project
gcloud config set project $PROJECT_ID
```

4. Use [Application Default Credentials](https://cloud.google.com/docs/authentication/application-default-credentials) from within the notebook.

```python
from google.auth import default
credentials, _ = default()

PROJECT = !gcloud config get-value project
PROJECT_ID = PROJECT[0]
REGION = "europe-west2"

# Now use whatever Google services...
import vertexai
vertexai.init(project=PROJECT_ID, location=LOCATION)
```

#### Google Colab

This is a great way to create and run Jupyter notebooks in the Cloud, and it makes them super-easy to share.

The great thing about this approach is that Colab provides native integration to authenticate your user account and provide your Google project details to the Colab environment.

Notes:

- No don't need to install Google `gcloud CLI` locally. It is pre-installed in the environment.
- You can share notebooks using Google Drive, with Drive-based access control.
- There are limitations for notebook size, and for notebook runtime instance size.

Check out [this guide](https://github.com/GoogleCloudPlatform/devrel-demos/blob/main/other/colab/Using%20Google%20Cloud%20from%20Colab.ipynb).

For example, in your notebook:

```python
import sys
import vertexai

PROJECT = !gcloud config get-value project
PROJECT_ID = PROJECT[0]
REGION = "europe-west2"

# Check if we're running in the Colab environment, and if so
# Use Colab native authentication to Google Cloud
if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()

credentials, _ = google.auth.default()

# Now use your Google services...
vertexai.init(
    project=MY_PROJECT,
    location=VERTEX_LOCATION,
    credentials=credentials
)
```


#### Vertex AI Workbench

[Vertex AI Workbench](https://cloud.google.com/vertex-ai/docs/workbench/introduction) is Google's most powerful managed enterprise Jupyter notebook hosting service. Is is fully-integrated with the Google Cloud and Vertex AI ecosystem.

Notes:

- The gcloud CLI is pre-installed in the environment.
- The JupyterLab environment is pre-installed.
- Access control and sharing is managed by Google Cloud IAM, rather than Google Drive.
- Because it is natively integrated with the Google Cloud environment, you don't need to provide any credentials or authenticate. You just need to provide Google project ID and region to any services that require this information. E.g.

```python
import vertexai

PROJECT = !gcloud config get-value project
PROJECT_ID = PROJECT[0]
REGION = "europe-west2"

# Go ahead and use your Google services...
vertexai.init(project=PROJECT_ID, location=REGION)
```

### Retrieve Environment Variables

I put my environment variables in a `.env` file at the top level of my project. It looks like this...

```
PYTHONPATH=src;src/notebooks

PROJECT_ID=my-project-id
REGION=my-region
```

In [None]:
import sys
from getpass import getpass

# Retrieve PROJECT_ID and other variables from any .env we can find
try:
    dc.get_envs_from_file()
except ValueError as e:
    logger.error(f"Problem reading env file:\n{e}")

env_vars = ["PROJECT_ID", "REGION"] # The vars we want to retrieve
for env_var in env_vars:
    if not os.getenv(env_var):
        PROJECT_ID = '' # @param {type: "string"}
        # If not retrieved from .env we'll need to input the value
        os.environ[env_var] = getpass(f"Enter {env_var}: ")

    # Set Python variable of the same name as the env var, e.g. 
    # set PROJECT_ID and REGION
    globals()[env_var] = os.environ[env_var]
    val = globals()[env_var]
    logger.info(f"{env_var} retrieved: {val}")
    

### Clear Environment Variables

**Only run the next cell if you want to manually clear the environment variables** and then input new values. In this scenario, you'll also want to comment out any variables in your .env file.

In [None]:
# Only run this if we want to clear env vars
for env_var in env_vars:
    if env_var in os.environ:
        del os.environ[env_var]
        logger.info(f"Cleared environment variable: {env_var}")

### A Notebook we can Run in ANY Environment

Let's engineer the notebook to be agnostic of where it is hosted.

In [None]:
# Don't forget to enable Cloud Storage API (storage.googleapis.com) 
%pip install google-auth google-auth-oauthlib google-auth-httplib2 google-cloud-storage 

In [None]:
from google.auth import default
from google.auth.exceptions import DefaultCredentialsError
from google.cloud import storage # Enable 

# If we're running Google Colab, authenticate
if "google.colab" in sys.modules:
    from google.colab import auth
    auth.authenticate_user()
else:
    # Be sure to set ADC if not running Vertex AI
    # E.g. gcloud auth application-default login
    pass

try:
    credentials, _ = default()
    !gcloud config set project $PROJECT_ID
except DefaultCredentialsError as e:
    logger.error(e)


### Video Transcription Usng the Video Intelligence API

Recall that in the previous notebook, I tried to perform audio trascription using the Python `speech_recognition` package, and the built-in [Google Web Speech API](https://wicg.github.io/speech-api/) `Recognizer`. It wasn't great!

So now let's use Google's [Video Intelligence API](https://cloud.google.com/video-intelligence/docs) to perform transcription...

In [None]:
%pip install google-cloud-videointelligence

In [None]:
from google.cloud import videointelligence

MIN_CONFIDENCE = 0.65 # Set a threshold for minimum acceptable API confidence in the transcription

video_client = videointelligence.VideoIntelligenceServiceClient()

# This API can do loads of things. Here I'll tell it to do speech transcription.
features = [videointelligence.Feature.SPEECH_TRANSCRIPTION]
config = videointelligence.SpeechTranscriptionConfig(
    language_code="en-US", enable_automatic_punctuation=True
)
video_context = videointelligence.VideoContext(speech_transcription_config=config)

# Fortunately, this API natively supports mp4 without any conversion
logger.debug(f"Looking for videos in {output_locn}...")
for video in Path(output_locn).glob(f'*.mp4'):
    logger.info(f"Processing {video.name}...")

    try:
        with io.open(video, "rb") as file:
            input_content = file.read()
            
        operation = video_client.annotate_video(
            request={
                "features": features,
                "input_content": input_content, # for lcoal
                # "input_uri": path, # for objects in GCS
                "video_context": video_context,
            }
        )

        result = operation.result(timeout=600)

        # There is only one annotation_result per video.
        annotation_results = result.annotation_results[0]
        complete_transcript = ""
        for speech_transcription in annotation_results.speech_transcriptions:
            # Each SpeechTranscription can contain multiple alternatives.
            # Each alternative is a different possible transcription and has its own confidence score.
            # They are ordered in terms of accuracy. So we really only need the first.
            part = speech_transcription.alternatives[0]
            if part.confidence < MIN_CONFIDENCE:
                logger.debug(f"Ignoring transcript alternative with confidence of {part.confidence}.")
                continue
                
            logger.debug("Part transcript: {}".format(part.transcript))
            logger.debug("Part confidence: {}\n".format(part.confidence))
            complete_transcript += part.transcript.strip() + "\n"
                
        complete_transcript.strip()
        if complete_transcript:
            logger.info(f"Complete transcript:\n{complete_transcript}")
        else:
            logger.warning("Could not retrieve high-confidence transcript.")

    except Exception as e:
       logger.error(e)

### Conclusions

It's pretty amazing!

- It's reliable and doesn't give random _pipe_ errors.
- It transcribes with much higher accuracy than the Python `speech_recognition` package using the Google Web Speech API `Recognizer`.
- We don't need to split the video into chunks.
- The API provides an estimate of transcription accuracy. And we can use this to filter out transcriptions that we don't want to keep.

Some minor issues:

1. It takes a long time to process each video.
2. It doesn't automatically detect the source language. So it fails with the Ukrainian music video.

What to do?

### Speech-to-Text from Audio

We can use Google [Speech-to-Text](https://cloud.google.com/speech-to-text/docs/sync-recognize) to detect the detect the language, and transcribe the audio to text.

Benefits of this approach:

- We can work from the audio files, so the data is much smaller.
- It is faster.
- If you end up calling the API a lot and cost becomes a factor, this will be cheaper.
- This API automatically detects the source language.

Let's give it a go!

First, enable the [Speech-to-Text API](https://console.developers.google.com/apis/api/speech.googleapis.com/overview?project=video-smarts-442000) (`speech.googleapis.com`) in your project.

Next: to process audio files longer than 1 minute in length, this API requires us to store our audio files in Google Cloud Storage (GCS).  So let's create a GCS bucket for this purpose.

#### Create the Bucket

In [None]:
BUCKET_NAME = f"{PROJECT_ID}-bucket"
BUCKET_URI = f"gs://{BUCKET_NAME}"

bucket_check = !gcloud storage ls $BUCKET_URI
bucket_exists = True
for line in bucket_check:
    if "404" in line:
        bucket_exists = False
        break
        
if not bucket_exists:
    logger.info(f"Creating bucket {BUCKET_URI}")
    ! gcloud storage buckets create {BUCKET_URI} --location={REGION}
else:
    logger.info(f"{BUCKET_URI} already exists.")


#### Use the Speech-to-Text API

In [None]:
%pip install google-cloud-speech

In [None]:

from google.cloud import speech

AUDIO_FILES_FOLDER = "audio"

def transcribe_local_file(audio_file: str) -> speech.RecognizeResponse:
    """Transcribe the given audio file. Only works with files up to 60s.
    Args:
        audio_file (str): Path to the local audio file to be transcribed.
            Example: "resources/audio.wav"
    Returns:
        cloud_speech.RecognizeResponse: The response containing the transcription results
    """
    client = speech.SpeechClient()

    FIRST_LANG = "en-US"
    SECOND_LANG = "uk-ua"

    with open(audio_file, "rb") as f:
        audio_content = f.read()

    audio = speech.RecognitionAudio(content=audio_content)
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=16000,
        language_code=FIRST_LANG,
        alternative_language_codes=[SECOND_LANG],
    )

    response = client.recognize(config=config, audio=audio)

    # Each result is for a consecutive portion of the audio. Iterate through
    # them to get the transcripts for the entire audio file.
    for result in response.results:
        # The first alternative is the most likely one for this portion.
        print(f"Transcript: {result.alternatives[0].transcript}")

    return response

def transcribe_long_audio(gcs_uri: str) -> str:
    """
    Asynchronously transcribes the audio file from Cloud Storage.
    For long audio (>60s) we have to store the file in GCS.
    
    Args:
        gcs_uri: The Google Cloud Storage path to an audio file.
                 E.g., "gs://storage-bucket/file.flac".
    Returns:
        The generated transcript from the audio file provided.
    """
    client = speech.SpeechClient()

    FIRST_LANG = "en-US"
    SECOND_LANG = "uk-ua"

    with open(audio_file, "rb") as f:
        audio_content = f.read()

    audio = speech.RecognitionAudio(content=audio_content)
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=16000,
        language_code=FIRST_LANG,
        alternative_language_codes=[SECOND_LANG],
    )
    
    operation = client.long_running_recognize(config=config, audio=audio)
    logger.debug("Waiting for operation to complete...")
    response = operation.result(timeout=90)

    transcript_builder = []
    # Each result is for a consecutive portion of the audio. Iterate through
    # them to get the transcripts for the entire audio file.
    for result in response.results:
        # The first alternative is the most likely one for this portion.
        transcript_builder.append(f"\nTranscript: {result.alternatives[0].transcript}")
        transcript_builder.append(f"\nConfidence: {result.alternatives[0].confidence}")

    transcript = "".join(transcript_builder)
    print(transcript)

    return transcript

def upload_to_gcs(bucket:str, src_file, dest_name):
    """Uploads a file to GCS."""
    try:
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket)
        
        # Destination blob name
        blob_name = dest_name 
        blob = bucket.blob(blob_name)
        logger.info(f"Uploading {src_file} to gs://{bucket}/{blob_name}")
        blob.upload_from_filename(src_file) 
        
        return f"gs://{bucket}/{blob_name}" # Return the full GCS URI
    except Exception as e:
        logger.exception(f"Error uploading {src_file} to GCS: {e}")
        return None # Or raise the exception if you want to stop processing

for audio_file in Path(output_locn).glob(f'*.m4a'):
    response = upload_to_gcs(BUCKET_NAME, 
                             src_file=audio_file, 
                             dest_name=f"{AUDIO_FILES_FOLDER}/{audio_file.name}")
    # response = transcribe_file(audio_file)
    

## Vertex AI

Let's integrate some Google Cloud Vertex AI smarts. Start by installing the **Google Cloud Vertex AI SDK for Python**. 

From [Introduction to the Vertex AI SDK for Python](https://cloud.google.com/vertex-ai/docs/python-sdk/use-vertex-ai-python-sdk#sdk-vs-client-library):

When you install the Vertex AI SDK for Python (`google.cloud.aiplatform`), the Vertex AI Python client library (`google.cloud.aiplatform.gapic`) is also installed. The Vertex AI SDK and the Vertex AI Python client library provide similar functionality with different levels of granularity. The Vertex AI SDK operates at a higher level of abstraction than the client library and is suitable for most common data science workflows. If you need lower-level functionality, then use the Vertex AI Python client library.

In [None]:
# Install Vertex AI SDK for Python and Vertex Generative AI SDK for Python
%pip install --upgrade google-cloud-aiplatform \
                       google-generativeai

In [None]:
from google.cloud import aiplatform # Google Cloud Vertex AI SDK for Python
# import vertexai   # Google Cloud Vertex Generative AI SDK for Python
import google.generativeai as genai  # Google Gemini API (GenAI)
from vertexai.generative_models import GenerativeModel