In [None]:
import json
from youtube_transcript_api import YouTubeTranscriptApi
from sentence_transformers import SentenceTransformer # Import SentenceTransformer
import pinecone
import time
import os # Import os module for environment variables
from dotenv import load_dotenv


# Load environment variables from .env file
load_dotenv()

# Retrieve Pinecone API key from environment
api_key = os.getenv("PINECONE_API_KEY")

# Initialize Pinecone with the hardcoded API key
pc = pinecone.Pinecone(api_key=api_key)

# Define index parameters
index_name = "youtube-index"
dimension = 768
metric = "cosine"

# Create Pinecone index if it doesn't exist
if index_name not in pc.list_indexes().names():
    pc.create_index(
        name=index_name,
        dimension=dimension,
        metric=metric,
        spec=pinecone.ServerlessSpec(cloud="aws", region="us-east-1")
    )
    print(f"Created Pinecone index: {index_name}")
else:
    print(f"Pinecone index '{index_name}' already exists")

# Wait for the Pinecone index to be ready
while not pc.describe_index(index_name).status['ready']:
    time.sleep(1)

# Connect to the index
index = pc.Index(index_name)
print(f"Successfully connected to Pinecone index: {index_name}")

# Initialize the SentenceTransformer model # Define and initialize the model
model = SentenceTransformer('all-mpnet-base-v2')

# Load metadata from JSON file
with open("videos_metadata.json", "r") as f:
    video_metadata = json.load(f)


# Convert metadata into a dictionary indexed by video_id for easy lookup
video_metadata_dict = {
    item['videoId']: {
        **item,
        "video_id": item['videoId'],
        "video_title": item.get('title', 'Unknown Title'),  # Adding video_title
        "publishedAt": item.get('publishedAt', 'Unknown Date'),  # Adding publishedAt if available
        "thumbnail": item.get('thumbnail', 'No Thumbnail Available')  # Adding thumbnail if available
    }
    for item in video_metadata
}

def get_video_transcript(video_id):
    try:
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        transcript_text = " ".join([item['text'] for item in transcript if 'text' in item])
        return transcript_text
    except Exception as e:
        print(f"Error retrieving transcript for video {video_id}: {e}")
        return None

def chunk_and_embed_text(text, chunk_size=50):
    chunks = [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
    embeddings = model.encode(chunks)
    return chunks, embeddings

def upsert_to_pinecone(video_id, metadata, chunks, embeddings):
    batch_size = 50
    for i in range(0, len(chunks), batch_size):
        chunk_ids = [f"{video_id}-{j}" for j in range(i, min(i + batch_size, len(chunks)))]
        
        # Metadata including chunk text and video metadata
        metadata_list = [
            {
                "video_id": video_id,
                "video_title": metadata.get("title", f"Title for {video_id}"),
                "published_at": metadata.get("publishedAt"),
                "thumbnail": metadata.get("thumbnail"),
                "text": chunks[k]
            }
            for k in range(i, min(i + batch_size, len(chunks)))
        ]

        vectors_with_metadata = []
        for j in range(len(chunk_ids)):
            vector = {
                "id": chunk_ids[j],
                "values": embeddings[i + j].tolist(),
                "metadata": metadata_list[j]
            }
            vectors_with_metadata.append(vector)

        index.upsert(vectors=vectors_with_metadata)

def process_video(video_id):
    print(f"Processing video: {video_id}")
    transcript_text = get_video_transcript(video_id)
    if transcript_text:
        print(f"Transcript for video {video_id}:")
        print(transcript_text)
        chunks, embeddings = chunk_and_embed_text(transcript_text)
        
        # Get the metadata for the video
        metadata = video_metadata_dict.get(video_id, {})
        
        # Upsert data to Pinecone
        upsert_to_pinecone(video_id, metadata, chunks, embeddings)

# Process all videos
video_ids = [
    'gvck7ssg9dE',
    'djzKCZHeVjY',
    'puXKFn-nKis',
    'pvRY3r-b0QI',
    'SUMLKweFAYk',
    'UOuxo6SA8Uc',
]

for video_id in video_ids:
    process_video(video_id)

In [None]:
import os
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Pinecone
from langchain.llms import OpenAI
from langchain.chains.question_answering import load_qa_chain
# Import HuggingFaceEmbeddings
from langchain.embeddings import HuggingFaceEmbeddings
# Import PromptTemplate from langchain.prompts
from langchain.prompts import PromptTemplate
# Import LLMChain
from langchain.chains import LLMChain
from youtube_transcript_api import YouTubeTranscriptApi
# Import RecursiveCharacterTextSplitter
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Import tiktoken for token counting
import tiktoken
import ipywidgets as widgets
from IPython.display import display, clear_output
#import openai
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
import pinecone

# 'index' is already defined from the previous code

# Get the API key from the environment variable
api_key = os.getenv("OPENAI_API_KEY")

# Initialize HuggingFace embeddings with the same model used to create the index
embeddings = HuggingFaceEmbeddings(model_name="all-mpnet-base-v2")

# Use the HuggingFaceEmbeddings for the Pinecone vectorstore
vectorstore = Pinecone(index, embeddings.embed_query, "text")

# Initialize language model and QA chain
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)


