# Kamiwaza RAG Demo: Document Processing with Catalog Integration

This notebook demonstrates the complete RAG (Retrieval-Augmented Generation) workflow using Kamiwaza SDK:
1. Upload documents to the catalog
2. Chunk documents into manageable pieces
3. Generate embeddings for each chunk
4. Store vectors with chunk text as metadata
5. Perform semantic search on the content

## 1. Setup and Initialization

In [1]:
# Import required libraries
import os
import json
from pathlib import Path
from typing import List, Dict, Any
from datetime import datetime

# Import Kamiwaza SDK
from kamiwaza_client import KamiwazaClient
from kamiwaza_client.schemas.catalog import Dataset
from kamiwaza_client.schemas.vectordb import (
    InsertVectorsRequest,
    SearchVectorsRequest,
    SearchResult
)

# Initialize the Kamiwaza client
KAMIWAZA_API_URL = "http://localhost:7777/api/"
client = KamiwazaClient(base_url=KAMIWAZA_API_URL)

print(f"Connected to Kamiwaza at: {KAMIWAZA_API_URL}")

Connected to Kamiwaza at: http://localhost:7777/api/


## 2. Document Upload to Catalog

In [2]:
# Helper function to add files to catalog (based on the example)
def add_files_to_catalog(filepaths, client, platform="file", recursive=True, description=""):
    """
    Add files to the Kamiwaza catalog.
    
    Args:
        filepaths: List of file paths or a single file path
        client: KamiwazaClient instance
        platform: Platform identifier (default: "file")
        recursive: Whether to process directories recursively
        description: Description for the datasets
    
    Returns:
        List of URNs for created datasets
    """
    if isinstance(filepaths, str):
        filepaths = [filepaths]
    
    urns = []
    datasets = []  # Keep track of dataset objects
    
    for filepath in filepaths:
        try:
            # Create dataset for each file
            dataset = client.catalog.create_dataset(
                dataset_name=filepath,
                platform=platform,
                environment="PROD",
                description=description or f"File: {Path(filepath).name}"
            )
            
            if dataset.urn:
                urns.append(dataset.urn)
                datasets.append(dataset)  # Store the dataset object
                print(f"✅ Added to catalog: {Path(filepath).name}")
                print(f"   URN: {dataset.urn}")
            
        except Exception as e:
            print(f"❌ Error adding {filepath}: {str(e)}")
    
    # Return both URNs and datasets
    return urns, datasets

# Function to show dataset info
def show_dataset_info(client, urns):
    """Display information about datasets in the catalog."""
    all_datasets = client.catalog.list_datasets()
    my_datasets = [d for d in all_datasets if d.urn in urns]
    
    print(f"Total datasets in catalog: {len(all_datasets)}")
    print(f"Matching datasets: {len(my_datasets)}")
    
    for d in my_datasets:
        print(f"\nURN: {d.urn}")
        print(f"ID: {d.id}")
        print(f"Platform: {d.platform}")
        print(f"Environment: {d.environment}")
        print(f"Name: {d.name if d.name else 'None'}")
        print("-" * 50)
    
    return my_datasets

In [3]:
# Example: Upload documents to catalog
# Replace with your document paths
DOCUMENT_PATHS = [
    "./kamiwaza.md",  
    "./kz_info.md"
    # Add more documents as needed
]

# Add files to catalog - now returns both URNs and datasets
urns, datasets_created = add_files_to_catalog(
    filepaths=DOCUMENT_PATHS,
    client=client,
    platform="file",
    description="RAG documents"
)

print(f"\nCreated {len(urns)} datasets in catalog")

# Show dataset information from list_datasets
if urns:
    print("\nDataset Information from Catalog:")
    datasets_from_catalog = show_dataset_info(client, urns)
    
    # Use the datasets we created directly if list_datasets doesn't return them
    if not datasets_from_catalog and datasets_created:
        print("\nUsing created datasets directly:")
        datasets = datasets_created
        for d in datasets:
            print(f"\nURN: {d.urn}")
            print(f"ID: {d.id}")
            print(f"Platform: {d.platform}")
            print(f"Environment: {d.environment}")
            print(f"Name: {d.name if d.name else 'None'}")
            print("-" * 50)
    else:
        datasets = datasets_from_catalog

✅ Added to catalog: kamiwaza.md
   URN: urn:li:dataset:(urn:li:dataPlatform:file,/home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kamiwaza.md,PROD)
✅ Added to catalog: kz_info.md
   URN: urn:li:dataset:(urn:li:dataPlatform:file,/home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kz_info.md,PROD)

Created 2 datasets in catalog

Dataset Information from Catalog:
Total datasets in catalog: 2
Matching datasets: 2

