In [2]:
# from posix import stat
# import logfire
from openai import AsyncOpenAI
from typing import List,Tuple
import numpy as np
import faiss
import os
import logfire
# from app.config import settings

class RAGEngine:
  def __init__(self):
    self.client = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])
    self.documents:List[dict] = []
    self.embedding_dim = 1536
    self.index = faiss.IndexFlatIP(self.embedding_dim)
    self.chunk_size = 500
    self.chunk_overlap = 50
    logfire.info("RAG Engine Initialised with FAISS")

  #step 1 index and process the documents

  def _split_text(self,text:str)->List[str]:
    chunks = []
    start = 0
    while start <len(text):
      end = start + self.chunk_size
      chunk = text[start:end]
      if end < len(text):
        last_period = chunk.rfind(".")
        if last_period > self.chunk_size//2:
          chunk = chunk[:last_period+1]
          end = start+last_period+1
      chunks.append(chunk.strip())
      start = end-self.chunk_overlap
    return [c for c in chunks if c]

  async def _get_embeddings(self,text:str)->np.array:
    with logfire.span("get_embeddings"):
      response = await self.client.embeddings.create(
        input=text,
        model="text-embedding-3-small")
      embeddings = np.array(response.data[0].embedding,dtype=np.float32)
      return embeddings
  async def add_documents(self,text:str,filename:str)->int:
    with logfire.span("add_document",filename=filename):
      chunks = self._split_text(text)
      logfire.info(f"Split the document into {len(chunks)} chunks")
      embedddings_list =[]
      for i,chunk in enumerate(chunks):
        embedding = await self._get_embeddings(chunk)
        embedddings_list.append(embedding)
        self.documents.append({
            "text":chunk,
            "filename":filename,
            "chunk_index":i,
        }

        )

      embedddings_matrix = np.vstack(embedddings_list).astype(np.float32)
      self.index.add(embedddings_matrix)
      logfire.info(f"Added {len(chunks)} chunks to FAISS VB")
      return len(chunks)

  async def _retrive(self,question:str,top_k:int=3)->List[dict]:
    if self.index.ntotal==0:
      return []
    with logfire.span("retrival of the chunks ",question=question):
      question_embedding =await self._get_embeddings(question)
      query_vector = question_embedding.reshape(-1,1)
      k = min(top_k,self.index.ntotal)
      distances,indices = self.index.search(query_vector,k)
      logfire.info("FAISS Search completed")
      top_chunks = []
      for i,idx in enumerate(indices[0]) :
        if idx<len(self.documents) and idx >=0:
          chunk = self.documents[idx].copy()
          chunk['similarity_score'] = float(distances[0][i])
          top_chunks.append(chunk)
      logfire.info("retivesd")
      return top_chunks

  async def ask(self,question:str)->Tuple[str,List[str]]:
    with logfire.span("rag_ask",question=question):
      relevant_chunks = await self._retrive(question)
      if not relevant_chunks:
        return (

            "dont have relevant chunks"
            "please upload another document",
            []

        )
    context = "\n\n".join([chunk for chunk in relevant_chunks])

    system_propmt = """you are a helpful assistant that answer question based in the provided contextt,
    Rules:
    1.only use information form the provided context
    2.If the context doenst contain the answer say so
    3.Be consise and accurate
    4.cite which document the information came from"""
    user_prompt =f""" Context:{context} Question:{question}
    please answer the question based on the context above"""
    logfire.info("Generating the answer with openai")
    response = await self.client.chat.completions.create(
        model="gpt-4o-min",
        messages=[
            {"role":"system","content":system_propmt},
            {"role":"user","content":user_prompt}
        ]
    )
    answer = response.choices[0].message.content
    sources = list(set(chunk["filename"] for chunk in relevant_chunks))
    logfire.info("Answer generated")
    return answer,sources
  def list_documents(self)-> List[dict]:
    seen = set()
    docs = []
    for doc in self.documents:
      if doc["filename"] not in seen:
        seen.add(doc["filename"])
        chunk_count = sum(1 for chunk in self.documents if chunk["filename"]==doc["filename"])
        doc.append(
            {
                "chunk_count":chunk_count,
                "file_name":doc["filename"]
            }
        )
      return docs


In [8]:
ragengine = RAGEngine()
from langchain.document_loaders import PDFPlumberLoader

# loader = PDFPlumberLoader("../docs/Multimodal Retrieval.pdf")
loader = PDFPlumberLoader("../docs/basic-text.pdf")
documents = loader.load()

In [14]:
from langchain.text_splitter import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=2000,
    chunk_overlap=100
)

chunks = splitter.split_documents(documents)
print(f"Total Chunks: {len(chunks)}")


Total Chunks: 1


In [18]:
ragengine.add_documents(chunks,"basic-text.pdf")
ans, _ =ragengine.ask("What is document about?")

  ragengine.add_documents(chunks,"basic-text.pdf")
  ans, _ =ragengine.ask("What is document about?")


TypeError: cannot unpack non-iterable coroutine object