# Memory Benchmark Interactive Notebook

This notebook allows you to run memory benchmark workloads interactively, similar to `test_memory_benchmark.py` but with direct control over server management and workload execution.

## Prerequisites

1. **Valkey Server**: Located at `/home/ubuntu/valkey/build/bin/valkey-server`
2. **Search Module**: Built at `/home/ubuntu/valkey-search/.build-release/libsearch.so`
3. **Python Dependencies**: `valkey-py` and optionally `psutil`, `matplotlib`, `pandas`

## Key Differences from Test Framework

- **Manual Server Management**: You explicitly start/stop the server
- **No Test Fixtures**: Direct client and server management
- **Interactive Execution**: Modify and run workloads on the fly
- **Explicit Paths**: All paths are hardcoded for your environment

In [1]:
# Cell 1: Environment Setup and Module Paths
import os
import sys
import subprocess
import time
import signal
from pathlib import Path

# Set up paths explicitly
VALKEY_SEARCH_ROOT = Path('/home/ubuntu/valkey-search')
VALKEY_SERVER_PATH = '/home/ubuntu/valkey/build/bin/valkey-server'
MODULE_PATH = '/home/ubuntu/valkey-search/.build-release/libsearch.so'

# JSON module path - adjust if needed
JSON_MODULE_PATH = str(VALKEY_SEARCH_ROOT / '.build-release/integration/valkey-json/.build-release/src/libjson.so')

# Add valkey-search to Python path
sys.path.insert(0, str(VALKEY_SEARCH_ROOT))
sys.path.insert(0, str(VALKEY_SEARCH_ROOT / 'integration'))

# Verify paths exist
print(f"Valkey Search Root: {VALKEY_SEARCH_ROOT}")
print(f"Valkey Server Path: {VALKEY_SERVER_PATH}")
print(f"Module Path: {MODULE_PATH}")
print(f"JSON Module Path: {JSON_MODULE_PATH}")

if not Path(VALKEY_SERVER_PATH).exists():
    print(f"❌ Valkey server not found at {VALKEY_SERVER_PATH}")
else:
    print("✅ Valkey server found")

if not Path(MODULE_PATH).exists():
    print(f"❌ Search module not found at {MODULE_PATH}")
else:
    print("✅ Search module found")
    
if not Path(JSON_MODULE_PATH).exists():
    print(f"⚠️ JSON module not found at {JSON_MODULE_PATH}")
    print("JSON module is optional but recommended for full functionality")
else:
    print("✅ JSON module found")

Valkey Search Root: /home/ubuntu/valkey-search
Valkey Server Path: /home/ubuntu/valkey/build/bin/valkey-server
Module Path: /home/ubuntu/valkey-search/.build-release/libsearch.so
JSON Module Path: /home/ubuntu/valkey-search/.build-release/integration/valkey-json/.build-release/src/libjson.so
✅ Valkey server found
✅ Search module found
✅ JSON module found


In [None]:
# Cell 2: Import Required Libraries and Test Components
import logging
import json
import numpy as np
from datetime import datetime
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, field
import asyncio

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Import valkey client
from valkey import ResponseError
from valkey.client import Valkey
from valkey.asyncio import Valkey as AsyncValkey

# Import test framework components
try:
    from hash_generator import (
        HashKeyGenerator, HashGeneratorConfig, IndexSchema, FieldSchema,
        FieldType, VectorFieldSchema, VectorAlgorithm, VectorMetric
    )
    from tags_builder import (
        TagsConfig, TagDistribution, TagSharingConfig, TagSharingMode
    )
    from string_generator import (
        LengthConfig, PrefixConfig, Distribution, StringType
    )
    print("✅ Successfully imported hash generator components")
except ImportError as e:
    print(f"❌ Failed to import components: {e}")
    print("Make sure hash_generator.py, tags_builder.py, and string_generator.py are in the integration directory")

# Try to import psutil for memory monitoring
try:
    import psutil
    PSUTIL_AVAILABLE = True
    print("✅ psutil available for memory monitoring")
