In [303]:
import os
from langchain.document_loaders import PyPDFLoader

from pathlib import Path

def load_file(file_name,current_dir=True):
    if current_dir:
        dir_path = os.path.dirname(os.getcwd())
        file_path = os.path.join(dir_path,file_name)
    else:
        file_path = file_name
        
    loader = PyPDFLoader(file_path)
    documents = loader.load()
    return documents

In [304]:
file_name = "Analysis_of_Vote_Share_and_Margin_of_Victory_of_Winners_Andhra_Pradesh_Assembly_2024_Finalver_English.pdf"
docs =  load_file(file_name)

In [328]:
(docs[20])

Document(metadata={'producer': 'Microsoft: Print To PDF', 'creator': 'PyPDF', 'creationdate': '2024-07-15T10:14:47+05:30', 'author': 'Navin Moni', 'moddate': '2024-07-15T10:14:47+05:30', 'title': 'Microsoft Word - Analysis of Vote Share and Margin of Victory of Winners Andhra Pradesh Assembly 2024.Finalver.English', 'source': 'C:\\Users\\Asus\\PycharmProjects\\GenAIPractise\\Analysis_of_Vote_Share_and_Margin_of_Victory_of_Winners_Andhra_Pradesh_Assembly_2024_Finalver_English.pdf', 'total_pages': 65, 'page': 20, 'page_label': '21'}, page_content="Page 21 of 65 \n \nVote Share and Representativeness of Winners in Andhra Pradesh Assembly Elections \nS.No. Winner Party Constituency \nTotal \nRegistered \nVoters  \nTotal Valid \nVotes in The \nConstituency \nTotal Votes \nPolled for \nWinner \n% of Votes \nShare  \n% of \nrepresentativeness \n% of Voters ' \nTurnout  \n1 Nara Lokesh Telugu Desam Mangalagiri 286552 253830 167710 66.07% 58.53% 88.58% \n2 Dr. Nimmala Ramanaidu Telugu Desam Pal

In [None]:
from langchain.text_splitter import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain.schema import Document

def get_chunks(documents):
    #Define markdown headers to split on
    headers_to_split_on = [
        ("#", "Header 1"),
        ("##", "Header 2"),
        ("###", "Header 3"),
        ("####", "Header 4"),
    ]
    
    separators =[" \n \n \n "," \n \n "," \n ",".  \n",". \n",". "]
    
    # Initialize markdown splitter
    markdown_splitter = MarkdownHeaderTextSplitter(
        headers_to_split_on=headers_to_split_on,
        strip_headers=False
    )
    
    # Initialize recursive character splitter as backup
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,
        chunk_overlap=50,
        length_function=len,
        separators=separators
    )
    
    all_chunks = []
    
    for doc in documents:
        try:
            # Try markdown splitting first
            md_chunks = markdown_splitter.split_text(doc.page_content)
            if md_chunks:
                # Convert to Document objects
                for chunk in md_chunks:
                    chunk.metadata=doc.metadata
                    chunks = text_splitter.split_documents([chunk])
                    all_chunks.extend(chunks)
            else:
                # Fallback to recursive character splitting
                chunks = text_splitter.split_documents([doc])
                all_chunks.extend(chunks)
                
        except Exception as e:
            print(f"⚠️ Markdown splitting failed for a document: {e}")
            return []
            # Fallback to recursive character splitting
            chunks = text_splitter.split_documents([doc])
            all_chunks.extend(chunks)
    return all_chunks

In [306]:
chunks = get_chunks(docs)

In [307]:
len(chunks)

193

In [311]:
from langchain.vectorstores.pgvector import PGVector
from langchain.embeddings import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema.document import Document
from sqlalchemy import create_engine
import json
import re
import os
import psycopg2
import numpy as np
from typing import List, Dict, Any
import logging
from contextlib import contextmanager
from psycopg2.extras import execute_batch

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

BATCH_SIZE = 100  # Process embeddings in batches
MAX_RETRIES = 3