# Define a prompt template for summarizing the video transcript
summary_template = """
Please provide a concise summary of the following YouTube video transcript:

Transcript:
{transcript}

Summary:
"""

SUMMARY_PROMPT = PromptTemplate(
    input_variables=["transcript"],
    template=summary_template
)

# Define a prompt template for answering questions about the video
qa_template = """
Use the following summary to answer the question about the YouTube video:

Summary:
{summary}

Question: {question}

Answer:
"""

QA_PROMPT = PromptTemplate(
    input_variables=["summary", "question"],
    template=qa_template
)

def get_video_summary(video_id):
    transcript = YouTubeTranscriptApi.get_transcript(video_id)
    transcript_text = " ".join([item['text'] for item in transcript if 'text' in item])

    # Initialize RecursiveCharacterTextSplitter with a suitable chunk size and overlap
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,  # Adjust chunk size as needed
        chunk_overlap=100  # Adjust overlap as needed
    )

    # Split the transcript into smaller chunks
    chunks = text_splitter.split_text(transcript_text)

    # Generate summaries for each chunk and combine them
    summary = ""
    for chunk in chunks:
        # Generate summary using LLMChain
        summary_chain = LLMChain(llm=llm, prompt=SUMMARY_PROMPT)
        chunk_summary = summary_chain.run(transcript=chunk)
        summary += chunk_summary + " "  # Combine summaries with a space

    return summary

def ask_question(question, video_id, show_context=False):
    tokenizer = tiktoken.get_encoding("cl100k_base")
    max_context_tokens = 10000

    # 1. Get the summary for the given video_id
    summary = get_video_summary(video_id)

    # 2. Build a retriever
    retriever = RunnableLambda(vectorstore.similarity_search).bind(k=3)

    # 3. Invoke the retriever to fetch documents before using them
    docs = retriever.invoke(question) # Invoke to get the documents

    # 4. Concatenate the retrieved documents to form the context
    context = ""
    total_tokens = 0
    for doc in docs:
        doc_tokens = len(tokenizer.encode(doc.page_content))
        if total_tokens + doc_tokens <= max_context_tokens:
            context += doc.page_content + " "
            total_tokens += doc_tokens
        else:
            break

    # 5. Format the context to be included in the prompt
    formatted_context = f"Context: {context}"  # Add "Context:" for clarity

    # 6. Create a QA chain using the retrieved context, summary, and question
    # Modified: Directly pass summary and question to QA_PROMPT
    qa_chain = QA_PROMPT | llm

    # 6. Create a QA chain using the retrieved context, summary, and question
    # Note: the 'context' key in the rag_chain should be 'retriever', not the actual docs
    # rag_chain={}
    # Modified to pass 'question' as input and summary and retriever in the prompt
    #rag_chain={"context": retriever,"summary": RunnablePassthrough()} | QA_PROMPT | llm
    #rag_chain = {"context": retriever, "question": RunnablePassthrough()} | QA_PROMPT | llm
    #rag_chain = {"question": RunnablePassthrough()} | QA_PROMPT.bind(question=question, context=retriever) | llm
    #rag_chain = {"question": RunnablePassthrough()} | QA_PROMPT.bind(summary=summary, context=retriever) | llm

    # 7. Pass the summary, question to the chain and access the content
    answer = qa_chain.invoke({"summary": summary, "question": question})

    # Return only the content of the answer
    return answer.content  # Access the content attribut

    # 8. Optionally display the relevant context
    if show_context:
        print("Relevant Context:")
        print(context)

    return answer

