<a href="https://colab.research.google.com/github/noobie105/10MS_RAG_Application/blob/main/10MS_TA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Converting PDF to DOCX using GEMINI

In [None]:
!pip install pdf2image python-docx
!apt-get update
!apt-get install -y poppler-utils

In [None]:
import os
import google.generativeai as genai
from docx import Document
import re
from google.colab import files
from google.colab import userdata
import time
from pdf2image import convert_from_path
import requests

#configuring Gemini API
def configure_gemini():
    try:
        api_key = userdata.get("GOOGLE_AOI_KEY_2")
        if not api_key:
            raise ValueError("API key not found in Colab Secrets as 'GOOGLE_AOI_KEY_2'.")
        genai.configure(api_key=api_key)
        model = genai.GenerativeModel('models/gemini-2.0-flash')  # Use Gemini 2.0 Flash
        # Test API key with a simple request
        model.generate_content("Test")
        return model
    except Exception as e:
        print(f"Error configuring Gemini API: {e}")
        print("Ensure the 'GOOGLE_AOI_KEY_2' secret is set in Colab Secrets: https://colab.research.google.com/drive/1...")
        return None

#extracting Text from PDF
def extract_text_from_page(pdf_path, page_num, model, max_retries=3, retry_delay=5):
    for attempt in range(1, max_retries + 1):
        try:
            images = convert_from_path(pdf_path, first_page=page_num, last_page=page_num, dpi=300)
            if not images:
                print(f"Error: No image generated for page {page_num} on attempt {attempt}.")
                continue
            temp_image_path = f"/content/temp_page_{page_num}.png"
            images[0].save(temp_image_path, 'PNG')

            sample_file = genai.upload_file(path=temp_image_path, display_name=f"Page_{page_num}")
            print(f"Uploaded page {page_num} as: {sample_file.uri} on attempt {attempt}")

            prompt = """
            Extract all text from the provided PDF page image, preserving the original Bengali script, sentence structure, and formatting as much as possible.
            Include all questions, answers, passages, and vocabulary notes.
            Ensure no content is omitted from the page.
            Output the text in a clean, readable format without summarizing or modifying content.
            """
            response = model.generate_content([sample_file, prompt])

            genai.delete_file(sample_file.name)
            print(f"Deleted temporary file: {sample_file.name}")

            os.remove(temp_image_path)

            text = response.text if response.text else None
            if text:
                return text
            print(f"Warning: No text extracted from page {page_num} on attempt {attempt}.")
        except (requests.exceptions.ConnectionError, Exception) as e:
            print(f"Error extracting text from page {page_num} on attempt {attempt}: {e}")
            if attempt < max_retries:
                print(f"Retrying page {page_num} in {retry_delay} seconds...")
                time.sleep(retry_delay)
            continue
        finally:
            if os.path.exists(temp_image_path):
                os.remove(temp_image_path)

    print(f"Failed to extract text from page {page_num} after {max_retries} attempts.")
    return None

def clean_text(text):
    if not text:
        return ""
    text = re.sub(r'\s+', ' ', text.strip())
    text = re.sub(r'[^\u0980-\u09FF\s।]', '', text)
    text = text.replace('া ু', 'ৌ').replace('ি ী', 'ী').replace('ু ু', 'ূ')
    text = re.sub(r'অনলাইন ব্যাচ বাংলা ইংরেজি আইসিটি\s*', '', text)
    return text

def save_to_word(text, output_path="/content/preprocessed_text.docx"):
    doc = Document()
    doc.add_paragraph(text)
    doc.save(output_path)
    print(f"Preprocessed text saved to {output_path}")
    files.download(output_path)

