<a href="https://colab.research.google.com/github/roya90/DocuQuery/blob/main/Building_a_PDF_Question_Answering_System_with_Retrieval_Augmented_Generation_(RAG)_in_Python.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install required libraries
!pip install -q PyMuPDF  # PyMuPDF (FAISS dependencies are usually pre-installed in Colab)
!pip install -q sentence-transformers faiss-cpu google-generativeai tqdm python-dotenv
!pip install -q --upgrade google-cloud-aiplatform

# Imports
import os
import sys
import fitz  # PyMuPDF
from sentence_transformers import SentenceTransformer
import numpy as np
import faiss
from tqdm.notebook import tqdm  # Use notebook version for Colab
import spacy
import re
from transformers import AutoTokenizer
from google import genai
from google.genai import types
from google.colab import files, output  # For file uploads and output control
import vertexai
from dotenv import load_dotenv

print("Libraries installed and imported successfully!")

Libraries installed and imported successfully!


# Download Spacy Model

In [None]:
# Download the spacy model
!python -m spacy download en_core_web_sm

print("spaCy model downloaded successfully!")

Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.
spaCy model downloaded successfully!


# Authenticate to Google Cloud Generative AI

In [None]:
!pip install --upgrade google-genai
!gcloud auth application-default login

Go to the following link in your browser, and complete the sign-in prompts:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com&redirect_uri=https%3A%2F%2Fsdk.cloud.google.com%2Fapplicationdefaultauthcode.html&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fsqlservice.login&state=WJDvtqThI3qJvbp7HQlpJE6OwdTzFb&prompt=consent&token_usage=remote&access_type=offline&code_challenge=l4vE_ACMI3hUn1-MtVKF2Jtlf8_CBc9yArs3CsaGzqI&code_challenge_method=S256

Once finished, enter the verification code provided in your browser: 4/0AQSTgQExG7Sl_vDZq2b5iWg8QY338TxSyDBIXtQ42FTG-05Yspd6ZcGSu327j-41gQmSdw

Credentials saved to file: [/content/.config/application_default_credentials.json]

These credentials will be used by any library that requests Application Default Credentials (ADC).
Ca

# Define Utility Functions (Text Extraction, Chunking, Embedding)

In [None]:
# --- PDF Text Extraction ---
def extract_text_from_pdf(pdf_path):
    try:
        pdf_path = Path(pdf_path)
        if not pdf_path.is_file() or not pdf_path.suffix.lower() == '.pdf':
            raise ValueError("The provided file is not a valid PDF.")

        text = ""
        with fitz.open(pdf_path) as pdf_document:
            for page_num in range(len(pdf_document)):
                text += pdf_document[page_num].get_text()
        return text

    except FileNotFoundError:
        print("The specified PDF file was not found.")
        return None
    except fitz.FileDataError:
        print("The PDF file is corrupted or unreadable.")
        return None
    except Exception as e:
        print(f"An unexpected error occurred: {e}", exc_info=True)
        return None


# --- Text Chunking ---

try:
    SPACY_NLP = spacy.load("en_core_web_sm")
    TOKENIZER = AutoTokenizer.from_pretrained("bert-base-uncased")
except Exception as e:
    raise RuntimeError(f"Failed to load models: {e}")

def is_meaningful(sentence, threshold=5):
    sentence = sentence.strip()
    if len(sentence) < threshold:
        return False
    if re.fullmatch(r"[\W\d_]+", sentence):
        return False
    return True

def validate_text_input(text, max_length=1_000_000):
    if not isinstance(text, str):
        raise ValueError("Input text must be a string.")
    if len(text) > max_length:
        raise ValueError("Input text is too large to process.")
    return text.strip()

def smart_chunk_spacy_by_paragraph(text):
    text = validate_text_input(text)
    paragraphs = [para.strip() for para in text.split("\n") if is_meaningful(para)]
    return paragraphs

def smart_chunk_spacy(text):
    text = validate_text_input(text)
    doc = SPACY_NLP(text)
    sentences = [sent.text for sent in doc.sents if is_meaningful(sent.text)]
    return sentences

