<a href="https://colab.research.google.com/github/speedthunder/PythonxGPT/blob/main/Youtube_RAG_Ver_003_graido.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [15]:
# 安裝所需套件
!pip install langchain langchain_openai rich youtube_search pytube yt-dlp langchain_community faiss-cpu
!apt install ffmpeg
!pip install faiss-gpu
# 匯入所需模組
from rich import print as pprint
import os
import re
from openai import OpenAI
from langchain_community.tools import YouTubeSearchTool
from pytube import YouTube
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings
from google.colab import drive
from google.colab import userdata
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain.chains import LLMChain
from langchain import OpenAI
import gradio as gr
# 設定 OpenAI API 金鑰
os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')

def Crawl_key_urls(keyword):
    try:
        # 使用 YouTubeSearchTool 查找影片
        tool = YouTubeSearchTool()
        result = tool.run(keyword)

        # 使用正則表達式來提取 YouTube 影片 ID
        video_ids = re.findall(r'watch\?v=([a-zA-Z0-9_-]{11})', result)

        if not video_ids:
            return "Error: No video found."

        # 返回第一個影片的網址
        url = f"https://youtu.be/{video_ids[0]}"
        return url
    except Exception as e:
        return f"Error in Crawl_key_urls: {str(e)}"


def url_download(url):
    try:
        !yt-dlp --force-overwrites -x --audio-format mp3 -o "audio.mp3" "{url}"
        !ffmpeg -y -i audio.mp3 -ab 32k compressed_audio.mp3
    except Exception as e:
        return f"Error in url_download: {str(e)}"

def audio_to_text(file_path):
    try:
        from openai import OpenAI
        client = OpenAI()
        audio_file = open(file_path, "rb")
        transcript = client.audio.transcriptions.create(
            model="whisper-1",
            file=audio_file
        )
        return transcript.text
    except Exception as e:
        return f"Error in audio_to_text: {str(e)}"

def split_text(text):
    try:
        text_splitter = RecursiveCharacterTextSplitter(separators=[' '],
                                                       chunk_size=300,
                                                       chunk_overlap=10)
        splits = text_splitter.split_text(text)
        return splits
    except Exception as e:
        return f"Error in split_text: {str(e)}"

def rag_embeddinge_save(splits, embeddings):
    try:
        db = FAISS.from_texts(splits, embeddings)
        db.save_local("/content/drive/MyDrive/youtube_db")
    except Exception as e:
        return f"Error in rag_embeddinge_save: {str(e)}"

def rag_embeddinge_load(embeddings):
    try:
        new_db = FAISS.load_local(
            folder_path="/content/drive/MyDrive/youtube_db",
            embeddings=embeddings,
            allow_dangerous_serialization=True
        )
        return new_db
    except Exception as e:
        return f"Error in rag_embeddinge_load: {str(e)}"

def create_chat_chain_with_embedding(embeddings, model_name="gpt-4o-mini", temperature=0.7, folder_path="/content/drive/MyDrive/youtube_db"):
    try:
        new_db = FAISS.load_local(
            folder_path="/content/drive/MyDrive/youtube_db",
            embeddings=embeddings,
            allow_dangerous_deserialization=True)

        docs = new_db.similarity_search('對這部電影的感受')
        pprint(docs[0])

        chat_model = ChatOpenAI(model_name=model_name, temperature=temperature)

        str_parser = StrOutputParser()
        template = (
            "所有回應都以條列式方式，請根據以下內容加上內容中的專業方面來判斷並回答問題：\n"
            "{context}\n"
            "問題：{question}"
        )
        prompt = ChatPromptTemplate.from_template(template)

        retriever = new_db.as_retriever()

        chain = (
            {"context": retriever, "question": RunnablePassthrough()}
            | prompt
            | chat_model
            | str_parser
        )

        return chain
    except Exception as e:
        return f"Error in create_chat_chain_with_embedding: {str(e)}"

# 初始化對話歷史
chat_history = []

def process(keyword, msg):
    try:
        global chat_history

        # 掛載 Google Drive
        drive.mount('/content/drive')

        # 呼叫函數取得 YouTube 搜尋結果
        if not chat_history:  # 只有在首次進行搜尋
            yt_url = Crawl_key_urls(keyword)
            if "Error" in yt_url:
                return yt_url
            print(f"取得的 URL：{yt_url}")

            # 下載影片音訊
            download_error = url_download(yt_url)
            if download_error:
                return download_error

            # 進行音訊轉文字
            text = audio_to_text('/content/compressed_audio.mp3')
            if "Error" in text:
                return text

            print("轉換的文字內容：")
            print(text)

            # 分割文字
            splits = split_text(text)
            if "Error" in splits:
                return splits

            # 創建嵌入向量
            embeddings = OpenAIEmbeddings()

            # 儲存嵌入向量
            save_error = rag_embeddinge_save(splits, embeddings)
            if save_error:
                return save_error

            # 創建聊天鏈條
            chain = create_chat_chain_with_embedding(embeddings)
            if isinstance(chain, str) and "Error" in chain:
                return chain

            # 儲存 chain 供後續使用
            chat_history.append(chain)

        # 使用已建立的 chain 進行對話
        chain = chat_history[0]
        while msg.lower() != 'q':
            response = chain.invoke(msg)
            ans = response.split('\n')
            ans = [i for i in ans if i != '']
            chat_history.append((msg, '\n'.join(ans)))
            return '\n'.join(ans)
        return "對話已結束。"
    except Exception as e:
        return f"Error in process: {str(e)}"

# Gradio UI
iface = gr.Interface(
    fn=process,
    inputs=[
        gr.Textbox(label="輸入關鍵字", lines=2, placeholder="在這裡輸入關鍵字..."),  # 傳入 process(keyword)
        gr.Textbox(label="輸入訊息，'q'離開, lines=2, placeholder="在這裡輸入訊息...")     # 傳入 process(msg)
    ],
    outputs=gr.Textbox(label="回應", lines=10),  # 輸出 '\n'.join(ans)
    title="YouTube 音訊處理與對話系統",
    description="輸入關鍵字來搜尋 YouTube 影片，並且根據影片內容進行對話。",
)

iface.launch()

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
ffmpeg is already the newest version (7:4.4.2-0ubuntu0.22.04.1).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.
Setting queue=True in a Colab notebook requires sharing enabled. Setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Running on public URL: https://8b2079f1cf1c671db7.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