def preprocess_pdf(pdf_path="/content/HSC26-Bangla1st-Paper.pdf"):
    model = configure_gemini()
    if not model:
        print("Error: Failed to initialize Gemini model. Check your API key in Colab Secrets.")
        return

    try:
        images = convert_from_path(pdf_path, dpi=100)
        total_pages = len(images)
        print(f"Total pages in PDF: {total_pages}")
    except Exception as e:
        print(f"Error determining page count: {e}")
        return

    all_text = ""
    for page_num in range(1, total_pages + 1):
        print(f"Extracting page {page_num}...")
        page_text = extract_text_from_page(pdf_path, page_num, model)
        if page_text:
            cleaned_text = clean_text(page_text)
            all_text += f"\n\n--- Page {page_num} ---\n{cleaned_text}"
        else:
            print(f"Warning: No text extracted from page {page_num} after retries.")
        time.sleep(5)

    if not all_text.strip():
        print("Error: No text extracted from any page. Ensure the PDF is valid.")
        return
    save_to_word(all_text)

    print("\nPreprocessed Text Preview:")
    print(all_text[:1000] + "..." if len(all_text) > 1000 else all_text)

if __name__ == "__main__":
    preprocess_pdf()

RAG Application complete code

In [None]:
!pip install python-docx docx2txt nltk indic-nlp-library sentence-transformers pinecone openai flask pyngrok -q
!pip install git+https://github.com/csebuetnlp/normalizer -q

import os
import re
import nltk
import numpy as np
import csv
from docx2txt import docx2txt
from normalizer import normalize
from indicnlp.tokenize import sentence_tokenize
from sentence_transformers import SentenceTransformer
from pinecone import Pinecone, ServerlessSpec
from openai import OpenAI
from collections import deque
from flask import Flask, request, jsonify
from pyngrok import ngrok
import threading
import json
import requests
from google.colab import userdata
import socket
import subprocess
import time

nltk.download('punkt')
app = Flask(__name__)

conversation_history = deque(maxlen=10)
model = None
index = None
chunks = None
openai_api_key = None

def find_free_port():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(('', 0))
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return s.getsockname()[1]

def terminate_port_processes(port):
    try:
        result = subprocess.run(['lsof', '-i', f':{port}'], capture_output=True, text=True)
        lines = result.stdout.splitlines()
        for line in lines[1:]:
            parts = line.split()
            if len(parts) > 1:
                pid = parts[1]
                subprocess.run(['kill', '-9', pid])
    except Exception:
        pass

def load_and_preprocess_docx(file_path):
    try:
        text = docx2txt.process(file_path)
        text = re.sub(r'--- Page \d+ ---', '', text)
        text = re.sub(r'অনলাইন ব্যাচ সম্পর্কিত যেকোনো জিজ্ঞাসায় কল করো', '', text)
        text = re.sub(r'বাংলা ১ম পত্র আলোচ্য বিষয় অপরিচিতা', '', text)
        text = re.sub(r'\s+', ' ', text.strip())
        return normalize(text)
    except Exception as e:
        raise Exception(f"Failed to load document: {str(e)}")

def chunk_text(text, language="bn"):
    try:
        if language == "bn":
            sentences = sentence_tokenize.sentence_split(text, lang="bn")
        else:
            sentences = nltk.sent_tokenize(text)
        return [sentence.strip() for sentence in sentences if sentence.strip()]
    except Exception as e:
        raise Exception(f"Failed to chunk text: {str(e)}")

