In [2]:
# Define project information
PROJECT_ID = "youtube-qa-417904"  # @param {type:"string"}
LOCATION = "asia-southeast1"  # @param {type:"string"}

# Initialize Vertex AI
import vertexai

vertexai.init(project=PROJECT_ID, location=LOCATION)

In [None]:
from vertexai.generative_models import GenerationConfig, GenerativeModel, Image, Part
import google.auth
from google.cloud import aiplatform

In [None]:
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
# credentials = service_account.Credentials.from_service_account_file(
#     '/content/youtube-qa-417904-c01e0b9c5ca1.json'
# )
aiplatform.init(credentials=credentials)

In [None]:
file_path = "youtube_qa/sample.mp4"
video_uri = f"gs://{file_path}"
video_url = f"https://storage.googleapis.com/{file_path}"

In [None]:
multimodal_model = GenerativeModel("gemini-1.0-pro-vision")

In [None]:
prompt = """
Who is in the video?
"""

video = Part.from_uri(video_uri, mime_type="video/mp4")
contents = [prompt, video]

responses = multimodal_model.generate_content(contents, stream=True)

for response in responses:
    print(response.text, end="")

 The people in the video are:

* MrBeast (Jimmy Donaldson)
* Chandler Hallow
* Chris Tyson
* Karl Jacobs
* Sapnap (Nick Armstrong)
* GeorgeNotFound (George Davidson)
* Dream (Clay)
* BadBoyHalo (Darryl Noveschosch)
* Antfrost (Anthony)
* Skeppy (Zak Ahmed)
* Punz (Luke)
* Quackity (Alexis)
* TommyInnit (Thomas Simons)
* Tubbo (Toby Smith)
* JackSucksAtLife (Jack Massey Welsh)
* PewDiePie (Felix Kjellberg)

# Preprocess pipeline


## Youtube downloader


In [1]:
import json
from pathlib import Path
from dataclasses import dataclass
from yt_dlp import YoutubeDL

In [2]:
@dataclass
class AppConfig:
    video_save_dir: str = "data/videos"
    metadata_save_dir: str = "data/metadata"

In [3]:
url = "https://www.youtube.com/watch?v=I9XXCvvAc4A"
params = {
    "paths": {"home": AppConfig.video_save_dir},
    "outtmpl": {"default": "%(title)s.%(ext)s"},
    "format": "mp4/bestaudio/best",
    "ffmpeg_location": "/Users/minhquang/Desktop/spaces/eklipse/lib/ffmpeg",
    "postprocessors": [
        {  # Extract audio using ffmpeg
            "key": "FFmpegExtractAudio",
            "preferredcodec": "wav",
        }
    ],
    "keepvideo": True,
}
with YoutubeDL(params=params) as ydl:
    info = ydl.extract_info(url, download=True)
    info = ydl.sanitize_info(info)

[youtube] Extracting URL: https://www.youtube.com/watch?v=I9XXCvvAc4A
[youtube] I9XXCvvAc4A: Downloading webpage
[youtube] I9XXCvvAc4A: Downloading ios player API JSON
[youtube] I9XXCvvAc4A: Downloading android player API JSON
[youtube] I9XXCvvAc4A: Downloading player 5352eb4f
[youtube] I9XXCvvAc4A: Downloading m3u8 information
[info] I9XXCvvAc4A: Downloading 1 format(s): 18
[download] Destination: data/videos/Lil Wuyn - An (Full Album).mp4
[download] 100% of  126.10MiB in 00:00:25 at 4.86MiB/s     
[ExtractAudio] Destination: data/videos/Lil Wuyn - An (Full Album).wav


In [None]:
info.get("title")

'DO4LOVE – 52Hz ft. willistic (Prod. Minsicko) | Official Lyric Video'

In [None]:
def download_video_from_youtube(url: str) -> dict[str, str]:
    # TODO: beautify title.

    params = {
        "paths": {"home": AppConfig.video_save_dir},
        "outtmpl": {"default": "%(title)s.%(ext)s"},
        "format": "mp4/bestaudio/best",
        "ffmpeg_location": "/Users/minhquang/Desktop/spaces/eklipse/lib/ffmpeg",
        "postprocessors": [
            {  # Extract audio using ffmpeg
                "key": "FFmpegExtractAudio",
                "preferredcodec": "wav",
            }
        ],
        "keepvideo": True,
    }
    with YoutubeDL(params=params) as ydl:
        info = ydl.extract_info(url, download=True)
        info = ydl.sanitize_info(info)
    metadata = {"id": info.get("id"), "title": info.get("title")}
    # TODO: need to also include frames, audio, transcription path.
    with open(f"{AppConfig.metadata_save_dir}/{metadata['title']}.json", "w") as f:
        json.dump(metadata, f)
    return metadata

