# ⚡ AsyncValkeyStore: Enterprise Async Patterns

This notebook demonstrates **when and why** to use `AsyncValkeyStore` for production workloads.

## 🎯 What You'll Learn

1. **🚀 Concurrent Search**: Handle multiple users simultaneously
2. **📦 Batch Processing**: Process hundreds of items efficiently
3. **🌐 FastAPI Integration**: Real async web service patterns
4. **📊 Performance Gains**: Measure actual speedup vs sync
5. **🏗️ Production Patterns**: Connection pooling, rate limiting, error handling

## 📚 Prerequisites

This notebook builds on concepts from:
- [`valkey_store.ipynb`](./valkey_store.ipynb) - Core ValkeyStore patterns (start here!)
- [`agentcore_valkey_store.ipynb`](./agentcore_valkey_store.ipynb) - Basic sync/async parity

## ⚖️ When to Use Async

| Scenario | Use Async | Use Sync |
|----------|-----------|----------|
| Single user CLI/script | ❌ | ✅ |
| Web API (FastAPI, aiohttp) | ✅ | ❌ |
| Batch processing 100+ items | ✅ | ❌ |
| Concurrent multi-user access | ✅ | ❌ |
| Simple notebook exploration | ❌ | ✅ |
| High-throughput production | ✅ | ⚠️ |

## 🔧 Environment Setup

**AWS Credentials**: Configure with `aws configure` or environment variables  
**Valkey Server**: Local (`localhost:6379`) or AWS MemoryDB connection string

In [1]:
# Install required packages
# !pip install langgraph-checkpoint-aws langchain-aws boto3 valkey

## 📦 Imports and Configuration

In [2]:
import asyncio
import time
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
import logging

# LangChain and AWS imports
from langchain_core.embeddings import Embeddings
from langchain_aws import BedrockEmbeddings

# ValkeyStore imports
import valkey
from langgraph_checkpoint_aws.store.valkey import AsyncValkeyStore, ValkeyIndexConfig

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Configuration
VALKEY_CONN_STRING = "valkey://localhost:6379"  # Update for your environment
VECTOR_DIMENSION = 1024  # Titan Text Embeddings v2: 1024, v1: 1536
NAMESPACE = ("async_demo",)
AWS_REGION = "us-east-1"

# Embedding mode: 'mock' (default, no AWS credentials needed) or 'bedrock' (production)
EMBEDDING_MODE = "mock"  # Change to "bedrock" for production with real embeddings

print("✅ Imports complete")

  from pydantic.v1.fields import FieldInfo as FieldInfoV1


✅ Imports complete


## 🎯 Embeddings Setup

This notebook supports two modes:

### 1. Mock Embeddings (Default) - No AWS Credentials Required ✅
- **Instant setup**: Works locally without any configuration
- **Realistic timing**: Simulates actual Bedrock API latency (80-120ms)
- **Perfect for**: Development, testing, learning async patterns
- **Limitations**: Not semantically meaningful, uses hash-based vectors

### 2. Bedrock Embeddings (Production) - Real Semantic Search
- **Production-ready**: Uses AWS Bedrock Titan embeddings
- **Semantic search**: Real similarity scores based on meaning
- **Requires**: AWS credentials configured (via AWS CLI or environment variables)

**Current Mode: `EMBEDDING_MODE = "mock"`**

### 🔄 How to Switch to Bedrock

1. **Configure AWS credentials** (one-time setup):
   ```bash
   aws configure
   # OR set environment variables:
   # export AWS_ACCESS_KEY_ID=your_key
   # export AWS_SECRET_ACCESS_KEY=your_secret
   # export AWS_DEFAULT_REGION=us-east-1
   ```

2. **Change mode in Cell 3**:
   ```python
   EMBEDDING_MODE = "bedrock"  # Change from "mock" to "bedrock"
   ```

3. **Re-run all cells** - that's it! 🎉

