# Database Pipeline

Load embeddings from JSON and store them in PostgreSQL with pgvector support.

**Features:**
- Load embeddings from chunks_primitive_embedded.json
- Create tables with pgvector extension
- Insert embeddings with vector indexing (IVF)
- Query similar chunks by vector similarity
- Database statistics and management

## Database Setup

Before using this module, ensure PostgreSQL is configured:

```sql
-- Create pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;

-- Create database
CREATE DATABASE marigold_rag;
```

## Implementation

In [11]:
from typing import List, Optional, Tuple
import os
import json


class DatabaseConnector:
    """PostgreSQL database connector with pgvector support."""
    
    def __init__(
        self,
        host: str = None,
        port: int = None,
        database: str = None,
        user: str = None,
        password: str = None
    ):
        """
        Initialize database connection parameters.
        
        Args:
            host: Database host (defaults to DB_HOST env var or localhost)
            port: Database port (defaults to DB_PORT env var or 5432)
            database: Database name (defaults to DB_NAME env var or marigold_rag)
            user: Database user (defaults to DB_USER env var or postgres)
            password: Database password (defaults to DB_PASSWORD env var)
        """
        self.host = host or os.getenv("DB_HOST", "localhost")
        self.port = int(port or os.getenv("DB_PORT", 5432))
        self.database = database or os.getenv("DB_NAME", "marigold_rag")
        self.user = user or os.getenv("DB_USER", "postgres")
        self.password = password or os.getenv("DB_PASSWORD")
        
        self.connection = None
        self.cursor = None
    
    def connect(self):
        """Establish database connection."""
        try:
            import psycopg2
            from psycopg2 import pool
        except ImportError:
            raise ImportError(
                "psycopg2 package required. Install with: pip install psycopg2-binary"
            )
        
        try:
            self.connection = psycopg2.connect(
                host=self.host,
                port=self.port,
                database=self.database,
                user=self.user,
                password=self.password
            )
            self.cursor = self.connection.cursor()
            print(f"Connected to {self.database} on {self.host}:{self.port}")
        except psycopg2.OperationalError as e:
            raise ConnectionError(f"Failed to connect to database: {e}")
    
    def disconnect(self):
        """Close database connection."""
        if self.cursor:
            self.cursor.close()
        if self.connection:
            self.connection.close()
            print("Disconnected from database")
    
    def _execute(self, query: str, params: tuple = None):
        """Execute a query."""
        try:
            self.cursor.execute(query, params or ())
            self.connection.commit()
        except Exception as e:
            self.connection.rollback()
            raise RuntimeError(f"Query execution failed: {e}\nQuery: {query}")
    
    def _fetch_all(self, query: str, params: tuple = None):
        """Execute a query and fetch all results."""
        self.cursor.execute(query, params or ())
        return self.cursor.fetchall()
    
    def create_tables(self):
        """Create necessary tables if they don't exist."""
        try:
            import psycopg2
        except ImportError:
            raise ImportError("psycopg2 package required")
        
        # Enable pgvector extension
        try:
            self._execute("CREATE EXTENSION IF NOT EXISTS vector;")
        except:
            print("Warning: Could not create vector extension (may already exist)")
        
        # Create chunks table
        create_chunks_table = """
        CREATE TABLE IF NOT EXISTS chunks (
            id SERIAL PRIMARY KEY,
            component VARCHAR(255) NOT NULL,
            section_title VARCHAR(500),
            section_path TEXT,
            content TEXT NOT NULL,
            demo_files TEXT,
            images TEXT,
            token_count INTEGER,
            embedding vector(768),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            CONSTRAINT valid_content CHECK (char_length(content) > 0)
        );
        """
        
        # Create index for vector similarity search
        create_index = """
        CREATE INDEX IF NOT EXISTS chunks_embedding_idx 
        ON chunks USING ivfflat (embedding vector_cosine_ops)
        WITH (lists = 100);
        """
        
        # Create metadata table for tracking
        create_metadata_table = """
        CREATE TABLE IF NOT EXISTS processing_metadata (
            id SERIAL PRIMARY KEY,
            component VARCHAR(255),
            total_chunks INTEGER,
            processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """
        
        self._execute(create_chunks_table)
        self._execute(create_index)
        self._execute(create_metadata_table)
        
        print("Database tables created successfully")
    
    def store_chunk(
        self,
        component: str,
        section_title: Optional[str],
        section_path: Optional[str],
        content: str,
        embedding: List[float],
        demo_files: List[str] = None,
        images: List[str] = None,
        token_count: int = 0
    ) -> int:
        """
        Store a chunk with its embedding.
        
        Args:
            component: Component name
            section_title: Section title
            section_path: Section path
            content: Chunk content
            embedding: Embedding vector
            demo_files: List of demo files
            images: List of image paths
            token_count: Token count for chunk
        
        Returns:
            Chunk ID
        """
        # Convert lists to JSON strings
        demo_files_json = json.dumps(demo_files or [])
        images_json = json.dumps(images or [])
        
        # Convert embedding to string format for pgvector
        embedding_str = "[" + ",".join(str(x) for x in embedding) + "]"
        
        insert_query = """
        INSERT INTO chunks 
        (component, section_title, section_path, content, embedding, 
         demo_files, images, token_count)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        RETURNING id;
        """
        
        self.cursor.execute(
            insert_query,
            (component, section_title, section_path, content, embedding_str,
             demo_files_json, images_json, token_count)
        )
        self.connection.commit()
        
        chunk_id = self.cursor.fetchone()[0]
        return chunk_id
    
    def search_similar(
        self,
        embedding: List[float],
        limit: int = 5,
        threshold: float = 0.7
    ) -> List[Tuple]:
        """
        Search for similar chunks using vector similarity.
        
        Args:
            embedding: Query embedding vector
            limit: Maximum number of results
            threshold: Cosine similarity threshold (0-1)
        
        Returns:
            List of (chunk_id, component, section_title, content, similarity_score)
        """
        embedding_str = "[" + ",".join(str(x) for x in embedding) + "]"
        
        search_query = """
        SELECT id, component, section_title, content,
               1 - (embedding <=> %s::vector) as similarity
        FROM chunks
        WHERE 1 - (embedding <=> %s::vector) > %s
        ORDER BY embedding <=> %s::vector
        LIMIT %s;
        """
        
        results = self._fetch_all(
            search_query,
            (embedding_str, embedding_str, threshold, embedding_str, limit)
        )
        
        return results
    
    def get_chunk(self, chunk_id: int) -> Optional[Tuple]:
        """Get a chunk by ID."""
        query = """
        SELECT id, component, section_title, content, demo_files, images, 
               token_count, created_at
        FROM chunks
        WHERE id = %s;
        """
        
        result = self._fetch_all(query, (chunk_id,))
        return result[0] if result else None
    
    def get_chunks_by_component(self, component: str) -> List[Tuple]:
        """Get all chunks for a component."""
        query = """
        SELECT id, component, section_title, content, token_count
        FROM chunks
        WHERE component = %s
        ORDER BY created_at;
        """
        
        return self._fetch_all(query, (component,))
    
    def delete_component_chunks(self, component: str) -> int:
        """Delete all chunks for a component (for reprocessing)."""
        query = "DELETE FROM chunks WHERE component = %s;"
        
        self.cursor.execute(query, (component,))
        self.connection.commit()
        
        return self.cursor.rowcount
    
    def get_stats(self) -> dict:
        """Get database statistics."""
        stats = {}
        
        # Total chunks
        result = self._fetch_all("SELECT COUNT(*) FROM chunks;")
        stats["total_chunks"] = result[0][0] if result else 0
        
        # Chunks per component
        result = self._fetch_all(
            "SELECT component, COUNT(*) as count FROM chunks GROUP BY component;"
        )
        stats["chunks_by_component"] = {row[0]: row[1] for row in result}
        
        # Average token count
        result = self._fetch_all("SELECT AVG(token_count) FROM chunks;")
        stats["avg_token_count"] = float(result[0][0]) if result and result[0][0] else 0
        
        return stats


