# main.py

In [40]:
from pydantic import BaseModel, Field

In [41]:
%pwd

'C:\\Users\\Lenovo\\learning\\RAGwithLangchain\\src\\rag'

In [48]:
from file_loader import Loader
from vectorstore import VectorDB
from offline_rag import Offline_RAG

  embedding: HuggingFaceEmbeddings = HuggingFaceEmbeddings(),
  embedding: HuggingFaceEmbeddings = HuggingFaceEmbeddings(),
  from .autonotebook import tqdm as notebook_tqdm


In [49]:
class InputQA(BaseModel):
    question: str = Field(..., title = 'Question to ask the model')

In [50]:
class OutputQA(BaseModel):
    answer: str = Field(..., title = 'Answer from the model')

In [5]:
def build_rag_chain(llm, data_dir, data_type):
    doc_loaded = Loader(file_type = data_type).load_dir(data_dir, workers=2)
    retriever = VectorDB(documents = doc_loaded).get_retriever()
    rag_chain = Offline_RAG(llm).get_chain(retriever)

    return rag_chain

# offline_rag.py

In [34]:
import re

In [35]:
from langchain import hub
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

In [36]:
class Str_OutputParser(StrOutputParser):
    def __inint(self) -> None:
        super().__init__()

    def parse(self, text: str) -> str:
        return self.extract_answer(text)

    def extract_answer(self, text_response: str, pattern: str = r'Answer:\s*(.*)') -> str:
        match = re.search(pattern, text_response, re.DOTALL)

        if match:
            answer_text = match.group(1).strip()
            return answer_text
        else:
            return text_response

In [37]:
# Assuming the typos are fixed in the implementation:
parser = Str_OutputParser()

# Input text with an "Answer:" label
input_text = "Question: What is the capital of France?\nAnswer: Paris"

# Call the parser
parsed_output = parser.parse(input_text)
print(parsed_output)  # Expected output: "Paris"

Paris


In [38]:
class Offline_RAG:
    def __init__(self, llm):
        self.llm = llm
        self.prompt = hub.pull("rlm/rag-prompt")
        self.str_parser = Str_OutputParser()

    def get_chain(self, retriever):
        input_data = {
            "context": retriever | self.format_docs,
            "question": RunnablePassthrough()
        }
        rag_chain = (
            input_data
            | self.prompt
            | self.llm
            | self.str_parser
        )
        return rag_chain

    def format_docs(self, docs):
        return "\n\n".join(doc.page_content for doc in docs)

# vectorstore.py

In [5]:
from typing import Union

from langchain_chroma import Chroma
from langchain_community.vectorstores import FAISS

from langchain_community.embeddings import HuggingFaceEmbeddings

In [7]:
class VectorDB:
    def __init__(self,
                 documents: Union[None, list] = None,
                 vector_db: Union[Chroma, FAISS] = Chroma,
                 embedding: HuggingFaceEmbeddings = HuggingFaceEmbeddings(),
                 ) -> None:
        self.vector_db = vector_db
        self.embedding = embedding
        self.db = self._build_db(documents)

    def _build_db(self, documents):
        db = self.vector_db.from_documents(documents=documents,
                                           embedding=self.embedding)
        return db

    def get_retriever(self,
                      search_type: str = "similarity",
                      search_kwargs: dict = {"k": 10}
                     ):
        retriever = self.db.as_retriever(search_type=search_type,
                                        search_kwargs=search_kwargs)
        return retriever

  embedding: HuggingFaceEmbeddings = HuggingFaceEmbeddings(),
  from .autonotebook import tqdm as notebook_tqdm
To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


# utils.py

In [22]:
import re

def extract_answer(text_response: str, pattern: str = r'Answer:\s*(.*)') -> str:
    match = re.search(pattern, text_response)

    if match:
        answer_text = match.group(1).strip()
    else:
        return "Answer not found"

In [23]:
test_cases = [
    "This is some text. Answer: This is the answer.",
    "Another text without an answer field.",
    "Multiple lines here.\nAnswer: Found this on another line.",
    "Answer:    Extra spaces around the answer   ",
    "Answer:42",  # No spaces after 'Answer:'
]

# Run the test
for i, test_text in enumerate(test_cases, 1):
    print(f"Test Case {i}:")
    print(f"Input: {test_text}")
    print(f"Output: {extract_answer(test_text)}")
    print("-" * 40)

Test Case 1:
Input: This is some text. Answer: This is the answer.
Output: None
----------------------------------------
Test Case 2:
Input: Another text without an answer field.
Output: Answer not found
----------------------------------------
Test Case 3:
Input: Multiple lines here.
Answer: Found this on another line.
Output: None
----------------------------------------
Test Case 4:
Input: Answer:    Extra spaces around the answer   
Output: None
----------------------------------------
Test Case 5:
Input: Answer:42
Output: None
----------------------------------------


# file_loader.py

