# Document Ingestion Pipeline - Step-by-Step Testing

This notebook tests each pipeline step individually to help diagnose issues.

## Prerequisites

Run this in an OpenShift AI workbench with network access to:
- MinIO S3 endpoint
- vector-search-service
- PostgreSQL database

## Configuration

Update these values before running:

In [None]:
# Configuration
import os

# S3/MinIO Configuration
S3_ENDPOINT = "https://minio-api-minio.apps.meshtest.llnl.gov"
S3_BUCKET = "kb-documents"
S3_PREFIX = "data/"  # Include trailing slash!
S3_ACCESS_KEY = "minioadmin"  # Update with actual key
S3_SECRET_KEY = "minioadmin"  # Update with actual key

# Local paths
DOWNLOAD_PATH = "/tmp/documents"
FILE_EXTENSIONS = [".md", ".txt", ".html"]

# Service Configuration
SERVICE_URL = "http://vector-search-service.servicenow-ai-poc.svc.cluster.local:8000"
COLLECTION_NAME = "default"
BATCH_SIZE = 10

# Database Configuration
DB_HOST = "postgres-pgvector.servicenow-ai-poc.svc.cluster.local"
DB_PORT = "5432"
DB_USER = "raguser"
DB_PASSWORD = ""  # MUST UPDATE
DB_NAME = "ragdb"

print("Configuration loaded")
print(f"  S3: {S3_ENDPOINT}/{S3_BUCKET}/{S3_PREFIX}")
print(f"  Service: {SERVICE_URL}")
print(f"  Collection: {COLLECTION_NAME}")
print(f"  Database: {DB_HOST}:{DB_PORT}/{DB_NAME}")

## Step 0: Install Dependencies

In [None]:
!pip install boto3 requests psycopg2-binary

## Step 1: Download from S3/MinIO

In [None]:
import boto3
from pathlib import Path
from botocore.config import Config
import warnings
warnings.filterwarnings('ignore')  # Suppress SSL warnings

print(f"Downloading from S3: {S3_ENDPOINT}/{S3_BUCKET}/{S3_PREFIX}")

# Configure with explicit timeouts
config = Config(
    connect_timeout=10,
    read_timeout=30,
    retries={'max_attempts': 3}
)

# Create S3 client
s3_client = boto3.client(
    's3',
    endpoint_url=S3_ENDPOINT,
    aws_access_key_id=S3_ACCESS_KEY,
    aws_secret_access_key=S3_SECRET_KEY,
    verify=False,  # For self-signed certs
    config=config
)

# Create download directory
Path(DOWNLOAD_PATH).mkdir(parents=True, exist_ok=True)

# List and download all objects
paginator = s3_client.get_paginator('list_objects_v2')
downloaded_count = 0
downloaded_files = []

print("Listing files...")
for page in paginator.paginate(Bucket=S3_BUCKET, Prefix=S3_PREFIX):
    if 'Contents' not in page:
        print("No contents found!")
        continue

    for obj in page['Contents']:
        s3_key = obj['Key']

        # Skip directory markers
        if s3_key.endswith('/'):
            continue

        # Calculate local path
        relative_path = s3_key[len(S3_PREFIX):] if S3_PREFIX else s3_key
        local_file = os.path.join(DOWNLOAD_PATH, relative_path)

        # Create local directory
        Path(local_file).parent.mkdir(parents=True, exist_ok=True)

        # Download file
        print(f"  [{downloaded_count+1}] {s3_key}")
        s3_client.download_file(S3_BUCKET, s3_key, local_file)
        downloaded_count += 1
        downloaded_files.append(local_file)

print(f"\nDownloaded {downloaded_count} files to {DOWNLOAD_PATH}")
print(f"First 5 files: {downloaded_files[:5]}")

## Step 2: Discover Documents

In [None]:
import os

files = []
for root, _, filenames in os.walk(DOWNLOAD_PATH):
    for filename in filenames:
        if any(filename.endswith(ext) for ext in FILE_EXTENSIONS):
            full_path = os.path.join(root, filename)
            files.append(full_path)