def smart_chunk_spacy_advanced(text, min_chunk_length=50, max_chunk_length=500):
    text = validate_text_input(text)
    raw_paragraphs = re.sub(r"\n{2,}", "\n\n", text).split("\n\n")
    refined_paragraphs = []
    for paragraph in raw_paragraphs:
        if len(paragraph.strip()) < min_chunk_length:
            continue
        doc = SPACY_NLP(paragraph)
        current_chunk = []
        current_length = 0
        for sent in doc.sents:
            sent_text = sent.text.strip()
            if current_length + len(sent_text) > max_chunk_length:
                refined_paragraphs.append(" ".join(current_chunk).strip())
                current_chunk = []
                current_length = 0
            current_chunk.append(sent_text)
            current_length += len(sent_text)
        if current_chunk:
            refined_paragraphs.append(" ".join(current_chunk).strip())
    return refined_paragraphs

def smart_chunk_transformers(text, max_tokens=128):
    text = validate_text_input(text)
    tokens = TOKENIZER(text, truncation=False, return_tensors="pt")
    chunks = [text[i:i+max_tokens] for i in range(0, len(tokens['input_ids'][0]), max_tokens)]
    return chunks

# --- Vector Database (FAISS) Utilities ---
from pathlib import Path
# Load the embedding model globally
try:
    MODEL = SentenceTransformer("all-MiniLM-L6-v2")
except Exception as e:
    raise RuntimeError(f"Failed to load the embedding model: {e}")

def validate_text_chunks(text):
    if isinstance(text, str):
        text = [text]
    if not isinstance(text, list) or not all(isinstance(t, str) for t in text):
        raise ValueError("Input must be a string or a list of strings.")
    return [t.strip() for t in text if t.strip()]

def generate_embeddings(chunks, model_name="all-MiniLM-L6-v2"):
    chunks = validate_text_chunks(chunks)
    embeddings = MODEL.encode(chunks, convert_to_tensor=False, show_progress_bar=True) # Add progress bar
    embeddings = np.array(embeddings)
    norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
    normalized_embeddings = embeddings / (norms + 1e-10)  # Add small value for numerical stability
    return normalized_embeddings

def store_in_faiss(embeddings, db_file="vector_db_cosine.index"):
    try:
        dimension = embeddings.shape[1]
        index = faiss.IndexFlatIP(dimension)  # Use inner product (for cosine similarity with normalized vectors)
        index.add(embeddings)
        faiss.write_index(index, str(db_file)) #cast to string for cross-platform
        print(f"FAISS index saved to {db_file}")
        return index
    except Exception as e:
        raise RuntimeError(f"Failed to store FAISS index: {e}")

def load_faiss_index(db_file):
    try:
        db_file = Path(db_file).resolve()
        return faiss.read_index(str(db_file))
    except Exception as e:
        raise RuntimeError(f"Failed to load FAISS index: {e}")

def query_faiss_index(query_text, vector_index, chunks, model_name="all-MiniLM-L6-v2", top_k=2):
    try:
        query_text = validate_text_chunks(query_text)
        if not query_text:
            raise ValueError("Query text cannot be empty.")
        query_embedding = MODEL.encode(query_text, convert_to_tensor=False)
        query_embedding = np.array(query_embedding)
        query_embedding = query_embedding / (np.linalg.norm(query_embedding, axis=1, keepdims=True) + 1e-10)
        distances, indices = vector_index.search(query_embedding, top_k)
        results = [(chunks[idx], distances[0][i], idx) for i, idx in enumerate(indices[0])]
        return results
    except Exception as e:
        raise RuntimeError(f"Failed to query FAISS index: {e}")

print("Utility functions defined.")

Utility functions defined.


# Define Gemini Query Function

