# Document Loading 

In [1]:
import docx
import PyPDF2
import os

def read_text_file(file_path: str):
    """Read content from a text file"""
    with open(file_path, 'r', encoding='utf-8') as file:
        return file.read()

def read_pdf_file(file_path: str):
    """Read content from a PDF file"""
    text = ""
    with open(file_path, 'rb') as file:
        pdf_reader = PyPDF2.PdfReader(file)
        for page in pdf_reader.pages:
            text += page.extract_text() + "\n"
    return text

def read_docx_file(file_path: str):
    """Read content from a Word document"""
    doc = docx.Document(file_path)
    return "\n".join([paragraph.text for paragraph in doc.paragraphs])


creating a unified interface for document reading

In [2]:
def read_document(file_path: str):
    """Read document content based on file extension"""
    _, file_extension = os.path.splitext(file_path)
    file_extension = file_extension.lower()

    if file_extension == '.txt':
        return read_text_file(file_path)
    elif file_extension == '.pdf':
        return read_pdf_file(file_path)
    elif file_extension == '.docx':
        return read_docx_file(file_path)
    else:
        raise ValueError(f"Unsupported file format: {file_extension}")


# Text Chunking process

In [4]:
def split_text(text: str, chunk_size: int = 500):
    """Split text into chunks while preserving sentence boundaries"""
    sentences = text.replace('\n', ' ').split('. ')
    chunks = []
    current_chunk = []
    current_size = 0

    for sentence in sentences:
        sentence = sentence.strip()
        if not sentence:
            continue

        # Ensure proper sentence ending
        if not sentence.endswith('.'):
            sentence += '.'

        sentence_size = len(sentence)

        # Check if adding this sentence would exceed chunk size
        if current_size + sentence_size > chunk_size and current_chunk:
            chunks.append(' '.join(current_chunk))
            current_chunk = [sentence]
            current_size = sentence_size
        else:
            current_chunk.append(sentence)
            current_size += sentence_size

    # Add the last chunk if it exists
    if current_chunk:
        chunks.append(' '.join(current_chunk))

    return chunks


# improved chunking process

In [3]:
import re
import nltk
from typing import List

def split_text(text: str, chunk_size: int = 500, overlap: int = 50, token_based: bool = False) -> List[str]:
    """
    Split text into chunks while preserving sentence and paragraph boundaries, 
    with optional overlap and token-based chunking.
    """
    # Load nltk sentence tokenizer if needed
    nltk.download('punkt', quiet=True)
    
    # Tokenize by paragraphs first
    paragraphs = text.split("\n\n")
    chunks = []
    current_chunk = []
    current_size = 0

    def get_token_count(text):
        """Helper to get the token count (rough approximation)."""
        return len(re.findall(r'\w+', text))

    # Adjust sentence or token counting function
    count_func = get_token_count if token_based else len

    for paragraph in paragraphs:
        sentences = nltk.sent_tokenize(paragraph.strip())
        
        for sentence in sentences:
            sentence = sentence.strip()
            if not sentence:
                continue

            sentence_size = count_func(sentence)
            
            # Add sentence to current chunk if it fits
            if current_size + sentence_size <= chunk_size:
                current_chunk.append(sentence)
                current_size += sentence_size
            else:
                # Finalize the current chunk
                chunks.append(' '.join(current_chunk))
                
                # Begin new chunk, optionally with overlap from last few sentences
                if overlap > 0 and len(current_chunk) > 0:
                    overlap_sentences = current_chunk[-overlap:]
                    current_chunk = overlap_sentences + [sentence]
                    current_size = count_func(' '.join(current_chunk))
                else:
                    current_chunk = [sentence]
                    current_size = sentence_size

    # Add the last chunk if it exists
    if current_chunk:
        chunks.append(' '.join(current_chunk))

    return chunks


# setting up ChromaDB 

using sentence transformers embeddings to initialize chromaDB