In [None]:
info = download_video_from_youtube("https://www.youtube.com/watch?v=zgOzvSAthlY")

[youtube] Extracting URL: https://www.youtube.com/watch?v=zgOzvSAthlY
[youtube] zgOzvSAthlY: Downloading webpage
[youtube] zgOzvSAthlY: Downloading ios player API JSON
[youtube] zgOzvSAthlY: Downloading android player API JSON
[youtube] zgOzvSAthlY: Downloading m3u8 information
[info] zgOzvSAthlY: Downloading 1 format(s): 22
[download] Destination: data/videos/DO4LOVE – 52Hz ft. willistic (Prod. Minsicko) ｜ Official Lyric Video.mp4
[download] 100% of    6.77MiB in 00:00:01 at 6.21MiB/s     
[ExtractAudio] Destination: data/videos/DO4LOVE – 52Hz ft. willistic (Prod. Minsicko) ｜ Official Lyric Video.wav


In [1]:
1

1

## Video to sequence of images


In [1]:
import os
from moviepy.video.io.VideoFileClip import VideoFileClip

In [6]:
def video_to_images(video_path: str, save_dir: str, seconds_per_frame: int = 5) -> None:
    video_name = video_path.split("/")[-1].split(".mp4")[0]
    fps = 1 / seconds_per_frame

    clip = VideoFileClip(video_path)
    clip.write_images_sequence(
        os.path.join(save_dir, f"{video_name}-frame-%04d.png"), fps=fps
    )


video_to_images("data/videos/sample.mp4", "data/videos")

t:   1%|          | 323/48777 [07:12<2:17:39,  5.87it/s, now=None]

Moviepy - Writing frames data/videos/sample-frame-%04d.png.
0.2


t:   1%|          | 323/48777 [08:45<2:17:39,  5.87it/s, now=None]

Moviepy - Done writing frames data/videos/sample-frame-%04d.png.


In [12]:
def video_to_audio(video_path: str, save_dir: str):
    video_name = video_path.split("/")[-1].split(".mp4")[0]

    clip = VideoFileClip(video_path)
    audio = clip.audio
    audio.write_audiofile(os.path.join(save_dir, f"{video_name}-audio.wav"))


video_to_audio("data/videos/sample.mp4", "data/audio")

MoviePy - Writing audio in data/audio/sample-audio.wav


                                                                        

MoviePy - Done.




## Transcribe audio


In [2]:
import speech_recognition as sr


def stt(audio_path: str) -> str:
    recognizer = sr.Recognizer()
    audio = sr.AudioFile(audio_path)

    with audio as source:
        # Record the audio data
        audio_data = recognizer.record(source)

        try:
            # Recognize the speech
            text = recognizer.recognize_whisper(audio_data)
        except sr.UnknownValueError:
            print("Speech recognition could not understand the audio.")
        except sr.RequestError as e:
            print(f"Could not request results from service; {e}")

    return text


stt(audio_path="data/audio/sample-audio.wav")

" I'm called the massive isolation chamber. And we're gonna see if these two strangers can survive in this chamber for the next 100 days. They have never met each other ever, Bailey. This is Susie. Susie, this is Bailey. Nice to meet you. Hi, this is Bailey. If the two of you can survive the next 100 days in here, I will give you the half a million dollars inside of this bowl. But if one of you leaves before the 100 days is up, you both get nothing. Alright, I think you guys understand the rules. Have fun. Okay, bye. This is gonna be crazy. Um... Yeah. This is actually like an insane asylum. They're currently looking at all the stuff we put in there. We gave them enough food for 100 days. Which is healthy, but basically the exact same thing over and over again. We also gave them the room to try the bathroom, which comes with the shower. And obviously has no cameras inside. And a bed to sleep on. They have everything they need to survive 100 days. It's just a question of, do they want i

In [1]:
import torch
from transformers import pipeline
from transformers.utils import is_flash_attn_2_available

pipe = pipeline(
    "automatic-speech-recognition",
    model="openai/whisper-large-v3",  # select checkpoint from https://huggingface.co/openai/whisper-large-v3#model-details
    torch_dtype=torch.float16,
    device="mps",  # or mps for Mac devices
    model_kwargs=(
        {"attn_implementation": "flash_attention_2"}
        if is_flash_attn_2_available()
        else {"attn_implementation": "sdpa"}
    ),
)

  from .autonotebook import tqdm as notebook_tqdm
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [2]:
outputs = pipe(
    "/Users/minhquang/Desktop/spaces/eklipse/data/audio/sample-audio.wav",
    chunk_length_s=30,
    batch_size=24,
    return_timestamps=True,
)

