In [18]:
import bn
import re
import pytesseract
from pdf2image import convert_from_path
from PIL import Image
import os 
import numpy as np
from PIL import ImageFilter, ImageEnhance
from google import genai
from typing import List, Optional, Tuple
from dotenv import load_dotenv
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_postgres import PGVector
import psycopg2

load_dotenv()

client = genai.Client()


In [None]:
#chack data store in database
# api_usage_examples.py
import requests
import json
import uuid


In [20]:

#extract data pdf to text file
custom_config = r'--oem 3'

def enhance_image_for_ocr(image):
    gray = image.convert('L')
    contrast_enhancer = ImageEnhance.Contrast(gray)
    contrast = contrast_enhancer.enhance(2.0)
    sharp = contrast.filter(ImageFilter.SHARPEN)
    return sharp


def pdf_to_images(pdf_path, dpi=300):
    pages = convert_from_path(pdf_path, dpi=dpi)
    enhanced_pages = []
    for i in range(len(pages)):
        if i>=4 and i<=18:
            crop = pages[i].crop((0, 300, pages[i].width, pages[i].height-400))
            enhanced_page = enhance_image_for_ocr(crop)
            enhanced_pages.append(enhanced_page)
    return enhanced_pages

def extract_text_from_images(images):
    full_text = ""
    for i, img in enumerate(images):
        if i == 0:
            pass
        else:
            print(f"Processing page {i+1}...")
            text = pytesseract.image_to_string(img, lang='ben', config=custom_config)  # 'ben' = Bengali
            clean_text = text.replace('\xa0', ' ')  # Remove non-breaking spaces
            clean_text = "\n".join([line.strip() for line in clean_text.splitlines() if line.strip()])
            #clean_text = clean_text.replace('\n', ' ')
            #full_text += f"\n\n--- Page {i+1} ---\n\n{text}"
            full_text += clean_text
    return full_text


In [21]:
pdf_file = "HSC.pdf"  

In [22]:
text_file = "extract_data2.txt"
if os.path.exists(text_file) == False:
    images = pdf_to_images(pdf_file)
    bangla_text = extract_text_from_images(images)
    with open(text_file, "w", encoding="utf-8") as f:
        f.write(bangla_text)

    print("✅ Done! Bangla text saved to 'extract_data2.txt'")
else:
    print("file exist please continue..........")

file exist please continue..........


In [23]:
def output_get(content,question):
    response = client.models.generate_content(
        model="gemini-2.5-flash",
        contents=f'{content} read this content and give {question} this question answer. only give answer',
    )
    return response.text

In [None]:

def chunk_and_vectorize_text(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            text_content = file.read()

        # --- Chunking ---
        # Tokenize the text into individual sentences
        text_content = text_content.replace("\n"," ")
        sentences = text_content.split("।")
        print(len(sentences))
        sum_sentence = []
        for i in range(0,len(sentences),8):
            chunk = sentences[i:i+8]
            sum_sentence.append("".join(chunk))
        return sum_sentence
    except:
        print("error")

def search_chunks_multiple_keywords(chunks, keywords):
    # Normalize keywords to lowercase
    keywords = [k.lower() for k in keywords]
    
    # Search for any of the keywords in each chunk
    result = []
    for chunk in chunks:
        chunk_lower = chunk.lower()
        if any(k in chunk_lower for k in keywords):
            result.append(chunk)
    return result


In [25]:
file_path = "extract_data2.txt"
chunks = chunk_and_vectorize_text(file_path)

371


Method 1: Chucking and Vectorize using Langchain and Gemini LLM

In [28]:
# Environment
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
DATABASE_URL = os.getenv("NEON_DATABASE_URL")

class TextToEmbeddingsPipeline:
    def __init__(self, db_url: str, collection_name: str = "documents"):
        self.db_url = db_url
        self.collection_name = collection_name
        self.embeddings = GoogleGenerativeAIEmbeddings(
            google_api_key=GOOGLE_API_KEY,
            model="models/embedding-001"
        )
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000, chunk_overlap=200, length_function=len
        )
        self.vector_store = None
        self._setup_pgvector()

    def _setup_pgvector(self):
        try:
            conn = psycopg2.connect(self.db_url)
            with conn:
                with conn.cursor() as cur:
                    cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
            print("pgvector extension ensured.")
        except Exception as e:
            print(f"Database setup failed: {e}")

    def _init_vector_store(self, documents: List[Document]):
        try:
            return PGVector.from_documents(
                documents=documents,
                embedding=self.embeddings,
                connection=self.db_url,
                collection_name=self.collection_name,
            )
        except TypeError:
            return PGVector.from_documents(
                documents=documents,
                embedding=self.embeddings,
                connection_string=self.db_url,
                collection_name=self.collection_name,
            )

    def process_texts(self, texts: List[str], metadatas: Optional[List[dict]] = None):
        documents = [Document(page_content=text, metadata=(metadatas[i] if metadatas else {})) for i, text in enumerate(texts)]
        chunks = self.text_splitter.split_documents(documents)

        if self.vector_store is None:
            self.vector_store = self._init_vector_store(chunks)
        else:
            self.vector_store.add_documents(chunks)

        print(f"✅ Stored {len(chunks)} chunks.")
        return self.vector_store

    def process_file(self, file_path: str, metadata: Optional[dict] = None):
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                text = f.read()
            return self.process_texts([text], [metadata or {"source": file_path}])
        except Exception as e:
            print(f"Error reading file {file_path}: {e}")
    def search(self, query: str, k: int = 3) -> List[Document]:
        return self.vector_store.similarity_search(query, k=k) if self.vector_store else []

    def search_with_score(self, query: str, k: int = 3) -> List[Tuple[Document, float]]:
        return self.vector_store.similarity_search_with_score(query, k=k) if self.vector_store else []


