# AsyncPulsarMemvidAdapter Tutorial

This notebook demonstrates the AsyncPulsarMemvidAdapter functionality with Apache Pulsar streaming.

## Prerequisites
1. Start Pulsar: `docker-compose up -d`
2. Wait 30-60 seconds for startup
3. Verify with: `docker logs pulsar-standalone`

In [1]:
from pydapter import AsyncAdaptable
from pydapter.extras.async_memvid_pulsar import AsyncPulsarMemvidAdapter
from pydantic import BaseModel

In [2]:
# Define data models
class ResearchPaper(AsyncAdaptable, BaseModel):
    """Research paper with rich metadata."""
    id: str
    title: str
    abstract: str
    authors: list[str]
    category: str

class SearchResult(AsyncAdaptable, BaseModel):
    """Search result from video memory."""
    text: str
    id: str = "0"
    score: float = 0.0

# Register adapters
ResearchPaper.register_async_adapter(AsyncPulsarMemvidAdapter)
SearchResult.register_async_adapter(AsyncPulsarMemvidAdapter)

# Create sample data
papers = [
    ResearchPaper(
        id="paper_001",
        title="Deep Learning for Natural Language Processing",
        abstract="This paper presents novel approaches to deep learning architectures for natural language understanding.",
        authors=["Dr. Sarah Chen", "Prof. Michael Rodriguez"],
        category="AI"
    ),
    ResearchPaper(
        id="paper_002",
        title="Quantum Computing Applications",
        abstract="We explore quantum computing implications for modern cryptographic systems.",
        authors=["Dr. Alan Quantum"],
        category="Quantum"
    )
]

print(f"Created {len(papers)} research papers:")
for paper in papers:
    print(f"- {paper.title} ({paper.category})")

Created 2 research papers:
- Deep Learning for Natural Language Processing (AI)
- Quantum Computing Applications (Quantum)


In [3]:
# Import required libraries
import asyncio
import tempfile
import os
import sys
from pathlib import Path
from typing import List
from pydantic import BaseModel
from pydapter.async_core import AsyncAdaptable

# Add source path
sys.path.insert(0, str(Path().parent.parent / "src"))

# Import AsyncPulsarMemvidAdapter
from pydapter.extras.async_memvid_pulsar import (
    AsyncPulsarMemvidAdapter,
    PulsarMemvidMessage,
    MemoryOperationResult
)

print("✅ All imports successful")

# Configuration
PULSAR_URL = "pulsar://localhost:6650"
print(f"Pulsar URL: {PULSAR_URL}")

✅ All imports successful
Pulsar URL: pulsar://localhost:6650


In [4]:
# Test Pulsar connectivity
async def test_pulsar_connectivity():
    """Test if Pulsar is accessible."""
    try:
        print("🔍 Testing Pulsar connectivity...")
        health = await AsyncPulsarMemvidAdapter.health_check(PULSAR_URL)
        
        if health.get('healthy', False):
            print("✅ Pulsar connection successful!")
            print(f"- Connection: {health.get('pulsar_connection')}")
            print(f"- Dependencies: {health.get('dependencies')}")
            return True
        else:
            print(f"❌ Health check failed: {health.get('error')}")
            return False
            
    except Exception as e:
        print(f"❌ Connection test failed: {e}")
        return False

# Run connectivity test
pulsar_ready = await test_pulsar_connectivity()

🔍 Testing Pulsar connectivity...


  from .autonotebook import tqdm as notebook_tqdm


2025-06-11 15:18:03.922 INFO  [0x1fe901f00] ClientImpl:666 | Closing Pulsar client with 0 producers and 0 consumers
✅ Pulsar connection successful!
- Connection: ok
- Dependencies: ok