In [6]:
from typing import Union, List, Literal
import glob
from tqdm import tqdm
import multiprocessing
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

In [7]:
def remove_non_utf8_characters(text):
    return ''.join(char for char in text if ord(char) < 128)

def load_pdf(pdf_file):
    docs = PyPDFLoader(pdf_file, extract_images=True).load()
    for doc in docs:
        doc.page_content = remove_non_utf8_characters(doc.page_content)
    return docs

def get_num_cpu():
    return multiprocessing.cpu_count()

In [8]:
get_num_cpu()

16

In [14]:
%pwd

'C:\\Users\\Lenovo\\learning\\RAGwithLangchain\\src\\rag'

In [15]:
glob.glob(f"../../data_source/generative_ai/*.pdf")

['../../data_source/generative_ai\\Attention Is All You Need.pdf',
 '../../data_source/generative_ai\\BERT_ Pre-training of Deep Bidirectional Transformers for Language Understanding.pdf',
 '../../data_source/generative_ai\\Chain-of-Thought Prompting Elicits Reasoning in Large Language Models.pdf',
 '../../data_source/generative_ai\\Denoising Diffusion Probabilistic Models.pdf',
 '../../data_source/generative_ai\\Instruction Tuning for Large Language Models_ A Survey.pdf',
 '../../data_source/generative_ai\\Llama 2_ Open Foundation and Fine-Tuned Chat Models.pdf']

In [18]:
load_pdf('../../data_source/generative_ai/Attention Is All You Need.pdf')

[Document(metadata={'source': '../../data_source/generative_ai/Attention Is All You Need.pdf', 'page': 0}, page_content='Provided proper attribution is provided, Google hereby grants permission to\nreproduce the tables and figures in this paper solely for use in journalistic or\nscholarly works.\nAttention Is All You Need\nAshish Vaswani\nGoogle Brain\navaswani@google.com\nNoam Shazeer\nGoogle Brain\nnoam@google.com\nNiki Parmar\nGoogle Research\nnikip@google.com\nJakob Uszkoreit\nGoogle Research\nusz@google.com\nLlion Jones\nGoogle Research\nllion@google.com\nAidan N. Gomez \nUniversity of Toronto\naidan@cs.toronto.edu\nukasz Kaiser\nGoogle Brain\nlukaszkaiser@google.com\nIllia Polosukhin \nillia.polosukhin@gmail.com\nAbstract\nThe dominant sequence transduction models are based on complex recurrent or\nconvolutional neural networks that include an encoder and a decoder. The best\nperforming models also connect the encoder and decoder through an attention\nmechanism. We propose a new 

In [11]:
class BaseLoader:
    def __init__(self) -> None:
        self.num_processes = get_num_cpu()

    def __call__(self, files: List[str], **kwargs):
        pass

class PDFLoader(BaseLoader):
    def __init__(self) -> None:
        super().__init__()

    def __call__(self, pdf_files: List[str], **kwargs):
        num_processes = min(self.num_processes, kwargs["workers"])
        with multiprocessing.Pool(processes=num_processes) as pool:
            doc_loaded = []
            total_files = len(pdf_files)
            with tqdm(total=total_files, desc="Loading PDFs", unit="file") as pbar:
                for result in pool.imap_unordered(load_pdf, pdf_files):
                    doc_loaded.extend(result)
                    pbar.update(1)
        return doc_loaded

In [12]:
class TextSplitter:
    def __init__(self,
                 separators: List[str] = ['\n\n', '\n', '', '.'],
                 chunk_size: int = 300,
                 chunk_overlap: int = 0
                 ) -> None:
        self.splitter = RecursiveCharacterTextSplitter(
            separators=separators,
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
        )

    def __call__(self, documents):
        return self.splitter.split_documents(documents)

In [13]:
class Loader:
    def __init__(self,
                 file_type: str = Literal["pdf"],
                 split_kwargs: dict = {
                     "chunk_size": 300,
                     "chunk_overlap": 0
                 }
                 ) -> None:
        assert file_type in ["pdf"], "file_type must be pdf"
        self.file_type = file_type
        if self.file_type == "pdf":
            self.doc_loader = PDFLoader()
        else:
            raise ValueError("file_type must be pdf")
        self.doc_splitter = TextSplitter(**split_kwargs)

    def load(self, pdf_files: Union[str, List[str]], workers: int = 1):
        if isinstance(pdf_files, str):
            pdf_files = [pdf_files]
        doc_loaded = self.doc_loader(pdf_files, workers=workers)
        doc_split = self.doc_splitter(doc_loaded)
        return doc_split

    def load_dir(self, dir_path: str, workers: int = 1):
        if self.file_type == "pdf":
            files = glob.glob(f"{dir_path}/*.pdf")
            assert len(files) > 0, f"No {self.file_type} files found in {dir_path}"
        else:
            raise ValueError("file_type must be pdf")
        return self.load(files, workers=workers)