<a target="_blank" href="https://colab.research.google.com/github/sonder-art/automl_o24/blob/main/codigo/rag/rag_vector_db.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

# Enhanced RAG System with Vector Databases

This notebook implements a flexible RAG (Retrieval Augmented Generation) system with the following features:
- Configurable embedding models with GPU support
- Multiple document format support (txt, pdf, html, xml)
- Smart document processing with caching
- Flexible chunking strategies
- Efficient vector storage using LanceDB

## Setup and Installation

In [None]:
!pip install -q sentence-transformers lancedb pandas numpy beautifulsoup4 PyPDF2 tqdm


In [None]:
import os
import numpy as np
import pandas as pd
import torch
from sentence_transformers import SentenceTransformer
import lancedb
import textwrap
from pathlib import Path
from typing import List, Optional, Dict, Union
import json
import hashlib
from bs4 import BeautifulSoup
import PyPDF2
from tqdm.auto import tqdm
import logging
from dataclasses import dataclass, asdict


  from tqdm.autonotebook import tqdm, trange


In [None]:
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Check if running in Colab
IN_COLAB = 'google.colab' in str(get_ipython())

if IN_COLAB:
    from google.colab import drive

## Configuration

Enhanced configuration system with easy parameter modification

In [None]:
@dataclass
class RAGConfig:
    # Model Configuration
    embedding_model: str = "BAAI/bge-small-en-v1.5"  # Better default model
    use_gpu: bool = torch.cuda.is_available()
    device: str = "cuda" if use_gpu else "cpu"
    
    # Storage Configuration
    use_drive: bool = False  # For Google Colab
    base_path: str = None  # Will be set in post_init
    
    # Chunking Configuration
    chunk_size: int = 500
    chunk_overlap: int = 50
    
    # Document Processing
    supported_formats: List[str] = None  # Will be set in post_init
    overwrite_existing: bool = False
    
    # Retrieval Configuration
    top_k: int = 3
    distance_metric: str = "cosine"
    
    def __post_init__(self):
        # Set base paths
        if self.use_drive and IN_COLAB:
            print("Mounting Google Drive...")
            drive.mount('/content/drive')
            self.base_path = Path('/content/drive/MyDrive/vector_db')
        else:
            self.base_path = Path('vector_store')
            
        self.docs_path = self.base_path / 'documents'
        self.db_path = self.base_path / 'db'
        
        # Create directories
        self.docs_path.mkdir(parents=True, exist_ok=True)
        self.db_path.mkdir(parents=True, exist_ok=True)
        
        # Set supported formats
        if self.supported_formats is None:
            self.supported_formats = ["txt", "pdf", "html", "xml"]
            
        print(f"Using device: {self.device}")
        print(f"Documents path: {self.docs_path}")
        print(f"Database path: {self.db_path}")
    
    def save(self):
        """Save configuration to file"""
        config_data = asdict(self)
        config_data['base_path'] = str(self.base_path)
        config_data['docs_path'] = str(self.docs_path)
        config_data['db_path'] = str(self.db_path)
        
        config_path = self.base_path / 'config.json'
        with open(config_path, 'w') as f:
            json.dump(config_data, f, indent=2)
            
    @classmethod
    def load(cls, base_path: Optional[str] = None) -> 'RAGConfig':
        """Load configuration from file"""
        if base_path is None:
            if IN_COLAB:
                base_path = '/content/drive/MyDrive/vector_db'
            else:
                base_path = 'vector_store'
                
        config_path = Path(base_path) / 'config.json'
        if config_path.exists():
            with open(config_path) as f:
                config_data = json.load(f)
            return cls(**config_data)
        return cls()


Using device: cuda
Documents path: vector_store/documents
Database path: vector_store/db


In [None]:
# Initialize configuration
config = RAGConfig()
config.save()

## Enhanced Document Processing

Improved document processor with multiple format support and smart caching

