# Basic RAG Pipeline Implementation


### Intro

This notebook implements a Retrieval-Augmented Generation (RAG) pipeline using classes from src directory. The implementation is divided into individual pipeline stages:


1. Document Processing: Token-aware chunking with overlap
2. Embedding Generation: Semantic vectorization
3. Vector Storage: PostgreSQL + pgvector indexing
4. Retrieval: Cosine similarit*y search
5. Generation: Context-augmented LLM completion

### Components

**TextProcessor** - Text segmentation and token budget management.

* Token-based chunking (512 tokens, 50 token overlap)
* Uses cl100k_base tokenizer for GPT compatibility
* Adaptive context assembly within token budgets

**HuggingFaceClient** - Embedding generation and LLM inference.

* Embeddings: Local sentence-transformers (all-MiniLM-L6-v2, 384-dim)
* Generation: Remote Hugging Face Inference API (default: Mistral-7B-Instruct)

**PgVectorDB** - PostgreSQL interface with vector similarity search.

* Stores embeddings as VECTOR(384) with chunk metadata
* Uses ivfflat indexing for approximate nearest neighbor search
* Cosine similarity search via <=> operator


In [1]:
import logging
import os
import sys
from typing import List, Dict, Optional
# from pathlib import Path


# import psycopg2
# from psycopg2.extras import execute_values, Json
# from pgvector.psycopg2 import register_vector
# from huggingface_hub import InferenceClient
# from transformers import pipeline


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)



sys.path.insert(0, '../src/')


from dotenv import load_dotenv

# Setup
load_dotenv()


True

In [4]:
from text_processor import TextProcessor
from pgvector_client import PgVectorClient
from hf_client import HuggingFaceClient
# from rag_handler import PgVectorRAG


In [22]:
PG_CONN_STRING = os.getenv("PG_CONNECTION_STRING")
HF_TOKEN= os.getenv("HF_TOKEN")

file_paths = [
    "../documents/policy.txt",
    "../documents/basic_info.md",
    # Add more files
]

batch_size=BATCH_SIZE=32

#  Query the system
questions = [
    "What is mario's email?",
    "How long does shipping take?",
    "Where there any projects with recommendation systems done by Mario?",
    "Does mario like data science?"
]

In [24]:
# Initialize components 
text_processor = TextProcessor(
    chunk_size=512,
    chunk_overlap=50,
    max_context_tokens=2000
)

db_client = PgVectorClient(
    connection_string=PG_CONN_STRING,
    embedding_dim=384  # Match embedding model output
)

hf_client = HuggingFaceClient(
    hf_token=HF_TOKEN,
    embedding_model="sentence-transformers/all-MiniLM-L6-v2",
    llm_model="mistralai/Mistral-7B-Instruct-v0.2",
    # use_remote_llm=True
)


INFO:pgvector_client:Database connected
INFO:pgvector_client:pgvector extension enabled
INFO:pgvector_client:Database schema created


INFO:hf_client:Initialized embedding model: sentence-transformers/all-MiniLM-L6-v2
INFO:hf_client:Initialized LLM: mistralai/Mistral-7B-Instruct-v0.2


## Load data

In [10]:
chunks = []    
for file_path in file_paths:        
    try:            
        file_chunks = text_processor.chunk_file(file_path)            
        chunks.extend(file_chunks)        
    except Exception as e:            
        logger.error(f"Failed to chunk {file_path}: {e}")



INFO:text_processor:Chunked policy.txt: 1 chunks
INFO:text_processor:Chunked basic_info.md: 2 chunks


In [13]:
for ind, chunk in enumerate(chunks):
    logger.info(f"chunk number {ind}, chunk len {len(chunk['content'])}: {chunk}")
    logger.info("\n")

INFO:__main__:chunk number 0, chunk len 107: {'content': 'Our refund policy allows returns within 35 days of purchase. Full refunds are provided for unopened items.\n', 'chunk_id': 0, 'token_count': 21, 'start_token': 0, 'end_token': 21, 'source': 'policy.txt'}
INFO:__main__:

INFO:__main__:chunk number 1, chunk len 2646: {'content': '# Cover letter\n\n## Basic info\n\n\nThis is a document with CV summary\n\n\nName: Mario (Marvin) Theplumber\n\nemail: mario.thplumber@gmail.com\nphone: +72787226083\n\nNorth Holland, Netherlands\nhttps://github.com/razmarrus\nhttps://www.linkedin.com/in/razmarrus/\n\nnpm i jsonresume-theme-caffeine-tweaked\nresume export --theme caffeine-tweaked resume.pdf\n\nadress: Beethovenstraat 22-1 1099 LK Rotterdam\nNetherlands \n\nEducation: Brazil State University of Informatics\n\n\n## About me / Role description\n\nAs a results-oriented Data Scientist, I believe that combining Machine Learning and Statistics provides businesses with solid answers to their ques

In [16]:

#  Generate embeddings in batches (HuggingFaceClient)
texts = [chunk["content"] for chunk in chunks]
embeddings = []

for i in range(0, len(texts), batch_size):
    batch = texts[i:i + batch_size]
    batch_embeddings = hf_client.get_embeddings(batch)
    embeddings.extend(batch_embeddings)   # adds each embedding vector
    logger.info(f"Embedded batch {i // batch_size + 1}/{(len(texts) + batch_size - 1) // batch_size}")

INFO:hf_client:Generated 3 embeddings
INFO:__main__:Embedded batch 1/1


In [None]:
# Insert into database (PgVectorDB)
db_client.insert_chunks(chunks, embeddings)

INFO:pgvector_client:Inserted 3 chunks


In [19]:


# # Log statistics (TextProcessor)
# stats = text_processor.get_chunk_stats(chunks)
# logger.info(
#     f"Loaded {stats['total_chunks']} chunks, "
#     f"avg {stats['avg_tokens']:.0f} tokens/chunk"
# )

## Query the model

In [23]:
def query_rag(question, text_processor, hf_client, db_client, k=5):
    """
    Execute RAG query: embed → search → assemble context → generate answer.
    
    Args:
        question: User query
        text_processor: TextProcessor instance
        hf_client: HuggingFaceClient instance
        db_client: PgVectorDB instance
        k: Number of chunks to retrieve
        
    Returns:
        dict: {"answer": str, "sources": list, "num_chunks": int}
    """
    # 1. Embed question
    query_embedding = hf_client.get_embeddings([question])[0]
    
    # 2. Search database
    chunks = db_client.search(query_embedding, k=k)
    
    if not chunks:
        return {"answer": "No relevant information found.", "sources": [], "num_chunks": 0}
    
    # 3. Assemble context
    context = text_processor.assemble_context(chunks, question=question)
    
    # 4. Generate answer (with fallback)
    try:
        answer = hf_client.generate_answer(question, context)
        if not answer or len(answer) < 10:
            answer = text_processor.create_fallback(chunks)
    except:
        answer = text_processor.create_fallback(chunks)
    
    return {
        "answer": answer,
        "sources": [c.get("text", "") for c in chunks],
        "num_chunks": len(chunks)
    }


In [None]:
result = query_rag(
    question="What is RAG?",
    text_processor=text_processor,
    hf_client=hf_client,
    db_client=db_client,
    k=5
)

print(result["answer"])


In [None]:


for question in questions:
    print(f"\n Question: {question}")
    
    result = query_rag(
        question=question,
        k=5,
        adaptive_context=True
    )
    
    print(f"\nAnswer:\n{result['answer']}")
    print(f"\nSources ({result['num_chunks']} chunks):")
    for src in result['sources']:
        print(
            f"  - {src['source']} (chunk {src['chunk_id']}, "
            f"tokens {src['start_token']}-{src['end_token']}, "
            f"score {src['similarity']})"
        )


 Question: What is mario's email?


NameError: name 'rag' is not defined

In [None]:
# # 1. Generate query embedding (HuggingFaceClient)
# query_embedding = self.hf.get_embeddings([question])[0]

# # 2. Search database (PgVectorDB)
# chunks = self.db.search(
#     query_embedding,
#     k=k,
#     similarity_threshold=similarity_threshold
# )

# if not chunks:
#     return {
#         "answer": "No relevant information found.",
#         "sources": [],
#         "num_chunks": 0,
#         "token_usage": {"context": 0, "question": 0}
#     }

# # 3. Assemble context (TextProcessor)
# if adaptive_context:
#     context = self.text.assemble_context(chunks, question=question)
# else:
#     context = self.text.assemble_context(chunks)

# # 4. Generate answer (HuggingFaceClient) with fallback (TextProcessor)
# try:
#     answer = self.hf.generate_answer(question, context)
#     if not answer or len(answer) < 10:
#         logger.warning("LLM returned empty/short answer, using fallback")
#         answer = self.text.create_fallback(chunks)
# except Exception as e:
#     logger.error(f"Generation failed: {e}, using fallback")
#     answer = self.text.create_fallback(chunks)