outputs

KeyboardInterrupt: 

In [None]:
import google.generativeai as genai
import os

os.environ["API_KEY"] = "AIzaSyDoyWs8cGfNhD3fYB3QmTqaxn0kqp2aqPM"
genai.configure(api_key=os.environ["API_KEY"])

model = genai.GenerativeModel("gemini-pro-vision")

In [None]:
multimodal_model = GenerativeModel("gemini-1.0-pro-vision")

In [None]:
def _chat_with_video(gs_path: str, )

In [None]:
file_path = "youtube_qa/sample.mp4"
video_uri = f"gs://{file_path}"
video_url = f"https://storage.googleapis.com/{file_path}"
prompt = """
Who is in the video?
"""

video = Part.from_uri(video_uri, mime_type="video/mp4")
contents = [prompt, video]

responses = multimodal_model.generate_content(contents, stream=True)

for response in responses:
    print(response.text, end="")

In [2]:
from youtube_transcript_api import YouTubeTranscriptApi

YouTubeTranscriptApi.get_transcript("3hlZifZHtKY", languages=("en", "vi"))

[{'text': 'Xin chào mừng các bạn đến với hav ship',
  'start': 0.0,
  'duration': 3.08},
 {'text': 'uống gì không chương trình mà th Minh',
  'start': 1.52,
  'duration': 4.08},
 {'text': 'gặp cố gắng gặp mọi người vào mỗi tuần',
  'start': 3.08,
  'duration': 4.52},
 {'text': 'nhưng mà gần đây mình cũng không phải',
  'start': 5.6,
  'duration': 5.48},
 {'text': 'lúc nào cũng gặp mọi người vào hàng tuần',
  'start': 7.6,
  'duration': 5.199},
 {'text': 'bởi vì th mình là người rất là quan',
  'start': 11.08,
  'duration': 3.799},
 {'text': 'trọng xem là khách mời của mình là ai',
  'start': 12.799,
  'duration': 5.041},
 {'text': 'cái câu chuyện mà họ có thể mang đến cho',
  'start': 14.879,
  'duration': 4.841},
 {'text': 'khán giả của mình là gì khách mờ hiện',
  'start': 17.84,
  'duration': 4.32},
 {'text': 'tại đang thấy người cười rung rích cái',
  'start': 19.72,
  'duration': 5.76},
 {'text': 'gì xin chào mừng một kiểu hoa hậu của',
  'start': 22.16,
  'duration': 6.8},
 {'tex

In [16]:
import re

url = "https://www.youtube.com/watch?v=gdW0LBFGh5w"
pattern = r"^https:\/\/www\.youtube\.com\/watch\?v=(\w*)(?:&\w=.*)*"
matches = re.findall(pattern, url)[0]
matches

'gdW0LBFGh5w'

In [None]:
from typing import Tuple


class VideoIngestionPipeline:
    metadata_save_dir: Path
    video_save_dir: Path
    audio_save_dir: Path
    text_save_dir: Path

    def __init__(self, config: AppConfig):
        self.metadata_save_dir = config.save_dir / "metadata"
        self.video_save_dir = config.save_dir / "video"
        self.img_save_dir = config.save_dir / "img"
        self.audio_save_dir = config.save_dir / "audio"
        self.text_save_dir = config.save_dir / "text"

        Path(config.save_dir).mkdir(exist_ok=True)
        self.metadata_save_dir.mkdir(exist_ok=True)
        self.video_save_dir.mkdir(exist_ok=True)
        self.img_save_dir.mkdir(exist_ok=True)
        self.audio_save_dir.mkdir(exist_ok=True)
        self.text_save_dir.mkdir(exist_ok=True)

    def _get_file_name(self, path: str | Path):
        if isinstance(path, str):
            return path.split("/")[-1].split(".")[0]
        if isinstance(path, Path):
            return path.stem
        raise TypeError("path must be instance of str or Path.")

    def _reformat_youtube_trans(self, transcription: list[dict]) -> list[dict]:
        formatted = []
        for chunk in transcription:
            start = round(chunk["start"], 2)
            end = round(start + chunk["duration"], 2)
            formatted.append({"timestamp": (start, end), "text": chunk["text"]})
        return formatted

    def _get_yt_id_from_url(self, url: str) -> str:
        pattern = r"^https:\/\/www\.youtube\.com\/watch\?v=(\w*)(?:&\w=.*)*"
        matches = re.findall(pattern, url)
        if len(matches) == 0:
            raise ValueError(
                "Youtube link has to in this form: https://www.youtube.com/watch?v=<video_id_here>"
            )
        return matches[0]

    def download_video_from_youtube(self, url: str) -> dict[str, str]:
        # TODO: beautify title.
        params = {
            "paths": {"home": str(self.video_save_dir)},
            "outtmpl": {"default": "%(title)s.%(ext)s"},
            "format": "mp4/bestaudio/best",
            "postprocessors": [{"key": "FFmpegExtractAudio", "preferredcodec": "wav"}],
            "keepvideo": True,
        }
        with YoutubeDL(params=params) as ydl:
            info = ydl.extract_info(url, download=True)
            info = ydl.sanitize_info(info)

        metadata = {
            "id": info.get("id"),
            "title": info.get("title"),
            "video_file": str(self.video_save_dir / f"{info.get('title')}.mp4"),
            "audio_file": str(self.audio_save_dir / f"{info.get('title')}.wav"),
        }
        # TODO: need to also include frames, audio, transcription path.
        with open(f"{self.metadata_save_dir}/{metadata['title']}.json", "w") as f:
            json.dump(metadata, f)

        # Move audio file to correct folders.
        shutil.move(
            metadata["video_file"].replace(".mp4", ".wav"), metadata["audio_file"]
        )

        return metadata

    def video_to_audio(self, video_path: str) -> str:
        video_name = self._get_file_name(video_path)
        clip = VideoFileClip(video_path)

        audio = clip.audio
        audio_path = self.audio_save_dir / f"{video_name}-audio.wav"
        audio.write_audiofile(audio_path)

        return audio_path

    def video_to_images(self, video_path: str, seconds_per_frame: int = 5) -> str:
        video_name = self._get_file_name(video_path)
        fps = 1 / seconds_per_frame

        clip = VideoFileClip(video_path)
        img_path = self.img_save_dir / f"{video_name}-frame-%04d.png"
        clip.write_images_sequence(img_path, fps=fps)
        return img_path

    def audio_to_text(
        self,
        audio_path: str,
        model: str = "openai/whisper-large-v3",
        device: str = "cuda:0",
        chunk_length_s: int = 30,
        batch_size: int = 10,
    ) -> Tuple[dict, Path]:
        pipe = pipeline(
            "automatic-speech-recognition",
            model=model,  # select checkpoint from https://huggingface.co/openai/whisper-large-v3#model-details
            torch_dtype=torch.float16,
            device=device,  # or mps for Mac devices
            generate_kwargs={"language": "english"},
            # model_kwargs={"attn_implementation": "flash_attention_2"} if is_flash_attn_2_available() else {"attn_implementation": "sdpa"},
        )
        # Output: dict with text and chunks.
        transcription = pipe(
            audio_path,
            chunk_length_s=chunk_length_s,
            batch_size=batch_size,
            return_timestamps=True,
        )
        audio_name = self._get_file_name(audio_path)
        text_path = self.text_save_dir / f"{audio_name}.json"
        with open(text_path, "w") as f:
            json.dump(transcription, f)

        return transcription, text_path

    def run(self, input_path_or_url: str, run_config: RunConfig):
        if re.match("^https?://", input_path_or_url):
            video_id = self._get_yt_id_from_url(input_path_or_url)
            # For youtube videos, already got video and audio.
            video_metadata = self.download_video_from_youtube(input_path_or_url)
            # Also, use subtitle api to get transcription.
            transcription = YouTubeTranscriptApi.get_transcript(video_id)
            transcription = self._reformat_youtube_trans(transcription)
            text_path = self.text_save_dir / f"{video_metadata.get('title')}.json"
        else:
            # Split audio from video.
            audio_path = self.video_to_audio(input_path_or_url)
            video_metadata = {
                "id": str(uuid.uuid4()),
                "title": self._get_file_name(input_path_or_url),
                "video_file": str(input_path_or_url),
                "audio_file": str(audio_path),
            }
            # Transcribe video.
            transcription, text_path = self.audio_to_text(video_metadata["audio_file"])
            video_metadata["text_file"] = str(text_path)

        # Split video in to frames.
        img_path = self.video_to_images(
            video_metadata["video_path"], run_config.seconds_per_frame
        )
        video_metadata["img_path"] = img_path

In [4]:
def _reformat_youtube_trans(transcription: list[dict]) -> dict[str, str]:
    chunks = []
    merged_text = ""
    for chunk in transcription:
        start = round(chunk["start"], 2)
        end = round(start + chunk["duration"], 2)
        chunks.append({"timestamp": (start, end), "text": chunk["text"]})
        merged_text += " " + chunk["text"]

    formatted = {"text": merged_text, "chunks": chunks}
    return formatted