In [None]:
from google.colab import userdata
def query_flash(question = "What is APR?", context_chunks = [], model_name="gemini-pro", top_k=3):
    """
    Queries a Large Language Model (LLM) with a question and relevant context chunks.

    Args:
        question (str): The question to be answered.
        context_chunks (list): A list of tuples, each containing (text_chunk, similarity_score, index).
        model_name (str): The name of the Gemini model to use. Defaults to "gemini-pro".
        top_k (int): The number of top context chunks to use. Defaults to 3.

    Returns:
        dict: A dictionary containing the generated answer and the relevant context chunks.
              Returns an error message as a string in case of exceptions.
    """
    try:
        # Input validation
        if not isinstance(question, str) or not question.strip():
            raise ValueError("The question must be a non-empty string.")
        if not isinstance(context_chunks, list) or not all(
            isinstance(chunk, (list, tuple)) for chunk in context_chunks
        ):
            raise ValueError("Context chunks must be a list of tuples or lists.")

        client = genai.Client(
            vertexai=True,
            project=userdata.get('project_id'),
            location=location,
        )

        # Construct the context
        context = " ".join([context_chunk[0] for context_chunk in context_chunks])

        # Prompt Engineering
        prompt = (
            f"You are a legal assistant specializing in contracts. "
            f"Answer the question based on the following context, and cite the sources explicitly. "
            f"Do not include any information not present in the provided context.\n\n"
            f"Context:\n{context}\n\n"
            f"Question: {question}\nAnswer:"
        )

        model = "gemini-2.0-flash-001"
        contents = [prompt
        ]
        generate_content_config = types.GenerateContentConfig(
            temperature = 1,
            top_p = 0.95,
            max_output_tokens = 8192,
            response_modalities = ["TEXT"],
            safety_settings = [types.SafetySetting(
            category="HARM_CATEGORY_HATE_SPEECH",
            threshold="OFF"
            ),types.SafetySetting(
            category="HARM_CATEGORY_DANGEROUS_CONTENT",
            threshold="OFF"
            ),types.SafetySetting(
            category="HARM_CATEGORY_SEXUALLY_EXPLICIT",
            threshold="OFF"
            ),types.SafetySetting(
            category="HARM_CATEGORY_HARASSMENT",
            threshold="OFF"
            )],
        )



        # Generate content (streaming)
        response_stream = client.models.generate_content_stream(model = model,
                    contents = contents,
                    config = generate_content_config,)

        # Collect the streamed response
        generated_answer = ""
        for chunk in response_stream:
            generated_answer += chunk.text

        relevant_chunks = context_chunks[:top_k]  # Top k chunks for citation
        return {"answer": generated_answer.strip(), "relevant_context": relevant_chunks}

    except Exception as e:
        return f"An unexpected error occurred: {e}"


print("Gemini query function defined.")

Gemini query function defined.


# Main Question Answering Function

In [None]:
def main(pdf_path, query_text, relevance_threshold=0.3):
    """
    Main function to perform question answering on a PDF document.

    Args:
        pdf_path (str): Path to the PDF file.  (Will be a temporary path in Colab)
        query_text (str): The question to ask.
        relevance_threshold (float): Minimum similarity score for a chunk.
    """

    print("\nExtracting text from the PDF...")
    try:
        extracted_text = extract_text_from_pdf(pdf_path)
        print("Text extracted successfully.")
        if not extracted_text:
            print("Error: Failed to extract text. Document might be empty/unreadable.")
            return  # Exit if extraction fails

        print("\nChunking the extracted text...")
        chunks = smart_chunk_spacy_advanced(extracted_text)  # Use advanced chunker
        if not chunks:
            print("Error: Failed to create text chunks.")
            return
        print(f"Text chunked into {len(chunks)} chunks.")

        print("\nGenerating embeddings for the chunks...")
        embeddings = generate_embeddings(chunks)
        print("Embeddings generated.")

        print("\nStoring embeddings in FAISS index...")
        index = store_in_faiss(embeddings)  # Use default filename
        print("FAISS index created and stored.")

        # No need to reload the index immediately, we just created it!

        print(f"\nQuerying FAISS index with: '{query_text}'...")
        results = query_faiss_index(query_text, index, chunks, top_k=5)

        # Filter results by relevance threshold
        context_chunks = [(text, dis, idx) for text, dis, idx in results if dis > relevance_threshold]

        print("\nResults from FAISS index:")
        for idx, (text, score, doc_id) in enumerate(results):
          if score > relevance_threshold:
            print(f"{idx}. Score: {score:.4f}, Document ID: {doc_id}\n{text[:200]}...\n")
        print(f"Found {len(context_chunks)} relevant context chunks.")


        print("\nQuerying the Gemini model for an answer...")
        answer = query_flash(query_text, context_chunks)  # Use Gemini query
        if isinstance(answer, str) and "Error" in answer: #check for errors from query_flash
          print(f"Error querying Gemini: {answer}")
          return

        print("\nGenerated Answer:")
        print(answer["answer"])
        print("\nCited Context:")
        for text, _, idx in answer["relevant_context"]:
            print(f"\tSource {idx}: {text}")

    except Exception as e:
        print(f"An unexpected error occurred in main(): {e}")



