# üöÄ RAG Document Ingestion Pipeline - From Folder

Step-by-step pipeline: Folder (file_to_ingest) ‚Üí Storage ‚Üí DB ‚Üí Text ‚Üí Chunks ‚Üí Embeddings ‚Üí pgvector

## 1Ô∏è‚É£ Setup Database Connection & Test

In [None]:
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
from sqlalchemy import create_engine, text as sql_text
from sqlalchemy.orm import sessionmaker
import uuid

load_dotenv(Path(".env"))
sys.path.insert(0, str(Path.cwd()))

DATABASE_URL = os.getenv("DATABASE_URL")
if "localhost:5432" in DATABASE_URL:
    DATABASE_URL = DATABASE_URL.replace("localhost:5432", "127.0.0.1:5433")

engine = create_engine(DATABASE_URL, pool_pre_ping=True, pool_size=5, pool_recycle=3600)
SessionLocal = sessionmaker(bind=engine)

print("‚úÖ Database engine created")

In [None]:
with engine.connect() as conn:
    result = conn.execute(sql_text("SELECT current_database(), version()"))
    db_name, version = result.fetchone()
    print(f"‚úÖ Connected to: {db_name}")
    print(f"‚úÖ PostgreSQL: {version.split(',')[0]}")

## 2Ô∏è‚É£ Setup Storage Provider & Create file_to_ingest Folder

In [None]:
class LocalFileStorage:
    def __init__(self, base_path="storage/uploads"):
        self.base_path = Path(base_path)
        self.base_path.mkdir(parents=True, exist_ok=True)
    
    def put(self, file_id: str, content: bytes) -> str:
        """Save file to storage and return the file_id"""
        path = self.base_path / f"{file_id}.pdf"
        path.write_bytes(content)
        return file_id
    
    def get(self, file_id: str) -> bytes:
        path = self.base_path / f"{file_id}.pdf"
        if not path.exists():
            raise FileNotFoundError(f"File not found: {file_id}")
        return path.read_bytes()

class GCSStorageProvider:
    def __init__(self, bucket_name: str, credentials_path: str = None, project_id: str = None):
        from google.cloud import storage as gcs_storage
        from google.oauth2 import service_account
        
        if bucket_name.startswith('gs://'):
            bucket_name = bucket_name[5:]
        
        self.bucket_name = bucket_name
        
        if credentials_path and os.path.exists(credentials_path):
            creds = service_account.Credentials.from_service_account_file(credentials_path)
            self.client = gcs_storage.Client(credentials=creds, project=project_id or creds.project_id)
        else:
            self.client = gcs_storage.Client(project=project_id)
        
        self.bucket = self.client.bucket(self.bucket_name)
    
    def put(self, file_id: str, content: bytes) -> str:
        """Upload file to GCS and return the file_id"""
        blob = self.bucket.blob(f"uploads/{file_id}.pdf")
        blob.upload_from_string(content)
        return file_id
    
    def get(self, file_id: str) -> bytes:
        blob = self.bucket.blob(f"uploads/{file_id}.pdf")
        if not blob.exists():
            raise FileNotFoundError(f"File not found: {file_id}")
        return blob.download_as_bytes()

# Initialize storage
STORAGE_TYPE = os.getenv("STORAGE_TYPE", "local").lower()
if STORAGE_TYPE == "gcs":
    storage = GCSStorageProvider(
        os.getenv("GCS_BUCKET_NAME"),
        os.getenv("GCS_CREDENTIALS_PATH"),
        os.getenv("GCS_PROJECT_ID")
    )
else:
    storage = LocalFileStorage()

print(f"‚úÖ Storage provider: {storage.__class__.__name__}")

# Create file_to_ingest folder
ingest_folder = Path("file_to_ingest")
ingest_folder.mkdir(exist_ok=True)
print(f"‚úÖ Created/verified folder: {ingest_folder}")

## 3Ô∏è‚É£ Discover & List Files in file_to_ingest Folder

In [None]:
# Scan folder for PDF files
pdf_files = list(ingest_folder.glob("*.pdf"))
other_files = [f for f in ingest_folder.glob("*") if f.is_file() and f.suffix.lower() not in ['.pdf']]