print("DatabaseConnector module loaded successfully")

DatabaseConnector module loaded successfully


## Section 2: Load and Store Embeddings

Load embeddings from JSON file and insert into database.

In [12]:
import json
from pathlib import Path
import os

# Load embeddings from file
CHUNKS_DIR = Path('/home/sinan/GitHub/reservix/ai-assistant/etl/data/chunks')
embeddings_file = CHUNKS_DIR / 'chunks_primitive_embedded.json'

print(f"Loading embeddings from: {embeddings_file}")
print(f"File exists: {embeddings_file.exists()}")

try:
    with open(embeddings_file) as f:
        chunks_with_embeddings = json.load(f)
    print(f"Loaded {len(chunks_with_embeddings)} chunks with embeddings")
    
    # Check embedding dimensions
    first_embedding = chunks_with_embeddings[0].get('embedding', [])
    print(f"Embedding dimensions: {len(first_embedding)}")
    print(f"Sample keys: {list(chunks_with_embeddings[0].keys())}")
    
except FileNotFoundError as e:
    print(f"Error: {e}")
    print("Run the embedder notebook first to generate embeddings")
    chunks_with_embeddings = []

Loading embeddings from: /home/sinan/GitHub/reservix/ai-assistant/etl/data/chunks/chunks_primitive_embedded.json
File exists: True
Loaded 186 chunks with embeddings
Embedding dimensions: 768
Sample keys: ['component', 'section_title', 'section_path', 'content', 'demo_files', 'images', 'token_count', 'embedding']


