# Import

In [None]:
import os
import requests
from typing import List

import chromadb
from chromadb.api.types import Documents, Embeddings
from chromadb.utils.embedding_functions import EmbeddingFunction

import google.generativeai as genai

import gradio as gr
import fitz  # 要安裝PyMuPDF

# Download PDF and Extract text from PDF

In [None]:
def download_pdf(url, save_path):
    """
    從指定 URL 下載 PDF 文件並儲存到本地。

    :param url: PDF 文件的網址 (string)
    :param save_path: PDF 文件儲存的本地路徑 (string)
    """
    # 使用 requests 模組發送 HTTP GET 請求以獲取 PDF 文件
    response = requests.get(url)

    # 打開指定的本地儲存路徑，使用二進位寫入模式 ('wb')
    with open(save_path, 'wb') as f:
        # 將下載的文件內容寫入到本地文件中
        f.write(response.content)


def extract_text_from_pdf_file_obj(file):
    """
    從 PDF 檔案物件提取文本內容。

    :param file: PDF 文件的檔案物件 (e.g., 通過 open(file, 'rb') 獲取)
    :return: 提取的文本內容 (string)
    """
    try:
        with fitz.open(file.name) as doc:
            pdf_text = ""
            for page in doc:
                pdf_text += page.get_text()
        return pdf_text
    except Exception as e:
        return f"Error while reading PDF: {str(e)}"


def extract_text_from_pdf_file_path(file_path):
    """
    從 PDF 文件的路徑提取文本內容。

    :param file_path: PDF 文件的檔案路徑 (string)
    :return: 提取的文本內容 (string)
    """
    try:
        with fitz.open(file_path) as doc:
            pdf_text = ""
            for page in doc:
                pdf_text += page.get_text()
        return pdf_text
    except Exception as e:
        return f"Error while reading PDF: {str(e)}"

# ToDo:
- Text splitting
- ChromaDB
- Prompt Construction

## Implement text splitting function

In [None]:
# 分割文本為小塊
def split_text(text: str, max_chunk_size: int = 500, overlap: int = 50) -> List[str]:
    """
    將長文本分割為多個小塊，支援塊之間的重疊。

    :param text: 要分割的文本 (string)
    :param max_chunk_size: 每個文本塊的最大大小 (int)
    :param overlap: 每個文本塊之間的重疊大小 (int)
    :return: 分割後的文本塊列表 (List of strings)
    """
    chunks = []
    start = 0
    while start < len(text):
        end = min(start + max_chunk_size, len(text))
        chunks.append(text[start:end].strip())
        start += max_chunk_size - overlap
    return chunks

## Custom embedding function using Gemini API

In [None]:
# 自定義 Gemini 嵌入函數
class GeminiEmbeddingFunction(EmbeddingFunction):
    def __init__(self, api_key: str, model: str = "models/embedding-001", title: str = "Custom query"):
        self.api_key = api_key
        self.model = model
        self.title = title
        genai.configure(api_key=self.api_key)

    def __call__(self, input: Documents) -> Embeddings:
        return [
            genai.embed_content(
                model=self.model,
                content=doc,
                task_type="retrieval_document",
                title=self.title
            )["embedding"]
            for doc in input
        ]

## Implement ChromaDB creation and querying

In [None]:
# 向現有的 ChromaDB 集合中新增文件。
def update_chroma_db(client, collection_name: str, new_documents: List[str]):
    """
    向現有的 ChromaDB 集合中新增文件。

    :param path: ChromaDB 的資料庫路徑 (string)
    :param collection_name: 要更新的集合名稱 (string)
    :param new_documents: 要新增的文件列表 (List of strings)
    """

    # Get the existing collection by name
    collection = client.get_or_create_collection(collection_name)

    # Add new documents to the collection
    for i, document in enumerate(new_documents):
        collection.add(
            ids=[f"new_doc_{i}"],  # New unique ID for each document
            documents=[document],  # New document content
        )

    print(f"Added {len(new_documents)} new documents to the collection '{collection_name}'.")

In [None]:
# 查詢相關段落
def get_relevant_passage(query: str, db, name: str, n_results: int = 3) -> List[str]:
    """
    從指定的 ChromaDB 集合中查詢與給定問題相關的段落。

    :param query: 用戶的查詢語句 (string)
    :param db: 連接的 ChromaDB 資料庫對象
    :param name: 要查詢的集合名稱 (string)
    :param n_results: 返回的相關結果數量 (int, 默認為 3)
    :ret
    """
    collection = db.get_collection(name)
    results = collection.query(query_texts=[query], n_results=n_results)
    return results["documents"][0]