print(f"üìÅ Scanning: {ingest_folder.absolute()}\n")
print(f"Found {len(pdf_files)} PDF file(s)")

if pdf_files:
    print("\nüìÑ PDF Files:")
    for i, file_path in enumerate(pdf_files, 1):
        file_size = file_path.stat().st_size
        print(f"  [{i}] {file_path.name} ({file_size:,} bytes)")
else:
    print("\n‚ö†Ô∏è  No PDF files found in file_to_ingest folder")

if other_files:
    print(f"\n‚ö†Ô∏è  {len(other_files)} other file(s) (not PDF):")
    for f in other_files[:5]:
        print(f"  - {f.name}")
    if len(other_files) > 5:
        print(f"  ... and {len(other_files) - 5} more")

In [None]:
# Select files to ingest (as list of indices)
file_indices = list(range(1, len(pdf_files) + 1))  # Change this to select specific files
# Example: [1, 2] to ingest first and second file, or [1] for just the first

selected_files = []

if pdf_files:
    print(f"üìã Selected files to ingest:\n")
    for idx in file_indices:
        if 1 <= idx <= len(pdf_files):
            file_path = pdf_files[idx - 1]
            selected_files.append(file_path)
            print(f"  ‚úÖ [{idx}] {file_path.name}")
        else:
            print(f"  ‚ùå [{idx}] Invalid index (out of range)")
    
    if selected_files:
        print(f"\n‚úÖ Total files to ingest: {len(selected_files)}")
    else:
        print("\n‚ùå No valid files selected")
else:
    print("‚ùå No PDF files to select")

## 4Ô∏è‚É£ Upload Files to Storage & Create Document Records in DB

In [None]:
from datetime import datetime

uploaded_documents = []  # Store document info {db_id, storage_filename, original_filename, file_size}

if selected_files:
    print(f"üì§ Uploading {len(selected_files)} file(s) to storage & creating DB records...\n")
    
    for file_idx, file_path in enumerate(selected_files, 1):
        try:
            # Read file content
            file_content = file_path.read_bytes()
            file_size = len(file_content)
            
            # Generate unique storage filename
            storage_filename = str(uuid.uuid4())
            
            # Upload to storage
            storage.put(storage_filename, file_content)
            
            # Insert document record into database
            db = SessionLocal()
            try:
                db.execute(sql_text("""
                    INSERT INTO document (id, original_filename, filename, file_size, status, created_at)
                    VALUES (:id, :original_filename, :filename, :file_size, :status, :created_at)
                """), {
                    "id": str(uuid.uuid4()),
                    "original_filename": file_path.name,
                    "filename": f"{storage_filename}.pdf",
                    "file_size": file_size,
                    "status": "UPLOADED",
                    "created_at": datetime.utcnow()
                })
                db.commit()
                
                # Get the inserted document ID
                result = db.execute(sql_text("""
                    SELECT id FROM document WHERE filename = :filename
                """), {"filename": f"{storage_filename}.pdf"})
                document_id = result.scalar()
                
                uploaded_documents.append({
                    "index": file_idx,
                    "db_id": document_id,
                    "storage_filename": storage_filename,
                    "original_filename": file_path.name,
                    "file_size": file_size
                })
                
                print(f"  ‚úÖ [{file_idx}] {file_path.name}")
                print(f"     Size: {file_size:,} bytes")
                print(f"     Storage ID: {storage_filename}")
                print(f"     DB ID: {document_id}\n")
            finally:
                db.close()
        
        except Exception as e:
            print(f"  ‚ùå [{file_idx}] {file_path.name}: {e}\n")
    
    if uploaded_documents:
        print(f"‚úÖ Successfully uploaded {len(uploaded_documents)}/{len(selected_files)} file(s)")
    else:
        print(f"‚ùå No files uploaded successfully")
else:
    print("‚ö†Ô∏è  No files selected for upload")

## 5Ô∏è‚É£ Extract Text from PDF

In [None]:
import pdfplumber
import io