## Environment Variables

Configure database connection via environment:

In [13]:
from psycopg2 import Error

# Initialize database connector
db = DatabaseConnector(
    host=os.getenv("DB_HOST", "localhost"),
    port=int(os.getenv("DB_PORT", 5432)),
    database=os.getenv("DB_NAME", "marigold_rag"),
    user=os.getenv("DB_USER", "postgres"),
    password=os.getenv("DB_PASSWORD", "postgres")
)

try:
    # Connect and create tables
    db.connect()
    db.create_tables()
    print("Tables created successfully\n")
    
    # Store all chunks with embeddings
    if chunks_with_embeddings:
        print(f"Storing {len(chunks_with_embeddings)} chunks...")
        
        stored_count = 0
        for i, chunk in enumerate(chunks_with_embeddings):
            try:
                chunk_id = db.store_chunk(
                    component=chunk.get('component', 'unknown'),
                    section_title=chunk.get('section_title'),
                    section_path=chunk.get('section_path'),
                    content=chunk.get('content', ''),
                    embedding=chunk.get('embedding', []),
                    demo_files=chunk.get('demo_files', []),
                    images=chunk.get('images', []),
                    token_count=chunk.get('token_count', 0)
                )
                stored_count += 1
                
                # Progress indicator
                if (i + 1) % 50 == 0:
                    print(f"  Stored {i + 1}/{len(chunks_with_embeddings)} chunks...")
            
            except Error as e:
                print(f"Error storing chunk {i}: {e}")
                continue
        
        print(f"\nSuccessfully stored {stored_count}/{len(chunks_with_embeddings)} chunks")
        
        # Show statistics
        stats = db.get_stats()
        print(f"\nDatabase Statistics:")
        print(f"  Total chunks: {stats['total_chunks']}")
        print(f"  Chunks by component: {stats['chunks_by_component']}")
        print(f"  Avg token count: {stats['avg_token_count']:.1f}")
    else:
        print("No embeddings loaded - skipping insert")

except Exception as e:
    print(f"Database error: {e}")
    import traceback
    traceback.print_exc()

finally:
    db.disconnect()

Connected to marigold_rag on localhost:5432
Database tables created successfully
Tables created successfully

Storing 186 chunks...
  Stored 50/186 chunks...
  Stored 100/186 chunks...
  Stored 150/186 chunks...

Successfully stored 186/186 chunks

Database Statistics:
  Total chunks: 186
  Chunks by component: {'tiles': 2, 'inset': 2, 'cva': 1, 'cn': 1, 'breadcrumbs': 2, 'inline': 3, 'button': 7, 'stack': 3, 'textarea': 2, 'multiselect': 4, 'slider': 4, 'selectlist': 2, 'link-button': 2, 'svg': 2, 'switch': 2, 'center': 1, 'number-field': 6, 'dialog': 4, 'toast': 2, 'useListData': 1, 'drawer': 3, 'file-field': 1, 'accordion': 2, 'aside': 2, 'section-message': 3, 'columns': 3, 'useAsyncListData': 1, 'calendar': 2, 'aspect': 2, 'scrollable': 2, 'useTheme': 1, 'numericformat': 3, 'timefield': 2, 'visually-hidden': 2, 'dateformat': 3, 'checkbox': 6, 'text': 2, 'overview': 4, 'pagination': 5, 'grid': 2, 'tooltip': 2, 'datepicker': 3, 'select': 4, 'provider': 2, 'textfield': 4, 'divider': 1