In [33]:
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import WebVTTFormatter
import yt_dlp
import os
from pathlib import Path
import webvtt
import json
import cv2

In [58]:
def download_video(video_url, path="/tmp/"):
    # Define output template with filename pattern
    ydl_opts = {
        "format": "bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]",
        "outtmpl": os.path.join(
            path, "%(title)s.%(ext)s"
        ),  # Use the provided path for the download
        "noplaylist": True,  # Ensures only the single video is downloaded (not the whole playlist)
    }

    try:
        # Download the video using yt-dlp
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info_dict = ydl.extract_info(
                video_url, download=True
            )  # Download and extract video info
            filename = ydl.prepare_filename(
                info_dict
            )  # Get the full path of the downloaded file

        print(f"Download completed successfully: {filename}")
        return filename  # Return the filepath of the downloaded video

    except Exception as e:
        print(f"Error downloading video: {e}")
        return None  # Return None if there is an error


# def download_video(video_url, path='/tmp/'):
#     ydl_opts = {
#         'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]',
#         'outtmpl': f"{path}/%(title)s.%(ext)s",
#     }

#     try:
#         with yt_dlp.YoutubeDL(ydl_opts) as ydl:
#             ydl.download([video_url])
#         print("Download completed successfully!")
#     except Exception as e:
#         print(f"Error downloading video: {e}")


def get_video_id_from_url(video_url):
    """
    Examples:
    - http://youtu.be/SA2iWivDJiE
    - http://www.youtube.com/watch?v=_oPAwA_Udwc&feature=feedu
    - http://www.youtube.com/embed/SA2iWivDJiE
    - http://www.youtube.com/v/SA2iWivDJiE?version=3&amp;hl=en_US
    """
    import urllib.parse

    url = urllib.parse.urlparse(video_url)
    if url.hostname == "youtu.be":
        return url.path[1:]
    if url.hostname in ("www.youtube.com", "youtube.com"):
        if url.path == "/watch":
            p = urllib.parse.parse_qs(url.query)
            return p["v"][0]
        if url.path[:7] == "/embed/":
            return url.path.split("/")[2]
        if url.path[:3] == "/v/":
            return url.path.split("/")[2]

    return video_url


# if this has transcript then download
def get_transcript_vtt(video_url, path="/tmp"):
    video_id = get_video_id_from_url(video_url)
    filepath = os.path.join(path, "captions.vtt")
    if os.path.exists(filepath):
        return filepath

    transcript = YouTubeTranscriptApi.get_transcript(
        video_id, languages=["en-GB", "en"]
    )
    formatter = WebVTTFormatter()
    webvtt_formatted = formatter.format_transcript(transcript)

    with open(filepath, "w", encoding="utf-8") as webvtt_file:
        webvtt_file.write(webvtt_formatted)
    webvtt_file.close()

    return filepath


def str2time(strtime):
    # strip character " if exists
    strtime = strtime.strip('"')
    # get hour, minute, second from time string
    hrs, mins, seconds = [float(c) for c in strtime.split(":")]
    # get the corresponding time as total seconds
    total_seconds = hrs * 60**2 + mins * 60 + seconds
    total_miliseconds = total_seconds * 1000
    return total_miliseconds


# Resizes a image and maintains aspect ratio
def maintain_aspect_ratio_resize(image, width=None, height=None, inter=cv2.INTER_AREA):
    # Grab the image size and initialize dimensions
    dim = None
    (h, w) = image.shape[:2]

    # Return original image if no need to resize
    if width is None and height is None:
        return image

    # We are resizing height if width is none
    if width is None:
        # Calculate the ratio of the height and construct the dimensions
        r = height / float(h)
        dim = (int(w * r), height)
    # We are resizing width if height is none
    else:
        # Calculate the ratio of the width and construct the dimensions
        r = width / float(w)
        dim = (width, int(h * r))

    # Return the resized image
    return cv2.resize(image, dim, interpolation=inter)


def extract_and_save_frames_and_metadata(
    path_to_video,
    path_to_transcript,
    path_to_save_extracted_frames,
    path_to_save_metadatas,
):
    """This function extracts frames from a video at specified times based on a transcript,
    resizes and saves those frames, and stores metadata related to each frame in a JSON file."""
    metadata = []
    # load video and transcript
    video = cv2.VideoCapture(path_to_video)
    trans = webvtt.read(path_to_transcript)

    # for each video segment specified in the transcript file
    for idx, transcript in enumerate(trans):
        start_time_ms = str2time(transcript.start)
        end_time_ms = str2time(transcript.end)
        mid_time_ms = (end_time_ms + start_time_ms) / 2
        # get the transcript, remove the next-line symbol
        text = transcript.text.replace("\n", " ")
        # get frame at the middle time
        video.set(cv2.CAP_PROP_POS_MSEC, mid_time_ms)
        success, frame = video.read()

        if success:
            # if the frame is extracted successfully, resize it
            image = maintain_aspect_ratio_resize(frame, height=350)
            # save frame as JPEG file
            img_fname = f"frame_{idx}.jpg"
            img_fpath = os.path.join(path_to_save_extracted_frames, img_fname)
            cv2.imwrite(img_fpath, image)

            # prepare the metadata
            single_metadata = {
                "extracted_frame_path": img_fpath,
                "transcript": text,
                "video_segment_id": idx,
                "video_path": path_to_video,
                "mid_time_ms": mid_time_ms,
            }
            metadata.append(single_metadata)

        else:
            print(f"ERROR! Cannot extract frame: idx = {idx}")

    # save metadata of all extracted frames
    fn = os.path.join(path_to_save_metadatas, "metadata.json")
    with open(fn, "w") as outfile:
        json.dump(metadata, outfile)
    return metadata

In [59]:
# first video's url
vid1_url = "https://www.youtube.com/watch?v=OKJbaoIy9vk"

# download Youtube video to ./shared_data/videos/video1
vid1_dir = "../data/videos"
vid1_filepath = download_video(vid1_url, vid1_dir)

# download Youtube video's subtitle to ./shared_data/videos/video1
vid1_transcript_filepath = get_transcript_vtt(vid1_url, vid1_dir)

[youtube] Extracting URL: https://www.youtube.com/watch?v=OKJbaoIy9vk
[youtube] OKJbaoIy9vk: Downloading webpage
[youtube] OKJbaoIy9vk: Downloading tv client config
[youtube] OKJbaoIy9vk: Downloading player 9c6dfc4a
[youtube] OKJbaoIy9vk: Downloading tv player API JSON
[youtube] OKJbaoIy9vk: Downloading ios player API JSON
[youtube] OKJbaoIy9vk: Downloading m3u8 information
[info] OKJbaoIy9vk: Downloading 1 format(s): 617+140
[download] ../data/videos/One Communicator - Any Communications.mp4 has already been downloaded
Download completed successfully: ../data/videos/One Communicator - Any Communications.mp4


In [60]:
# output paths to save extracted frames and their metadata
extracted_frames_path = os.path.join(vid1_dir, "extracted_frame")
metadatas_path = vid1_dir

# create these output folders if not existing
Path(extracted_frames_path).mkdir(parents=True, exist_ok=True)
Path(metadatas_path).mkdir(parents=True, exist_ok=True)

# call the function to extract frames and metadatas
metadatas = extract_and_save_frames_and_metadata(
    vid1_filepath,
    vid1_transcript_filepath,
    extracted_frames_path,
    metadatas_path,
)

## Case 2: No available transcripts

Use STT model such as Whisper to transcribe video

## Case 3: No language in video

Use lvlm to inference frames for their meaning