In [4]:
import chromadb
from chromadb.utils import embedding_functions

# Initialize ChromaDB client with persistence
client = chromadb.PersistentClient(path="chroma_db")

# Configure sentence transformer embeddings
sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(
    model_name="all-MiniLM-L6-v2"
)

# Create or get existing collection
collection = client.get_or_create_collection(
    name="documents_collection",
    embedding_function=sentence_transformer_ef
)


  from tqdm.autonotebook import tqdm, trange


# inserting data into ChromaDB 

preparing a pipeline that processes documents and prepares them for insertion in chromaDB

In [5]:
def process_document(file_path: str):
    """Process a single document and prepare it for ChromaDB"""
    try:
        # Read the document
        content = read_document(file_path)

        # Split into chunks
        chunks = split_text(content)

        # Prepare metadata
        file_name = os.path.basename(file_path)
        metadatas = [{"source": file_name, "chunk": i} for i in range(len(chunks))]
        ids = [f"{file_name}_chunk_{i}" for i in range(len(chunks))]

        return ids, chunks, metadatas
    except Exception as e:
        print(f"Error processing {file_path}: {str(e)}")
        return [], [], []


# batch processing for multiple documents

In [6]:
def add_to_collection(collection, ids, texts, metadatas):
    """Add documents to collection in batches"""
    if not texts:
        return

    batch_size = 100
    for i in range(0, len(texts), batch_size):
        end_idx = min(i + batch_size, len(texts))
        collection.add(
            documents=texts[i:end_idx],
            metadatas=metadatas[i:end_idx],
            ids=ids[i:end_idx]
        )

def process_and_add_documents(collection, folder_path: str):
    """Process all documents in a folder and add to collection"""
    files = [os.path.join(folder_path, file) 
             for file in os.listdir(folder_path) 
             if os.path.isfile(os.path.join(folder_path, file))]

    for file_path in files:
        print(f"Processing {os.path.basename(file_path)}...")
        ids, texts, metadatas = process_document(file_path)
        add_to_collection(collection, ids, texts, metadatas)
        print(f"Added {len(texts)} chunks to collection")


In [8]:
import nltk
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\khush\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping tokenizers\punkt_tab.zip.


True

trying chromaDB

In [9]:
# Initialize ChromaDB collection 
collection = client.get_or_create_collection(
    name="documents_collection",
    embedding_function=sentence_transformer_ef
)

# Process and add documents from a folder
folder_path = "E:\Coding\python/rag\docs"
process_and_add_documents(collection, folder_path)


Processing 2180712_CIS_GTU_Study_Material_e-Notes_All-Units_17062020050424AM.pdf...
Added 1533 chunks to collection
Processing 3140705_OOP---I_GTU_Study_Material_e-Notes_Unit-1-to-5_11062022015400PM (1).pdf...
Added 1536 chunks to collection
Processing e-Notes_PDF_All-Units_24042019090707AM.pdf...
Added 1101 chunks to collection
Processing Services_Proposal Document - Adrta.docx...
Added 106 chunks to collection


# semantic search to retreive relevant documents

In [10]:
def semantic_search(collection, query: str, n_results: int = 2):
    """Perform semantic search on the collection"""
    results = collection.query(
        query_texts=[query],
        n_results=n_results
    )
    return results

def get_context_with_sources(results):
    """Extract context and source information from search results"""
    # Combine document chunks into a single context
    context = "\n\n".join(results['documents'][0])

    # Format sources with metadata
    sources = [
        f"{meta['source']} (chunk {meta['chunk']})" 
        for meta in results['metadatas'][0]
    ]

    return context, sources


In [11]:
# Perform a search
query = "what is java?"
results = semantic_search(collection, query)
# results