def save_chunks(chunks, output_file="sentence_chunks.csv"):
    if output_file.endswith(".csv"):
        with open(output_file, "w", encoding="utf-8", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(["Index", "Text"])
            for i, chunk in enumerate(chunks):
                writer.writerow([i, chunk])
    else:
        raise ValueError("Output file must have .csv extension")

def initialize_vector_store(chunks, model_name="intfloat/multilingual-e5-large", api_key=None, index_name="rag-index", batch_size=100):
    try:
        model = SentenceTransformer(model_name)
        embeddings = model.encode(chunks, convert_to_numpy=True, show_progress_bar=True)
        dimension = embeddings.shape[1]
        if dimension != 1024:
            raise ValueError(f"Embedding dimension {dimension} does not match expected 1024 for rag-index")

        pc = Pinecone(api_key=api_key)
        if index_name not in pc.list_indexes().names():
            pc.create_index(
                name=index_name,
                dimension=1024,
                metric="cosine",
                spec=ServerlessSpec(
                    cloud="aws",
                    region="us-east-1"
                )
            )
        index = pc.Index(index_name)

        vectors = [(f"doc_{i}", embedding, {"text": chunk, "index": i}) for i, (embedding, chunk) in enumerate(zip(embeddings.tolist(), chunks))]
        for i in range(0, len(vectors), batch_size):
            batch = vectors[i:i + batch_size]
            index.upsert(vectors=batch, namespace="")

        stats = index.describe_index_stats()
        if stats['total_vector_count'] != len(chunks):
            raise Exception(f"Vector count mismatch: {stats['total_vector_count']} in Pinecone, {len(chunks)} expected")

        return model, index, chunks
    except Exception as e:
        raise Exception(f"Failed to initialize vector store: {str(e)}")

def retrieve_similar_chunks(query, model, index, top_k=10):
    try:
        query = normalize(query)
        query_embedding = model.encode([query], convert_to_numpy=True)[0]
        results = index.query(
            vector=query_embedding.tolist(),
            top_k=top_k,
            include_metadata=True,
            namespace=""
        )
        return [(match['metadata']['text'], match['score'], match['metadata']['index']) for match in results['matches']]
    except Exception as e:
        raise Exception(f"Failed to retrieve chunks: {str(e)}")

def generate_answer(query, retrieved_chunks, conversation_history, openai_api_key, model_name="gpt-4o"):
    client = OpenAI(api_key=openai_api_key)
    context = "\n".join([f"Chunk {i+1} (Index {chunk[2]}): {chunk[0]}" for i, chunk in enumerate(retrieved_chunks)])

    history_context = ""
    if conversation_history:
        history_context = "Recent Conversation History (most recent first):\n"
        for h_query, h_answer in reversed(list(conversation_history)):
            history_context += f"User: {h_query}\nAnswer: {h_answer}\n"

    prompt = f"""You are a helpful teaching assistant that answers questions based on provided document context and the most recent conversation history. Ensure the answer is clear and relevant by focusing on:

1. Context Usage: Resolve pronouns and references by using the most recent relevant entity mentioned.
2. Query Resolution: Answer based on the most recent query-response pair first, followed by the document context if necessary.
3. Relevance: If context is insufficient, refer to your knowledge base while ensuring brevity and clarity. Avoid irrelevant details.

Please answer in the same language as the query. Ensure that responses are concise and directly related to the asked question, grounded in the retrieved document content.

Document Context:
{context}

{history_context}

Current Query: {query}

Answer:"""

    try:
        response = client.chat.completions.create(
            model=model_name,
            messages=[
                {"role": "system", "content": "You are a helpful assistant with accurate context retention."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.7,
            max_tokens=300
        )
        return response.choices[0].message.content.strip()
    except Exception as e:
        raise Exception(f"Failed to generate answer with GPT-4o: {str(e)}")

#REST API Endpoint
@app.route('/api/query', methods=['POST'])
def handle_query():
    try:
        if not all([model, index, openai_api_key]):
            return jsonify({"error": "RAG system not initialized properly"}), 500

        data = request.get_json()
        if not data or 'query' not in data:
            return jsonify({"error": "Query is required in JSON payload"}), 400

        query = data['query'].strip()
        if not query:
            return jsonify({"error": "Query cannot be empty"}), 400

        retrieved_chunks = retrieve_similar_chunks(query, model, index)
        answer = generate_answer(query, retrieved_chunks, conversation_history, openai_api_key)
        conversation_history.append((query, answer))
        return jsonify({"answer": json.loads(json.dumps(answer))})
    except Exception as e:
        return jsonify({"error": f"Internal server error: {str(e)}"}), 500

#Evaluation of the system
def evaluate_rag(query, expected_answer, retrieved_chunks):
    avg_score = sum(score for _, score, _ in retrieved_chunks) / len(retrieved_chunks) if retrieved_chunks else 0
    grounded = any(expected_answer.lower() in chunk.lower() for chunk, _, _ in retrieved_chunks)
    return {
        "query": query,
        "avg_similarity": round(avg_score, 3),
        "grounded": "SUPPORTED" if grounded else "NOT SUPPORTED"
    }

def test_query(query, expected_answer, api_url):
    """
    Send a query to the API and print the answer, expected answer, and evaluation.
    """
    try:
        response = requests.post(
            f"{api_url}/api/query",
            headers={"Content-Type": "application/json"},
            json={"query": query}
        )
        response.raise_for_status()
        result = response.json()
        if "answer" in result:
            retrieved_chunks = retrieve_similar_chunks(query, model, index)
            eval_result = evaluate_rag(query, expected_answer, retrieved_chunks)
            print(f"Query: {query}")
            print(f"Actual Answer: {result['answer']}")
            print(f"Expected Answer: {expected_answer}")
            print(f"Evaluation: Groundedness={eval_result['grounded']}, Relevance (Avg Cosine Similarity)={eval_result['avg_similarity']}")
            print("-" * 50)
        else:
            print(f"Error for query '{query}': {result.get('error', 'Unknown error')}")
    except Exception as e:
        print(f"Error testing query '{query}': {str(e)}")

def initialize_rag_system():
    global model, index, chunks, openai_api_key
    file_path = "preprocessed_text.docx"
    pinecone_api_key = userdata.get("Pinecone_API_KEY")
    openai_api_key = userdata.get("my_GPT_key_2")
    ngrok_auth_token = userdata.get("NGROK_AUTH_TOKEN")

    if not pinecone_api_key or not openai_api_key or not ngrok_auth_token:
        raise ValueError("API keys must be set in Colab Secrets")

    if not os.path.exists(file_path):
        raise FileNotFoundError(f"'{file_path}' not found. Upload the file.")

    normalized_text = load_and_preprocess_docx(file_path)
    chunks = chunk_text(normalized_text, language="bn")
    save_chunks(chunks)
    model, index, chunks = initialize_vector_store(chunks, api_key=pinecone_api_key)

def run_flask(port):
    app.run(host='0.0.0.0', port=port)

if __name__ == "__main__":
    try:
        ngrok.kill()
        terminate_port_processes(5000)
        port = find_free_port()
        initialize_rag_system()
        ngrok.set_auth_token(userdata.get("NGROK_AUTH_TOKEN"))
        public_url = ngrok.connect(port)
        api_url = public_url.public_url
        print(f"Public URL for API: {api_url}")

        # Start Flask server in a thread
        flask_thread = threading.Thread(target=run_flask, args=(port,), daemon=True)
        flask_thread.start()
        time.sleep(2)  # Wait for Flask server to start

        # Verify server is running
        try:
            response = requests.get(f"http://localhost:{port}/api/query", timeout=5)
            if response.status_code != 405:  # Expect 405 (Method Not Allowed) for GET
                raise Exception("Flask server not responding as expected")
        except requests.exceptions.ConnectionError:
            raise Exception(f"Flask server failed to start on port {port}")

        # Corrected sample queries
        sample_evals = [
            {"query": "অনুপমের ভাষায় সুপরুষ কাকে বলা হয়েছে?", "expected": "শম্ভুনাথ"},
            {"query": "কাকে অনুপমের ভাগ্য দেবতা বলে উল্লেখ করা হয়েছে?", "expected": "মামাকে"},
            {"query": "Who is the main character in the story?", "expected": "অনুপম"}
        ]
        for sample in sample_evals:
            test_query(sample["query"], sample["expected"], api_url=api_url)

    except Exception as e:
        print(f"Error: {str(e)}")