In [1]:
import os
import gc
from warnings import filterwarnings
from prompts import *
from llama_index.core.schema import ImageNode
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from pytubefix import YouTube
from moviepy.editor import VideoFileClip
import speech_recognition as sr
from pathlib import Path
from llama_index.core.indices.multi_modal.base import MultiModalVectorStoreIndex
from llama_index.core import SimpleDirectoryReader, StorageContext
from llama_index.vector_stores.lancedb import LanceDBVectorStore
import time
import re

# Set up paths
output_video_path = "./video_data/"
output_folder = "./mixed_data/"
output_audio_path = "./mixed_data/output_audio.wav"

filepath = output_video_path + "input_vid.mp4"

# Ensure necessary directories exist
Path(output_folder).mkdir(parents=True, exist_ok=True)

# Utility function to validate YouTube URL
def is_valid_youtube_url(url):
    youtube_regex = re.compile(r'(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/.+')
    return youtube_regex.match(url)

# Download video
def download_video(url, output_path):
    yt = YouTube(url)
    metadata = {"Author": yt.author, "Title": yt.title, "Views": yt.views}
    yt.streams.get_highest_resolution().download(output_path=output_path, filename="input_vid.mp4")
    return metadata

# Extract images from video
def video_to_images(video_path, output_folder):
    clip = VideoFileClip(video_path)
    clip.write_images_sequence(os.path.join(output_folder, "frame%04d.png"), fps=0.2)

# Extract audio from video
def video_to_audio(video_path, output_audio_path):
    clip = VideoFileClip(video_path)
    audio = clip.audio
    audio.write_audiofile(output_audio_path)

# Convert audio to text
def audio_to_text(audio_path):
    recognizer = sr.Recognizer()
    audio = sr.AudioFile(audio_path)
    with audio as source:
        audio_data = recognizer.record(source)
        try:
            text = recognizer.recognize_whisper(audio_data)
        except sr.UnknownValueError:
            print("Speech recognition could not understand the audio.")
        except sr.RequestError as e:
            print(f"Could not request results from service; {e}")
    return text

# Set up LlamaIndex for retrieval
def setup_index(url):
    if not is_valid_youtube_url(url):
        print("Invalid URL")
        return None, None
    else:
        metadata_vid = download_video(url, output_video_path)
    video_to_images(filepath, output_folder)
    video_to_audio(filepath, output_audio_path)
    text_data = audio_to_text(output_audio_path)

    with open(output_folder + "output_text.txt", "w") as file:
        file.write(text_data)
    
    os.remove(output_audio_path)  # Clean up the audio file

    # Initialize vector stores and LlamaIndex
    text_store = LanceDBVectorStore(uri="lancedb", table_name="text_collection")
    image_store = LanceDBVectorStore(uri="lancedb", table_name="image_collection")
    storage_context = StorageContext.from_defaults(vector_store=text_store, image_store=image_store)

    documents = SimpleDirectoryReader(output_folder).load_data()
    index = MultiModalVectorStoreIndex.from_documents(documents, storage_context=storage_context)
    return index, metadata_vid

# Retrieve and display context
def retrieve_context(index, query):
    retriever_engine = index.as_retriever(similarity_top_k=5, image_similarity_top_k=5)
    retrieval_results = retriever_engine.retrieve(query)
    retrieved_image = []
    retrieved_text = []
    for res_node in retrieval_results:
        if isinstance(res_node.node, ImageNode):
            retrieved_image.append(res_node.node.metadata["file_path"])
        else:
            retrieved_text.append(res_node.text)
    return retrieved_image, retrieved_text

# Response generator
def response_generator(response):
    try:
        for chunk in response:
            yield chunk["message"]["content"]
    except KeyError as e:
        print(f"Error in response format: {e}")

output_video_path = "./video_data/"
output_folder = "./mixed_data/"
output_audio_path = "./mixed_data/output_audio.wav"
filepath = os.path.join(output_video_path, "input_vid.mp4")
Path(output_folder).mkdir(parents=True, exist_ok=True)

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
from models import *
from llama_index.core.program import MultiModalLLMCompletionProgram
from llama_index.core.prompts import PromptTemplate
from llama_index.core.query_pipeline import QueryPipeline
from llama_index.core import Settings


embed_model = HuggingFaceEmbedding(model_name=selected_embed_model)
Settings.embed_model = embed_model

video_url = input("Insert YouTube video URL: ")

# Setup index and metadata
index, metadata_vid = setup_index(video_url)

Moviepy - Writing frames ./mixed_data/frame%04d.png.


                                                              

Moviepy - Done writing frames ./mixed_data/frame%04d.png.
MoviePy - Writing audio in ./mixed_data/output_audio.wav


                                                                        

MoviePy - Done.


[2024-10-02T07:48:02Z WARN  lance::dataset] No existing dataset at /home/reaper/code/LLMs/MMLLMRAG/lancedb/text_collection.lance, it will be created
[2024-10-02T07:48:05Z WARN  lance::dataset] No existing dataset at /home/reaper/code/LLMs/MMLLMRAG/lancedb/image_collection.lance, it will be created


### Tools

In [15]:
from tools import DocumentToolsGenerator

file_path = './mixed_data'
tools = []

if file_path.endswith(".txt"):
    docs_tools = DocumentToolsGenerator(file_path=file_path)
    nodes = docs_tools.data_ingestion()
    text_toolkit = list(docs_tools.tool_generator(nodes=nodes))
    tools.extend(text_toolkit)

### Setup Agent

In [22]:
from llama_index.core.agent.react_multimodal.step import MultimodalReActAgentWorker
from llama_index.core.agent import Task
from llama_index.multi_modal_llms.ollama import OllamaMultiModal

llm = OllamaMultiModal(
    model=selected_model
)

react_step_engine = MultimodalReActAgentWorker.from_tools(
    tools,
    # [],
    multi_modal_llm=llm,
    verbose=True,
)
agent = react_step_engine.as_agent()

In [23]:
query_str = (
    "The images are snapshots of the video. "
    "Can you try to answer the user's questions on specific details?"
)

from llama_index.core.schema import ImageDocument
image_documents = [ImageDocument(image_path=str(png)) for png in Path('./mixed_data').glob('*.png')]

task = agent.create_task(
    query_str,
    extra_state={"image_docs": image_documents},
)

In [24]:
from llama_index.core.agent import AgentRunner


def execute_step(agent: AgentRunner, task: Task):
    step_output = agent.run_step(task.task_id)
    if step_output.is_last:
        response = agent.finalize_response(task.task_id)
        print(f"> Agent finished: {str(response)}")
        return response
    else:
        return None


def execute_steps(agent: AgentRunner, task: Task):
    response = execute_step(agent, task)
    while response is None:
        response = execute_step(agent, task)
    return response

In [25]:
response = execute_step(agent, task)

ResponseError: json: cannot unmarshal array into Go struct field ChatRequest.messages of type string