URN: urn:li:dataset:(urn:li:dataPlatform:file,/home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kamiwaza.md,PROD)
ID: /home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kamiwaza.md
Platform: file
Environment: PROD
Name: None
--------------------------------------------------

URN: urn:li:dataset:(urn:li:dataPlatform:file,/home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kz_info.md,PROD)
ID: /home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kz_info.md
Platform: file
Environment: PROD
Name: None
--------------------------------------------------


## 3. Document Chunking and Embedding

In [4]:
# Configuration for chunking and embedding
CHUNK_SIZE = 1024  # Maximum tokens per chunk
CHUNK_OVERLAP = 102  # Token overlap between chunks
EMBEDDER_MODEL = "BAAI/bge-base-en-v1.5"  # Use the same model as the RAG app

# Use a unique collection name to avoid schema conflicts
import time
timestamp = int(time.time())
COLLECTION_NAME = f"TestRAG3"

print(f"Chunking configuration:")
print(f"  - Chunk size: {CHUNK_SIZE} tokens")
print(f"  - Overlap: {CHUNK_OVERLAP} tokens")
print(f"  - Embedding model: {EMBEDDER_MODEL}")
print(f"  - Collection name: {COLLECTION_NAME}")
print(f"\nNote: Using a new collection to ensure clean schema")

# Create a global embedder instance to prevent cleanup between operations
print(f"\nInitializing global embedder...")
GLOBAL_EMBEDDER = client.embedding.get_embedder(
    model=EMBEDDER_MODEL,
    provider_type="huggingface_embedding"
)
print(f"✅ Global embedder initialized and ready")

Chunking configuration:
  - Chunk size: 1024 tokens
  - Overlap: 102 tokens
  - Embedding model: BAAI/bge-base-en-v1.5
  - Collection name: TestRAG3

Note: Using a new collection to ensure clean schema

Initializing global embedder...
✅ Global embedder initialized and ready


In [5]:
def process_document_from_catalog(dataset: Dataset, collection_name: str, embedder=None):
    """
    Process a document from the catalog: chunk, embed, and store in vector DB.
    
    Args:
        dataset: The catalog dataset containing the document
        collection_name: Name of the vector collection to store chunks
        embedder: Optional embedder instance (uses GLOBAL_EMBEDDER if not provided)
    
    Returns:
        Number of chunks processed
    """
    # Use provided embedder or global instance
    if embedder is None:
        embedder = GLOBAL_EMBEDDER
    
    # Extract file path from dataset
    # The dataset id contains the file path
    doc_path = Path(dataset.id)
    
    # Fallback: Parse path from URN if needed
    if not doc_path.exists() and dataset.urn:
        # URN format: urn:li:dataset:(urn:li:dataPlatform:file,/path/to/file,PROD)
        parts = dataset.urn.split(',')
        if len(parts) >= 2:
            doc_path = Path(parts[1])
    
    if not doc_path.exists():
        raise FileNotFoundError(f"File not found: {doc_path}")
    
    # Read document content
    with open(doc_path, 'r', encoding='utf-8') as f:
        content = f.read()
    
    print(f"📄 Processing document: {doc_path.name}")
    print(f"   - Size: {len(content)} characters")
    print(f"   - URN: {dataset.urn}")
    
    try:
        # Chunk the document
        print(f"\n📋 Chunking document...")
        chunks = embedder.chunk_text(
            text=content,
            max_length=CHUNK_SIZE,
            overlap=CHUNK_OVERLAP
        )
        
        print(f"   - Created {len(chunks)} chunks")
        
        # Generate embeddings for all chunks
        print(f"\n🧮 Generating embeddings...")
        embeddings = embedder.embed_chunks(chunks)
        
        # Prepare metadata for each chunk
        # Include both autofields and custom fields
        metadata_list = []
        
        for i, chunk in enumerate(chunks):
            # Truncate chunk text if needed to fit VARCHAR limit (1000 chars)
            chunk_text = chunk[:900] + "..." if len(chunk) > 900 else chunk
            
            metadata = {
                # Required autofields (these MUST be included)
                "model_name": EMBEDDER_MODEL,
                "source": str(doc_path),
                "catalog_urn": dataset.urn or "",
                "offset": i,
                "filename": doc_path.name,
                
                # Custom fields - these will be added to the schema
                "chunk_text": chunk_text,
                "chunk_index": i,
                "chunk_size": len(chunk),
            }
            metadata_list.append(metadata)
        
        # Insert vectors into the database
        print(f"\n💾 Inserting vectors into collection '{collection_name}'...")
        
        # Define custom fields for the collection schema
        # IMPORTANT: Use tuple format (field_name, field_type)
        field_list = [
            ("chunk_text", "str"),      # Store the actual chunk text
            ("chunk_index", "int"),     # Store chunk index
            ("chunk_size", "int"),      # Store chunk size
        ]
        
        # Use the SDK's insert method with field_list
        response = client.vectordb.insert(
            vectors=embeddings,
            metadata=metadata_list,
            collection_name=collection_name,
            field_list=field_list  # Pass custom fields
        )
        
        print(f"✅ Successfully inserted {len(chunks)} vectors")
        print(f"   - Collection: {collection_name}")
        print(f"   - Custom fields added: chunk_text, chunk_index, chunk_size")
        print(f"   - Chunk text stored directly in Milvus!")
        
        return len(chunks)
        
    except Exception as e:
        print(f"❌ Error during processing: {str(e)}")
        print(f"\nDebug info:")
        print(f"  - Collection name: {collection_name}")
        print(f"  - Number of vectors: {len(embeddings) if 'embeddings' in locals() else 'N/A'}")
        print(f"  - Number of metadata entries: {len(metadata_list) if 'metadata_list' in locals() else 'N/A'}")
        print(f"  - Field list: {field_list if 'field_list' in locals() else 'N/A'}")
        if 'metadata_list' in locals() and metadata_list:
            print(f"  - Sample metadata keys: {list(metadata_list[0].keys())}")
        raise

