In [1]:
from langchain_core.documents import Document as LangchainDocument
from langchain_community.document_loaders import UnstructuredMarkdownLoader
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_community.vectorstores.utils import DistanceStrategy
from ragatouille import RAGPretrainedModel
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from typing import List
from langchain.schema import Document as LangchainDocument
from langchain.text_splitter import RecursiveCharacterTextSplitter
from transformers import AutoTokenizer
from langchain.document_loaders import UnstructuredMarkdownLoader
from typing import List, Tuple
from utils import *  # type: ignore
import torch

  from .autonotebook import tqdm as notebook_tqdm


# Specify models

In [2]:
# Specify LLM
LLM_NAME = "meta-llama/Meta-Llama-3.1-8B-Instruct"

# Specify embeddings
EMBEDDING_MODEL = "BAAI/bge-large-en-v1.5"

# Specify reranker
RERANKER_MODEL = "colbert-ir/colbertv2.0"

# Load documents

In [3]:
def load_and_process_docs(data_file_path: str, chunk_size: int = 512) -> List[LangchainDocument]:
    """
    Load and process the documents.

    Args:
        data_file_path (str): The file path of the data file.
        chunk_size (int): The maximum size of each chunk in characters. Defaults to 512.

    Returns:
        List[LangchainDocument]: A list of processed documents.

    """

    def split_documents(
            knowledge_base: List[LangchainDocument],
            chunk_size: int,
        ) -> List[LangchainDocument]:
        """
        Split documents into chunks of maximum size `chunk_size` characters and return a list of documents.

        Args:
            knowledge_base (List[LangchainDocument]): The list of documents to be split.
            chunk_size (int): The maximum size of each chunk in characters.

        Returns:
            List[LangchainDocument]: A list of split documents.

        """
        
        text_splitter = RecursiveCharacterTextSplitter.from_huggingface_tokenizer(
            AutoTokenizer.from_pretrained(EMBEDDING_MODEL),
            chunk_size=chunk_size,
            chunk_overlap=int(chunk_size / 10) + 5,
            add_start_index=True,
            strip_whitespace=True,
        )

        docs_processed = text_splitter.split_documents(knowledge_base)

        # Remove duplicates
        unique_texts = set()
        docs_processed_unique = []
        for doc in docs_processed:
            if doc.page_content not in unique_texts:
                unique_texts.add(doc.page_content)
                docs_processed_unique.append(doc)

        return docs_processed_unique

    try:
        loader = UnstructuredMarkdownLoader(file_path=data_file_path)
        raw_knowledge_base = loader.load()
        processed_docs = split_documents(raw_knowledge_base, chunk_size)
        return processed_docs
    except Exception as e:
        # Handle or re-raise the exception as appropriate for your use case
        raise RuntimeError(f"Error processing documents: {str(e)}") from e

In [4]:
# Load and process documents
docs_processed = load_and_process_docs(
    data_file_path = "../data/pcse.md"
)

In [5]:
docs_processed