In [None]:
class DocumentProcessor:
    def __init__(self, config: RAGConfig):
        self.config = config
        self.model = SentenceTransformer(config.embedding_model, device=config.device)
        
    def _compute_file_hash(self, file_path: str) -> str:
        """Compute SHA-256 hash of a file."""
        sha256_hash = hashlib.sha256()
        with open(file_path, "rb") as f:
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)
        return sha256_hash.hexdigest()
    
    def _read_txt(self, file_path: str) -> str:
        """Read text from a txt file."""
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()
    
    def _read_pdf(self, file_path: str) -> str:
        """Read text from a PDF file."""
        text = ""
        with open(file_path, 'rb') as f:
            pdf_reader = PyPDF2.PdfReader(f)
            for page in pdf_reader.pages:
                text += page.extract_text() + "\n"
        return text
    
    def _read_html(self, file_path: str) -> str:
        """Read text from an HTML file."""
        with open(file_path, 'r', encoding='utf-8') as f:
            soup = BeautifulSoup(f.read(), 'html.parser')
            return soup.get_text()
    
    def _read_xml(self, file_path: str) -> str:
        """Read text from an XML file."""
        with open(file_path, 'r', encoding='utf-8') as f:
            soup = BeautifulSoup(f.read(), 'xml')
            return soup.get_text()
            
    def process_text(self, text: str) -> List[str]:
        """Split text into overlapping chunks."""
        words = text.split()
        chunks = []
        
        for i in range(0, len(words), self.config.chunk_size - self.config.chunk_overlap):
            chunk = ' '.join(words[i:i + self.config.chunk_size])
            chunks.append(chunk)
        
        return chunks
    
    def process_file(self, file_path: str) -> Dict:
        """Process a single file with caching support."""
        file_ext = file_path.split('.')[-1].lower()
        if file_ext not in self.config.supported_formats:
            raise ValueError(f"Unsupported file format: {file_ext}")
            
        # Check cache
        file_hash = self._compute_file_hash(file_path)
        cache_path = self.config.docs_path / f"{file_hash}.json"
        
        if cache_path.exists() and not self.config.overwrite_existing:
            logger.info(f"Loading cached processing for {file_path}")
            with open(cache_path) as f:
                return json.load(f)
        
        # Read content based on file type
        readers = {
            'txt': self._read_txt,
            'pdf': self._read_pdf,
            'html': self._read_html,
            'xml': self._read_xml
        }
        
        text = readers[file_ext](file_path)
        chunks = self.process_text(text)
        
        # Use GPU for batch processing if available
        embeddings = self.model.encode(chunks, show_progress_bar=True)
        
        result = {
            'chunks': chunks,
            'embeddings': embeddings.tolist(),
            'source': str(file_path),
            'file_hash': file_hash
        }
        
        # Cache result
        with open(cache_path, 'w') as f:
            json.dump(result, f)
            
        return result
    
    def process_directory(self, dir_path: Optional[str] = None, table_name: str = 'documents') -> None:
        """Process all supported files in a directory."""
        dir_path = Path(dir_path) if dir_path else self.config.docs_path
        db = lancedb.connect(self.config.db_path)
        
        all_data = []
        for ext in self.config.supported_formats:
            for file_path in dir_path.glob(f"**/*.{ext}"):
                try:
                    logger.info(f"Processing {file_path}...")
                    result = self.process_file(str(file_path))
                    
                    for chunk, embedding in zip(result['chunks'], result['embeddings']):
                        all_data.append({
                            'text': chunk,
                            'vector': embedding,
                            'source': result['source'],
                            'file_hash': result['file_hash']
                        })
                except Exception as e:
                    logger.error(f"Error processing {file_path}: {str(e)}")
        
        if all_data:
            df = pd.DataFrame(all_data)
            if table_name in db.table_names():
                table = db.open_table(table_name)
                # Convert PyArrow Table to Pandas DataFrame correctly
                existing_data = table.to_arrow().to_pandas()
                if not existing_data.empty:
                    existing_hashes = set(existing_data['file_hash'].unique())
                    new_data = df[~df['file_hash'].isin(existing_hashes)]
                    if not new_data.empty:
                        table.add(new_data)
                        logger.info(f"Added {len(new_data)} new chunks to the database")
                    else:
                        logger.info("No new documents to add")
                else:
                    table.add(df)
                    logger.info(f"Added {len(df)} chunks to empty table")
            else:
                db.create_table(table_name, df)
                logger.info(f"Created new table with {len(df)} chunks")
        else:
            logger.info("No documents found to process")


## Enhanced Vector Database Operations

Improved vector database class with better search capabilities

In [None]:
from typing import List, Dict, Optional
import pandas as pd
import lancedb
from sentence_transformers import SentenceTransformer
import logging
import textwrap
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

In [None]:

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


In [None]:

class VectorDB:
    def __init__(self, config: RAGConfig):
        self.config = config
        self.db = lancedb.connect(config.db_path)
        self.model = SentenceTransformer(config.embedding_model, device=config.device)
    
    def list_tables(self) -> List[str]:
        """List all tables in the database."""
        return self.db.table_names()
    
    def get_table_info(self, table_name: str) -> Dict:
        """Get detailed information about a table."""
        table = self.db.open_table(table_name)
        df = table.to_arrow().to_pandas()
        
        if df.empty:
            return {
                'total_chunks': 0,
                'unique_documents': 0,
                'sources': [],
                'chunks_per_source': {},
                'avg_chunk_length': 0
            }
        
        sources = df['source'].unique()
        source_counts = df['source'].value_counts().to_dict()
        
        return {
            'total_chunks': len(df),
            'unique_documents': len(sources),
            'sources': sources.tolist(),
            'chunks_per_source': source_counts,
            'avg_chunk_length': df['text'].str.len().mean()
        }
    
    def semantic_search(self, 
                       query: str, 
                       table_name: str = 'documents',
                       k: Optional[int] = None,
                       threshold: Optional[float] = None,
                       source_filter: Optional[List[str]] = None) -> pd.DataFrame:
        """Enhanced semantic search with filtering options."""
        k = k or self.config.top_k
        query_embedding = self.model.encode([query])[0]
        table = self.db.open_table(table_name)
        
        # Build search query
        search_query = table.search(query_embedding)
        
        # Apply source filter if provided
        if source_filter:
            # Properly format the list for the SQL-like query
            formatted_sources = "(" + ", ".join([f"'{s}'" for s in source_filter]) + ")"
            search_query = search_query.where(f"source IN {formatted_sources}")
        
        # Execute the search and convert to Pandas DataFrame
        results_arrow = search_query.limit(k).to_arrow()
        
        # Convert to Pandas DataFrame
        results = results_arrow.to_pandas()
        
        # Debug: Print available columns
        print("Available columns in search results:", results.columns.tolist())
        
        if len(results) == 0:
            logger.warning("No results found matching the criteria")
            return pd.DataFrame()
        
        # Compute cosine similarity manually
        # Ensure 'vector' column exists and contains embeddings
        if 'vector' not in results.columns:
            logger.error("No 'vector' column found in the search results.")
            return pd.DataFrame()
        
        db_embeddings = np.array(results['vector'].tolist())
        query_embedding_np = np.array(query_embedding).reshape(1, -1)
        similarities = cosine_similarity(db_embeddings, query_embedding_np).flatten()
        
        # Add similarity scores to the DataFrame
        results['similarity'] = similarities
        
        # Apply similarity threshold if provided
        if threshold is not None:
            results = results[results['similarity'] >= threshold]
            if results.empty:
                logger.warning("No results meet the similarity threshold.")
                return pd.DataFrame()
        
        # Sort results by similarity in descending order
        results = results.sort_values('similarity', ascending=False)
        
        # Select and rename relevant columns
        return results[['text', 'source', 'similarity']]
    
    def batch_search(self, 
                    queries: List[str], 
                    table_name: str = 'documents',
                    k: Optional[int] = None) -> List[pd.DataFrame]:
        """Perform batch semantic search for multiple queries."""
        k = k or self.config.top_k
        
        # Batch encode queries
        query_embeddings = self.model.encode(queries, show_progress_bar=True)
        
        results = []
        table = self.db.open_table(table_name)
        
        for query, embedding in zip(queries, query_embeddings):
            search_query = table.search(embedding).limit(k)
            results_arrow = search_query.to_arrow()
            df = results_arrow.to_pandas()
            
            if df.empty:
                logger.warning(f"No results found for query: {query}")
                results.append(pd.DataFrame())
                continue
            
            # Compute cosine similarity manually
            if 'vector' not in df.columns:
                logger.error(f"No 'vector' column found in search results for query: {query}")
                results.append(pd.DataFrame())
                continue
            
            db_embeddings = np.array(df['vector'].tolist())
            embedding_np = np.array(embedding).reshape(1, -1)
            similarities = cosine_similarity(db_embeddings, embedding_np).flatten()
            
            # Add similarity scores to the DataFrame
            df['similarity'] = similarities
            
            # Sort by similarity in descending order
            df = df.sort_values('similarity', ascending=False)
            
            # Add the query to the results
            df['query'] = query
            results.append(df[['query', 'text', 'source', 'similarity']])
        
        return results
    
    def delete_table(self, table_name: str) -> None:
        """Delete a table from the database."""
        if table_name in self.list_tables():
            self.db.drop_table(table_name)
            logger.info(f"Table '{table_name}' deleted")
        else:
            logger.warning(f"Table '{table_name}' not found")


## Example Usage

Here's how to use the enhanced RAG system