In [3]:
class MockEmbeddings(Embeddings):
    """Mock embeddings with realistic Bedrock API timing.
    
    Simulates Bedrock Titan embed-text-v2 behavior:
    - Latency: 80-120ms per request (matches real Bedrock)
    - Vector dimension: 1024 (configurable)
    - Deterministic: Same text always produces same vector
    """
    
    def embed_documents(self, texts: List[str]) -> List[List[float]]:
        """Generate mock embeddings based on text characteristics."""
        return [self._generate_embedding(text) for text in texts]
    
    def embed_query(self, text: str) -> List[float]:
        """Generate mock embedding for query."""
        return self._generate_embedding(text)
    
    async def aembed_documents(self, texts: List[str]) -> List[List[float]]:
        """Async version - simulates Bedrock API latency (80-120ms)."""
        import random
        # Simulate realistic Bedrock latency: 80-120ms
        latency = random.uniform(0.08, 0.12)
        await asyncio.sleep(latency)
        return self.embed_documents(texts)
    
    async def aembed_query(self, text: str) -> List[float]:
        """Async query embedding with simulated latency."""
        import random
        latency = random.uniform(0.08, 0.12)
        await asyncio.sleep(latency)
        return self.embed_query(text)
    
    def _generate_embedding(self, text: str) -> List[float]:
        """Create a deterministic vector based on text hash."""
        import hashlib
        import struct
        
        # Use text hash for reproducible but varied embeddings
        hash_obj = hashlib.sha256(text.encode())
        hash_bytes = hash_obj.digest()
        
        # Generate VECTOR_DIMENSION floats from hash
        vector = []
        for i in range(0, VECTOR_DIMENSION * 4, 4):
            byte_chunk = hash_bytes[(i % len(hash_bytes)):(i % len(hash_bytes)) + 4]
            if len(byte_chunk) < 4:
                byte_chunk = byte_chunk + b'\x00' * (4 - len(byte_chunk))
            value = struct.unpack('f', byte_chunk)[0]
            # Normalize to [-1, 1] range
            normalized = max(-1.0, min(1.0, value / 1e38))
            vector.append(normalized)
        
        # Normalize to unit vector for cosine similarity
        magnitude = sum(x * x for x in vector) ** 0.5
        if magnitude > 0:
            vector = [x / magnitude for x in vector]
        
        return vector


# Initialize embeddings based on mode
if EMBEDDING_MODE == "bedrock":
    # Production mode: Real Bedrock embeddings
    try:
        import boto3
        from botocore.config import Config
        
        # Configure boto3 for async concurrency
        # Create dedicated boto3 session for isolated connection pool
        boto_session = boto3.Session()

        bedrock_config = Config(
            max_pool_connections=50,  # Up from default 10 for concurrent operations
            retries={'max_attempts': 3, 'mode': 'adaptive'}
        )
        
        embeddings = BedrockEmbeddings(
            model_id="amazon.titan-embed-text-v2:0",
            region_name=AWS_REGION,
            client=boto_session.client(
                "bedrock-runtime",
                region_name=AWS_REGION,
                config=bedrock_config
            )
        )
        print("✅ Using BedrockEmbeddings (Production)")
        print(f"   Model: amazon.titan-embed-text-v2:0")
        print(f"   Region: {AWS_REGION}")
        print(f"   Connection pool: 50")
    except Exception as e:
        print(f"❌ Bedrock initialization failed: {e}")
        print("💡 Falling back to MockEmbeddings")
        print("   Tip: Check AWS credentials with 'aws sts get-caller-identity'")
        embeddings = MockEmbeddings()
        EMBEDDING_MODE = "mock"
else:
    # Development mode: Mock embeddings (default)
    embeddings = MockEmbeddings()
    print("✅ Using MockEmbeddings (Development)")
    print("   Simulated latency: 80-120ms (matches Bedrock)")
    print(f"   Vector dimension: {VECTOR_DIMENSION}")
    print("   💡 To use real Bedrock: Set EMBEDDING_MODE = 'bedrock' in Cell 3")

✅ Using MockEmbeddings (Development)
   Simulated latency: 80-120ms (matches Bedrock)
   Vector dimension: 1024
   💡 To use real Bedrock: Set EMBEDDING_MODE = 'bedrock' in Cell 3


## 🏗️ AsyncValkeyStore Setup

Create store with vector search configuration.

In [4]:
valkey_client = None
async_store = None
    
try:
    # Initialize Valkey client with enterprise configuration
    valkey_client = valkey.from_url(
        VALKEY_CONN_STRING,
        # Enterprise connection settings
        socket_connect_timeout=10,
        socket_timeout=10,
        retry_on_timeout=True,
        health_check_interval=30
    )
    
    # Test connection
    valkey_client.ping()
    logger.info("✅ Valkey connection established")
    # Create index configuration for vector search
    index_config : ValkeyIndexConfig = {
        "collection_name": "enterprise_memory_vectors",
        "dims": VECTOR_DIMENSION,  # Match embedding dimensions
        "embed":embeddings,
        "distance_metric": "COSINE",
        "index_type": "HNSW",
        # Searchable fields for hybrid queries
        "fields": ["user_id", "category", "priority"],
    }

    # Initialize AsyncValkeyStore
    async_store = AsyncValkeyStore(
        valkey_client,
        index=index_config,
    )
    await async_store.setup()
    logger.info(f"✅ AsyncValkeyStore configured with vector search: {index_config["collection_name"]}")
    logger.info(f"   Connection: {VALKEY_CONN_STRING}")
    logger.info(f"📐 Dimension: {index_config["dims"]}, Algorithm: {index_config["index_type"]}")
