Zohaib Khan - 3740572

In [None]:
!pip install faiss-cpu langchain langchain-community sentence-transformers rank_bm25 pypdf

In [None]:
from langchain_community.document_loaders import PyPDFLoader, PyMuPDFLoader
from langchain.schema import Document
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import TextLoader
from rank_bm25 import BM25Okapi
import numpy as np

from transformers import AutoModelForCausalLM, AutoTokenizer
from transformers import pipeline

import textwrap

Chunking

In [None]:
# Load and split PDF
doc_path = "https://homel.vsb.cz/~fai0013/Kniha_Algoritmy.pdf"
loader = PyPDFLoader(doc_path)
pages = loader.load()
print(len(pages))

# Chunking text
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
chunks = text_splitter.split_documents(pages)

# Prepare documents and their metadata
texts = [chunk.page_content for chunk in chunks]
metadata = [chunk.metadata for chunk in chunks]
print(len(texts))
print(chunks[10])

In [None]:
# Initialize embedding model
embedding_model = HuggingFaceEmbeddings(model_name="BAAI/bge-small-en")

# Create FAISS vector database
vectordb = FAISS.from_documents(pages, embedding_model)

# Save FAISS index to disk for later use
vectordb.save_local("faiss_index")

# Check the number of stored documents
print(f"Number of documents in the vector store: {vectordb.index.ntotal}")

In [None]:
# BM25 Indexing
tokenized_texts = [text.split() for text in texts]
bm25 = BM25Okapi(tokenized_texts)

def reciprocal_rank_fusion(results_bm25, results_embedding, k=2):
    scores = {}

    # Use document content or metadata as the key
    for rank, (doc, score) in enumerate(results_bm25):
        doc_id = doc.page_content  # Or use doc.metadata.get("source", "unknown") if available
        scores[doc_id] = scores.get(doc_id, 0) + 1 / (rank+1) # (k + rank + 1)
        print("BM25", scores[doc_id])

    for rank, (doc, score) in enumerate(results_embedding):
        doc_id = doc.page_content  # Use the same identifier
        scores[doc_id] = scores.get(doc_id, 0) + 1 / (rank+1) # (k + rank + 1)
        print("Dense", scores[doc_id])

    return sorted(scores.items(), key=lambda x: x[1], reverse=True)


# Extract page content and metadata properly
def format_response(doc):
    return f"Page {doc.metadata.get('page', 'Unknown')}: {doc.page_content.strip()}"

In [None]:
# Retrieve function
def retrieve(query, k=3):
    query_embedding = embedding_model.embed_query(query)
    results_embedding = vectordb.similarity_search_with_score_by_vector(query_embedding, k=k)
    results_embedding = sorted(results_embedding, key=lambda x: x[1], reverse=True)

    print("============Dense Embeddings=============")
    for doc, score in results_embedding:
        print(f"page {doc.metadata.get('page','Unknown')} - Score: {score:.4f} - {doc.page_content[:100]}...")

    # Get BM25 scores for all documents and sort to get top-k results
    results_bm25 = [(idx, bm25.get_scores(query.split())[idx]) for idx in range(len(texts))]
    results_bm25 = sorted(results_bm25, key=lambda x: x[1], reverse=True)[:k]  # Keep only top-k results
    # Convert BM25 results to (Document, score) format
    results_bm25_docs = [(Document(page_content=texts[idx], metadata=metadata[idx]), score) for idx, score in results_bm25]

    print("************BM25 Results*************")
    for doc, score in results_bm25_docs:
        print(f"page {doc.metadata.get('page','Unknown')} - Score: {score:.4f} - {doc.page_content[:100]}...")

    # Create a lookup dictionary {document content -> Document object}
    doc_lookup = {doc.page_content: doc for doc, _ in results_bm25_docs}
    doc_lookup.update({doc.page_content: doc for doc, _ in results_embedding})

    # Fuse results
    fused_results = reciprocal_rank_fusion(results_bm25_docs, results_embedding)

    # Format results, ensuring document IDs are mapped back to actual Documents
    return [format_response(doc_lookup[doc_id]) for doc_id, _ in fused_results if doc_id in doc_lookup]

    #fused_results = reciprocal_rank_fusion(results_bm25, results_embedding)
    #return [(texts[idx], metadata[idx]["page"] if "page" in metadata[idx] else "Unknown") for idx, _ in fused_results]


