In [1]:
!pip install PyMuPDF
!pip install parrotai
!pip install python_dotenv
!pip install faiss-cpu



In [2]:
import fitz
import numpy as np
from parrotai import ParrotAPI
import time
import json
import os
from dotenv import load_dotenv
import faiss

In [3]:
parrot = ParrotAPI()
username = "xxx"
password = "xxx"

login_resp = parrot.login(username=username, password=password)

In [4]:
def read_pdf_text(pdf_path, chunk=100, overlap_percentage=0.3):
    doc = fitz.open(pdf_path)
    texts = ""
    for page in doc:
        texts += page.get_text()

    words = texts.split()
    overlap = int(chunk * overlap_percentage)
    step = chunk - overlap

    chunks = [' '.join(words[i:i+chunk]) for i in range(0, len(words), step) if i+chunk <= len(words)]

    if len(words) % step != 0:
        chunks.append(' '.join(words[-(len(words) % step):]))

    return chunks

def create_embeddings(texts, model="gte-large"):
    embeddings = []
    for text in texts:
        resp = parrot.create_text_embedding(text=text, model=model)
        task_id = resp['data']['task_id']
        timeout = time.time() + 60
        while True:
            if time.time() > timeout:
                break
            time.sleep(1)
            result = parrot.result_text_embedding(task_id=task_id)
            if result['data']['data']['status'] == 'COMPLETED':
                break

        response = result['data']['data']['response']
        if response == '' or response is None:
            print(f"The empty string from text: {text[:30]}...")
            continue
        else:
            embedding_list = json.loads(response)
            embedding = np.array(embedding_list, dtype=np.float32)
            embeddings.append(embedding)
    return embeddings

def create_index(vectors, use_gpu=False):
    dimension = vectors.shape[1]
    index = faiss.IndexFlatL2(dimension)

    if use_gpu:
        co = faiss.GpuMultipleClonerOptions()
        co.shard = True
        gpu_index = faiss.index_cpu_to_all_gpus(index, co=co)
        gpu_index.add(vectors)
        return gpu_index
    else:
        index.add(vectors)
        return index

def search_index(index, query_vectors, k=5):
    D, I = index.search(query_vectors, k)
    return D, I

def save_index(index, file_path):
    faiss.write_index(index, file_path)

def load_index(file_path):
    index = faiss.read_index(file_path)
    return index

def rag_logic(question: str, k: int = 4) -> str:
    # 1. embedding question
    question_embeddings = create_embeddings([question])
    question_embeddings = np.array(question_embeddings).reshape(1, -1)
    # 2. Search
    distances, indices = search_index(index, question_embeddings, k=k)
    # 3. Tạo prompt
    found_texts = [texts[idx] for idx in indices[0]]  # Assuming documents is a list of texts
#     prompt = f"""You are an expert in situational analysis. I will provide you with a question and some information related to that question. Please write a concise and comprehensive answer to the question. Just return only the answer, not repeat the question and relevant information.
# Question: {question},
# Relevant informations: {found_texts},
# Answer:

# For example:
# Question: What is the capital of VietNam?
# Answer:

# You will only return Hanoi
# """
    prompt = f"""As an expert in situational analysis, your task is to provide clear and detailed answers based solely on the information provided. Your response should directly address the question without reiterating the question or the background information provided. Aim for brevity while ensuring your answer is complete and informative.

Example:
Question: What is the capital of Vietnam?
Relevant Information: [Any additional information that might have been provided]
Your response should simply be, not repeat the question and relevalt information:
'Hanoi'

Please, just provide the answer in response to the question given, drawing from the relevant information if mentioned.
Question: {question}
Relevant information: {found_texts}
"""

    return prompt

def chat_logic(question: str, max_tokens=1024, temperature=0.7, top_p=0.9, top_k=50) -> str:
    prompt = rag_logic(question)
    response = parrot.text_generation(
        messages=[{"role": "user", "content": prompt}],
        model="gemma-7b",
        max_tokens=max_tokens,
        temperature=temperature,
        top_p=top_p,
        top_k=top_k,
    )

    if response["data"]["is_success"]:
        content = response["data"]["data"]["response"]
        return content
    else:
        return "Unable to generate response."

In [5]:
pdf_path = 'test_rag.pdf'

texts = read_pdf_text(pdf_path)

text_embeddings = np.array(create_embeddings(texts)).reshape(-1, 1024)

In [6]:
index = create_index(np.array(text_embeddings), use_gpu=False)  # Create the index, use_gpu=True if you want to use GPU
save_index(index, "db.index")
index = load_index("db.index")

In [7]:
print(chat_logic("Máy bay có độ cao và sải cánh bao nhiêu?"))

The WindRunner has a height of 24 m and a wingspan of 80 m.