In [None]:
# 建構提示詞
def make_rag_prompt(query: str, relevant_passages: List[str]) -> str:
    context = "\n\n".join(relevant_passages)
    return f"""
    You are an intelligent assistant. Use the following context to answer the question:

    Context:
    {context}

    Question:
    {query}

    Provide a concise and accurate response.
    """

# LLM Response Generation

In [None]:
# Check Gemini API key
from dotenv import load_dotenv
import os

# 載入 .env 文件中的所有變數
load_dotenv("key.env")

# 使用 os.getenv 獲取環境變數
api_key = os.getenv('Geminiapikey')


# 確認變數是否正確載入
print(f"Gemini api key: {api_key}")

In [None]:
# Generate answer using Gemini Pro API
def generate_answer(prompt: str):
    load_dotenv()
    api_key = os.getenv('Geminiapikey')
    gemini_api_key = api_key
    if not gemini_api_key:
        raise ValueError(
            "Gemini API Key not provided. Please provide GEMINI_API_KEY as an environment variable")
    genai.configure(api_key=gemini_api_key)
    model = genai.GenerativeModel('gemini-pro')
    result = model.generate_content(prompt)
    return result.text

# Testing

In [None]:
# Set up configurations
pdf_url = "https://services.google.com/fh/files/misc/ai_adoption_framework_whitepaper.pdf"
pdf_path = "ai_adoption_framework_whitepaper.pdf"

db_folder = "chroma_db"
db_path = os.path.join(os.getcwd(), db_folder)

# Create database directory
if not os.path.exists(db_folder):
    os.makedirs(db_folder)


client = chromadb.PersistentClient(path=db_path)

# a database unit in Chroma is called collection, so db here means collection
db_name = "rag_experiment"
client.get_or_create_collection(db_name)
print(f"{db_name} is created")

In [None]:
# Download and process PDF
download_pdf(pdf_url, pdf_path)
pdf_text = extract_text_from_pdf_file_path(pdf_path)

# Split text into chunks
chunked_text = split_text(pdf_text)

update_chroma_db(client, db_name, chunked_text)

In [None]:
# Process user query
query = 'what is this file talking about?'
relevant_text = get_relevant_passage(query, client, db_name, n_results=3)

# Generate and display answer
if relevant_text:
    final_prompt = make_rag_prompt(query, "".join(relevant_text))
    answer = generate_answer(final_prompt)
    print("\nGenerated Answer:", answer)
else:
    print("No relevant information found for the given query.")

# Combine Functions

In [None]:
# 從 PDF 文件提取文本，分割文本為小塊，並更新 ChromaDB 集合。
def add_document_to_db(client, db_name, file):
    """
    :param db_path: ChromaDB 資料庫的路徑 (string)
    :param db_name: 要更新的 ChromaDB 集合名稱 (string)
    :param file: PDF 文件的二進位文件對象 (BinaryIO)
    """
    pdf_text = extract_text_from_pdf_file_obj(file)

    # Split text into chunks
    chunked_text = split_text(pdf_text)

    update_chroma_db(client, db_name, chunked_text)

    print(f"{db_name} is updated")

In [None]:
# 基於 RAG (Retrieval-Augmented Generation) 流程生成回答。
def rag_response(query, client, db_name):
    """
    :param query: 用戶的查詢語句 (string)
    :param client: 連接的 ChromaDB 資料庫客戶端
    :param db_name: 查詢的集合名稱 (string)
    :return: 生成的回答或錯誤信息 (string)
    """
    # Process user query
    relevant_text = get_relevant_passage(query, client, db_name, n_results=3)

    # Generate and display answer
    if relevant_text:
        final_prompt = make_rag_prompt(query, "".join(relevant_text))
        answer = generate_answer(final_prompt)
        response = "\nGenerated Answer:"+answer
    else:
        response = "No relevant information found for the given query."

    return response

# Main execution
## ToDo:
 - Chat history
 - Multiple file injest

# Initilaize