In [None]:
def rag_pipeline(model_name, question, ground_truth):
  model = AutoModelForCausalLM.from_pretrained(
    model_name,
    device_map="auto", #device_map='cuda'
    torch_dtype="auto",
    trust_remote_code=True,
  )
  tokenizer = AutoTokenizer.from_pretrained(model_name)


  print("Model Information")
  print(model.dtype)
  total_params = sum(p.numel() for p in model.parameters())
  print(f"Total Parameters: {total_params / 1e6} million")
  memory_footprint = total_params * 2 / (1024 ** 2)  # Convert to MB
  print(f"Estimated Memory Footprint: {memory_footprint:.2f} MB\n")

  # Create a pipeline
  generator = pipeline(
  "text-generation",
  model=model,
  tokenizer=tokenizer,
  return_full_text=False,
  max_new_tokens=5000,
  # do_sample=False
  )

  retrieved_responses = retrieve(question, k=3)
  semantic_response = ""
  for i in range(0,len(retrieved_responses)):
    semantic_response = semantic_response + (retrieved_responses[i])
    semantic_response = semantic_response + "-------"

  print("Now performing LLM Search")

  # Construct the RAG prompt
  prompt = f"""
  You are an AI assistant tasked with answering questions based on retrieved knowledge.

  ### **Retrieved Information**:
  1. {retrieved_responses[0]}

  2. {retrieved_responses[1]}

  3. {retrieved_responses[2]}

  ### **Question**:
  {question}

  ### **Instructions**:
  - Integrate the key points from all retrieved responses into a **cohesive, well-structured answer**.
  - If the responses are **contradictory**, mention the different perspectives.
  - If none of the retrieved responses contain relevant information, reply:
  **"I couldn't find a good response to your query in the database."**
  """

  # Generate response using LLM
  messages = [{"role": "user", "content": prompt}]
  output = generator(messages)
  llm_response =  textwrap.fill(output[0]["generated_text"], width=80)
  print("Responses with semantic search:\n{}\nResponses with LLM use:\n{}".format(semantic_response, llm_response))

  #EVALATUATION
  print("\n\nEVALUATION\n")
    # üîç Retrieval Evaluation Prompt
  eval_prompt = f"""
  You are an expert evaluator.

  ### Task:
  Assess the quality of the retrieved information used to answer the question.

  ### Question:
  {question}

  ### Retrieved Context:
  1. {retrieved_responses[0]}
  2. {retrieved_responses[1]}
  3. {retrieved_responses[2]}

  ### AI-Generated Answer:
  {llm_response}

  ---

  PART 1: Relevance of Retrieval
  - Assess whether each chunk is relevant to the question.
  - For each chunk, state:
  - Relevance (Yes/No)
  - Reason

  PART 2: Faithfulness of Retrieval
  - Break the generated answer into **distinct factual claims**.
  - For each claim:
  - Claim text
  - Is it supported by retrieved content? (Yes/No)
  - Which chunk(s) support it (if any)

  - Then calculate:
  Faithfulness Score = (Number of Supported Claims) / (Total Claims)

  PART 3: LLM Response
  Please rate the generated answer on a scale of 1 to 5 for each of the following:

  - Correctness: Is it factually accurate compared to the ground truth?
  - Relevance: Does it focus on the core points?
  - Coherence: Is it logically and clearly written?
  - Completeness: Does it match the full scope of the ground truth?
  - Faithfulness: Does the answer stay grounded in the retrieved context and avoid hallucinations?

  ### Format:
  PART 1: Relevance of Chunks
  - Chunk 1: Relevant: <Yes/No> ‚Äì <reason>
  - Chunk 2: Relevant: <Yes/No> ‚Äì <reason>
  - Chunk 3: Relevant: <Yes/No> ‚Äì <reason>

  PART 2: Faithfulness
  - Claim 1: "...", Supported: Yes, Source: Chunk 2
  - Claim 2: "...", Supported: No
  ...
  Faithfulness Score: X/Y = Z.ZZ

  PART 3: LLM Response
  -Correctness: <score>/5 - <comment>
  -Relevance: <score>/5 - <comment>
  -Coherence: <score>/5 - <comment>
  -Completeness: <score>/5 - <comment>
  -Faithfulness: <score>/5 - <comment>

  """

  eval = generator([{"role": "user", "content": eval_prompt}])
  print("\n Retrieval Evaluation:")
  print("="*35)
  print(eval[0]["generated_text"])
  print("="*35)

  return semantic_response, llm_response

In [None]:
model_names = [
  "Qwen/Qwen2.5-3B-Instruct",
  "microsoft/phi-2",
  "meta-llama/Llama-3.2-3B-Instruct"

]

questions = [
    "What is the Master Theorem?",
    "Explain what is meant by Divide and Conquer algorithms.",
    "What is the Knapsack Problem?"
]

ground_truths = [
       "Master Theorem: If f(n) ‚àà Œò(n^d) where d ‚â• 0 in recurrence (5.1), then\n"
    "T(n) ‚àà\n"
    "    Œò(n^d)         if a < b^d,\n"
    "    Œò(n^d log n)   if a = b^d,\n"
    "    Œò(n^log_b a)   if a > b^d.\n"
    "Analogous results hold for the O and Œ© notations, too.",

    "Divide-and-conquer is a general algorithm design technique that solves a problem by dividing it into several smaller subproblems of the same type (ideally, of about equal size), solving each of them recursively, and then combining their solutions to get a solution to the original problem. Many efficient algorithms are based on this technique, although it can be both inapplicable and inferior to simpler algorithmic solutions.",

    "The knapsack problem can be posed as follows. Given a knapsack of capacity W and n items of weights w1,...,wn and values v1,...,vn, find the most valuable subset of the items that fits into the knapsack.",
]

In [None]:
rag_pipeline(model_names[0], questions[0], ground_truths[0] )

In [None]:
rag_pipeline(model_names[0], questions[1], ground_truths[1] )

In [None]:
rag_pipeline(model_names[0], questions[2], ground_truths[2] )

In [None]:
rag_pipeline(model_names[1], questions[0], ground_truths[0] )

In [None]:
rag_pipeline(model_names[1], questions[1], ground_truths[1] )

In [None]:
rag_pipeline(model_names[1], questions[2], ground_truths[2] )

In [None]:
rag_pipeline(model_names[2], questions[0], ground_truths[0] )

In [None]:
rag_pipeline(model_names[2], questions[1], ground_truths[1] )

In [None]:
rag_pipeline(model_names[2], questions[2], ground_truths[2] )