In [5]:
# Async video memory operations
async def demo_async_operations():
    """Demonstrate async video memory operations."""
    if not pulsar_ready:
        print("❌ Pulsar not ready, skipping demo")
        return
    
    print("🎬 Testing async memory operations...")
    
    # Create temporary files
    temp_dir = tempfile.mkdtemp()
    video_file = os.path.join(temp_dir, "research.mp4")
    index_file = os.path.join(temp_dir, "index.json")
    
    try:
        # Test async encoding (sends to Pulsar topic)
        print("\n📨 Async encoding (queued via Pulsar)...")
        async_result = await AsyncPulsarMemvidAdapter.to_obj(
            papers[:1],  # Single paper for demo
            pulsar_url=PULSAR_URL,
            topic="memory-operations",
            memory_id="research-async",
            video_file=video_file,
            index_file=index_file,
            text_field="abstract",
            async_processing=True
        )
        
        print(f"✅ Async operation queued:")
        print(f"- Message sent: {async_result.get('message_sent')}")
        print(f"- Memory ID: {async_result.get('memory_id')}")
        print(f"- Message ID: {async_result.get('message_id')}")
        
        print("\n💡 In production, background workers would process this message.")
        
    except Exception as e:
        print(f"❌ Operation failed: {e}")

# Run async operations demo
await demo_async_operations()

🎬 Testing async memory operations...

📨 Async encoding (queued via Pulsar)...
2025-06-11 15:18:03.928 INFO  [0x1fe901f00] ClientConnection:193 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2025-06-11 15:18:03.928 INFO  [0x1fe901f00] ConnectionPool:124 | Created connection for pulsar://localhost:6650-pulsar://localhost:6650-0
2025-06-11 15:18:03.935 INFO  [0x33cd97000] ClientConnection:410 | [[::1]:51893 -> [::1]:6650] Connected to broker
2025-06-11 15:18:03.943 INFO  [0x33cd97000] HandlerBase:115 | [persistent://public/default/memory-operations, ] Getting connection from pool
2025-06-11 15:18:03.944 INFO  [0x33cd97000] BinaryProtoLookupService:86 | Lookup response for persistent://public/default/memory-operations, lookup-broker-url pulsar://localhost:6650, from [[::1]:51893 -> [::1]:6650] 
2025-06-11 15:18:03.944 INFO  [0x33cd97000] ProducerImpl:148 | Creating producer for topic:persistent://public/default/memory-operations, producerName: on [[::1]:51893 

In [6]:
# Background worker pattern
async def demo_background_worker():
    """Demonstrate background worker creation."""
    if not pulsar_ready:
        print("❌ Pulsar not ready, skipping worker demo")
        return
    
    print("👷 Creating background memory worker...")
    
    try:
        worker_func = await AsyncPulsarMemvidAdapter.create_memory_worker(
            pulsar_url=PULSAR_URL,
            topic="memory-operations",
            subscription="worker-group-1",
            result_topic="operation-results",
            worker_id="demo-worker-001"
        )
        
        print(f"✅ Worker function created: {type(worker_func).__name__}")
        print("- Worker ID: demo-worker-001")
        print("- Subscription: worker-group-1")
        print("- Input topic: memory-operations")
        print("- Result topic: operation-results")
        
        print("\n💡 In production, you would run this worker function")
        print("   in separate processes/containers to handle the message queue.")
        
    except Exception as e:
        print(f"❌ Worker creation failed: {e}")

# Run worker demo
await demo_background_worker()

👷 Creating background memory worker...
✅ Worker function created: function
- Worker ID: demo-worker-001
- Subscription: worker-group-1
- Input topic: memory-operations
- Result topic: operation-results

💡 In production, you would run this worker function
   in separate processes/containers to handle the message queue.


## Summary

This tutorial demonstrated:

✅ **Enterprise Streaming**: Apache Pulsar integration for scalable messaging  
✅ **Async Operations**: Both sync and async processing modes  
✅ **Background Workers**: Scalable worker pattern for production  
✅ **Type Safety**: Structured message models with Pydantic  
✅ **Health Monitoring**: Built-in connectivity and health checks  

### Key Benefits

- **Scalability**: Horizontal scaling via Pulsar topics
- **Reliability**: Message persistence and delivery guarantees  
- **Flexibility**: Both sync and async processing modes
- **Enterprise-Ready**: Production-grade streaming architecture

Don't forget to stop Pulsar: `docker-compose down`