In [None]:
import os
from dataclasses import dataclass

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import TextLoader, PyPDFLoader
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from unsloth import FastLanguageModel

In [None]:
from typing import Union, List
from langchain_core.documents import Document

def load_documents(documents: Union[List[str], str]) -> List[Document]:
    if isinstance(documents, str):
        ext = {"txt" : TextLoader, "pdf" : PyPDFLoader}
        extension = documents.split(".")[-1]

        if extension not in ext:
            raise ValueError(f"Unsupported extension: .{documents.split()[-1]}. Supported document extensions are {list(ext.keys())}")
        
        loader = ext[extension](documents)
        return loader.load()

    if isinstance(documents, list):
        loaded = list()
        for item in documents:
            assert isinstance(item, str), "Expected document paths to be of type str"
            loaded.extend(load_documents(item))
        return loaded

    raise ValueError(f"Unknown type of documents: {type(documents)}. Supported types are str and List[str]")

In [None]:
def download_architecture_book():
    filename = os.path.join("data", "arch.pdf")
    
    # Download nothing if file already exists
    if os.path.exists(filename):
        print(f"File {filename} already exists. Downloading nothing!")
        return

    # Download file because it does not exist
    import subprocess
    downloaded_name = "Computer Architecture A Quantitative Approach (5th edition).pdf"
    if not os.path.exists(downloaded_name):
        url = "https://acs.pub.ro/~cpop/SMPA/Computer%20Architecture%20A%20Quantitative%20Approach%20(5th%20edition).pdf"
        subprocess.run(["wget", "--no-check-certificate", url])
    os.rename(downloaded_name, filename)

download_architecture_book()

In [None]:
# Load model
load_path = os.path.join(os.getcwd(), "outputs", "llama-3.2-3b-instruct")
model, tokenizer = FastLanguageModel.from_pretrained(load_path)
print(f"Model loaded from {load_path}")

In [None]:
@dataclass
class RaggenQAConfig:
    k: int = 5
    embedding_model_name: str = "sentence-transformers/all-MiniLM-L6-v2"

class RaggenQA:
    def __init__(self, model, tokenizer, documents, config: RaggenQAConfig):
        self.config = config
        self.vector_store = self._create_vector_store(documents)

        self.model = model
        self.tokenizer = tokenizer

    def _create_vector_store(self, documents):
        documents = load_documents(documents)

        # Split documents into chunks
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
        )
        texts = text_splitter.split_documents(documents)

        # Create embeddings
        embeddings = HuggingFaceEmbeddings(model_name=self.config.embedding_model_name)

        # Create vector store
        vector_store = FAISS.from_documents(texts, embeddings)
        return vector_store

    def _skip_input_prompt(self, input_tokens, output_tokens):
        input_squeezed = input_tokens.squeeze()
        output_squeezed = output_tokens.squeeze()
        assert len(input_squeezed) < len(output_squeezed)
        return output_squeezed[len(input_squeezed) :]
    
    def ask(self, question):
        model = FastLanguageModel.for_inference(self.model)

        SYSTEM_PROMPT = """
            You are a helpful QA assistant, answering user questions based on pieces of context from documents.
            Use the following pieces of context to answer the question at the end.
            If you don't know the answer, just say that you don't know, don't try to make up an answer.
        """

        for document in self.vector_store.similarity_search(question, k=self.config.k):
            SYSTEM_PROMPT += f"\n{document.page_content}\n"

        USER_PROMPT = question

        messages = [{"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": USER_PROMPT}]

        inputs = tokenizer.apply_chat_template(
            messages,
            tokenize=True,
            add_generation_prompt=True,  # Must add for generation
            return_tensors="pt",
        ).to("cuda")

        out = model.generate(
            input_ids=inputs, max_new_tokens=1024, use_cache=True, temperature=0.3, min_p=0.1, do_sample=True
        )

        decoded = tokenizer.decode(self._skip_input_prompt(inputs, out).cpu().numpy(), skip_special_tokens=True)

        return decoded

In [None]:
raggen_qa = RaggenQA(model, tokenizer, os.path.join("data", "raggen_1000.txt"), RaggenQAConfig())