except ImportError:
    PSUTIL_AVAILABLE = False
    print("⚠️ psutil not available - memory monitoring will be limited")

In [2]:
# Cell 3: Valkey Server Management
class ValkeyServerManager:
    """Manages Valkey server lifecycle for notebook testing"""
    
    def __init__(self, port=6379, working_dir="/tmp/valkey-notebook"):
        self.port = port
        self.working_dir = Path(working_dir)
        self.working_dir.mkdir(exist_ok=True)
        self.process = None
        self.client = None
        
    def start(self):
        """Start Valkey server with search and JSON modules"""
        if self.process:
            print("Server already running")
            return
            
        # Create config file
        config_path = self.working_dir / "valkey.conf"
        config_content = f"""
port {self.port}
dir {self.working_dir}
save ""
enable-debug-command yes
loadmodule {MODULE_PATH}
"""
        
        # Add JSON module if available
        if Path(JSON_MODULE_PATH).exists():
            config_content = f"""
port {self.port}
dir {self.working_dir}
save ""
enable-debug-command yes
loadmodule {JSON_MODULE_PATH}
loadmodule {MODULE_PATH}
"""
        
        with open(config_path, 'w') as f:
            f.write(config_content)
        print(f"✅ Config file created at {config_path}")
        # Start server
        cmd = [VALKEY_SERVER_PATH, str(config_path)]
        self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print(f"Starting Valkey server with command: {' '.join(cmd)}")
        # Wait for server to start
        time.sleep(100)
        
        # Test connection
        try:
            self.client = Valkey(host='localhost', port=self.port, decode_responses=True)
            self.client.ping()
            print(f"✅ Valkey server started on port {self.port}")
            
            
        except Exception as e:
            print(f"❌ Failed to connect to server: {e}")
            self.stop()
            raise
    
    def stop(self):
        """Stop Valkey server"""
        if self.client:
            try:
                self.client.close()
            except:
                pass
            self.client = None
            
        if self.process:
            self.process.terminate()
            try:
                self.process.wait(timeout=5)
            except subprocess.TimeoutExpired:
                self.process.kill()
                self.process.wait()
            self.process = None
            print("✅ Valkey server stopped")
    
    def get_client(self, decode_responses=True):
        """Get a new client connection"""
        if not self.process:
            raise RuntimeError("Server not running")
        return Valkey(host='localhost', port=self.port, decode_responses=decode_responses)
    
    def get_async_client(self, decode_responses=False):
        """Get an async client connection"""
        if not self.process:
            raise RuntimeError("Server not running")
        return AsyncValkey(host='localhost', port=self.port, decode_responses=decode_responses)
    
    def __enter__(self):
        self.start()
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.stop()

# Create server manager instance
server_manager = ValkeyServerManager(port=6380)  # Using 6380 to avoid conflicts

In [3]:
# Cell 4: Start/Stop Server Controls
# Start the server
server_manager.start()

# To stop the server when done:
# server_manager.stop()

✅ Config file created at /tmp/valkey-notebook/valkey.conf
Starting Valkey server with command: /home/ubuntu/valkey/build/bin/valkey-server /tmp/valkey-notebook/valkey.conf
❌ Failed to connect to server: name 'Valkey' is not defined
✅ Valkey server stopped


NameError: name 'Valkey' is not defined

In [None]:
# Cell 5: Memory Benchmark Helper Functions
@dataclass
class MemorySnapshot:
    """Memory usage snapshot"""
    used_memory: int
    used_memory_human: str
    used_memory_rss: int
    used_memory_peak: int
    used_memory_search: int = 0
    timestamp: float = field(default_factory=time.time)