# Function to process all datasets from catalog
def process_catalog_datasets(datasets, collection_name):
    """Process multiple datasets from the catalog."""
    total_chunks = 0
    
    # Use the global embedder for all datasets
    for dataset in datasets:
        print(dataset)
        try:
            print(f"\n{'='*80}")
            chunks = process_document_from_catalog(dataset, collection_name, embedder=GLOBAL_EMBEDDER)
            total_chunks += chunks
        except Exception as e:
            print(f"❌ Error processing dataset {dataset.id}: {str(e)}")
    
    print(f"\n🎉 Total chunks processed: {total_chunks}")
    return total_chunks

In [6]:
# Process documents from catalog
if datasets:
    total_chunks = process_catalog_datasets(datasets, COLLECTION_NAME)
else:
    print("No datasets found in catalog to process")

urn='urn:li:dataset:(urn:li:dataPlatform:file,/home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kamiwaza.md,PROD)' id='/home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kamiwaza.md' platform='file' environment='PROD' paths=None name=None actor=None customProperties=None removed=None tags=None

📄 Processing document: kamiwaza.md
   - Size: 5590 characters
   - URN: urn:li:dataset:(urn:li:dataPlatform:file,/home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kamiwaza.md,PROD)

📋 Chunking document...


2025-07-17 14:37:03,709 - kamiwaza_client.services.embedding - INFO - Starting embedding generation for 4 chunks (batch size: 64)


   - Created 4 chunks

🧮 Generating embeddings...


2025-07-17 14:37:07,420 - kamiwaza_client.services.embedding - INFO - Successfully generated embeddings for 4 chunks



💾 Inserting vectors into collection 'TestRAG3'...


2025-07-17 14:37:09,189 - kamiwaza_client.services.embedding - INFO - Starting embedding generation for 12 chunks (batch size: 64)


✅ Successfully inserted 4 vectors
   - Collection: TestRAG3
   - Custom fields added: chunk_text, chunk_index, chunk_size
   - Chunk text stored directly in Milvus!
urn='urn:li:dataset:(urn:li:dataPlatform:file,/home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kz_info.md,PROD)' id='/home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kz_info.md' platform='file' environment='PROD' paths=None name=None actor=None customProperties=None removed=None tags=None

📄 Processing document: kz_info.md
   - Size: 17416 characters
   - URN: urn:li:dataset:(urn:li:dataPlatform:file,/home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kz_info.md,PROD)

📋 Chunking document...
   - Created 12 chunks

🧮 Generating embeddings...


2025-07-17 14:37:20,808 - kamiwaza_client.services.embedding - INFO - Successfully generated embeddings for 12 chunks



💾 Inserting vectors into collection 'TestRAG3'...
✅ Successfully inserted 12 vectors
   - Collection: TestRAG3
   - Custom fields added: chunk_text, chunk_index, chunk_size
   - Chunk text stored directly in Milvus!

🎉 Total chunks processed: 16


## 4. Semantic Search