In [None]:
# config = RAGConfig(
#     embedding_model="BAAI/bge-small-en-v1.5",  # Change model if desired
#     chunk_size=300,  # Adjust chunk size
#     chunk_overlap=30,  # Adjust overlap
#     top_k=5  # Adjust number of results
# )
# # Initialize vector database
# db = VectorDB(config)

# # Delete the existing table
# db.delete_table('documents')


In [None]:
# Initialize system with custom configuration if needed
config = RAGConfig(
    embedding_model="BAAI/bge-small-en-v1.5",  # Change model if desired
    chunk_size=300,  # Adjust chunk size
    chunk_overlap=30,  # Adjust overlap
    top_k=5  # Adjust number of results
)

# Initialize processor and process documents
processor = DocumentProcessor(config)
processor.process_directory()

# Initialize vector database
db = VectorDB(config)



INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: BAAI/bge-small-en-v1.5


Using device: cuda
Documents path: vector_store/documents
Database path: vector_store/db


INFO:__main__:Processing vector_store/documents/ssrn-3480294.pdf...
INFO:__main__:Loading cached processing for vector_store/documents/ssrn-3480294.pdf
INFO:__main__:Created new table with 50 chunks
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: BAAI/bge-small-en-v1.5


In [None]:
# Get database information
print("\nAvailable tables:")
for table_name in db.list_tables():
    info = db.get_table_info(table_name)
    print(f"\nTable: {table_name}")
    print(f"Total chunks: {info['total_chunks']}")
    print(f"Unique documents: {info['unique_documents']}")
    print(f"Average chunk length: {info['avg_chunk_length']:.0f} characters")
    


Available tables:

Table: documents
Total chunks: 50
Unique documents: 1
Average chunk length: 1922 characters


In [None]:
# Example semantic search
query = '''the policy question
of interest depends on only one, or a very few, price eﬀects.'''
results = db.semantic_search(
    query=query,
    k=3,  # Number of results
    threshold=0.2,  # Optional similarity threshold
    source_filter=None  # Optional source filtering
)

print(f"\nQuery: {query}\n")
for _, row in results.iterrows():
    print(f"Score: {row['similarity']:.4f}")
    print(f"Source: {row['source']}")
    print(f"Text: {textwrap.fill(row['text'], width=80)}\n")


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

Available columns in search results: ['text', 'vector', 'source', 'file_hash', '_distance']

Query: the policy question
of interest depends on only one, or a very few, price eﬀects.

Score: 0.2247
Source: vector_store/documents/ssrn-3480294.pdf
Text: econometric approach. To elaborate on the economics suppose only one price is
varying. For simplicity we consider quantity rather than share. Let ( )b
et h ed e m a n df u n c t i o nf o rt h eg o o dw i t hp r i c ev a r y i n g
(holding all other prices constant) for one type of individual preferences. Let
()be the equivalent variation EV for a price change from to 1for
typeHausman and Newey (2016) showed that if the income e ﬀect for every is
bounded below and above by andrespectively then Z1 ( )e x p
(−[−])≤()≤Z1 ( )e x p (−[−]) If( )is zero over
[1]then upper and lower bounds coincide at zero. Itegrating over the
distribution of gives Z1 ¯()e x p (−[−])≤¯()≤Z

In [None]:
# Example batch search
queries = ["query 1", "query 2", "query 3"]
batch_results = db.batch_search(queries)

for df in batch_results:
    print(f"\nResults for query: {df['query'].iloc[0]}")
    for _, row in df.iterrows():
        print(f"Score: {row['similarity']:.4f}")
        print(f"Text: {textwrap.fill(row['text'], width=80)}\n")

Batches:   0%|          | 0/1 [00:00<?, ?it/s]


Results for query: query 1
Score: 0.1608
Text: the use of grocery store scanner data, allowance for nonparametric, general
heterogeneity in the cross-section, including zeros in regressions, and in the
comparison of cross-section and panelresults. 2 Demand and Weighted Average
Surplus We consider a demand model where the form of heterogeneity is
unrestricted. To describe themodel let denote the quantity of a vector of
goods, the quantity of a numeraire good, the price vector for relative to ,a
n dthe individual income level relative to the numeraire price. The unobserved
heterogeneity will be represented by a vector of unobserved disturbances of
unknown dimension. We think of each value of as corresponding to a consumer but
do allow to be continuously distributed. For each consumer the demand function
( )will be obtained by maximizing a utility function( )that is
monotonic increasing in andsubject to the budget constraint, with ( ) =
arg max ≥0≥0(