In [None]:
# Install required packages
!pip install -q boto3 pandas pyarrow pymilvus sentence-transformers requests

## 1. Connect to Services

In [None]:
import boto3
from botocore.client import Config
import pandas as pd
import requests
import io

# MinIO Configuration
MINIO_ENDPOINT = "http://localhost:9000"
MINIO_ACCESS_KEY = "admin"
MINIO_SECRET_KEY = "password"

# Create S3 client
s3 = boto3.client(
    "s3",
    endpoint_url=MINIO_ENDPOINT,
    aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY,
    config=Config(signature_version="s3v4"),
    region_name="us-east-1",
)

# Test connection
buckets = s3.list_buckets()
print("MinIO Buckets:", [b['Name'] for b in buckets['Buckets']])

In [None]:
# Check RavenDB
ravendb_url = "http://localhost:8080"
try:
    resp = requests.get(f"{ravendb_url}/databases", timeout=5)
    print(f"RavenDB: {resp.status_code == 200 and '✓ Connected' or '✗ Error'}")
except:
    print("RavenDB: ✗ Not available")

In [None]:
# Check Nessie
nessie_url = "http://localhost:19120"
try:
    resp = requests.get(f"{nessie_url}/api/v1/config", timeout=5)
    config = resp.json()
    print(f"Nessie: ✓ Connected")
    print(f"  Default branch: {config.get('defaultBranch', 'main')}")
except:
    print("Nessie: ✗ Not available")

In [None]:
# Check Milvus
from pymilvus import connections, utility

try:
    connections.connect(host="localhost", port="19530")
    collections = utility.list_collections()
    print(f"Milvus: ✓ Connected")
    print(f"  Collections: {collections}")
except Exception as e:
    print(f"Milvus: ✗ Error - {e}")

## 2. Explore the Data Lake

In [None]:
# List contents of the lakehouse bucket
def list_s3_prefix(prefix, max_keys=20):
    """List objects under a prefix."""
    resp = s3.list_objects_v2(Bucket="lakehouse", Prefix=prefix, MaxKeys=max_keys)
    return [obj['Key'] for obj in resp.get('Contents', [])]

print("Bronze Layer (Raw Files):")
for key in list_s3_prefix("bronze/"):
    print(f"  {key}")

print("\nSilver Layer (RavenDB Landing):")
for key in list_s3_prefix("silver/"):
    print(f"  {key}")

print("\nGold Layer (Vectors):")
for key in list_s3_prefix("gold/"):
    print(f"  {key}")

In [None]:
# Read order data from landing zone
def read_parquet_from_s3(key):
    """Read a Parquet file from MinIO."""
    obj = s3.get_object(Bucket="lakehouse", Key=key)
    return pd.read_parquet(io.BytesIO(obj['Body'].read()))

# Find first Parquet file
parquet_files = [k for k in list_s3_prefix("silver/ravendb_landing/orders", 100) if k.endswith('.parquet')]
if parquet_files:
    df_orders = read_parquet_from_s3(parquet_files[0])
    print(f"Sample orders from: {parquet_files[0]}")
    display(df_orders.head())
else:
    print("No order data found. Run ravendb_sync.py first.")

## 3. Vector Search Demo

In [None]:
from sentence_transformers import SentenceTransformer
from pymilvus import Collection

# Load embedding model
model = SentenceTransformer('all-MiniLM-L6-v2')
print("✓ Loaded embedding model")

In [None]:
# Connect to Milvus collection
COLLECTION_NAME = "orders_vector_index"

if utility.has_collection(COLLECTION_NAME):
    collection = Collection(COLLECTION_NAME)
    collection.load()
    print(f"✓ Collection '{COLLECTION_NAME}' loaded")
    print(f"  Entities: {collection.num_entities}")
else:
    print(f"✗ Collection not found. Run milvus_bulk_load.py first.")