def get_memory_info(client) -> MemorySnapshot:
    """Get current memory usage from Valkey"""
    info = client.info('memory')
    
    # Get search-specific memory if available
    search_memory = 0
    try:
        search_info = client.execute_command("SEARCH.MEMORY")
        if isinstance(search_info, list):
            for i in range(0, len(search_info), 2):
                if search_info[i] == b'total_memory_bytes' or search_info[i] == 'total_memory_bytes':
                    search_memory = int(search_info[i + 1])
                    break
    except:
        pass
    
    return MemorySnapshot(
        used_memory=info['used_memory'],
        used_memory_human=info['used_memory_human'],
        used_memory_rss=info['used_memory_rss'],
        used_memory_peak=info['used_memory_peak'],
        used_memory_search=search_memory
    )

def format_bytes(bytes_val: int) -> str:
    """Format bytes to human readable string"""
    for unit in ['B', 'KB', 'MB', 'GB']:
        if bytes_val < 1024.0:
            return f"{bytes_val:.2f} {unit}"
        bytes_val /= 1024.0
    return f"{bytes_val:.2f} TB"

def calculate_memory_overhead(before: MemorySnapshot, after: MemorySnapshot, num_keys: int) -> Dict:
    """Calculate memory overhead per key"""
    memory_diff = after.used_memory - before.used_memory
    search_diff = after.used_memory_search - before.used_memory_search
    
    return {
        'total_memory_used': memory_diff,
        'total_memory_human': format_bytes(memory_diff),
        'search_memory_used': search_diff,
        'search_memory_human': format_bytes(search_diff),
        'bytes_per_key': memory_diff / num_keys if num_keys > 0 else 0,
        'search_bytes_per_key': search_diff / num_keys if num_keys > 0 else 0,
        'num_keys': num_keys
    }