def print_search_results(results):
    """Print formatted search results"""
    print("\nSearch Results:\n" + "-" * 100)

    for i in range(len(results['documents'][0])):
        doc = results['documents'][0][i]
        meta = results['metadatas'][0][i]
        distance = results['distances'][0][i]

        print(f"\nResult {i + 1}")
        print(f"Source: {meta['source']}, Chunk {meta['chunk']}")
        print(f"Distance: {distance}")
        print(f"Content: {doc}\n")



print_search_results(results)



Search Results:
----------------------------------------------------------------------------------------------------

Result 1
Source: 3140705_OOP---I_GTU_Study_Material_e-Notes_Unit-1-to-5_11062022015400PM (1).pdf, Chunk 3
Distance: 0.4537600576877594
Content:  On 8 May 2007, Sun finished the process, making all of Java's core code free and open -source,  aside from a small portion of code to which Sun did not hold the copyright. What is Java?    Java is a programming language that:    Is exclusively object oriented    Has full GUI support    Has full network support    Is platform independent    Executes stand -alone or “on -demand” in web browser as applets      1 - Basics of Java      2    Prof. Swati R.


Result 2
Source: 3140705_OOP---I_GTU_Study_Material_e-Notes_Unit-1-to-5_11062022015400PM (1).pdf, Chunk 4
Distance: 0.5449849367141724
Content: Sharma  | 3140705  – Object Oriented Programming - I JDK, JRE , Byte  code  & JVM.  Java Development Kit (JDK)   o JDK contains

configuring gemini model to generate answers 

In [11]:
import google.generativeai as genai
import os
from dotenv import load_dotenv

load_dotenv()

genai.configure(api_key=os.environ["GEMINI_API_KEY"])

prompt engineering

In [12]:
def get_prompt(context: str, conversation_history: str, query: str):
    """Generate a prompt combining context, history, and query"""
    prompt = f"""Based on the following context and conversation history, 
    please provide a relevant and contextual response.Look through every part of the document like tables if they exists and give answers based on that. If the answer cannot 
    be derived from the context, only use the conversation history or say 
    "I cannot answer this based on the provided information."

    Context from documents:
    {context}

    Previous conversation:
    {conversation_history}

    Human: {query}

    Assistant:"""

    return prompt


# configuring model to answer queries/questions 

In [13]:
def generate_response(query: str, context: str, conversation_history: str = ""):
    """Generate a response using Gemini with a dynamic prompt and configurable generation parameters."""
    prompt = get_prompt(context, conversation_history, query)

    try:
        # Initialize the model
        model = genai.GenerativeModel("gemini-1.5-flash")
        
        # Generate response using the prompt with a customized generation config
        response = model.generate_content(
            prompt,  # Use the dynamically generated prompt here
            generation_config=genai.types.GenerationConfig(
                candidate_count=1,  # Generates one response candidate
                # stop_sequences=["\n","End of answer"],  # Adjust stop sequences as needed
                max_output_tokens=800,  # Set your desired max tokens
                temperature=0.2  # Adjust temperature for response variability
            ),
        )

        # Extract the response content
        return response.text if response else "No content generated."

    except Exception as e:
        return f"Error generating response: {str(e)}"





for a single query at a time

In [34]:
def rag_query(collection, query: str, n_chunks: int = 2):
    """Perform RAG query: retrieve relevant chunks and generate answer"""
    # Get relevant chunks
    results = semantic_search(collection, query, n_chunks)
    context, sources = get_context_with_sources(results)

    # Generate response
    response = generate_response(query, context)

    return response, sources


In [None]:
query = "give a breif about history of cloud computing"
response, sources = rag_query(collection, query)

# Print results
print("\nQuery:", query)
print("\nAnswer:", response)
print("\nSources used:")
for source in sources:
    print(f"- {source}") 

for a list/quesstionnaire of questions all at a time

