In [1]:
from moviepy.editor import VideoFileClip
from pathlib import Path
import speech_recognition as sr
from pytubefix import YouTube
from pprint import pprint
from dotenv import load_dotenv
import rich

import json
import os

from PIL import Image
import matplotlib.pyplot as plt
from youtube_transcript_api import YouTubeTranscriptApi

import re


In [2]:

# %pip install llama-index-multi-modal-llms-gemini
# %pip install llama-index-vector-stores-qdrant
# %pip install llama-index-embeddings-gemini
# %pip install llama-index-llms-gemini

In [3]:
# !pip install llama-index 'google-generativeai>=0.3.0' matplotlib qdrant_client

In [4]:
load_dotenv(dotenv_path="../env")

True

In [5]:
# import os

# GOOGLE_API_KEY = "AIzaSyDoNF1YGtsVG2JpCIeBRWHHBdDElFPJmx0"
# os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

In [6]:
# video_url = "https://www.youtube.com/watch?v=Tcqfx4LKKkY"
# output_video_path = "./video_data/"
# output_folder = "./mixed_data/"
# output_audio_path = "./mixed_data/output_audio.wav"

# filepath = output_video_path + "input_vid.mp4"
# Path(output_folder).mkdir(parents=True, exist_ok=True)

In [7]:
output_folder = "../data/"
Path(output_folder).mkdir(parents=True, exist_ok=True)

In [8]:



def plot_images(image_paths):
    images_shown = 0
    plt.figure(figsize=(16, 9))
    for img_path in image_paths:
        if os.path.isfile(img_path):
            image = Image.open(img_path)

            plt.subplot(2, 3, images_shown + 1)
            plt.imshow(image)
            plt.xticks([])
            plt.yticks([])

            images_shown += 1
            if images_shown >= 7:
                break

In [15]:
def get_video_metadata(yt:YouTube):
    #yt = YouTube(video_id)
    
    return {
        "video_id": yt.video_id,
        "title": yt.title,
        "author": yt.author,
        "keywords": yt.keywords,
        
        "publish_date": yt.publish_date.isoformat(),
        
        "length": yt.length,
        "likes": yt.likes,
        "views": yt.views,
        "channel_id": yt.channel_id,
        "thumbnail_url": yt.thumbnail_url,
        "description": yt.description,

    }

  
    
def get_youtube_id(link:str):
    """Extracts the video ID from a YouTube video link."""
    if "youtube.com" in link:
        pattern = r'youtube\.com/watch\?v=([a-zA-Z0-9_-]+)'
        video_id = re.search(pattern, link).group(1)
        return video_id
    elif "youtu.be" in link:
        pattern = r"youtu\.be/([a-zA-Z0-9_-]+)"
        video_id = re.search(pattern, link).group(1)
        return video_id
    else:
        return None

def get_transcript(video_id:str):
    try:
        transcript_dict = YouTubeTranscriptApi.get_transcript(video_id)
        final_transcript = " ".join(i["text"] for i in transcript_dict)
        return final_transcript , transcript_dict
    except Exception as e:
        print(e)