In [None]:
question = "What features can be used for a good cache eviction policy?"
answer = raggen_qa.ask(question)
print(answer)

In [None]:
@dataclass
class RaggenQAConfigSC:
    k: int = 3
    embedding_model_name: str = "sentence-transformers/all-MiniLM-L6-v2"
    self_consistency_k: int = 3
    self_consistency_temperature: float = 0.3

class RaggenQASC:
    def __init__(self, model, tokenizer, document, config: RaggenQAConfig):
        self.config = config
        self.vector_store = self._create_vector_store(document)

        self.model = model
        self.tokenizer = tokenizer

    def _create_vector_store(self, document):
        # Load documents
        loader = TextLoader(document)
        documents = loader.load()

        # Split documents into chunks
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
        )
        texts = text_splitter.split_documents(documents)

        # Create embeddings
        embeddings = HuggingFaceEmbeddings(model_name=self.config.embedding_model_name)

        # Create vector store
        vector_store = FAISS.from_documents(texts, embeddings)
        return vector_store

    def _skip_input_prompt(self, input_tokens, output_tokens):
        input_squeezed = input_tokens.squeeze()
        output_squeezed = output_tokens.squeeze()
        assert len(input_squeezed) < len(output_squeezed)
        return output_squeezed[len(input_squeezed) :]
    
    def ask(self, question):
        model = FastLanguageModel.for_inference(self.model)

        answers = list()

        # Self-consistency
        for _ in range(self.config.self_consistency_k):

            SYSTEM_PROMPT = """
                You are a helpful QA assistant, answering user questions based on pieces of context from documents.
                Use the following pieces of context to answer the question at the end.
                If you don't know the answer, just say that you don't know, don't try to make up an answer.
            """

            for document in self.vector_store.similarity_search(question, k=self.config.k):
                SYSTEM_PROMPT += f"\n{document.page_content}\n"

            USER_PROMPT = question

            messages = [{"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": USER_PROMPT}]

            inputs = tokenizer.apply_chat_template(
                messages,
                tokenize=True,
                add_generation_prompt=True,  # Must add for generation
                return_tensors="pt",
            ).to("cuda")

            out = model.generate(
                input_ids=inputs, max_new_tokens=1024, use_cache=True, temperature=self.config.self_consistency_temperature, min_p=0.1, do_sample=True
            )

            decoded = tokenizer.decode(self._skip_input_prompt(inputs, out).cpu().numpy(), skip_special_tokens=True)

            answers.append(decoded)

        # Majority vote
        SYSTEM_PROMPT = """
            You are a helpful QA assistant using self-consistency, answering user questions based on pieces of context from documents.
            Use the following pieces of context to answer the question at the end.
            If you don't know the answer, just say that you don't know, don't try to make up an answer.
        """

        for document in self.vector_store.similarity_search(question, k=self.config.k):
            SYSTEM_PROMPT += f"\n{document.page_content}\n"

        SYSTEM_PROMPT += f"Your previous {self.config.self_consistency_k} answers were:\n\n"
        SYSTEM_PROMPT += "\n\n".join(answers)

        USER_PROMPT = f"Based on your previous {self.config.self_consistency_k} answers, what is your final answer to the original question: {question}?"

        messages = [{"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": USER_PROMPT}]

        inputs = tokenizer.apply_chat_template(
            messages,
            tokenize=True,
            add_generation_prompt=True,  # Must add for generation
            return_tensors="pt",
        ).to("cuda")

        out = model.generate(
            input_ids=inputs, max_new_tokens=1024, use_cache=True, temperature=0.2, min_p=0.1, do_sample=True
        )

        decoded = tokenizer.decode(self._skip_input_prompt(inputs, out).cpu().numpy(), skip_special_tokens=True)

        return decoded

In [None]:
raggen_qa = RaggenQASC(model, tokenizer, os.path.join("data", "raggen_1000.txt"), RaggenQAConfigSC())

In [None]:
question = "What line did LRU evict when the PC was 0x413a4b?"
answer = raggen_qa.ask(question)
print(answer)