# ZeroIPC Tutorial: High-Performance Shared Memory IPC

This notebook demonstrates ZeroIPC, a lock-free shared memory IPC library that enables zero-copy data sharing between processes.

## Table of Contents
1. [Installation & Setup](#1-installation--setup)
2. [Core Concepts](#2-core-concepts)
3. [Basic Data Structures](#3-basic-data-structures)
4. [Codata Structures](#4-codata-structures)
5. [Cross-Process Communication](#5-cross-process-communication)
6. [Performance Comparison](#6-performance-comparison)
7. [Best Practices](#7-best-practices)

## 1. Installation & Setup

In [None]:
# Install ZeroIPC (run once)
import sys
!{sys.executable} -m pip install -e /home/spinoza/github/beta/zeroipc/python

In [None]:
# Import required modules
import numpy as np
import time
import os
from multiprocessing import Process

# Import ZeroIPC components
from zeroipc import (
    Memory, Array, Queue, Stack, Table,
    Map, Set, Pool, Ring, Future, Lazy, Stream, Channel
)

print("ZeroIPC successfully imported!")

## 2. Core Concepts

### Shared Memory
ZeroIPC uses POSIX shared memory for zero-copy data sharing between processes.

In [None]:
# Create shared memory region
mem = Memory("/demo_memory", 10 * 1024 * 1024)  # 10MB
print(f"Created shared memory: {mem}")

# The metadata table tracks all data structures
print(f"\nTable entries: {mem.table.size()} / {mem.table.capacity()}")

## 3. Basic Data Structures

### Array: Fixed-size contiguous storage

In [None]:
# Create an array of floats
temperatures = Array(mem, "temperatures", 100, dtype=np.float32)

# Write some data
for i in range(10):
    temperatures[i] = 20.0 + i * 0.5

# Read data
print(f"Temperature readings: {temperatures[:10]}")
print(f"Array size: {len(temperatures)} elements")

### Queue: Lock-free FIFO buffer

In [None]:
# Create a message queue
messages = Queue(mem, "message_queue", capacity=10, dtype=np.int32)

# Producer: push messages
for i in range(5):
    if messages.push(100 + i):
        print(f"Pushed message: {100 + i}")

print(f"\nQueue size: {messages.size()}")

# Consumer: pop messages
while not messages.empty():
    msg = messages.pop()
    print(f"Received message: {msg}")

### Map: Hash table with string keys

In [None]:
# Create a configuration map
config = Map(mem, "config", capacity=16, dtype=np.float32)

# Set configuration values
config.set("threshold", 0.95)
config.set("timeout", 30.0)
config.set("rate", 100.0)

# Read configuration
print("Configuration:")
for key in ["threshold", "timeout", "rate"]:
    value = config.get(key)
    if value is not None:
        print(f"  {key}: {value}")

print(f"\nMap contains 'threshold': {config.contains('threshold')}")
print(f"Map size: {config.size()} entries")

### Set: Unique value collection

In [None]:
# Create a set to track active IDs
active_ids = Set(mem, "active_ids", capacity=32, dtype=np.int32)

# Add some IDs
ids_to_add = [101, 102, 103, 102, 104]  # Note: 102 appears twice
for id_val in ids_to_add:
    if active_ids.insert(id_val):
        print(f"Added ID: {id_val}")
    else:
        print(f"ID already exists: {id_val}")

print(f"\nActive IDs: {active_ids.to_list()}")
print(f"Set size: {active_ids.size()} unique elements")

## 4. Codata Structures

### Future: Async computation results

In [None]:
# Create a future for async result
result_future = Future(mem, "computation_result", dtype=np.float64)

def compute_async():
    """Simulate async computation in another process"""
    mem = Memory("/demo_memory")
    future = Future(mem, "computation_result", dtype=np.float64, open_existing=True)
    
    # Simulate computation
    time.sleep(0.5)
    result = 3.14159
    
    # Set the result
    future.set(result)
    print(f"Computation complete: {result}")

# Start async computation
process = Process(target=compute_async)
process.start()

# Wait for result
print("Waiting for computation...")
result = result_future.get(timeout=2.0)
print(f"Got result: {result}")

process.join()

### Lazy: Deferred evaluation with memoization

In [None]:
# Create lazy-evaluated value
expensive_calc = Lazy(mem, "expensive_calc", dtype=np.float64)

def expensive_computation():
    """Simulate expensive computation"""
    print("Starting expensive computation...")
    time.sleep(0.2)  # Simulate work
    return 42.0

# Initialize with computation (not executed yet)
if not expensive_calc.is_computed():
    expensive_calc.init(expensive_computation)
    print("Lazy value initialized (not computed)")

# First access triggers computation
print(f"\nFirst access: {expensive_calc.force()}")

# Second access uses memoized value
print(f"Second access (memoized): {expensive_calc.force()}")

### Ring: Circular buffer for streaming

In [None]:
# Create a ring buffer for sensor data
sensor_stream = Ring(mem, "sensor_data", capacity=100, dtype=np.float32)

# Producer: write sensor readings
print("Writing sensor data...")
for i in range(10):
    reading = 100.0 + i * 0.1
    sensor_stream.push(reading)
    print(f"  Wrote: {reading:.1f}")

print(f"\nRing buffer size: {sensor_stream.size()} readings")

# Consumer: read sensor data
print("\nReading sensor data:")
data = sensor_stream.read(max_bytes=5 * 4)  # Read 5 floats
if data is not None:
    for value in data:
        print(f"  Read: {value:.1f}")

### Channel: CSP-style synchronous communication

In [None]:
# Create buffered channel for task distribution
task_channel = Channel(mem, "tasks", capacity=5, dtype=np.int32)

def worker_process():
    """Worker that processes tasks from channel"""
    mem = Memory("/demo_memory")
    tasks = Channel(mem, "tasks", dtype=np.int32, open_existing=True)
    
    while True:
        task_id = tasks.receive(timeout=1.0)
        if task_id is None:
            break
        print(f"  Worker: Processing task {task_id}")
        time.sleep(0.1)  # Simulate work

# Start worker
worker = Process(target=worker_process)
worker.start()

# Send tasks
print("Sending tasks to worker...")
for task_id in range(1, 4):
    task_channel.send(task_id)
    print(f"Sent task {task_id}")

# Close channel and wait for worker
time.sleep(0.5)
task_channel.close()
worker.join(timeout=2.0)
if worker.is_alive():
    worker.terminate()
print("\nWorker finished")

## 5. Cross-Process Communication

### Producer-Consumer Pattern

In [None]:
def producer(name):
    """Producer process that generates data"""
    mem = Memory("/demo_memory")
    queue = Queue(mem, "shared_queue", dtype=np.int32, open_existing=True)
    
    for i in range(3):
        value = hash(name) % 1000 + i
        while not queue.push(value):
            time.sleep(0.01)
        print(f"  {name}: Produced {value}")
        time.sleep(0.1)

def consumer(name):
    """Consumer process that processes data"""
    mem = Memory("/demo_memory")
    queue = Queue(mem, "shared_queue", dtype=np.int32, open_existing=True)
    
    consumed = 0
    while consumed < 3:
        value = queue.pop()
        if value is not None:
            print(f"  {name}: Consumed {value}")
            consumed += 1
        else:
            time.sleep(0.01)

# Create shared queue
shared_queue = Queue(mem, "shared_queue", capacity=10, dtype=np.int32)
print("Starting producer-consumer demo...\n")

# Start producers and consumers
producers = [Process(target=producer, args=(f"Producer{i}",)) for i in range(2)]
consumers = [Process(target=consumer, args=(f"Consumer{i}",)) for i in range(2)]

for p in producers + consumers:
    p.start()

for p in producers + consumers:
    p.join(timeout=2.0)
    if p.is_alive():
        p.terminate()

print("\nDemo complete")

### Publish-Subscribe Pattern

In [None]:
def publisher():
    """Publisher that broadcasts events"""
    mem = Memory("/demo_memory")
    events = Ring(mem, "event_stream", capacity=100, dtype=np.int32, open_existing=True)
    
    for i in range(5):
        event_id = 1000 + i
        events.push(event_id)
        print(f"Published event: {event_id}")
        time.sleep(0.1)

def subscriber(name):
    """Subscriber that receives events"""
    mem = Memory("/demo_memory")
    events = Ring(mem, "event_stream", capacity=100, dtype=np.int32, open_existing=True)
    
    # Track our read position
    last_size = 0
    events_received = 0
    
    while events_received < 5:
        current_size = events.size()
        if current_size > last_size:
            # New events available
            new_events = events.read(max_bytes=(current_size - last_size) * 4)
            if new_events is not None:
                for event in new_events:
                    print(f"  {name}: Received event {event}")
                    events_received += 1
            last_size = current_size
        time.sleep(0.05)

# Create event stream
event_stream = Ring(mem, "event_stream", capacity=100, dtype=np.int32)
print("Starting publish-subscribe demo...\n")

# Start publisher and subscribers
pub = Process(target=publisher)
subs = [Process(target=subscriber, args=(f"Subscriber{i}",)) for i in range(2)]

pub.start()
for s in subs:
    s.start()

pub.join(timeout=2.0)
for s in subs:
    s.join(timeout=2.0)
    if s.is_alive():
        s.terminate()

print("\nDemo complete")

## 6. Performance Comparison

### ZeroIPC vs Traditional IPC

In [None]:
import pickle
from multiprocessing import Queue as MPQueue

# Test data
data_size = 1000
test_data = np.random.rand(data_size).astype(np.float32)

print(f"Benchmarking with {data_size} float32 elements...\n")

# ZeroIPC shared memory array
print("ZeroIPC Array (zero-copy):")
start = time.perf_counter()
zero_array = Array(mem, "perf_test", data_size, dtype=np.float32)
for i, val in enumerate(test_data):
    zero_array[i] = val
result = zero_array[:data_size]
zero_time = time.perf_counter() - start
print(f"  Time: {zero_time*1000:.3f} ms")

# Traditional multiprocessing Queue (serialization)
print("\nMultiprocessing Queue (pickle serialization):")
mp_queue = MPQueue()
start = time.perf_counter()
mp_queue.put(test_data)
result = mp_queue.get()
mp_time = time.perf_counter() - start
print(f"  Time: {mp_time*1000:.3f} ms")

# Performance comparison
speedup = mp_time / zero_time
print(f"\nZeroIPC is {speedup:.1f}x faster for this workload")

## 7. Best Practices

### Resource Management

In [None]:
# Use context managers for automatic cleanup
print("Best Practice: Use context managers\n")

with Channel(mem, "auto_channel", capacity=5, dtype=np.int32) as ch:
    ch.send(42)
    print(f"Channel is open: {ch.is_open()}")
    print(f"Received: {ch.receive()}")
# Channel automatically closed here

# Verify it's closed
reopened = Channel(mem, "auto_channel", dtype=np.int32, open_existing=True)
print(f"\nChannel after context exit: closed={reopened.is_closed()}")

### Error Handling

In [None]:
# Demonstrate proper error handling
print("Best Practice: Handle errors gracefully\n")

# Example 1: Check queue capacity
small_queue = Queue(mem, "small_queue", capacity=2, dtype=np.int32)

for i in range(4):
    if small_queue.push(i):
        print(f"Successfully pushed {i}")
    else:
        print(f"Queue full, could not push {i}")

# Example 2: Use timeouts for blocking operations
print("\nUsing timeouts:")
future_timeout = Future(mem, "timeout_test", dtype=np.int32)
result = future_timeout.get(timeout=0.5)
if result is None:
    print("Future timed out (expected)")
else:
    print(f"Got result: {result}")

### Memory Layout Inspection

In [None]:
# Inspect memory layout
print("Memory Layout Overview:")
print(f"Total size: {mem.size:,} bytes")
print(f"\nMetadata table:")
print(f"  Entries used: {mem.table.size()} / {mem.table.capacity()}")
print(f"\nRegistered structures:")

# List all structures (would need table iteration API)
structures = [
    "temperatures", "message_queue", "config", "active_ids",
    "computation_result", "expensive_calc", "sensor_data", 
    "tasks", "shared_queue", "event_stream"
]

for name in structures[:5]:  # Show first 5
    entry = mem.table.find(name)
    if entry:
        print(f"  {name:20} offset={entry.offset:8} size={entry.size:8} bytes")

## Cleanup

In [None]:
# Clean up shared memory
print("Cleaning up shared memory...")
del mem

# Remove shared memory file
import os
shm_path = "/dev/shm/demo_memory"
if os.path.exists(shm_path):
    os.unlink(shm_path)
    print(f"Removed {shm_path}")

print("\nTutorial complete!")

## Summary

ZeroIPC provides:
- **Zero-copy performance** through shared memory
- **Lock-free data structures** for safe concurrent access
- **Codata structures** for advanced computation patterns
- **Cross-language support** (C++, Python, C)
- **Simple API** that feels native to each language

Key use cases:
- High-frequency trading systems
- Real-time sensor data processing
- Video/audio streaming pipelines  
- Distributed computing frameworks
- Inter-process communication in robotics

For more information:
- GitHub: https://github.com/nullpy/zeroipc
- Documentation: See `/docs` in the repository
- C++ Examples: `/cpp/examples/`
- Python Tests: `/python/tests/`