def extract_text_from_pdf(pdf_bytes: bytes) -> dict:
    """
    Extract text from PDF with page number tracking.
    Returns: {page_number: full_text_for_page, ...}
    """
    pages_text = {}
    try:
        with pdfplumber.open(io.BytesIO(pdf_bytes)) as pdf:
            for page_num, page in enumerate(pdf.pages, 1):  # Start from page 1
                extracted = page.extract_text()
                if extracted:
                    pages_text[page_num] = extracted
    except Exception as e:
        print(f"Error extracting PDF: {e}")
        raise
    return pages_text

print("‚úÖ PDF extraction function loaded (with page tracking)")

In [None]:
extracted_texts = {}  # Dictionary: {db_id: {page_num: text}}

if uploaded_documents:
    print(f"üìÑ Extracting text from {len(uploaded_documents)} document(s)...\n")
    
    for doc in uploaded_documents:
        db_id = doc["db_id"]
        storage_filename = doc["storage_filename"]
        filename = doc["original_filename"]
        
        try:
            # Retrieve file from storage
            pdf_bytes = storage.get(storage_filename)
            pages_text = extract_text_from_pdf(pdf_bytes)
            extracted_texts[db_id] = pages_text
            
            total_chars = sum(len(t) for t in pages_text.values())
            print(f"  ‚úÖ {filename}: {len(pages_text)} pages, {total_chars} characters")
        except Exception as e:
            print(f"  ‚ùå {filename}: {e}")
    
    if extracted_texts:
        print(f"\n‚úÖ Extracted {len(extracted_texts)} document(s) with page tracking")
    else:
        print(f"‚ùå No documents extracted successfully")
else:
    print("‚ö†Ô∏è  No uploaded documents to extract")

## 6Ô∏è‚É£ Execute Text Chunking

In [None]:
import re
from typing import List, Dict, Tuple

def chunk_text_with_pages(pages_text: Dict[int, str], chunk_size: int = 500, overlap: int = 50) -> List[Tuple[str, int]]:
    """
    Chunk text while preserving page numbers.
    Returns: [(chunk_content, page_number), ...]
    """
    chunks_with_pages = []
    
    for page_num in sorted(pages_text.keys()):
        page_content = pages_text[page_num]
        sentences = re.split(r'(?<=[.!?])\s+', page_content)
        
        current_chunk = []
        current_size = 0
        
        for sentence in sentences:
            words = sentence.split()
            if current_size + len(words) > chunk_size and current_chunk:
                chunk_content = ' '.join(current_chunk)
                chunks_with_pages.append((chunk_content, page_num))
                current_chunk = current_chunk[-int(overlap/10):]
                current_size = len(' '.join(current_chunk).split())
            
            current_chunk.extend(words)
            current_size += len(words)
        
        if current_chunk:
            chunk_content = ' '.join(current_chunk)
            chunks_with_pages.append((chunk_content, page_num))
    
    return chunks_with_pages

print("‚úÖ Text chunking function loaded (with page tracking)")

In [None]:
chunks_by_document = {}  # Dictionary: {db_id: [(content, page_num), ...]}

if extracted_texts:
    print(f"üì¶ Creating chunks for {len(extracted_texts)} document(s)...\n")
    
    for doc_id, pages_text in extracted_texts.items():
        chunks_with_pages = chunk_text_with_pages(pages_text, chunk_size=500)
        chunks_by_document[doc_id] = chunks_with_pages
        
        # Get filename for display
        doc = next((d for d in uploaded_documents if d["db_id"] == doc_id), None)
        if doc:
            print(f"  ‚úÖ {doc['original_filename']}: {len(chunks_with_pages)} chunks")
            if chunks_with_pages:
                content, page_num = chunks_with_pages[0]
                print(f"     Sample: Page {page_num}: {content[:80]}...")
    
    total_chunks = sum(len(c) for c in chunks_by_document.values())
    print(f"\n‚úÖ Total chunks created: {total_chunks}")
else:
    print("‚ö†Ô∏è  No extracted text to chunk")

## 7Ô∏è‚É£ Load OpenAI & Generate Embeddings

In [None]:
from openai import OpenAI

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    raise ValueError("OPENAI_API_KEY not set in .env")

client = OpenAI(api_key=OPENAI_API_KEY)

def generate_embedding(text: str) -> List[float]:
    response = client.embeddings.create(
        input=text,
        model="text-embedding-3-small"
    )
    # Use full 1536 dimensions (updated to match database schema)
    embedding = response.data[0].embedding
    return embedding