except Exception as e:
    logger.error(f"❌ Valkey Client initialization failed: {e}")
    logger.info("💡 Ensure Valkey is running: docker run -p 6379:6379 -d valkey/valkey")
    raise

2025-12-04 22:32:57,045 - __main__ - INFO - ✅ Valkey connection established
2025-12-04 22:32:57,050 - langgraph_checkpoint_aws.store.valkey.async_store - INFO - Created search index enterprise_memory_vectors with TAG fields
2025-12-04 22:32:57,051 - __main__ - INFO - ✅ AsyncValkeyStore configured with vector search: enterprise_memory_vectors
2025-12-04 22:32:57,051 - __main__ - INFO -    Connection: valkey://localhost:6379
2025-12-04 22:32:57,051 - __main__ - INFO - 📐 Dimension: 1024, Algorithm: HNSW


## 📊 Data Models

In [5]:
@dataclass
class UserQuery:
    """Represents a user search query."""
    user_id: str
    query_text: str
    filters: Dict[str, Any] = field(default_factory=dict)
    timestamp: datetime = field(default_factory=datetime.now)

@dataclass
class SearchResult:
    """Enhanced search result with timing info."""
    user_id: str
    query: str
    results_count: int
    duration_ms: float
    top_result: Optional[Dict[str, Any]] = None
    
    def __str__(self):
        return f"User {self.user_id}: {self.results_count} results in {self.duration_ms:.1f}ms"

print("✅ Data models defined")

✅ Data models defined


## 🔄 Pattern 1: Concurrent Multi-User Search

**Use Case**: Web application serving multiple users simultaneously.

**Why Async Wins**: Handles 10+ concurrent users without blocking, maximizing I/O parallelism.

In [6]:
async def setup_demo_data():
    """Populate store with sample user preferences."""
    demo_data = [
        # User 1 - Tech enthusiast
        {
            "namespace": ("preferences", "user-1"),
            "key": "pref-tech",
            "value": {
                "content": "I love machine learning, AI, and cloud computing. Especially interested in AWS services and vector databases.",
                "user_id": "user-1",
                "category": "technology",
                "priority": "high"
            }
        },
        # User 2 - Fitness focused
        {
            "namespace": ("preferences", "user-2"),
            "key": "pref-fitness",
            "value": {
                "content": "Passionate about running, yoga, and healthy eating. Training for a marathon and exploring plant-based nutrition.",
                "user_id": "user-2",
                "category": "health",
                "priority": "high"
            }
        },
        # User 3 - Travel lover
        {
            "namespace": ("preferences", "user-3"),
            "key": "pref-travel",
            "value": {
                "content": "Love exploring new countries, trying local cuisines, and photography. Favorite destinations include Japan and Iceland.",
                "user_id": "user-3",
                "category": "lifestyle",
                "priority": "medium"
            }
        },
        # User 4 - Finance professional
        {
            "namespace": ("preferences", "user-4"),
            "key": "pref-finance",
            "value": {
                "content": "Interested in stock markets, cryptocurrency, and investment strategies. Following market trends and economic indicators.",
                "user_id": "user-4",
                "category": "finance",
                "priority": "high"
            }
        },
        # User 5 - Book reader
        {
            "namespace": ("preferences", "user-5"),
            "key": "pref-reading",
            "value": {
                "content": "Avid reader of science fiction, fantasy novels, and historical biographies. Current favorite authors include Brandon Sanderson.",
                "user_id": "user-5",
                "category": "entertainment",
                "priority": "medium"
            }
        },
    ]
    
    # Store all items concurrently
    tasks = [
        async_store.aput(
            namespace=item["namespace"],
            key=item["key"],
            value=item["value"]
        )
        for item in demo_data
    ]
    await asyncio.gather(*tasks)
    logger.info(f"✅ Stored {len(demo_data)} demo items")

# Run setup
await setup_demo_data()
print("\n🎪 Demo data ready!")

2025-12-04 22:32:57,171 - __main__ - INFO - ✅ Stored 5 demo items



🎪 Demo data ready!


In [7]:
async def search_for_user(query: UserQuery) -> SearchResult:
    """Execute search for a single user with timing."""
    start = time.perf_counter()
    
    # Perform vector search
    results = await async_store.asearch(
        namespace_prefix=("preferences", query.user_id),
        query=query.query_text,
        limit=5,
        filter=query.filters or None
    )
    
    duration = (time.perf_counter() - start) * 1000  # Convert to ms
    
    top_result = None
    if results:
        top = results[0]
        top_result = {
            "key": top.key,
            "score": top.score,
            "content": top.value.get("content", "")[:80] + "..."
        }
    
    return SearchResult(
        user_id=query.user_id,
        query=query.query_text,
        results_count=len(results),
        duration_ms=duration,
        top_result=top_result
    )