print(f"Discovered {len(files)} files")
print(f"\nFirst 10 files:")
for f in files[:10]:
    print(f"  {f}")

# Save for next step
discovered_files = files

## Step 3: Test Single Document Ingestion

Let's test ingesting just ONE document first to see if the API works.

In [None]:
import requests
from pathlib import Path
import json

if not discovered_files:
    print("No files discovered! Run Step 2 first.")
else:
    # Test with first file
    test_file = discovered_files[0]
    print(f"Testing ingestion with: {test_file}")
    
    try:
        # Read file content
        with open(test_file, 'r', encoding='utf-8') as f:
            file_content = f.read()
        
        print(f"File size: {len(file_content)} characters")
        print(f"First 200 chars: {file_content[:200]}...")
        
        # Prepare payload
        filename = Path(test_file).name
        payload = {
            "content": file_content,
            "metadata": {
                "source": "jupyter-test",
                "file_path": test_file,
                "filename": filename
            }
        }
        
        print(f"\nSending to: {SERVICE_URL}/api/v1/collections/{COLLECTION_NAME}/documents")
        
        # Send request
        response = requests.post(
            f"{SERVICE_URL}/api/v1/collections/{COLLECTION_NAME}/documents",
            json=payload,
            headers={"Content-Type": "application/json"},
            timeout=300
        )
        
        print(f"\nResponse status: {response.status_code}")
        print(f"Response headers: {dict(response.headers)}")
        print(f"Response body: {response.text}")
        
        if response.status_code in [200, 201]:
            result = response.json()
            doc_id = result.get('document_id', 'unknown')
            print(f"\nSUCCESS: Document ID {doc_id}")
        else:
            print(f"\nFAILED: HTTP {response.status_code}")
            print(f"Error: {response.text}")
            
    except Exception as e:
        print(f"\nERROR: {str(e)}")
        import traceback
        traceback.print_exc()

## Step 4: Batch Ingestion (All Documents)

Only run this after Step 3 works successfully!

In [None]:
import requests
from pathlib import Path

if not discovered_files:
    print("No files discovered! Run Step 2 first.")
else:
    print(f"Processing {len(discovered_files)} files in batches of {BATCH_SIZE}")
    print(f"Target collection: {COLLECTION_NAME}")
    
    batch_results = []
    successful = 0
    failed = 0
    
    # Process in batches
    for i in range(0, len(discovered_files), BATCH_SIZE):
        batch = discovered_files[i:i + BATCH_SIZE]
        print(f"\nProcessing batch {i//BATCH_SIZE + 1}: {len(batch)} files")
        
        for file_path in batch:
            try:
                # Read file content
                with open(file_path, 'r', encoding='utf-8') as f:
                    file_content = f.read()
                
                # Prepare request
                filename = Path(file_path).name
                payload = {
                    "content": file_content,
                    "metadata": {
                        "source": "jupyter-batch",
                        "file_path": file_path,
                        "filename": filename
                    }
                }
                
                # Send request
                response = requests.post(
                    f"{SERVICE_URL}/api/v1/collections/{COLLECTION_NAME}/documents",
                    json=payload,
                    headers={"Content-Type": "application/json"},
                    timeout=300
                )
                
                if response.status_code in [200, 201]:
                    result = response.json()
                    doc_id = result.get('document_id', 'unknown')
                    print(f"  SUCCESS: {filename}: Document ID {doc_id}")
                    batch_results.append({
                        "file": file_path,
                        "success": True,
                        "document_id": doc_id
                    })
                    successful += 1
                else:
                    error_detail = response.text
                    print(f"  FAILED: {filename}: HTTP {response.status_code} - {error_detail}")
                    batch_results.append({
                        "file": file_path,
                        "success": False,
                        "error": f"HTTP {response.status_code}: {error_detail}"
                    })
                    failed += 1
                    
            except Exception as e:
                print(f"  ERROR: {file_path}: {str(e)}")
                batch_results.append({
                    "file": file_path,
                    "success": False,
                    "error": str(e)
                })
                failed += 1
    
    print(f"\n{'='*60}")
    print(f"Summary: {successful}/{len(discovered_files)} files ingested successfully")
    print(f"Failed: {failed}")
    print(f"{'='*60}")
    
    # Show failed files if any
    if failed > 0:
        print("\nFailed files:")
        for result in batch_results:
            if not result["success"]:
                print(f"  {result['file']}: {result['error']}")