def main(query):
    pipeline = TextToEmbeddingsPipeline(db_url=DATABASE_URL, collection_name="my_documents")

    with open("extract_data2.txt", "r", encoding="utf-8") as f:
        texts = f.read().split("\n\n")

    metadatas = [{"topic": t.split()[0].lower()} for t in texts]
    print(metadatas)

    pipeline.process_texts(texts, metadatas)

    
    results = pipeline.search(query)

    print(f"\n🔍 Results for: '{query}'")
    ai_input_text = []
    for i, doc in enumerate(results):
        ai_input_text.append(doc.page_content)
        #print(f"{i+1}. {doc.page_content}\n   Metadata: {doc.metadata}")
    join_txt = " ".join(ai_input_text)
    return join_txt
    

query = "বিয়ের সময় কল্যাণীর প্রকৃত বয়স কত ছিল?"
if __name__ == "__main__":
    join_txt=main(query)


pgvector extension ensured.
[{'topic': 'মূল'}]
✅ Stored 36 chunks.

🔍 Results for: 'বিয়ের সময় কল্যাণীর প্রকৃত বয়স কত ছিল?'


In [29]:
ai_result = output_get(join_txt,query)
print(ai_result)

প্রদত্ত অংশে কল্যাণীর প্রকৃত বয়স সম্পর্কে কোনো তথ্য নেই।


Method 2: Chucking text in Sentence and store database in string. search question keyword in database and get text. find answer in this text using ai

In [None]:
# Data store in neon database: db_neon
def create_sample_chunk(id,chunk,word, token_count):
    """Create a sample data chunk"""
    sample_data = {
        "chunk_id": id,
        "source_file": "sample_document.pdf",
        "chunk_text": chunk,
        "token_count": token_count,
        "start_unit": 0,
        "end_unit": 46,
        "embedding_model": "text-embedding-ada-002",
        "embedding": f'{word}'
    }
    try:
        response = requests.post(f"{BASE_URL}/chunks/", json=sample_data)
        if response.status_code == 200:
            print("✅ Chunk created successfully!")
            chunk_data = response.json()
            print(f"Created chunk with ID: {chunk_data['id']}")
            return chunk_data['id']
        else:
            print(f"❌ Error creating chunk: {response.text}")
            return None
    except:
        print("existing data.....")

# Base URL of your FastAPI application
BASE_URL = "http://localhost:8001"

for i, ch in enumerate(chunks):
    st=bn.remove_stopwords(ch)
    words_ch = bn.tokenizer(st) # or bn.tokenizer(text, 'word')
    token_count = len(words_ch)
    chunk_id = create_sample_chunk(i,ch,words_ch,token_count)

existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....
existing data.....


In [31]:
question = "বিয়ের সময় কল্যাণীর প্রকৃত বয়স কত ছিল?"

Run By sending Data

In [32]:
st=bn.remove_stopwords(question)
words = bn.tokenizer(st) # or bn.tokenizer(text, 'word')
found_chunks = search_chunks_multiple_keywords(chunks, words)
result_al = []
for i, chunk in enumerate(found_chunks):
    result_al.append(chunk)
result = output_get(result_al,question)
print(result)

১৫ বছর


Run by Get data from Neon dataset 

In [34]:
#data get from dataset 
import requests
# Get all chunk texts
response = requests.get("http://localhost:8001/chunks/texts")
chunk_texts = response.json()
def generate_bot_response(list_chunk,user_message: str) -> str:
    user_message = user_message.lower().strip()
    print(user_message)
    st=bn.remove_stopwords(user_message)
    words = bn.tokenizer(st) # or bn.tokenizer(text, 'word')
    found_chunks = search_chunks_multiple_keywords(list_chunk, words)
    result_al = []
    for i, chunk in enumerate(found_chunks):
        result_al.append(chunk)
    result = output_get(result_al,user_message)
    print(result)
    return result


list_chunk= []
for chunk in chunk_texts:
    list_chunk.append(chunk['chunk_text'])
#print(list_chunk)

question_ai = "বিয়ের সময় কল্যাণীর প্রকৃত বয়স কত ছিল?"
result = generate_bot_response(list_chunk,question_ai)

বিয়ের সময় কল্যাণীর প্রকৃত বয়স কত ছিল?
পনেরো।