class VectorEmbeddingsProcessor:
    def __init__(self, connection_string: str,table_name, embedding_model: str = "text-embedding-3-small"):
        self.connection_string = connection_string
        self.embeddings = OpenAIEmbeddings(model=embedding_model)
        self.table_name = table_name
        
    @contextmanager
    def get_db_connection(self):
        """Context manager for database connections with proper cleanup"""
        conn = None
        try:
            conn = psycopg2.connect(self.connection_string)
            yield conn
        except Exception as e:
            if conn:
                conn.rollback()
            logger.error(f"Database connection error: {e}")
            raise
        finally:
            if conn:
                conn.close()
    
    def sanitize_filename(self, filename: str) -> str:
        """Sanitize filename for database storage"""
        base_name = os.path.basename(filename)
        return re.sub(r'[^a-zA-Z0-9_.-]', '_', base_name)
    
    def generate_embeddings_batch(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings for a batch of texts with retry logic"""
        for attempt in range(MAX_RETRIES):
            try:
                # Use embed_documents for batch processing (more efficient)
                return self.embeddings.embed_documents(texts)
            except Exception as e:
                logger.warning(f"Embedding generation attempt {attempt + 1} failed: {e}")
                if attempt == MAX_RETRIES - 1:
                    raise
                
    def prepare_batch_data(self, chunks: List[Document]) -> List[tuple]:
        """Prepare data for batch insertion"""
        batch_data = []
        
        for chunk in chunks:
            file_name = self.sanitize_filename(chunk.metadata.get("source", "unknown"))
            content = chunk.page_content
            metadata_json = json.dumps(chunk.metadata)
            
            batch_data.append((file_name, content, metadata_json))
            
        return batch_data
    
    def create_table_if_not_exists(self, cursor):
        """Create the embeddings table if it doesn't exist"""
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {self.table_name} (
            id SERIAL PRIMARY KEY,
            file_name VARCHAR(255) NOT NULL,
            chunk TEXT NOT NULL,
            embedding vector(1536),  -- Adjust dimension based on your model
            metadata JSONB,
            UNIQUE(file_name, chunk)  -- Prevent duplicates
        );
        
        -- Create indexes for better query performance
        CREATE INDEX IF NOT EXISTS idx_document_embeddings_file_name 
            ON public.t_document_embeddings(file_name);
        CREATE INDEX IF NOT EXISTS idx_document_embeddings_embedding 
            ON public.t_document_embeddings USING ivfflat (embedding vector_cosine_ops);
        """
        cursor.execute(create_table_query)
    
    def insert_embeddings_batch(self, cursor, batch_data: List[tuple], embeddings: List[List[float]]):
        """Insert embeddings in batch with upsert logic"""
        insert_query = f"""
        INSERT INTO {self.table_name} (file_name, chunk, embedding, metadata)
        VALUES (%s, %s, %s, %s)
        ON CONFLICT (file_name, chunk) 
        DO UPDATE SET 
            embedding = EXCLUDED.embedding,
            metadata = EXCLUDED.metadata
        """
        
        # Combine batch data with embeddings
        full_batch_data = [
            (file_name, chunk, embedding, metadata)
            for (file_name, chunk, metadata), embedding in zip(batch_data, embeddings)
        ]
        
        # Use executemany instead of execute_batch for better compatibility
        cursor.executemany(insert_query, full_batch_data)
    
    def process_documents(self, all_chunks: List[Document]) -> None:
        """Main method to process documents and store embeddings"""
        if not all_chunks:
            logger.warning("No chunks provided for processing")
            return
            
        logger.info(f"Processing {len(all_chunks)} document chunks")
        
        with self.get_db_connection() as conn:
            cursor = conn.cursor()
            
            # Create table and indexes if they don't exist
            self.create_table_if_not_exists(cursor)
            conn.commit()
            
            # Process in batches
            for i in range(0, len(all_chunks), BATCH_SIZE):
                batch_chunks = all_chunks[i:i + BATCH_SIZE]
                logger.info(f"Processing batch {i//BATCH_SIZE + 1}/{(len(all_chunks) + BATCH_SIZE - 1)//BATCH_SIZE}")
                
                try:
                    # Extract text content for embedding generation
                    texts = [chunk.page_content for chunk in batch_chunks]
                    
                    # Generate embeddings for the batch
                    batch_embeddings = self.generate_embeddings_batch(texts)
                    
                    # Prepare data for insertion
                    batch_data = self.prepare_batch_data(batch_chunks)
                    
                    # Insert batch into database
                    self.insert_embeddings_batch(cursor, batch_data, batch_embeddings)
                    conn.commit()
                    
                    logger.info(f"Successfully processed batch {i//BATCH_SIZE + 1}")
                    
                except Exception as e:
                    logger.error(f"Error processing batch {i//BATCH_SIZE + 1}: {e}")
                    conn.rollback()
                    raise
        
        logger.info("All document chunks processed successfully")

# Usage example
def process_vector_embeddings(connection_string,table_name,chunks):
    # Initialize the processor
    processor = VectorEmbeddingsProcessor(connection_string,table_name)    
    
    
    try:
        processor.process_documents(chunks)
        print("Embeddings processing completed successfully!")
        
    except Exception as e:
        logger.error(f"Failed to process embeddings: {e}")
        raise



In [312]:
len(chunks)

193

In [313]:
CONNECTION_STRING = "host=localhost port=5432 dbname=vector_db user=vector password=vector"
table_name = "public.t_document_embeddings"
process_vector_embeddings(CONNECTION_STRING,table_name,chunks)

INFO:__main__:Processing 193 document chunks
INFO:__main__:Processing batch 1/2
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO:__main__:Successfully processed batch 1
INFO:__main__:Processing batch 2/2
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO:__main__:Successfully processed batch 2
INFO:__main__:All document chunks processed successfully


Embeddings processing completed successfully!


In [320]:
from langchain.vectorstores.pgvector import PGVector
from langchain.embeddings import OpenAIEmbeddings


def retrieve_context(table_name,file_name,connection_string,question,top_k):
    # Correct DSN    
    # OpenAI Embeddings
    embedding_function = OpenAIEmbeddings(model="text-embedding-3-small")
    embedding = embedding_function.embed_query(question)

    
    query = f"""SELECT chunk, embedding <-> '{embedding}' AS distance --<#> for inner product, and <-> for cosine distance
    FROM {table_name}
    where file_name = '{file_name}'
    ORDER BY distance
    LIMIT {top_k};"""

    print(query)
    
    conn = psycopg2.connect(connection_string)
    cursor = conn.cursor()
    cursor.execute(query)
    result = cursor.fetchall()
    context = "\n".join([row[0] for row in result])
    return context



In [None]:
chunks

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from operator import itemgetter
from langchain_core.output_parsers import StrOutputParser
import os

question = "who won in sattenapalli constituency?"


CONNECTION_STRING = "host=localhost port=5432 dbname=vector_db user=vector password=vector"
table_name = "public.t_document_embeddings"
base_name = os.path.basename(chunks[0].metadata["source"])

file_name = re.sub(r'[^a-zA-Z0-9_.-]', '_', base_name)

prompt = ChatPromptTemplate.from_messages([("system","""You are a helpful assistant that answers questions based on the context provided. 
    Use only the given context to answer. If the context does not contain the answer, 
    say "I don't know" and do not make up an answer.
    
Instructions:
- Provide a comprehensive answer based on the context
- If relevant, mention which website(s) the information comes from
- Be specific and cite details from the context
- If the context doesn't contain enough information, clearly state what's missing

"""),
                                                    ("human",'Context: {context}'),
                                                    ("human","Question: {question}")])

context =  retrieve_context(table_name,file_name,CONNECTION_STRING ,question,3)
print(context)

rag_chain = ({"context": itemgetter("context") ,
              "question":itemgetter("question") } | prompt  | llm | StrOutputParser() )

llm_response = rag_chain.invoke({"question":question,"context":context})

In [323]:
print(llm_response)

I don't know.