[Document(page_content='Department of Physics, Computer Science and Engineering\n\nDr. Anton Riedl, Chair, Luter Hall 313, (757) 594-7065, riedl@cnu.edu\n\nMission Statement Our Mission is to: - introduce all Christopher Newport University students to the richness of science and engineering as a human endeavor and to emphasize their importance and utility in our lives, - enhance awareness of the interaction between science and the other disciplines, - prepare our graduates to enter careers as competent scientists, engineers and educators, - give our students the lifelong learning and leadership skills that enable them to grow in their professions and advance to positions of leadership, and - be recognized widely as a group of individuals engaged in and contributing to our various communities\n\nThe Department of Physics, Computer Science and Engineering offers majors in the fields of engineering, science, and high technology. The physics major allows students to specialize in the desig

In [6]:
print_object_info(docs_processed)  # type: ignore

type_of_object: <class 'list'>
--------------------------------------------------
length_of_object: 35
--------------------------------------------------
is_mapping: False
--------------------------------------------------
is_iterable: True
--------------------------------------------------
keys_of_object: None
--------------------------------------------------
values_of_object: None
--------------------------------------------------
items_of_object: None
--------------------------------------------------
attributes_of_object:
--------------------------------------------------
type_of_first_element: <class 'langchain_core.documents.base.Document'>
--------------------------------------------------
length_of_first_element: None
--------------------------------------------------
attributes_of_first_element:
  _abc_impl: <_abc._abc_data object at 0x7fba3134e440>
  lc_attributes: {}
  lc_secrets: {}
  metadata: {'source': '../data/pcse.md', 'start_index': 0}
  page_content: #### TRUNCATED 

In [7]:
get_largest_tokens(docs_processed)  # type: ignore

Largest token count: 519


# Load vector store

In [8]:
def load_vectors() -> FAISS:
    """
    Load document vectors into a FAISS vector store.

    This method creates an embedding model using HuggingFaceBgeEmbeddings,
    then uses it to embed the processed documents and store them in a
    FAISS vector store.

    Returns:
        FAISS: A FAISS vector store containing the embedded documents.
    """
    embedding_model = HuggingFaceBgeEmbeddings(
        model_name=EMBEDDING_MODEL,
        model_kwargs={"device": "cuda"},
        encode_kwargs={"normalize_embeddings": True},
    )

    vector_store = FAISS.from_documents(
        documents=docs_processed,
        embedding=embedding_model,
        distance_strategy=DistanceStrategy.COSINE
    )

    return vector_store

In [9]:
# Load knowledge vector database
knowledge_vector_database = load_vectors()

In [10]:
print_object_info(knowledge_vector_database)  # type: ignore

type_of_object: <class 'langchain_community.vectorstores.faiss.FAISS'>
--------------------------------------------------
length_of_object: None
--------------------------------------------------
is_mapping: False
--------------------------------------------------
is_iterable: False
--------------------------------------------------
keys_of_object: None
--------------------------------------------------
values_of_object: None
--------------------------------------------------
items_of_object: None
--------------------------------------------------
attributes_of_object:
  _abc_impl: <_abc._abc_data object at 0x7fba30a1d440>
  _normalize_L2: False
  distance_strategy: COSINE
  docstore: <langchain_community.docstore.in_memory.InMemoryDocstore object at 0x7fb9e179ae00>
  embedding_function: client=SentenceTransformer(
  (0): Transformer({'max_seq_length': 512, 'do_lower_case': True}) with Transformer model: BertModel 
  (1): Pooling({'word_embedding_dimension': 1024, 'pooling_mode_cls_tok

# Load LLM

In [13]:
def load_llm():
    """
    Load and configure a language model for text generation.

    This method sets up a quantized language model using the Bits and Bytes configuration,
    loads the model and tokenizer from a pre-trained checkpoint, and configures a text
    generation pipeline with specific parameters.

    Returns:
        pipeline: A Hugging Face pipeline object for text generation.
    """
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
    )
    
    model = AutoModelForCausalLM.from_pretrained(
        pretrained_model_name_or_path=LLM_NAME, 
        quantization_config=bnb_config
    )
    
    tokenizer = AutoTokenizer.from_pretrained(
        pretrained_model_name_or_path=LLM_NAME
    )

    reader_llm = pipeline(
        model=model,
        tokenizer=tokenizer,
        task="text-generation",
        do_sample=True,
        temperature=0.15,
        top_p=0.95,
        repetition_penalty=1.2,
        return_full_text=False,
        max_new_tokens=750,
    )

    return reader_llm

In [14]:
# Load LLM and tokenizer
reader_llm = load_llm()

`low_cpu_mem_usage` was None, now set to True since model is quantized.


Loading checkpoint shards: 100%|██████████| 4/4 [00:04<00:00,  1.25s/it]


# Define prompt

In [16]:
def define_prompt(prompt_file_path: str) -> str:
    """
    Define and format the RAG prompt template.

    This method loads a system prompt from a file, combines it with a user prompt
    template, and formats the entire prompt using the model's chat template.

    Args:
        prompt_file_path (str): The file path to the system prompt.

    Returns:
        str: The formatted RAG prompt template.
    """

    def load_prompt_from_file(file_path: str) -> str:
        """
        Load prompt content from a file.

        Args:
            file_path (str): The path to the prompt file.

        Returns:
            str: The content of the prompt file.
        """
        with open(file_path, "r") as file:
            return file.read()

    system_content = load_prompt_from_file(prompt_file_path)

    prompt_in_chat_format = [
        {
            "role": "system",
            "content": system_content,
        },
        {
            "role": "user",
            "content": """Context: {context}

            ---

            Here is the question you need to answer.
            
            Question: {question}""",
        },
    ]

    tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=LLM_NAME)

    rag_prompt_template = tokenizer.apply_chat_template(
        prompt_in_chat_format, 
        tokenize=False, 
        add_generation_prompt=True
    )

    return rag_prompt_template

In [18]:
# Define RAG prompt template
rag_prompt_template = define_prompt("../data/prompt.txt")

In [21]:
import pprint

pprint.pprint(rag_prompt_template)


('<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n'
 '\n'
 'Cutting Knowledge Date: December 2023\n'
 'Today Date: 26 Jul 2024\n'
 '\n'
 '"""You are an academic advisor for Christopher Newport University; you '
 'answer questions ONLY regarding \n'
 'academics in a polite and professional tone. NEVER provide offensive or '
 'controversial responses!\n'
 'NEVER make assumptions about the user; you know nothing about them!\n'
 '\n'
 'Using the information contained in the context, give a comprehensive answer '
 'to the question.\n'
 'Respond only to the question asked. Your response should be relevant to the '
 'question and sufficiently detailed.\n'
 'If the answer cannot be deduced from the context, indicate that the '
 'information is not available in the context.\n'
 '\n'
 'Remember, YOU are the advisor, and ALWAYS use your reasoning to answer the '
 'question.\n'
 'Try your best to synthesize the information from ALL the documents to answer '
 'the question, using your 

# Load reranker

In [25]:
def load_reranker() -> RAGPretrainedModel:
    """
    Load a pre-trained reranker model.

    This method loads a pre-trained RAG (Retrieval-Augmented Generation) model
    to be used as a reranker in the retrieval process.

    Returns:
        RAGPretrainedModel: A pre-trained RAG model for reranking.
    """
    reranker = RAGPretrainedModel.from_pretrained(
        pretrained_model_name_or_path=RERANKER_MODEL
    )
    return reranker

In [26]:
# Load reranker
reranker = load_reranker()

  self.scaler = torch.cuda.amp.GradScaler()


# Instantiate RAG

In [27]:
def answer_with_rag(question: str,
                    use_reranker: bool = False,
                    num_retrieved_docs: int = 30,
                    num_docs_final: int = 10) -> Tuple[str, List[str]]:
    """
    Generate an answer using Retrieval-Augmented Generation (RAG).

    This method retrieves relevant documents based on the question, optionally
    reranks them, and then uses a language model to generate an answer.

    Args:
        question (str): The question to be answered.
        use_reranker (bool, optional): Whether to use document reranking. Defaults to False.
        num_retrieved_docs (int, optional): Number of documents to retrieve initially. Defaults to 30.
        num_docs_final (int, optional): Number of documents to use for answer generation. Defaults to 10.

    Returns:
        Tuple[str, List[str]]: A tuple containing the generated answer and the list of relevant documents.
    """
    
    relevant_docs = knowledge_vector_database.similarity_search(query=question, k=num_retrieved_docs)
    relevant_docs = [doc.page_content for doc in relevant_docs]
    
    if use_reranker:
        print("Reranking documents...")
        relevant_docs = reranker.rerank(question, relevant_docs, k=num_docs_final)
        relevant_docs = [doc["content"] for doc in relevant_docs]
    else:
        print("Skipping reranking...")
        relevant_docs = relevant_docs[:num_docs_final]
    
    context = "\nExtracted documents:\n" + "\n".join([f"Document {i}:::\n{doc}" for i, doc in enumerate(relevant_docs)])
    final_prompt = rag_prompt_template.format(question=question, context=context)
    
    answer = reader_llm(final_prompt)[0]["generated_text"]
    
    return answer, relevant_docs