## Data Ingestion for Deep RAG

In this notebook, we'll load extracted data into Qdrant vector database:

- **Markdown**: Page-level chunks with metadata
- **Tables**: Separate documents with context and page numbers
- **Images**: Text descriptions embedded (generated in notebook 06-01b)
- **Hybrid Search**: Dense (semantic) + Sparse (keyword) embeddings

**Prerequisites:**
- Run notebook 06-01 first to extract PDFs
- Run notebook 06-01b to generate image descriptions
- Qdrant server running on localhost:6333
- Google API key set in .env file

**Output:**
- Single Qdrant collection with all content types
- Rich metadata for filtering (company, year, quarter, doc_type, page)
- Deduplication using file hashes

### 1. Setup and Imports

In [None]:
from dotenv import load_dotenv
load_dotenv()

import hashlib
from pathlib import Path

from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_qdrant import QdrantVectorStore, RetrievalMode, FastEmbedSparse
from langchain_core.documents import Document
from qdrant_client import QdrantClient

### 2. Configuration

In [None]:
# Paths
MARKDOWN_DIR = "data/rag-data/markdown"
TABLES_DIR = "data/rag-data/tables"
IMAGES_DESC_DIR = "data/rag-data/images_desc"

# Qdrant Configuration
COLLECTION_NAME = "financial_docs"
EMBEDDING_MODEL = "models/gemini-embedding-001"

### 3. Initialize Embeddings and Client

In [None]:
# Embeddings
embeddings = GoogleGenerativeAIEmbeddings(model=EMBEDDING_MODEL)
sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25")

### 4. Create or Recreate Collection

In [None]:
# Create vector store
vector_store = QdrantVectorStore.from_documents(
    documents=[],  # Empty initialization
    embedding=embeddings,
    sparse_embedding=sparse_embeddings,
    url="http://localhost:6333",
    collection_name=COLLECTION_NAME,
    retrieval_mode=RetrievalMode.HYBRID,
    force_recreate=True
)

In [None]:
vector_store._client

### 5. Helper Functions

In [None]:
# Reuse from 06-01
def extract_metadata_from_filename(filename: str):
    """Extract metadata from filename."""
    
    filename = filename.replace('.pdf', '').replace('.md', '')
    parts = filename.split()

    return {
        'company_name': parts[0],
        'doc_type': parts[1],
        'fiscal_quarter': parts[2] if len(parts)==4 else None,
        'fiscal_year': parts[-1]
    }

In [None]:
def compute_file_hash(file_path: Path) -> str:
    """Compute SHA-256 hash for deduplication."""
    sha256_hash = hashlib.sha256()
    with open(file_path, 'rb') as f:
        for byte_block in iter(lambda: f.read(4096), b""):
            sha256_hash.update(byte_block)
    return sha256_hash.hexdigest()

In [None]:
all_points = vector_store.client.scroll(
        collection_name=COLLECTION_NAME,
        limit=10000,
        with_payload=True
    )
    
all_points[0][0].payload['metadata']['file_hash']

In [None]:
def get_processed_hashes():
    """Get file hashes already in Qdrant."""
    
    all_points = vector_store.client.scroll(
        collection_name=COLLECTION_NAME,
        limit=10000,
        with_payload=True
    )
    
    return set(
        point.payload['metadata'].get('file_hash') 
        for point in all_points[0]
    )

### 6. Ingestion Functions

In [None]:
def ingest_markdown_file(md_path: Path, processed_hashes: set):
    """Ingest markdown file split by pages."""
    
    file_hash = compute_file_hash(md_path)
    if file_hash in processed_hashes:
        return 0
    
    markdown_text = md_path.read_text(encoding='utf-8')
    pages = markdown_text.split("<!-- page break -->")
    
    file_metadata = extract_metadata_from_filename(md_path.name)
    
    documents = []
    for page_num, page_text in enumerate(pages, start=1):
        page_content = page_text.strip()
        if page_content:
            metadata = file_metadata.copy()
            metadata['content_type'] = 'text'
            metadata['page'] = page_num
            metadata['file_hash'] = file_hash
            metadata['source_file'] = md_path.name
            
            documents.append(Document(page_content=page_content, metadata=metadata))
    
    if documents:
        vector_store.add_documents(documents)
        processed_hashes.add(file_hash)
    
    return len(documents)