In [None]:
def semantic_search(query: str, limit: int = 5):
    """
    Perform semantic search on orders.
    
    Args:
        query: Natural language search query
        limit: Number of results to return
    """
    # Generate query embedding
    query_embedding = model.encode([query])
    
    # Search in Milvus
    search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
    
    results = collection.search(
        data=query_embedding.tolist(),
        anns_field="embedding",
        param=search_params,
        limit=limit,
        output_fields=["order_id"]
    )
    
    print(f"Query: '{query}'")
    print(f"\nTop {limit} matching orders:")
    print("-" * 50)
    
    for i, hit in enumerate(results[0]):
        order_id = hit.entity.get('order_id')
        score = hit.distance
        print(f"{i+1}. {order_id} (similarity: {score:.4f})")
    
    return [hit.entity.get('order_id') for hit in results[0]]

In [None]:
# Try some semantic searches
semantic_search("high value orders shipped to New York")

In [None]:
semantic_search("pending orders with multiple items")

In [None]:
semantic_search("delivered orders from Seattle")

## 4. Full Read Path: Vector Search → Metadata Lookup

In [None]:
def get_order_details(order_ids: list) -> pd.DataFrame:
    """
    Look up full order details from the landing zone.
    
    In production, this would query Iceberg via Spark/Trino.
    For demo, we read directly from Parquet.
    """
    # Read all order parquet files
    parquet_files = [k for k in list_s3_prefix("silver/ravendb_landing/orders", 100) if k.endswith('.parquet')]
    
    dfs = []
    for key in parquet_files:
        df = read_parquet_from_s3(key)
        dfs.append(df)
    
    df_all = pd.concat(dfs, ignore_index=True)
    
    # Filter to requested order IDs
    return df_all[df_all['OrderId'].isin(order_ids)]

In [None]:
# End-to-end: Search → Lookup
query = "cancelled orders"
order_ids = semantic_search(query)

print("\n" + "=" * 50)
print("Full Order Details:")
print("=" * 50)

df_details = get_order_details(order_ids)
display(df_details)

## 5. Generate Presigned URLs (for External Compute)

In [None]:
def generate_presigned_url(key: str, expiration: int = 3600) -> str:
    """
    Generate a presigned URL for accessing a file in MinIO.
    
    Args:
        key: S3 object key
        expiration: URL expiration in seconds
    """
    return s3.generate_presigned_url(
        'get_object',
        Params={'Bucket': 'lakehouse', 'Key': key},
        ExpiresIn=expiration
    )

# Example: Generate URLs for Parquet files
parquet_files = [k for k in list_s3_prefix("gold/milvus_import", 10) if k.endswith('.parquet')]

print("Presigned URLs for External Compute:")
print("-" * 50)
for key in parquet_files[:3]:
    url = generate_presigned_url(key)
    print(f"\n{key}:")
    print(f"  {url[:80]}...")

## 6. Architecture Summary

```
┌─────────────┐     ┌──────────────┐     ┌─────────────────┐
│  RavenDB    │────▶│    MinIO     │────▶│  Apache Iceberg │
│  (Source)   │     │  (Storage)   │     │  (Table Format) │
└─────────────┘     └──────────────┘     └─────────────────┘
                           │                      │
                           ▼                      ▼
                    ┌─────────────┐      ┌────────────────┐
                    │   Nessie    │      │     Milvus     │
                    │  (Catalog)  │      │ (Vector Store) │
                    └─────────────┘      └────────────────┘
                           │                      │
                           └──────────┬───────────┘
                                      ▼
                              ┌──────────────┐
                              │  Application │
                              │   (Query)    │
                              └──────────────┘
```

### Service URLs

| Service | URL |
|---------|-----|
| RavenDB Studio | http://localhost:8080 |
| MinIO Console | http://localhost:9001 |
| Nessie API | http://localhost:19120 |
| Milvus | localhost:19530 |
| Spark UI | http://localhost:4040 |