## Step 5: Verify Ingestion in Database

In [None]:
import psycopg2

if not DB_PASSWORD:
    print("ERROR: DB_PASSWORD not set! Update configuration cell.")
else:
    print(f"Connecting to database: {DB_HOST}:{DB_PORT}/{DB_NAME}")
    
    # Connect to database
    conn = psycopg2.connect(
        host=DB_HOST,
        port=DB_PORT,
        user=DB_USER,
        password=DB_PASSWORD,
        database=DB_NAME
    )
    
    cur = conn.cursor()
    
    # Query document statistics
    print("\nQuerying documents table...")
    cur.execute("""
        SELECT
            COUNT(*) as total_documents,
            COUNT(DISTINCT collection_id) as total_collections
        FROM documents
    """)
    doc_stats = cur.fetchone()
    
    # Query embedding statistics
    print("Querying embeddings table...")
    cur.execute("""
        SELECT COUNT(*) as total_embeddings
        FROM embeddings
    """)
    emb_stats = cur.fetchone()
    
    # Query collection info
    print("Querying collections table...")
    cur.execute("""
        SELECT id, name, description, created_at
        FROM collections
    """)
    collections = cur.fetchall()
    
    print(f"\n{'='*60}")
    print("Database Statistics:")
    print(f"  Total documents: {doc_stats[0]}")
    print(f"  Total collections: {doc_stats[1]}")
    print(f"  Total embeddings: {emb_stats[0]}")
    print(f"\nCollections:")
    for col in collections:
        print(f"  - {col[1]} (ID: {col[0]}): {col[2]}")
        print(f"    Created: {col[3]}")
    print(f"{'='*60}")
    
    # Sample some documents
    print("\nSample documents (first 5):")
    cur.execute("""
        SELECT id, collection_id, content_preview, created_at
        FROM documents
        ORDER BY created_at DESC
        LIMIT 5
    """)
    docs = cur.fetchall()
    for doc in docs:
        print(f"\n  Document ID: {doc[0]}")
        print(f"  Collection ID: {doc[1]}")
        print(f"  Preview: {doc[2][:100]}...")
        print(f"  Created: {doc[3]}")
    
    cur.close()
    conn.close()
    
    print("\nDatabase connection closed.")

## Step 6: Test Search

Verify we can search the ingested documents.

In [None]:
import requests
import json

# Test search query
search_query = "troubleshooting"
limit = 5

print(f"Searching for: '{search_query}'")
print(f"Limit: {limit} results")

payload = {
    "query": search_query,
    "limit": limit
}

response = requests.post(
    f"{SERVICE_URL}/api/v1/collections/{COLLECTION_NAME}/search",
    json=payload,
    headers={"Content-Type": "application/json"},
    timeout=30
)

print(f"\nResponse status: {response.status_code}")

if response.status_code == 200:
    results = response.json()
    print(f"\nFound {len(results)} results:")
    print(f"\n{'='*60}")
    
    for i, result in enumerate(results, 1):
        print(f"\nResult {i}:")
        print(f"  Document ID: {result.get('document_id', 'N/A')}")
        print(f"  Score: {result.get('score', 'N/A')}")
        print(f"  Preview: {result.get('content', 'N/A')[:200]}...")
        if 'metadata' in result:
            print(f"  Metadata: {result['metadata']}")
    
    print(f"\n{'='*60}")
else:
    print(f"Search failed: {response.text}")

## Debugging Cell

Use this cell for ad-hoc debugging queries.

In [None]:
# Add any debugging code here
# For example, check if collection exists:

import requests

response = requests.get(f"{SERVICE_URL}/api/v1/collections")
print(f"Collections: {response.json()}")