In [14]:
def rag_query(collection, questions: list, n_chunks: int = 2):
    """Perform RAG query for a list of questions: retrieve relevant chunks and generate answers."""
    responses = []
    sources_used = []
    
    for question in questions:
        # Get relevant chunks for each question
        results = semantic_search(collection, question, n_chunks)
        context, sources = get_context_with_sources(results)

        # Generate response for each question
        response = generate_response(question, context)

        # Append results for this question
        responses.append((question, response))
        sources_used.append((question, sources))

    return responses, sources_used

In [15]:
questions = [
    "explain data structures in detail with example",
    "What is a stack data structure?",
    # "How has cloud computing evolved in the past decade?",
    "what is case processing in pharmacovigilance?",
    "find out who is Mr. Ramesh patel from the table in the leadership section",
    "what is the budget strategy of adrta?"
]
responses, sources = rag_query(collection, questions)

# Print results
for question, response in responses:
    print("\nQuestion:", question)
    print("Answer:", response)

print("\n------------------------------------------")
print("\nSources used for each question:")
for question, source_list in sources:
    print(f"\nQuestion: {question}")
    for source in source_list:
        print(f"- {source}") 



Question: explain data structures in detail with example
Answer: Data structures are a way of organizing data in a computer's memory. They define the relationship between individual data elements and how they are stored. Think of it like a filing system for your computer. 

Here's a breakdown:

* **Representation of Data:**  Data structures determine how data is stored in the computer's memory. This could be in the form of numbers, characters, or even complex objects.
* **Accessing Data:** Data structures define how you can access and manipulate the stored data.  You can retrieve specific data elements, modify them, or add new ones.
* **Logical Relationships:** Data structures establish the connections between data elements. For example, a list might store items in a specific order, while a tree structure might organize data hierarchically.

**Example:**

Imagine you're creating a program to manage a library. You need to store information about books, such as their title, author, and 

input questions via a pdf/doc/xlsx

In [None]:
import PyPDF2  
import docx    
import pandas as pd  
from pathlib import Path

def extract_questions_from_pdf(file_path):
    """Extract questions from a PDF document using PyPDF2."""
    questions = []
    with open(file_path, "rb") as pdf_file:
        pdf_reader = PyPDF2.PdfReader(pdf_file)
        for page in pdf_reader.pages:
            text = page.extract_text()
            questions += [line.strip() for line in text.splitlines() if line.strip().endswith('?')]
    return questions
 
def extract_questions_from_docx(file_path):
    """Extract questions from a Word document."""
    questions = []
    doc = docx.Document(file_path)
    for paragraph in doc.paragraphs:
        if paragraph.text.strip().endswith('?'):
            questions.append(paragraph.text.strip())
    return questions

def extract_questions_from_excel(file_path):
    """Extract questions from an Excel document."""
    questions = []
    df = pd.read_excel(file_path)
    for col in df.columns:
        for value in df[col].dropna():
            if isinstance(value, str) and value.strip().endswith('?'):
                questions.append(value.strip())
    return questions

def process_document(file_path):
    """Automatically detect file type, extract questions, and process them with the RAG pipeline."""
    # Detect file type
    file_extension = Path(file_path).suffix.lower()
    
    # Map the file extension to the appropriate extraction function
    if file_extension == ".pdf":
        questions = extract_questions_from_pdf(file_path)
    elif file_extension == ".docx":
        questions = extract_questions_from_docx(file_path)
    elif file_extension == ".xlsx":
        questions = extract_questions_from_excel(file_path)
    else:
        raise ValueError("Unsupported file type")
    
    # Process each question with the RAG app
    responses, sources = rag_query(collection, questions)
    
    # Print results
    for question, response in responses:
        print("\nQuestion:", question)
        print("Answer:", response)
    
    print("\nSources used for each question:")
    for question, source_list in sources:
        print(f"\nQuestion: {question}")
        for source in source_list:
            print(f"- {source}")

# Example usage
file_path = "E:\Coding\python/rag\questions-xl.xlsx"  # Replace with your file path
process_document(file_path)


NameError: name 'rag_query' is not defined