In [None]:
# Cell 6: Workload Runner
class WorkloadRunner:
    """Run memory benchmark workloads"""
    
    def __init__(self, client):
        self.client = client
        self.results = []
        
    async def run_async_workload(self, generator_config, num_keys, batch_size=1000):
        """Run workload using async client for better performance"""
        async_client = server_manager.get_async_client()
        
        # Create index
        index_name = generator_config.index_schema.name
        create_cmd = self._build_create_index_command(generator_config.index_schema)
        await async_client.execute_command(*create_cmd)
        
        # Generate and insert data
        generator = HashKeyGenerator(generator_config)
        
        # Take memory snapshot before
        before = get_memory_info(self.client)
        
        # Insert data in batches
        inserted = 0
        for batch_start in range(0, num_keys, batch_size):
            batch_end = min(batch_start + batch_size, num_keys)
            batch_size_actual = batch_end - batch_start
            
            # Generate batch
            batch_data = list(generator.generate_range(batch_start, batch_end))
            
            # Insert using pipeline
            pipe = async_client.pipeline(transaction=False)
            for key, hash_data in batch_data:
                pipe.hset(key, mapping=hash_data)
            
            await pipe.execute()
            inserted += batch_size_actual
            
            if inserted % 10000 == 0:
                print(f"Inserted {inserted}/{num_keys} keys...")
        
        # Take memory snapshot after
        after = get_memory_info(self.client)
        
        # Calculate overhead
        overhead = calculate_memory_overhead(before, after, num_keys)
        
        # Get index info
        index_info = await async_client.execute_command("SEARCH.INDEX", index_name, "INFO")
        
        await async_client.close()
        
        return {
            'config': generator_config,
            'num_keys': num_keys,
            'memory_overhead': overhead,
            'index_info': self._parse_index_info(index_info)
        }
    
    def run_sync_workload(self, generator_config, num_keys, batch_size=1000):
        """Run workload using sync client"""
        # Create index
        index_name = generator_config.index_schema.name
        create_cmd = self._build_create_index_command(generator_config.index_schema)
        self.client.execute_command(*create_cmd)
        
        # Generate and insert data
        generator = HashKeyGenerator(generator_config)
        
        # Take memory snapshot before
        before = get_memory_info(self.client)
        
        # Insert data in batches
        inserted = 0
        for batch_start in range(0, num_keys, batch_size):
            batch_end = min(batch_start + batch_size, num_keys)
            batch_size_actual = batch_end - batch_start
            
            # Generate batch
            batch_data = list(generator.generate_range(batch_start, batch_end))
            
            # Insert using pipeline
            pipe = self.client.pipeline(transaction=False)
            for key, hash_data in batch_data:
                pipe.hset(key, mapping=hash_data)
            
            pipe.execute()
            inserted += batch_size_actual
            
            if inserted % 10000 == 0:
                print(f"Inserted {inserted}/{num_keys} keys...")
        
        # Take memory snapshot after
        after = get_memory_info(self.client)
        
        # Calculate overhead
        overhead = calculate_memory_overhead(before, after, num_keys)
        
        # Get index info
        index_info = self.client.execute_command("SEARCH.INDEX", index_name, "INFO")
        
        return {
            'config': generator_config,
            'num_keys': num_keys,
            'memory_overhead': overhead,
            'index_info': self._parse_index_info(index_info)
        }
    
    def _build_create_index_command(self, schema):
        """Build SEARCH.CREATE command from IndexSchema"""
        cmd = ["SEARCH.CREATE", schema.name]
        
        if schema.on_hash:
            cmd.extend(["ON", "HASH"])
        
        if schema.prefixes:
            cmd.extend(["PREFIX", len(schema.prefixes)])
            cmd.extend(schema.prefixes)
        
        cmd.append("SCHEMA")
        
        for field in schema.fields:
            cmd.append(field.name)
            
            if isinstance(field, VectorFieldSchema):
                cmd.append("VECTOR")
                cmd.append(field.algorithm.value)
                cmd.append(str(field.dimension * 2 + 4))  # Number of args
                cmd.extend([
                    "TYPE", field.data_type,
                    "DIM", str(field.dimension),
                    "DISTANCE_METRIC", field.metric.value
                ])
            else:
                cmd.append(field.type.value)
                if field.separator:
                    cmd.extend(["SEPARATOR", field.separator])
        
        return cmd
    
    def _parse_index_info(self, info):
        """Parse index info response"""
        result = {}
        i = 0
        while i < len(info):
            if isinstance(info[i], bytes):
                key = info[i].decode()
            else:
                key = str(info[i])
            
            if i + 1 < len(info):
                value = info[i + 1]
                if isinstance(value, bytes):
                    value = value.decode()
                result[key] = value
            i += 2
        
        return result
    
    def cleanup(self):
        """Clean up all data"""
        self.client.flushall()

## Example Workloads

Below are several example workloads demonstrating different tag sharing patterns and configurations.

In [None]:
# Cell 7: Example 1 - Simple Tag Workload with Low Sharing
# This creates tags with minimal sharing between keys

# Configuration for low tag sharing
low_sharing_config = HashGeneratorConfig(
    index_schema=IndexSchema(
        name="idx_low_sharing",
        fields=[
            FieldSchema(name="tags", type=FieldType.TAG, separator="|")
        ],
        on_hash=True,
        prefixes=["user:"]
    ),
    tags_config=TagsConfig(
        num_tags_per_key=(3, 5),  # 3-5 tags per key
        unique_tags_range=(10000, 50000),  # Large pool of unique tags
        distribution=TagDistribution.UNIFORM,
        sharing_config=TagSharingConfig(
            mode=TagSharingMode.LOW_SHARING,
            shared_tags_ratio=0.1,  # Only 10% of tags are shared
            max_keys_per_shared_tag=10
        )
    ),
    prefix="user:",
    key_pattern="user:{id}"
)

# Create workload runner
runner = WorkloadRunner(server_manager.get_client())

# Run the workload
print("Running low sharing workload...")
result = runner.run_sync_workload(low_sharing_config, num_keys=10000)

print(f"\nResults:")
print(f"Total memory used: {result['memory_overhead']['total_memory_human']}")
print(f"Memory per key: {result['memory_overhead']['bytes_per_key']:.2f} bytes")
print(f"Search memory: {result['memory_overhead']['search_memory_human']}")
print(f"Index info: {result['index_info'].get('num_indexed_keys', 'N/A')} keys indexed")