In [None]:
# 初始化 ChromaDB 資料庫，創建資料庫目錄並設置集合。
def initialize_database(db_folder: str, db_name: str) -> chromadb.PersistentClient:
    """
    :param db_folder: 資料庫文件夾名稱 (string)
    :param db_name: 資料庫集合名稱 (string)
    :return: 已初始化的 ChromaDB 客戶端 (chromadb.PersistentClient)
    """
    # 獲取當前工作目錄，構建完整的資料庫路徑
    db_path = os.path.join(os.getcwd(), db_folder)

    # 如果資料庫目錄不存在，則創建該目錄
    if not os.path.exists(db_folder):
        os.makedirs(db_folder)

    # 創建一個 PersistentClient 連接到指定的資料庫路徑
    client = chromadb.PersistentClient(path=db_path)

    # 在資料庫中創建或獲取指定名稱的集合
    client.get_or_create_collection(db_name)

    # 打印提示信息，確認集合已創建或存在
    print(f"Collection '{db_name}' is initialized in {db_folder}.")

    # 返回已初始化的客戶端對象
    return client

In [None]:
db_folder = "chroma_db"
db_name = "rag_experiment"

client = initialize_database(db_folder, db_name)
print(client)

# gradio UI

In [38]:
from flask import Flask, request, jsonify
from flask_cors import CORS
app = Flask(__name__)
#允許跨域請求
CORS(app)
# 初始化聊天歷史
chat_history = []  # 用於存儲用戶和機器人之間的所有對話

# 定義用戶輸入的交互邏輯
def respond(input_text, history):
    """
    處理用戶輸入，生成回應並更新聊天歷史。
    Args:
        input_text (str): 用戶的輸入訊息。
        history (list): 聊天歷史記錄。
    Returns:
        tuple: 清空的輸入框和更新後的聊天歷史。
    """
    # 確保聊天歷史初始化為空列表
    if history is None:
        history = []

    # 使用 RAG 模型生成回應
    bot_response = rag_response(input_text, client, db_name)

    # 將用戶輸入和機器人回應追加到歷史記錄
    history.append([input_text, bot_response])  # 每次對話為 [用戶訊息, 機器人回應]

    return "", history  # 返回清空的輸入框和新的聊天歷史

# 處理 PDF 文件上傳的函數


def handle_pdf_upload(file):
    """
    處理用戶上傳的 PDF 文件。
    Args:
        file (File): 上傳的文件對象。
    Returns:
        str: 文件處理狀態信息。
    """
    if file is None:
        return "尚未上傳文件。"

    # 檢查文件格式是否為 PDF
    if not file.name.endswith(".pdf"):
        return "僅支持上傳 PDF 文件！"

    # 模擬將文件添加到數據庫
    add_document_to_db(client, db_name, file)
    return f"已上傳文件：{file.name}"

@app.route('/respond', methods=['POST'])
def respond_api():
    data = request.json
    message = data.get('user_message')
    # 初始化 history 為空列表
    history = []

    # 呼叫 respond 函數，並將 message 和 history 作為參數
    _, updated_history = respond(message, history)
    bot_response = updated_history[-1][1] if updated_history else "無回應"

    return jsonify({"bot_message": bot_response})

@app.route('/upload', methods=['POST'])
def upload_file():
    file = request.files['file']
    response = handle_pdf_upload(file)
    return jsonify(response)

In [40]:
app.run(host="0.0.0.0", port=5000)

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5000
 * Running on http://172.24.209.41:5000
[33mPress CTRL+C to quit[0m
127.0.0.1 - - [19/Feb/2025 20:39:41] "POST /upload HTTP/1.1" 200 -
127.0.0.1 - - [19/Feb/2025 20:39:53] "OPTIONS /respond HTTP/1.1" 200 -
127.0.0.1 - - [19/Feb/2025 20:39:56] "POST /respond HTTP/1.1" 200 -
127.0.0.1 - - [19/Feb/2025 20:40:15] "OPTIONS /respond HTTP/1.1" 200 -
127.0.0.1 - - [19/Feb/2025 20:40:17] "POST /respond HTTP/1.1" 200 -
127.0.0.1 - - [19/Feb/2025 20:40:25] "OPTIONS /respond HTTP/1.1" 200 -
127.0.0.1 - - [19/Feb/2025 20:40:28] "POST /respond HTTP/1.1" 200 -
127.0.0.1 - - [19/Feb/2025 20:41:18] "OPTIONS /respond HTTP/1.1" 200 -
127.0.0.1 - - [19/Feb/2025 20:41:21] "POST /respond HTTP/1.1" 200 -
127.0.0.1 - - [19/Feb/2025 20:41:39] "OPTIONS /respond HTTP/1.1" 200 -
127.0.0.1 - - [19/Feb/2025 20:41:42] "POST /respond HTTP/1.1" 200 -
127.0.0.1 - - [19/Feb/2025 20:41:53] "OPTIONS /respond HTTP/1.1" 200 -
[2025-02-19 20:41:55,93