async def concurrent_search_demo():
    """Simulate 10 users searching simultaneously."""
    queries = [
        UserQuery("user-1", "What are the latest developments in artificial intelligence?"),
        UserQuery("user-2", "How can I improve my marathon training?"),
        UserQuery("user-3", "Best photography spots in Iceland"),
        UserQuery("user-4", "Current trends in cryptocurrency markets"),
        UserQuery("user-5", "Recommend fantasy books similar to Brandon Sanderson"),
        UserQuery("user-1", "Explain vector databases and their use cases"),
        UserQuery("user-2", "Plant-based meal prep ideas for athletes"),
        UserQuery("user-3", "Travel tips for visiting Japan"),
        UserQuery("user-4", "Understanding stock market indicators"),
        UserQuery("user-5", "Historical biographies worth reading"),
    ]
    
    print("🚀 Launching 10 concurrent searches...\n")
    start = time.perf_counter()
    
    # Execute all searches concurrently
    results = await asyncio.gather(*[search_for_user(q) for q in queries])
    
    total_duration = (time.perf_counter() - start) * 1000
    
    # Display results
    print("📊 Results:")
    for result in results:
        print(f"  {result}")
        if result.top_result:
            print(f"    ↳ Top: {result.top_result['content']} (score: {result.top_result['score']:.3f})")
    
    print(f"\n⏱️  Total wall time: {total_duration:.1f}ms")
    print(f"📈 Average per search: {total_duration / len(queries):.1f}ms")
    print(f"🎯 Throughput: {len(queries) / (total_duration / 1000):.1f} searches/sec")
    
    # Compare with hypothetical sequential execution
    sequential_estimate = sum(r.duration_ms for r in results)
    speedup = sequential_estimate / total_duration
    print(f"\n⚡ Async speedup: {speedup:.1f}x faster than sequential")

# Run the demo
await concurrent_search_demo()

🚀 Launching 10 concurrent searches...

📊 Results:
  User user-1: 1 results in 101.2ms
    ↳ Top: I love machine learning, AI, and cloud computing. Especially interested in AWS s... (score: 0.600)
  User user-2: 1 results in 120.8ms
    ↳ Top: Passionate about running, yoga, and healthy eating. Training for a marathon and ... (score: 0.600)
  User user-3: 1 results in 102.2ms
    ↳ Top: Love exploring new countries, trying local cuisines, and photography. Favorite d... (score: 0.100)
  User user-4: 1 results in 113.6ms
    ↳ Top: Interested in stock markets, cryptocurrency, and investment strategies. Followin... (score: 0.600)
  User user-5: 1 results in 110.6ms
    ↳ Top: Avid reader of science fiction, fantasy novels, and historical biographies. Curr... (score: 0.600)
  User user-1: 1 results in 115.7ms
    ↳ Top: I love machine learning, AI, and cloud computing. Especially interested in AWS s... (score: 0.600)
  User user-2: 1 results in 96.5ms
    ↳ Top: Passionate about running, yo

## 📦 Pattern 2: Batch Processing with Parallelism

**Use Case**: Processing large volumes of data (ETL, migrations, bulk analysis).

**Why Async Wins**: Process 100s of items with controlled parallelism, faster than sequential sync.

In [8]:
async def batch_store_with_semaphore(items: List[Dict[str, Any]], max_concurrent: int = 20):
    """Store items in batches with controlled concurrency."""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def store_with_limit(item):
        async with semaphore:
            await async_store.aput(
                namespace=item["namespace"],
                key=item["key"],
                value=item["value"]
            )
    
    start = time.perf_counter()
    await asyncio.gather(*[store_with_limit(item) for item in items])
    duration = time.perf_counter() - start
    
    return duration

async def batch_processing_demo():
    """Demonstrate batch processing efficiency."""
    # Generate 100 sample items
    batch_items = [
        {
            "namespace": ("batch", f"user-{i % 10}"),
            "key": f"item-{i}",
            "value": {
                "content": f"Batch item {i}: Sample data for testing parallel processing patterns with AsyncValkeyStore",
                "user_id": f"user-{i % 10}",
                "category": ["tech", "health", "finance"][i % 3],
                "batch_id": i // 10,
            }
        }
        for i in range(100)
    ]
    
    print("📦 Batch Processing Demo: 100 items\n")
    
    # Test different concurrency levels
    for max_concurrent in [1, 5, 10, 20]:
        duration = await batch_store_with_semaphore(batch_items, max_concurrent)
        throughput = len(batch_items) / duration
        print(f"  Concurrency {max_concurrent:2d}: {duration:.2f}s ({throughput:.1f} items/sec)")
    
    print("\n✅ Batch processing complete")
    print("💡 Tip: Tune max_concurrent based on your Valkey/MemoryDB capacity")