# Clean up for next test
runner.cleanup()

In [None]:
# Cell 8: Example 2 - High Tag Sharing Workload
# This creates tags with high sharing between keys (social network style)

high_sharing_config = HashGeneratorConfig(
    index_schema=IndexSchema(
        name="idx_high_sharing",
        fields=[
            FieldSchema(name="tags", type=FieldType.TAG, separator="|")
        ],
        on_hash=True,
        prefixes=["post:"]
    ),
    tags_config=TagsConfig(
        num_tags_per_key=(5, 10),  # 5-10 tags per key
        unique_tags_range=(100, 500),  # Small pool of tags (high reuse)
        distribution=TagDistribution.ZIPFIAN,  # Some tags much more popular
        sharing_config=TagSharingConfig(
            mode=TagSharingMode.HIGH_SHARING,
            shared_tags_ratio=0.8,  # 80% of tags are shared
            max_keys_per_shared_tag=1000
        )
    ),
    prefix="post:",
    key_pattern="post:{id}"
)

# Run the workload
print("Running high sharing workload...")
result = runner.run_sync_workload(high_sharing_config, num_keys=10000)

print(f"\nResults:")
print(f"Total memory used: {result['memory_overhead']['total_memory_human']}")
print(f"Memory per key: {result['memory_overhead']['bytes_per_key']:.2f} bytes")
print(f"Search memory: {result['memory_overhead']['search_memory_human']}")
print(f"Index info: {result['index_info'].get('num_indexed_keys', 'N/A')} keys indexed")

# Compare with low sharing
print(f"\nNote: High sharing typically uses less memory per key due to tag reuse")

runner.cleanup()

In [None]:
# Cell 9: Example 3 - Async Workload with Prefix Sharing
# This demonstrates using async for better performance with larger datasets

# Configuration with prefix sharing
prefix_sharing_config = HashGeneratorConfig(
    index_schema=IndexSchema(
        name="idx_prefix_sharing",
        fields=[
            FieldSchema(name="category", type=FieldType.TAG, separator="|"),
            FieldSchema(name="tags", type=FieldType.TAG, separator="|")
        ],
        on_hash=True,
        prefixes=["item:"]
    ),
    tags_config=TagsConfig(
        num_tags_per_key=(3, 7),
        unique_tags_range=(1000, 5000),
        distribution=TagDistribution.UNIFORM,
        sharing_config=TagSharingConfig(
            mode=TagSharingMode.PREFIX_SHARING,
            shared_tags_ratio=0.5,
            prefix_patterns=["tech_", "news_", "blog_"],  # Common prefixes
            prefix_ratio=0.6  # 60% of tags have these prefixes
        )
    ),
    prefix="item:",
    key_pattern="item:{id}",
    additional_fields={
        "category": lambda i: f"cat_{i % 50}"  # 50 categories
    }
)

# Run async workload
import asyncio

async def run_async_example():
    print("Running async prefix sharing workload...")
    result = await runner.run_async_workload(prefix_sharing_config, num_keys=50000, batch_size=5000)
    
    print(f"\nResults:")
    print(f"Total memory used: {result['memory_overhead']['total_memory_human']}")
    print(f"Memory per key: {result['memory_overhead']['bytes_per_key']:.2f} bytes")
    print(f"Search memory: {result['memory_overhead']['search_memory_human']}")
    print(f"Index info: {result['index_info'].get('num_indexed_keys', 'N/A')} keys indexed")
    
    return result

# Run the async workload
result = asyncio.run(run_async_example())

runner.cleanup()

In [None]:
# Cell 10: Benchmark Multiple Configurations
# Compare memory usage across different configurations