print("Main function defined.")

Main function defined.


# Get API Key, Project ID, and Location, Upload PDF, and Run!

In [None]:
# get project ID
PROJECT_ID = ! gcloud config get project
PROJECT_ID = PROJECT_ID[0]
LOCATION = "us-central1"
if PROJECT_ID == "(unset)":
    print(f"Please set the project ID manually below")

Please set the project ID manually below


In [None]:
# define project information
if PROJECT_ID == "(unset)":
    PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

# generate an unique id for this session
from datetime import datetime

UID = datetime.now().strftime("%m%d%H%M")

In [None]:
from google.colab import userdata


api_key = os.getenv("GOOGLE_API_KEY")


# Initialize Vertex AI and genai
vertexai.init(project=PROJECT_ID, location=LOCATION)

# File upload
print("Upload your PDF file:")
uploaded = files.upload()
if not uploaded:
    print("No file uploaded.  Exiting.")
    sys.exit(1)  # Exit if no file

pdf_filename = list(uploaded.keys())[0]  # Get the filename
print(f"Uploaded file: {pdf_filename}")



Upload your PDF file:


Saving example_doc.pdf to example_doc (5).pdf
Uploaded file: example_doc (5).pdf


In [None]:
# Get the question
query = input("Enter your question: ")  #@param {type:"string"}

# Run the main function
main(pdf_filename, query)
print("Done!")

Enter your question: how many days to return an item?

Extracting text from the PDF...
Text extracted successfully.

Chunking the extracted text...
Text chunked into 15 chunks.

Generating embeddings for the chunks...


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Embeddings generated.

Storing embeddings in FAISS index...
FAISS index saved to vector_db_cosine.index
FAISS index created and stored.

Querying FAISS index with: 'how many days to return an item?'...

Results from FAISS index:
0. Score: 0.6795, Document ID: 2
WAYS TO MAKE A RETURN
We accept returns via mail or in store. Returns are eligible for a refund if they are made within 30 days of delivery. Returned items must be presented in the
same condition as w...

1. Score: 0.6222, Document ID: 11
Q: HOW DO I EXCHANGE AN ITEM?
A: To exchange an item, you can visit your local Saks Fifth Avenue store and ask a Style Advisor to help you. Q: HOW LONG DO I HAVE TO MAKE A RETURN? A: Returns are eligi...

2. Score: 0.5211, Document ID: 7
We offer price adjustments within 7 days of the purchase date for full-price items. If you see an item on sale after paying the original price, you can
submit a price adjustment request online. Non-U....

3. Score: 0.5143, Document ID: 5
OUR RETURN POLICY
Priva




Generated Answer:
Returns are eligible for a full refund if they are made within 30 days of delivery (Return Policy | Saks Fifth Avenue).

Cited Context:
	Source 2: WAYS TO MAKE A RETURN
We accept returns via mail or in store. Returns are eligible for a refund if they are made within 30 days of delivery. Returned items must be presented in the
same condition as when they were received: unworn, undamaged, unaltered, and with the original tags, packaging (if applicable) and proof of purchase. Returns that do not meet these criteria will not be accepted and will be sent back to you with an explanation. Shipping fees cannot be refunded.
	Source 11: Q: HOW DO I EXCHANGE AN ITEM?
A: To exchange an item, you can visit your local Saks Fifth Avenue store and ask a Style Advisor to help you. Q: HOW LONG DO I HAVE TO MAKE A RETURN? A: Returns are eligible for a refund if they are made within 30 days of delivery. We cannot guarantee that your return will be accepted if shipped after your
return w