print("‚úÖ OpenAI embedding function loaded (1536 dimensions)")

In [None]:
if chunks_by_document:
    print("üîó Generating sample embedding...\n")
    
    for doc_id, chunks_with_pages in list(chunks_by_document.items())[:1]:  # Just test first document
        if chunks_with_pages:
            # Get filename for display
            doc = next((d for d in uploaded_documents if d["db_id"] == doc_id), None)
            if doc:
                chunk_content, page_num = chunks_with_pages[0]
                print(f"  Testing: {doc['original_filename']} (Page {page_num})")
                embedding = generate_embedding(chunk_content)
                
                print(f"  ‚úÖ Embedding generated")
                print(f"     Dimension: {len(embedding)}")
                print(f"     First 5 values: {embedding[:5]}")
else:
    print("‚ö†Ô∏è  No chunks to embed")

## 8Ô∏è‚É£ Insert Chunks & Finalize Documents

In [None]:
from app.models.document_chunk import DocumentChunk

def insert_chunks_and_finalize(document_id: str, chunks_with_pages: list):
    """
    Insert chunks ke PostgreSQL dengan pgvector embeddings (1536 dimensions)
    FIXED: Use SQLAlchemy ORM untuk handle vector type dengan benar
    """
    db = SessionLocal()
    try:
        # 1. Update status to PROCESSING
        db.execute(sql_text("""
            UPDATE document SET status = 'PROCESSING' WHERE id = :id
        """), {"id": document_id})
        db.commit()
        print(f"  ‚úì Status: PROCESSING")
        
        # 2. Generate embeddings and insert chunks
        print(f"  ‚úì Inserting {len(chunks_with_pages)} chunks...")
        for idx, (chunk_content, page_number) in enumerate(chunks_with_pages):
            embedding = generate_embedding(chunk_content)  # Returns List[float]
            
            # Metadata JSON
            chunk_metadata = {
                "page": page_number,
                "chunk_sequence": idx
            }
            
            # ‚úÖ FIXED: Use ORM object directly
            chunk = DocumentChunk(
                document_id=document_id,
                chunk_index=idx,
                content=chunk_content,
                page_number=page_number,
                embedding=embedding,  # Pass as list - SQLAlchemy handles pgvector conversion
                chunk_metadata=chunk_metadata
            )
            
            db.add(chunk)
            db.flush()  # Flush to ensure it's saved but don't commit yet
            
            if (idx + 1) % 5 == 0:
                print(f"     {idx + 1}/{len(chunks_with_pages)} inserted...")
        
        db.commit()  # Commit all at once
        print(f"  ‚úì All {len(chunks_with_pages)} chunks inserted")
        
        # 3. Update document status to PROCESSED
        db.execute(sql_text("""
            UPDATE document SET status = 'PROCESSED', processed_at = :now WHERE id = :id
        """), {"now": datetime.utcnow(), "id": document_id})
        db.commit()
        print(f"  ‚úì Status: PROCESSED")
        
    except Exception as e:
        db.execute(sql_text("""
            UPDATE document SET status = 'FAILED' WHERE id = :id
        """), {"id": document_id})
        db.commit()
        print(f"\n‚ùå Error: {e}")
        raise
    finally:
        db.close()

print("‚úÖ Insert function loaded - using SQLAlchemy ORM with pgvector support")

## 9Ô∏è‚É£ EXECUTION - Process & Ingest All Documents

In [None]:
if uploaded_documents and chunks_by_document:
    print(f"üöÄ Starting RAG ingestion for {len(uploaded_documents)} document(s)\n")
    
    for doc_idx, doc in enumerate(uploaded_documents, 1):
        document_id = doc["db_id"]
        filename = doc["original_filename"]
        
        if document_id not in chunks_by_document:
            print(f"‚ö†Ô∏è  [{doc_idx}] {filename} - Skipping (no chunks available)")
            continue
        
        try:
            chunks_with_pages = chunks_by_document[document_id]
            print(f"\n[{doc_idx}/{len(uploaded_documents)}] Processing: {filename}")
            print(f"    Chunks: {len(chunks_with_pages)}")
            insert_chunks_and_finalize(document_id, chunks_with_pages)
            
        except Exception as e:
            print(f"‚ùå Error processing document: {e}\n")
            import traceback
            traceback.print_exc()