configurations = [
    {
        "name": "Single Unique Tag (100 keys per tag)",
        "config": HashGeneratorConfig(
            index_schema=IndexSchema(
                name="idx_single_tag",
                fields=[FieldSchema(name="tag", type=FieldType.TAG)],
                on_hash=True,
                prefixes=["key:"]
            ),
            tags_config=TagsConfig(
                num_tags_per_key=(1, 1),  # Exactly 1 tag per key
                unique_tags_range=(100, 100),  # 100 unique tags
                distribution=TagDistribution.UNIFORM
            ),
            prefix="key:",
            key_pattern="key:{id}"
        ),
        "num_keys": 10000
    },
    {
        "name": "Low Tag Sharing",
        "config": HashGeneratorConfig(
            index_schema=IndexSchema(
                name="idx_low_share",
                fields=[FieldSchema(name="tags", type=FieldType.TAG, separator="|")],
                on_hash=True,
                prefixes=["key:"]
            ),
            tags_config=TagsConfig(
                num_tags_per_key=(3, 5),
                unique_tags_range=(10000, 50000),
                distribution=TagDistribution.UNIFORM,
                sharing_config=TagSharingConfig(
                    mode=TagSharingMode.LOW_SHARING,
                    shared_tags_ratio=0.1
                )
            ),
            prefix="key:",
            key_pattern="key:{id}"
        ),
        "num_keys": 10000
    },
    {
        "name": "High Tag Sharing",
        "config": HashGeneratorConfig(
            index_schema=IndexSchema(
                name="idx_high_share",
                fields=[FieldSchema(name="tags", type=FieldType.TAG, separator="|")],
                on_hash=True,
                prefixes=["key:"]
            ),
            tags_config=TagsConfig(
                num_tags_per_key=(5, 10),
                unique_tags_range=(100, 500),
                distribution=TagDistribution.ZIPFIAN,
                sharing_config=TagSharingConfig(
                    mode=TagSharingMode.HIGH_SHARING,
                    shared_tags_ratio=0.8
                )
            ),
            prefix="key:",
            key_pattern="key:{id}"
        ),
        "num_keys": 10000
    }
]

# Run benchmarks
results = []
for config_info in configurations:
    print(f"\n{'='*50}")
    print(f"Running: {config_info['name']}")
    print(f"{'='*50}")
    
    result = runner.run_sync_workload(config_info['config'], config_info['num_keys'])
    result['name'] = config_info['name']
    results.append(result)
    
    print(f"Memory per key: {result['memory_overhead']['bytes_per_key']:.2f} bytes")
    print(f"Total memory: {result['memory_overhead']['total_memory_human']}")
    
    runner.cleanup()

# Summary comparison
print(f"\n{'='*50}")
print("SUMMARY COMPARISON")
print(f"{'='*50}")
print(f"{'Configuration':<30} {'Bytes/Key':<15} {'Total Memory':<15}")
print("-" * 60)
for result in results:
    print(f"{result['name']:<30} {result['memory_overhead']['bytes_per_key']:<15.2f} {result['memory_overhead']['total_memory_human']:<15}")

## Visualization and Analysis

In [None]:
# Cell 11: Visualization Support (requires matplotlib)
try:
    import matplotlib.pyplot as plt
    
    # Create bar chart of memory usage
    if results:
        names = [r['name'] for r in results]
        bytes_per_key = [r['memory_overhead']['bytes_per_key'] for r in results]
        
        plt.figure(figsize=(10, 6))
        plt.bar(names, bytes_per_key)
        plt.xlabel('Configuration')
        plt.ylabel('Bytes per Key')
        plt.title('Memory Usage Comparison Across Tag Configurations')
        plt.xticks(rotation=45, ha='right')
        plt.tight_layout()
        plt.show()
        
except ImportError:
    print("matplotlib not installed. Install with: pip install matplotlib")
    print("Skipping visualization...")

## Advanced Usage: Custom Workloads and Tag Distributions

In [None]:
# Cell 12: Custom Workload with Vector Fields
# Example showing how to create a workload with vector similarity search