# Run the demo
await batch_processing_demo()

📦 Batch Processing Demo: 100 items

  Concurrency  1: 10.62s (9.4 items/sec)
  Concurrency  5: 2.15s (46.5 items/sec)
  Concurrency 10: 1.07s (93.7 items/sec)
  Concurrency 20: 0.58s (172.7 items/sec)

✅ Batch processing complete
💡 Tip: Tune max_concurrent based on your Valkey/MemoryDB capacity


## 🌐 Pattern 3: FastAPI Integration

**Use Case**: Building production REST APIs with async frameworks.

**Example**: Search endpoint that serves multiple concurrent requests efficiently.

In [9]:
# Example FastAPI application (pseudo-code for demonstration)

from typing import List, Optional
from dataclasses import asdict

# This demonstrates the pattern - actual FastAPI would be in separate service
class SearchAPI:
    """Example FastAPI-style search service."""
    
    def __init__(self, store: AsyncValkeyStore):
        self.store = store
    
    async def search_endpoint(self, user_id: str, query: str, category: Optional[str] = None) -> Dict[str, Any]:
        """Async search endpoint handler.
        
        Usage in FastAPI:
            @app.get("/search/{user_id}")
            async def search(user_id: str, query: str, category: Optional[str] = None):
                return await search_api.search_endpoint(user_id, query, category)
        """
        filters = {"category": category} if category else None
        
        start = time.perf_counter()
        
        # Async search - doesn't block other requests
        results = await self.store.asearch(
            namespace_prefix=("preferences", user_id),
            query=query,
            limit=10,
            filter=filters
        )
        
        duration_ms = (time.perf_counter() - start) * 1000
        
        return {
            "user_id": user_id,
            "query": query,
            "results": [
                {
                    "key": r.key,
                    "score": r.score,
                    "content": r.value.get("content", ""),
                    "category": r.value.get("category"),
                }
                for r in results
            ],
            "count": len(results),
            "duration_ms": duration_ms
        }
    
    async def health_check(self) -> Dict[str, str]:
        """Health check endpoint."""
        try:
            # Quick connectivity check
            await self.store.asearch(
                namespace_prefix=("health",),
                query="test",
                limit=1
            )
            return {"status": "healthy", "store": "connected"}
        except Exception as e:
            return {"status": "unhealthy", "error": str(e)}

# Simulate FastAPI usage
async def fastapi_demo():
    """Demonstrate FastAPI integration patterns."""
    api = SearchAPI(async_store)
    
    print("🌐 FastAPI Integration Demo\n")
    
    # Simulate health check
    health = await api.health_check()
    print(f"Health Check: {health}\n")
    
    # Simulate concurrent API requests
    requests = [
        ("user-1", "artificial intelligence trends", "technology"),
        ("user-2", "marathon training tips", "health"),
        ("user-3", "best travel destinations", "lifestyle"),
    ]
    
    print("📞 Simulating 3 concurrent API requests...\n")
    start = time.perf_counter()
    
    responses = await asyncio.gather(*[
        api.search_endpoint(user_id, query, category)
        for user_id, query, category in requests
    ])
    
    total_time = (time.perf_counter() - start) * 1000
    
    for resp in responses:
        print(f"User {resp['user_id']}: {resp['count']} results in {resp['duration_ms']:.1f}ms")
        if resp['results']:
            top = resp['results'][0]
            print(f"  ↳ Top result: {top['content'][:60]}... (score: {top['score']:.3f})")
    
    print(f"\n⏱️  Total API response time: {total_time:.1f}ms")
    print(f"💡 With sync, this would block other requests during I/O")

# Run the demo
await fastapi_demo()

print("\n" + "="*60)
print("📝 Production FastAPI Example:")
print("="*60)
print("""
from fastapi import FastAPI, HTTPException
from langgraph_checkpoint_aws.store.valkey import AsyncValkeyStore

app = FastAPI()
store = AsyncValkeyStore(conn_string=CONN_STRING, ...)

@app.on_event("startup")
async def startup():
    # Store is ready - async context handles connections
    pass

@app.get("/search/{user_id}")
async def search(user_id: str, query: str):
    results = await store.asearch(
        namespace_prefix=("prefs", user_id),
        query=query,
        limit=10
    )
    return {"results": [asdict(r) for r in results]}

@app.get("/health")
async def health():
    # Quick connectivity check
    await store.asearch(("health",), "test", limit=1)
    return {"status": "ok"}
""")