def get_transcript_time(link:str):
    """Gets the transcript of a YouTube video with timestamps."""
    video_id =get_youtube_id(link)

    try:
        transcript_dict = YouTubeTranscriptApi.get_transcript(video_id)
        final_transcript = ""
        for i in transcript_dict:
            timevar = round(float(i["start"]))
            hours = int(timevar // 3600)
            timevar %= 3600
            minutes = int(timevar // 60)
            timevar %= 60
            timevex = f"{hours:02d}:{minutes:02d}:{timevar:02d}"
            final_transcript += f'{i["text"]} "time:{timevex}" '
        return final_transcript
    except Exception as e:
        print(e)
        return video_id
    
    

def download_video(video_id:str, output_path):
    """
    Download a video from a given url and save it to the output path.

    Parameters:
    url (str): The url of the video to download.
    output_path (str): The path to save the video to.

    Returns:
    dict: A dictionary containing the metadata of the video.
    """
    
    url =  f"https://www.youtube.com/watch?v={video_id}"
    print (url)
    yt = YouTube(url, use_po_token=False)
    metadata = get_video_metadata(yt)
    
    transcript,transcript_dict = get_transcript(video_id)
    
    with open(os.path.join(output_path,"transcript.txt"),"w") as f:
        f.write(transcript)

        
    
    with open(os.path.join(output_path,"metadata.json"),"w") as f:
        json.dump(metadata,f, indent=4)

    yt.streams.get_highest_resolution().download(
        output_path=output_path, filename="video.mp4"
    )
    return metadata


def video_to_images(video_path, output_folder,fps=0.2):
    """
    Convert a video to a sequence of images and save them to the output folder.

    Parameters:
    video_path (str): The path to the video file.
    output_folder (str): The path to the folder to save the images to.

    """
    clip = VideoFileClip(video_path)
    
    os.makedirs(os.path.join(output_folder,"images"), exist_ok=True)
    clip.write_images_sequence(
        os.path.join(output_folder,"images", "frame%04d.png"), fps=fps
    )


In [16]:
video_id_1 = "TQQlZhbC5ps"
video_id_2 = "ODluYyMZzs0"

video_ids = [video_id_1 , video_id_2]

In [17]:
data_folder ="../data"

In [18]:
os.makedirs(data_folder, exist_ok=True)

In [19]:
for video_id in video_ids:
    print (video_id)
    output_folder = os.path.join(data_folder, video_id)
    os.makedirs(output_folder, exist_ok=True)
    metadata = download_video(video_id,output_folder )
    
    video_file_path = os.path.join(output_folder, "video.mp4")
    
    video_to_images(video_file_path, output_folder )

TQQlZhbC5ps
https://www.youtube.com/watch?v=TQQlZhbC5ps


BotDetection: TQQlZhbC5ps This request was detected as a bot. Use `use_po_token=True` to view. See more details at https://github.com/JuanBindez/pytubefix/pull/209

In [None]:
yt = YouTube("https://www.youtube.com/watch?v=TQQlZhbC5ps", use_oauth=True)

In [None]:
json.dumps (yt.vid_details )

In [None]:
metadata = get_video_metadata(yt)


In [None]:
yt.metadata._raw_metadata

In [None]:
??yt

In [None]:
from llama_index.core.indices import MultiModalVectorStoreIndex
from llama_index.core import SimpleDirectoryReader, StorageContext

from llama_index.vector_stores.lancedb import LanceDBVectorStore

from llama_index.core import Settings

from llama_index.core import SimpleDirectoryReader
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from llama_index.multi_modal_llms.openai import OpenAIMultiModal


In [None]:
text_store = LanceDBVectorStore(uri="lancedb", table_name="text_collection")
image_store = LanceDBVectorStore(uri="lancedb", table_name="image_collection")

In [None]:
storage_context = StorageContext.from_defaults(
    vector_store=text_store, image_store=image_store
)

In [None]:
embed_model = OpenAIEmbedding(
    model="text-embedding-3-large",
    dimensions=512,
)

llm_model  = OpenAI(model="gpt-4o-mini")


openai_mm_llm = OpenAIMultiModal(
    model="gpt-4o-mini"
)

In [None]:
Settings.embed_model = embed_model

Settings.llm = llm_model

In [None]:
def file_metadata(file_path:str):
    
    parent_dir = os.path.dirname(file_path)

    if ".png" in file_path:
        path_metadata_folder = os.path.join(parent_dir,"..")
    elif ".txt" in file_path:
         path_metadata_folder = parent_dir
            
    
    path_metadata = os.path.join(path_metadata_folder, "metadata.json")
        
    metadata = json.load(open(path_metadata))
    
    
    del metadata["description"]
    del metadata["keywords"]
    
    return metadata

In [None]:
#?SimpleDirectoryReader

In [None]:
# Create the MultiModal index
documents = SimpleDirectoryReader(data_folder, required_exts=[".txt",".png"], recursive=True, file_metadata = file_metadata).load_data(show_progress= True)



In [None]:

documents[0].excluded_embed_metadata_keys

In [None]:
len(documents)

In [None]:
documents[0]

In [None]:
?MultiModalVectorStoreIndex.from_vector_store

In [None]:
index = MultiModalVectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
)

In [None]:
retriever_engine = index.as_retriever(
    similarity_top_k=5, image_similarity_top_k=5
)

In [None]:
import json

#metadata_str = json.dumps(metadata_vid)

qa_tmpl_str = (
    "Given the provided information, including relevant images and retrieved context from the video, \
 accurately and precisely answer the query without any additional prior knowledge.\n"
    
    "Answer the following Question based on the Context only. Only answer from the Context. If you don't know the answer, say 'I don't know'. \n"
    
    "Please ensure honesty and responsibility, refraining from any racist or sexist remarks.\n"
    "---------------------\n"
    "Context: {context_str}\n"
    "Metadata for video: {metadata_str} \n"
    "---------------------\n"
    "Query: {query_str}\n"
    "Answer: "
)

In [None]:
from llama_index.core.response.notebook_utils import display_source_node
from llama_index.core.schema import ImageNode


def retrieve(retriever_engine, query_str, source_length=400):
    retrieval_results = retriever_engine.retrieve(query_str)

    retrieved_image = []
    retrieved_text = []
    for res_node in retrieval_results:
        
        if isinstance(res_node.node, ImageNode):
            retrieved_image.append(res_node.node.image_path)
        else:
            display_source_node(res_node, source_length=source_length)
            retrieved_text.append(res_node.text)

    return retrieved_image, retrieved_text

In [None]:
query_str = "what are the best food places in NYC"


In [None]:
query_str = "what are the best food places in Honolulu"


In [None]:
query_str = "Why is attention important for transformers"


In [None]:
query_str = "Why is exercising important"


In [None]:

# list of images, and list of output text
# source_length:length of text output to print.
img, txt = retrieve(retriever_engine=retriever_engine, query_str=query_str, source_length=100)

#image document contains the images we want to process in multi-modal prompt to LLM
image_documents = SimpleDirectoryReader(
    input_dir=output_folder, input_files=img
).load_data()
context_str = "".join(txt)
plot_images(img)

In [None]:
# 

# openai_mm_llm = OpenAIMultiModal(
#     model="gpt-4o", api_key=OPENAI_API_KEY, max_new_tokens=1500
# )


# response_1 = openai_mm_llm.complete(
#     prompt=qa_tmpl_str.format(
#         context_str=context_str, query_str=query_str, metadata_str=metadata_str
#     ),
#     image_documents=image_documents,
# )

# pprint(response_1.text)

In [None]:
mm_llm = openai_mm_llm

In [None]:
response_1 = mm_llm.complete(
    prompt=qa_tmpl_str.format(
        context_str=context_str, query_str=query_str, metadata_str={}
    ),
    image_documents=image_documents,
)



In [None]:
rich.print (response_1 )

In [None]:
rich.print(response_1.text)