In [7]:
def semantic_search(query: str, collection_name: str, limit: int = 5, embedder=None):
    """
    Perform semantic search on the document collection.
    
    Args:
        query: The search query
        collection_name: Name of the vector collection
        limit: Maximum number of results to return
        embedder: Optional embedder instance (uses GLOBAL_EMBEDDER if not provided)
    
    Returns:
        Search results with chunk text and metadata
    """
    # Use provided embedder or global instance
    if embedder is None:
        embedder = GLOBAL_EMBEDDER
        
    print(f"🔍 Searching for: '{query}'")
    print(f"   - Collection: {collection_name}")
    print(f"   - Max results: {limit}\n")
    
    # Generate embedding for the query
    query_embedding = embedder.create_embedding(query).embedding
    
    # Perform vector search using the simplified API
    # Include custom fields in output_fields
    results = client.vectordb.search(
        query_vector=query_embedding,
        collection_name=collection_name,
        limit=limit,
        output_fields=["source", "offset", "filename", "catalog_urn", "model_name", "chunk_text", "chunk_index", "chunk_size"]
    )
    
    # The search returns a list directly, not an object with .results
    print(f"Found {len(results)} relevant chunks:\n")
    
    for i, result in enumerate(results, 1):
        # Each result is likely a dict or object with metadata
        # Let's check the structure
        if hasattr(result, 'metadata'):
            metadata = result.metadata
        elif isinstance(result, dict) and 'metadata' in result:
            metadata = result['metadata']
        else:
            metadata = {}
            
        # Get score
        score = result.score if hasattr(result, 'score') else result.get('score', 0.0) if isinstance(result, dict) else 0.0
        
        # Get chunk text from metadata
        chunk_text = metadata.get('chunk_text', None)
        
        print(f"Result {i}:")
        print(f"  📊 Score: {score:.4f}")
        print(f"  📄 Source: {metadata.get('source', 'Unknown')}")
        print(f"  📁 Filename: {metadata.get('filename', 'Unknown')}")
        print(f"  📍 Chunk Index: {metadata.get('chunk_index', metadata.get('offset', 'N/A'))}")
        print(f"  🤖 Model: {metadata.get('model_name', 'N/A')}")
        print(f"  🔗 Catalog URN: {metadata.get('catalog_urn', 'N/A')}")
        print(f"  📏 Chunk Size: {metadata.get('chunk_size', 'N/A')} chars")
        
        # Display chunk text
        if chunk_text:
            print(f"  📝 Content:")
            # Indent the chunk text
            for line in chunk_text.split('\n'):
                print(f"     {line}")
        else:
            print(f"  📝 Note: Chunk text not available in metadata")
            
        print("-" * 80)
    
    return results

In [None]:
# Example searches
queries = [
    "What are the conditions for transitioning planning to execution?",
    "How does the operational art inform the development of a campaign plan?",
    "What is a “Center of Gravity” in operational design?"
]

for query in queries:
    results = semantic_search(query, COLLECTION_NAME, limit=3)
    print("\n" + "="*80 + "\n")

🔍 Searching for: 'What are the conditions for transitioning planning to execution?'
   - Collection: TestRAG3
   - Max results: 3

Found 3 relevant chunks:

Result 1:
  📊 Score: 0.5353
  📄 Source: /home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kz_info.md
  📁 Filename: kz_info.md
  📍 Chunk Index: 6
  🤖 Model: BAAI/bge-base-en-v1.5
  🔗 Catalog URN: urn:li:dataset:(urn:li:dataPlatform:file,/home/kamiwaza/kamiwaza-040/notebooks/sdk-rag/kz_info.md,PROD)
  📏 Chunk Size: 1991 chars
  📝 Content:
     unless explicitly specified in the deployment request:
     
     ```python
     # 1. Explicit engine specification (highest priority)
     if deployment_request.engine_name:
         engine_name = deployment_request.engine_name
     else:
         # 2. Automatic detection based on platform, hardware, and model format
         engine_name = determine_engine_automatically()
     ```
     
     #### Automatic Engine Selection Algorithm
     
     ```python
     def determine_engine_automatically():
 

## 5. Collection Management

In [28]:
# List all collections
def list_collections():
    """List all vector collections in the database."""
    collections = client.vectordb.list_collections()
    
    print("📚 Available Collections:")
    for collection in collections:
        print(f"   - {collection}")
    
    return collections

collections = list_collections()

📚 Available Collections:
   - JointPlanningProcess
   - KamiDocs


## 8. Cleanup Operations

In [None]:
# Function to clean up resources
def cleanup_collection(collection_name: str):
    """
    Drop a vector collection.
    
    Args:
        collection_name: Name of the collection to drop
    """
    try:
        client.vectordb.drop_collection(collection_name)
        print(f"✅ Dropped collection: {collection_name}")
    except Exception as e:
        print(f"❌ Error dropping collection: {str(e)}")

# Uncomment to clean up
# cleanup_collection(COLLECTION_NAME)

## Summary

This notebook demonstrated the complete RAG workflow using Kamiwaza SDK:

1. **Catalog Integration**: Documents are first uploaded to the catalog for centralized management
2. **Chunking**: Documents are split into overlapping chunks for better context
3. **Embedding**: Each chunk is converted to a vector representation
4. **Storage**: Vectors are stored with chunk text as metadata for instant retrieval
5. **Search**: Semantic search returns relevant chunks with their full text