else:
    print("‚ö†Ô∏è  Missing prerequisites")
    if not uploaded_documents:
        print("   - Upload documents first (run Steps 3-4)")
    if not chunks_by_document:
        print("   - Extract and chunk text first (run Steps 5-6)")

## üîü Quick Verification - Embeddings

In [None]:
print("‚úÖ Quick Embedding Verification\n")
print("=" * 80)

def parse_vector_string(vector_str: str) -> list:
    """Parse PostgreSQL vector string to Python list"""
    if not vector_str:
        return []
    # Remove brackets and split by comma
    vector_str = vector_str.strip().strip('[]')
    return [float(x.strip()) for x in vector_str.split(',') if x.strip()]

with engine.connect() as conn:
    # Get sample embedding
    result = conn.execute(sql_text("""
        SELECT COUNT(*) as total, 
               (SELECT embedding FROM document_chunk LIMIT 1) as sample_emb
        FROM document_chunk
    """)).fetchone()
    
    total, sample = result
    
    print(f"Total chunks in database: {total}")
    
    if sample:
        # Parse vector string to list
        if isinstance(sample, str):
            sample_array = parse_vector_string(sample)
        else:
            sample_array = sample
        
        print(f"Sample embedding raw: {sample[:50]}..." if len(sample) > 50 else f"Sample embedding raw: {sample}")
        print(f"Sample embedding type (parsed): {type(sample_array).__name__}")
        print(f"Sample embedding dimensions: {len(sample_array)}")
        
        if len(sample_array) == 1536:
            print("‚úÖ CORRECT: 1536 dimensions!")
            print(f"‚úÖ First 5 values: {sample_array[:5]}")
        else:
            print(f"‚ùå WRONG: {len(sample_array)} dimensions (should be 1536)")
    
    print("=" * 80)

## 1Ô∏è‚É£1Ô∏è‚É£ Semantic Search Test

In [None]:
def semantic_search(query_text: str, top_k: int = 5) -> list:
    """
    Semantic search menggunakan pgvector cosine similarity (<=>)
    Using true cosine similarity, not distance
    Returns: List of (content, filename, page_number, similarity_score)
    """
    db = SessionLocal()
    try:
        # Generate query embedding
        query_embedding = generate_embedding(query_text)

        # Convert to pgvector array format: '[0.1, 0.2, ..., 1.5]'
        embedding_str = '[' + ','.join(str(float(v)) for v in query_embedding) + ']'

        # Use <=> operator for cosine similarity (range -1 to 1, where 1 is most similar)
        result = db.execute(sql_text(f"""
            SELECT
                dc.content,
                d.original_filename,
                dc.page_number,
                dc.embedding <=> '{embedding_str}'::vector as similarity_score
            FROM document_chunk dc
            JOIN document d ON dc.document_id = d.id
            ORDER BY dc.embedding <=> '{embedding_str}'::vector DESC
            LIMIT :limit
        """), {"limit": top_k})

        results = []
        for row in result.fetchall():
            content, filename, page_num, similarity = row
            results.append({
                "content": content,
                "filename": filename,
                "page": page_num,
                "similarity": float(similarity) if similarity else 0.0
            })

        return results
    finally:
        db.close()

print("‚úÖ Semantic search function loaded - using cosine similarity (<=>)")

In [None]:
YOUR_QUERY = "your query here"  # ‚Üê Change this to test different queries

print(f"üéØ Custom Query: \"{YOUR_QUERY}\"\n")
print("=" * 80)

results = semantic_search(YOUR_QUERY, top_k=5)

if results:
    print(f"Found {len(results)} relevant chunks:\n")
    for i, result in enumerate(results, 1):
        print(f"  [{i}] üìÑ {result['filename']}")
        print(f"      üìç Page {result['page']}")
        print(f"      ‚≠ê Similarity: {result['similarity']:.4f}")
        print(f"      üìù {result['content'][:200]}...")
        print()
else:
    print("‚ùå No relevant chunks found for this query")

print("=" * 80)