# Interactive loop for asking questions
while True:
    question = input("Enter your question (or 'q' to quit): ")
    if question.lower() == 'q':
        print("Exiting the program. Goodbye!")
        break
    elif not question.strip():
        print("Please enter a valid question.")
        continue

    video_id = input("Enter the YouTube video ID (or 'q' to quit): ")
    if video_id.lower() == 'q':
        print("Exiting the program. Goodbye!")
        break
    elif not video_id.strip():
        print("Please enter a valid YouTube video ID.")
        continue

    answer = ask_question(question, video_id)
    print(answer)

# Video IDs for the dropdown
video_ids = [
    'gvck7ssg9dE',
    'djzKCZHeVjY',
    'puXKFn-nKis',
    'pvRY3r-b0QI',
    'SUMLKweFAYk',
    'UOuxo6SA8Uc',
]

In [None]:
import gradio as gr
import json
import os
import requests
from PIL import Image as PILImage
import io
import base64
import socket


# Load video metadata
json_file_path = 'videos_metadata.json'  # Replace with the actual path
with open(json_file_path, 'r') as json_file:
    video_metadata = json.load(json_file)

# Extract video IDs and titles for the dropdown
video_choices = {video_data["title"]: video_data["videoId"] for video_data in video_metadata}

# Define the Gradio interface with event listeners
with gr.Blocks() as iface:
    gr.Markdown("## ClipVerse YT, just do IT !?")

    # Dropdown for video selection (no initial value)
    video_dropdown = gr.Dropdown(
        choices=list(video_choices.keys()),
        label="Pick Your Adventure",
        value=None  # Start with no selection
    )

    # Components to display metadata (initially hidden)
    with gr.Row(visible=False) as metadata_row:
        title_textbox = gr.Textbox(label="Title")
        thumbnail_image = gr.Image(label="Thumbnail")  # Specify type as "url"
        published_at_textbox = gr.Textbox(label="Published At")

    # Question textbox
    question_textbox = gr.Textbox(lines=2, placeholder="Enter your question here...", label="Dare to Discover?")

    # Answer textbox
    answer_textbox = gr.Textbox(label="Take a Leap!")

    # Button to ask questions with custom CSS
    ask_button = gr.Button("Show me the Magic", variant="primary") # Use variant for styling

    # Restart button with custom CSS
    restart_button = gr.Button("Give more Magic", variant="secondary") # Use variant for styling

    # Function to update metadata and make them visible
    def update_metadata_fields(selected_title):
        selected_video_id = video_choices.get(selected_title)
        if selected_video_id:
            selected_video_data = next(
                (item for item in video_metadata if item["videoId"] == selected_video_id),
                None
            )
            if selected_video_data:
                # Load image from URL and encode as base64
                image_url = selected_video_data.get("thumbnail")
                if image_url:
                    print(f"Fetching thumbnail from: {image_url}")
                    try:
                        # If it's a local file path, use open() instead of requests.get()
                        if os.path.isfile(image_url):
                            with open(image_url, "rb") as image_file:
                                image_content = image_file.read()
                        else:  # If it's a URL, use requests.get()
                            response = requests.get(image_url, stream=True)
                            response.raise_for_status()
                            image_content = response.content

                        # Use PIL to load and resize the image
                        image = PILImage.open(io.BytesIO(image_content))
                        # Calculate new dimensions while preserving aspect ratio
                        width, height = image.size
                        target_size = (200, 200)  # Desired size

                        if width > height:
                            new_width = target_size[0]
                            new_height = int(height * (target_size[0] / width))
                        else:
                            new_height = target_size[1]
                            new_width = int(width * (target_size[1] / height))

                        # Resize using LANCZOS for better quality
                        image = image.resize((new_width, new_height), PILImage.LANCZOS)

                        # Update: Directly return the PIL Image object
                        return {
                            metadata_row: gr.update(visible=True),
                            title_textbox: gr.update(value=selected_video_data["title"]),
                            thumbnail_image: gr.update(value=image),  # Return PIL Image
                            published_at_textbox: gr.update(value=selected_video_data["publishedAt"]),
                        }

                    except Exception as e:
                        print(f"Error loading thumbnail: {e}")
                        # Handle the case where image loading fails (e.g., invalid URL)
                        return {
                            metadata_row: gr.update(visible=True),  # Still show metadata
                            title_textbox: gr.update(value=selected_video_data["title"]),
                            thumbnail_image: gr.update(value=None),  # Set thumbnail to None
                            published_at_textbox: gr.update(value=selected_video_data["publishedAt"]),
                        }
                else:
                    print(f"Thumbnail URL not found for video ID: {selected_video_id}")
            else:
                print(f"Video data not found for video ID: {selected_video_id}")
        return {
            metadata_row: gr.update(visible=False),
            title_textbox: gr.update(value=""),
            thumbnail_image: gr.update(value=None),
            published_at_textbox: gr.update(value=""),
        }


    # Event listener for video dropdown change
    video_dropdown.change(
        fn=update_metadata_fields,
        inputs=video_dropdown,
        outputs=[metadata_row, title_textbox, thumbnail_image, published_at_textbox],
    )

    # Function to handle question asking
    def ask_question_gradio(question, selected_title):
        video_id = video_choices.get(selected_title)
        if not video_id:
            return "Please select a video first."
        try:
            answer = ask_question(question, video_id)  # Assuming 'ask_question' is defined elsewhere
        except Exception as e:
            return f"Error: Could not answer the question. {e}"
        return answer

    # Event trigger for the ask button
    ask_button.click(
        fn=ask_question_gradio,
        inputs=[question_textbox, video_dropdown],
        outputs=answer_textbox,
    )

    # Function to reset the interface
    def restart_interface():
        return {
            video_dropdown: gr.update(value=None),
            metadata_row: gr.update(visible=False),
            title_textbox: gr.update(value=""),
            thumbnail_image: gr.update(value=None),
            published_at_textbox: gr.update(value=""),
            question_textbox: gr.update(value=""),
            answer_textbox: gr.update(value=""),
        }

    # Event listener for video dropdown change
    video_dropdown.change(
        fn=update_metadata_fields,
        inputs=video_dropdown,
        outputs=[metadata_row, title_textbox, thumbnail_image, published_at_textbox],
    )

    # Event trigger for the ask button
    ask_button.click(
        fn=lambda question, selected_title: ask_question_gradio(
            question, video_choices.get(selected_title)
        ),
        inputs=[question_textbox, video_dropdown],
        outputs=answer_textbox,
    )

    # Function to reset the interface
    def restart_interface():
        return {
            video_dropdown: gr.update(value=None),
            metadata_row: gr.update(visible=False),
            title_textbox: gr.update(value=""),
            thumbnail_image: gr.update(value=None),
            published_at_textbox: gr.update(value=""),
            question_textbox: gr.update(value=""),
            answer_textbox: gr.update(value=""),
        }

    # Insert ask_question_gradio here
    def ask_question_gradio(question, video_id):
        # 1. Check if video_id is valid
        if not video_id:
            return "Please select a video first."

        # 2. Call your existing ask_question function
        try:
            answer = ask_question(question, video_id)
        except Exception as e:
            return f"Error: Could not answer the question. {e}"

        return answer


    # Event trigger for the restart button
    restart_button.click(
        fn=restart_interface,
        outputs=[
            video_dropdown,
            metadata_row,
            title_textbox,
            thumbnail_image,
            published_at_textbox,
            question_textbox,
            answer_textbox,
        ],
    )

# Launch the interface
iface.launch(share=True, server_port=1234)