In [None]:
def ingest_table_file(table_path: Path, doc_name: str, processed_hashes: set):
    """Ingest a single table file."""
    
    file_hash = compute_file_hash(table_path)
    if file_hash in processed_hashes:
        return 0
    
    table_content = table_path.read_text(encoding='utf-8')
    file_metadata = extract_metadata_from_filename(doc_name + '.md')
    
    # Extract table number and page from filename (table_1_page_5.md)
    stem = table_path.stem
    parts = stem.split('_')
    table_num = int(parts[1])
    page_num = int(parts[3]) if len(parts) >= 4 else None
    
    metadata = file_metadata.copy()
    metadata['content_type'] = 'table'
    metadata['table_number'] = table_num
    metadata['page'] = page_num
    metadata['file_hash'] = file_hash
    metadata['source_file'] = table_path.name
    
    doc = Document(page_content=table_content, metadata=metadata)
    vector_store.add_documents([doc])
    processed_hashes.add(file_hash)
    
    return 1

In [None]:
def ingest_image_description(desc_path: Path, doc_name: str, processed_hashes: set):
    """Ingest image description file."""
    
    file_hash = compute_file_hash(desc_path)
    if file_hash in processed_hashes:
        return 0
    
    description = desc_path.read_text(encoding='utf-8')
    file_metadata = extract_metadata_from_filename(doc_name + '.md')
    
    # Extract page number from filename (page_5.md)
    page_num = int(desc_path.stem.split('_')[1])
    
    metadata = file_metadata.copy()
    metadata['content_type'] = 'image'
    metadata['page'] = page_num
    metadata['file_hash'] = file_hash
    metadata['source_file'] = desc_path.name
    
    doc = Document(page_content=description, metadata=metadata)
    vector_store.add_documents([doc])
    processed_hashes.add(file_hash)
    
    return 1

In [None]:
def ingest_company_tables(company_dir: Path, processed_hashes: set) -> int:
    """Ingest all tables for a company."""
    table_count = 0
    
    for doc_dir in company_dir.iterdir():
        if doc_dir.is_dir():
            for table_file in doc_dir.glob("table_*.md"):
                table_count += ingest_table_file(table_file, doc_dir.name, processed_hashes)
    
    return table_count

In [None]:
def ingest_company_image_descriptions(company_dir: Path, processed_hashes: set) -> int:
    """Ingest all image descriptions for a company."""
    desc_count = 0
    
    for doc_dir in company_dir.iterdir():
        if doc_dir.is_dir():
            for desc_file in doc_dir.glob("page_*.md"):
                desc_count += ingest_image_description(desc_file, doc_dir.name, processed_hashes)
    
    return desc_count

### 7. Process All Data

In [None]:
processed_hashes = get_processed_hashes()

markdown_path = Path(MARKDOWN_DIR)
md_files = list(markdown_path.rglob("*.md"))

total_pages = 0
for md_path in md_files:
    total_pages += ingest_markdown_file(md_path, processed_hashes)

print(f"Markdown: {total_pages} pages ingested")

In [None]:
tables_path = Path(TABLES_DIR)
company_dirs = [d for d in tables_path.iterdir() if d.is_dir()]

total_tables = 0
for company_dir in company_dirs:
    total_tables += ingest_company_tables(company_dir, processed_hashes)

print(f"Tables: {total_tables} tables ingested")

In [None]:
images_desc_path = Path(IMAGES_DESC_DIR)
company_dirs = [d for d in images_desc_path.iterdir() if d.is_dir()]

total_images = 0
for company_dir in company_dirs:
    total_images += ingest_company_image_descriptions(company_dir, processed_hashes)

print(f"Images: {total_images} descriptions ingested")

### 8. Verify Ingestion

In [None]:
collection_info = vector_store.client.get_collection(COLLECTION_NAME)
collection_info

### 9. Test Search

In [None]:
# Test hybrid search
query = "What is Amazon's revenue?"
results = vector_store.similarity_search(query, k=5)

results