# Configuration with vectors and tags
vector_config = HashGeneratorConfig(
    index_schema=IndexSchema(
        name="idx_vector",
        fields=[
            FieldSchema(name="tags", type=FieldType.TAG, separator="|"),
            VectorFieldSchema(
                name="embedding",
                algorithm=VectorAlgorithm.HNSW,
                dimension=128,
                metric=VectorMetric.L2,
                data_type="FLOAT32"
            )
        ],
        on_hash=True,
        prefixes=["doc:"]
    ),
    tags_config=TagsConfig(
        num_tags_per_key=(2, 4),
        unique_tags_range=(50, 200),
        distribution=TagDistribution.UNIFORM
    ),
    prefix="doc:",
    key_pattern="doc:{id}",
    # Vector generation happens automatically based on schema
)

print("Running vector + tags workload...")
result = runner.run_sync_workload(vector_config, num_keys=1000)

print(f"\nResults with vectors:")
print(f"Total memory used: {result['memory_overhead']['total_memory_human']}")
print(f"Memory per key: {result['memory_overhead']['bytes_per_key']:.2f} bytes")
print(f"Search memory: {result['memory_overhead']['search_memory_human']}")

runner.cleanup()

In [None]:
# Cell 13: Memory Monitoring During Insertion
# Monitor memory growth during data insertion

import time
import pandas as pd

def monitor_memory_during_insertion(config, num_keys, sample_interval=1000):
    """Monitor memory usage during insertion"""
    memory_samples = []
    
    # Create index
    index_name = config.index_schema.name
    create_cmd = runner._build_create_index_command(config.index_schema)
    runner.client.execute_command(*create_cmd)
    
    # Get initial memory
    initial = get_memory_info(runner.client)
    memory_samples.append({
        'keys': 0,
        'memory_mb': initial.used_memory / (1024 * 1024),
        'search_memory_mb': initial.used_memory_search / (1024 * 1024)
    })
    
    # Generate and insert data with monitoring
    generator = HashKeyGenerator(config)
    
    for i in range(0, num_keys, sample_interval):
        # Insert batch
        batch_end = min(i + sample_interval, num_keys)
        batch_data = list(generator.generate_range(i, batch_end))
        
        pipe = runner.client.pipeline(transaction=False)
        for key, hash_data in batch_data:
            pipe.hset(key, mapping=hash_data)
        pipe.execute()
        
        # Sample memory
        current = get_memory_info(runner.client)
        memory_samples.append({
            'keys': batch_end,
            'memory_mb': current.used_memory / (1024 * 1024),
            'search_memory_mb': current.used_memory_search / (1024 * 1024)
        })
        
        print(f"Inserted {batch_end} keys - Memory: {format_bytes(current.used_memory)}")
    
    runner.cleanup()
    return pd.DataFrame(memory_samples)

# Monitor memory growth
print("Monitoring memory growth during insertion...")
df = monitor_memory_during_insertion(low_sharing_config, num_keys=20000, sample_interval=2000)

# Plot if matplotlib is available
try:
    import matplotlib.pyplot as plt
    
    plt.figure(figsize=(10, 6))
    plt.plot(df['keys'], df['memory_mb'], label='Total Memory', marker='o')
    plt.plot(df['keys'], df['search_memory_mb'], label='Search Memory', marker='s')
    plt.xlabel('Number of Keys')
    plt.ylabel('Memory Usage (MB)')
    plt.title('Memory Growth During Insertion')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()
    
except ImportError:
    print("Install matplotlib to see the plot")
    print(df)

## Cleanup and Notes

Remember to stop the server when you're done with your experiments.

In [None]:
# Cell 14: Cleanup
# Stop the server when done
server_manager.stop()

# Notes:
# 1. Make sure to build valkey-search with `./build.sh` before running
# 2. Set VALKEY_SERVER_PATH if valkey-server is not in your PATH
# 3. The notebook uses port 6380 by default to avoid conflicts
# 4. For production benchmarks, use larger datasets and multiple runs
# 5. Memory measurements include both Valkey overhead and search index overhead