🌐 FastAPI Integration Demo

Health Check: {'status': 'healthy', 'store': 'connected'}

📞 Simulating 3 concurrent API requests...

User user-1: 1 results in 113.7ms
  ↳ Top result: I love machine learning, AI, and cloud computing. Especially... (score: 0.100)
User user-2: 1 results in 86.7ms
  ↳ Top result: Passionate about running, yoga, and healthy eating. Training... (score: 0.600)
User user-3: 1 results in 109.3ms
  ↳ Top result: Love exploring new countries, trying local cuisines, and pho... (score: 0.600)

⏱️  Total API response time: 113.8ms
💡 With sync, this would block other requests during I/O

📝 Production FastAPI Example:

from fastapi import FastAPI, HTTPException
from langgraph_checkpoint_aws.store.valkey import AsyncValkeyStore

app = FastAPI()
store = AsyncValkeyStore(conn_string=CONN_STRING, ...)

@app.on_event("startup")
async def startup():
    # Store is ready - async context handles connections
    pass

@app.get("/search/{user_id}")
async def search(user_id: str, q

## ⚖️ Pattern 4: Sync vs Async Performance Comparison

Direct measurement of async benefits in real-world scenarios.

In [10]:
from langgraph_checkpoint_aws.store.valkey import ValkeyStore

# Create sync store for comparison
sync_store = ValkeyStore(
    valkey_client,
    index=index_config
)

async def performance_comparison():
    """Compare sync vs async for multiple searches."""
    queries = [
        (("preferences", "user-1"), "machine learning applications"),
        (("preferences", "user-2"), "fitness and nutrition"),
        (("preferences", "user-3"), "travel photography"),
        (("preferences", "user-4"), "investment strategies"),
        (("preferences", "user-5"), "fantasy literature"),
    ]
    
    print("⚖️  Sync vs Async Performance Comparison\n")
    print("Testing 5 searches...\n")
    
    # Sync approach (sequential)
    print("🔵 Sync (Sequential):")
    sync_start = time.perf_counter()
    sync_results = []
    for namespace, query in queries:
        results = sync_store.search(namespace, query=query, limit=3)
        sync_results.append(results)
    sync_duration = time.perf_counter() - sync_start
    print(f"   Duration: {sync_duration * 1000:.1f}ms")
    print(f"   Throughput: {len(queries) / sync_duration:.1f} searches/sec\n")
    
    # Async approach (concurrent)
    print("🟢 Async (Concurrent):")
    async_start = time.perf_counter()
    async_results = await asyncio.gather(*[
        async_store.asearch(namespace, query=query, limit=3)
        for namespace, query in queries
    ])
    async_duration = time.perf_counter() - async_start
    print(f"   Duration: {async_duration * 1000:.1f}ms")
    print(f"   Throughput: {len(queries) / async_duration:.1f} searches/sec\n")
    
    # Analysis
    speedup = sync_duration / async_duration
    print(f"📊 Results:")
    print(f"   Speedup: {speedup:.2f}x")
    print(f"   Time saved: {(sync_duration - async_duration) * 1000:.1f}ms")
    print(f"   Efficiency gain: {((speedup - 1) * 100):.1f}%")
    
    # Verify results match
    print(f"\n🔍 Verification:")
    for i, (sync_res, async_res) in enumerate(zip(sync_results, async_results)):
        if len(sync_res) == len(async_res):
            print(f"   Query {i+1}: ✅ Both returned {len(sync_res)} results")
        else:
            print(f"   Query {i+1}: ⚠️  Count mismatch: sync={len(sync_res)}, async={len(async_res)}")

# Run comparison
await performance_comparison()

⚖️  Sync vs Async Performance Comparison

Testing 5 searches...

🔵 Sync (Sequential):
   Duration: 25.7ms
   Throughput: 194.3 searches/sec

🟢 Async (Concurrent):
   Duration: 122.6ms
   Throughput: 40.8 searches/sec

📊 Results:
   Speedup: 0.21x
   Time saved: -96.8ms
   Efficiency gain: -79.0%

🔍 Verification:
   Query 1: ✅ Both returned 1 results
   Query 2: ✅ Both returned 1 results
   Query 3: ✅ Both returned 1 results
   Query 4: ✅ Both returned 1 results
   Query 5: ✅ Both returned 1 results


## 🏗️ Pattern 5: Production-Ready Patterns

### Connection Pooling & Resource Management

In [11]:
from contextlib import asynccontextmanager
from typing import AsyncGenerator

class ProductionValkeyManager:
    """Production-grade async ValkeyStore manager."""
    
    def __init__(self, valkey_client: Any, index_config: ValkeyIndexConfig):
        self.client = valkey_client
        self.index_config = index_config
        self._store: Optional[AsyncValkeyStore] = None
    
    async def initialize(self):
        """Initialize store with connection validation."""
        self._store = AsyncValkeyStore(
            self.client,
            index=self.index_config
        )
        await self._store.setup()
        logger.info("✅ AsyncValkeyStore initialized")
    
    async def shutdown(self):
        """Graceful shutdown."""
        if self._store:
            # Cleanup if needed
            logger.info("🛑 AsyncValkeyStore shutdown")
    
    @asynccontextmanager
    async def get_store(self) -> AsyncGenerator[AsyncValkeyStore, None]:
        """Get store with automatic resource management."""
        if not self._store:
            raise RuntimeError("Manager not initialized")
        try:
            yield self._store
        except Exception as e:
            logger.error(f"Store operation failed: {e}")
            raise

async def production_patterns_demo():
    """Demonstrate production patterns."""
    print("🏗️  Production Patterns Demo\n")
    
    # Initialize manager
    manager = ProductionValkeyManager(
        valkey_client=valkey_client,
        index_config=index_config
    )
    await manager.initialize()
    
    # Use with context manager
    print("1️⃣  Context Manager Pattern:")
    async with manager.get_store() as store:
        results = await store.asearch(
            namespace_prefix=("preferences",),
            query="technology interests",
            limit=3
        )
        print(f"   Found {len(results)} results\n")
    
    # Error handling pattern
    print("2️⃣  Error Handling Pattern:")
    try:
        async with manager.get_store() as store:
            results = await store.asearch(
                namespace_prefix=("nonexistent",),
                query="test",
                limit=3
            )
            print(f"   Search completed: {len(results)} results\n")
    except Exception as e:
        print(f"   ⚠️  Handled error: {type(e).__name__}\n")
    
    # Rate limiting pattern
    print("3️⃣  Rate Limiting Pattern:")
    rate_limit = asyncio.Semaphore(5)  # Max 5 concurrent ops
    
    async def rate_limited_search(query: str):
        async with rate_limit:
            async with manager.get_store() as store:
                return await store.asearch(
                    namespace_prefix=("preferences",),
                    query=query,
                    limit=1
                )
    
    queries = [f"query-{i}" for i in range(10)]
    start = time.perf_counter()
    results = await asyncio.gather(*[rate_limited_search(q) for q in queries])
    duration = time.perf_counter() - start
    print(f"   Processed {len(queries)} queries in {duration*1000:.1f}ms")
    print(f"   (Max 5 concurrent due to rate limiting)\n")
    
    # Retry pattern
    print("4️⃣  Retry Pattern:")
    async def search_with_retry(query: str, max_retries: int = 3):
        for attempt in range(max_retries):
            try:
                async with manager.get_store() as store:
                    return await store.asearch(
                        namespace_prefix=("preferences",),
                        query=query,
                        limit=3
                    )
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                wait_time = 2 ** attempt  # Exponential backoff
                logger.warning(f"Attempt {attempt + 1} failed, retrying in {wait_time}s...")
                await asyncio.sleep(wait_time)
    
    results = await search_with_retry("resilient search")
    print(f"   Search with retry: {len(results)} results\n")
    
    # Cleanup
    await manager.shutdown()
    print("✅ Production patterns demonstration complete")

# Run demo
await production_patterns_demo()

2025-12-04 22:33:12,128 - __main__ - INFO - ✅ AsyncValkeyStore initialized


🏗️  Production Patterns Demo

1️⃣  Context Manager Pattern:
   Found 3 results

2️⃣  Error Handling Pattern:
   Search completed: 0 results

3️⃣  Rate Limiting Pattern:


2025-12-04 22:33:12,741 - __main__ - INFO - 🛑 AsyncValkeyStore shutdown


   Processed 10 queries in 269.1ms
   (Max 5 concurrent due to rate limiting)

4️⃣  Retry Pattern:
   Search with retry: 3 results

✅ Production patterns demonstration complete


## 🎯 Pattern 6: Timeout & Circuit Breaker

Protect your application from slow or failing dependencies.

In [12]:
async def search_with_timeout(store: AsyncValkeyStore, query: str, timeout_seconds: float = 2.0):
    """Search with timeout protection."""
    try:
        return await asyncio.wait_for(
            store.asearch(
                namespace_prefix=("preferences",),
                query=query,
                limit=5
            ),
            timeout=timeout_seconds
        )
    except asyncio.TimeoutError:
        logger.warning(f"Search timed out after {timeout_seconds}s")
        return []  # Return empty results

class CircuitBreaker:
    """Simple circuit breaker for fault tolerance."""
    
    def __init__(self, failure_threshold: int = 5, timeout: float = 60.0):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open
    
    def is_open(self) -> bool:
        if self.state == "open":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "half-open"
                return False
            return True
        return False
    
    def record_success(self):
        self.failures = 0
        self.state = "closed"
    
    def record_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        if self.failures >= self.failure_threshold:
            self.state = "open"
            logger.warning(f"Circuit breaker opened after {self.failures} failures")

async def resilience_demo():
    """Demonstrate timeout and circuit breaker patterns."""
    print("🛡️  Resilience Patterns Demo\n")
    
    # Timeout demo
    print("1️⃣  Timeout Protection:")
    results = await search_with_timeout(async_store, "test query", timeout_seconds=5.0)
    print(f"   Search completed: {len(results)} results\n")
    
    # Circuit breaker demo
    print("2️⃣  Circuit Breaker Pattern:")
    breaker = CircuitBreaker(failure_threshold=3, timeout=10.0)
    
    async def protected_search(query: str):
        if breaker.is_open():
            logger.warning("Circuit breaker is open, skipping request")
            return []
        
        try:
            results = await async_store.asearch(
                namespace_prefix=("preferences",),
                query=query,
                limit=3
            )
            breaker.record_success()
            return results
        except Exception as e:
            breaker.record_failure()
            logger.error(f"Search failed: {e}")
            return []
    
    # Simulate searches
    for i in range(5):
        results = await protected_search(f"query-{i}")
        print(f"   Query {i+1}: {len(results)} results (breaker: {breaker.state})")
    
    print("\n✅ Resilience patterns complete")

# Run demo
await resilience_demo()

🛡️  Resilience Patterns Demo

1️⃣  Timeout Protection:
   Search completed: 5 results

2️⃣  Circuit Breaker Pattern:
   Query 1: 3 results (breaker: closed)
   Query 2: 3 results (breaker: closed)
   Query 3: 3 results (breaker: closed)
   Query 4: 3 results (breaker: closed)
   Query 5: 3 results (breaker: closed)

✅ Resilience patterns complete


## 📊 Summary & Best Practices

### When to Use AsyncValkeyStore

✅ **Use Async When:**
- Building web APIs (FastAPI, aiohttp, etc.)
- Handling concurrent users
- Processing large batches (100+ items)
- Integrating with other async services
- Maximizing I/O throughput

❌ **Use Sync When:**
- Simple CLI scripts
- Single-user notebooks
- Sequential processing is sufficient
- Codebase is entirely synchronous

### Performance Tips

1. **Concurrency Control**: Use `asyncio.Semaphore` to limit concurrent operations
2. **Batching**: Group operations with `asyncio.gather()` for parallelism
3. **Timeouts**: Always set timeouts with `asyncio.wait_for()`
4. **Circuit Breakers**: Protect against cascading failures
5. **Connection Pooling**: Reuse store instances, avoid creating per-request

### Production Checklist

- [ ] Use `AsyncValkeyStore` in async web frameworks
- [ ] Implement timeout protection (2-5s typical)
- [ ] Add circuit breaker for fault tolerance
- [ ] Monitor connection pool metrics
- [ ] Set appropriate concurrency limits (10-50 typical)
- [ ] Use exponential backoff for retries
- [ ] Implement graceful shutdown
- [ ] Add comprehensive error logging

### Typical Speedup Ranges

| Operation | Expected Speedup | Notes |
|-----------|-----------------|-------|
| Single search | 1.0x | No benefit |
| 5 concurrent searches | 2-4x | Network latency dependent |
| 10 concurrent searches | 3-6x | Ideal for web APIs |
| 100 batch operations | 5-15x | With proper semaphore limits |
| High-concurrency API | 10-20x+ | With connection pooling |

### Next Steps

1. Review [`valkey_store.ipynb`](./valkey_store.ipynb) for core concepts
2. Explore [`agentcore_valkey_store.ipynb`](./agentcore_valkey_store.ipynb) for state management
3. Adapt patterns above to your production use case
4. Monitor performance metrics in your environment
5. Tune concurrency limits based on your infrastructure

## 🎓 Key Takeaways

1. **Async shines with I/O-bound operations** - Network calls, database queries, API requests
2. **Measure before optimizing** - Profile your specific workload
3. **Production needs resilience** - Timeouts, retries, circuit breakers
4. **Context managers are your friend** - Proper resource management
5. **Tune concurrency for your setup** - Start conservative, measure, adjust

---

**Happy async coding with ValkeyStore! 🚀**