# Concurrent Queue Comparison

Compare the three queue implementations:
- **SCQ** (Scalable Circular Queue) - Portable, single-width CAS
- **LCRQ** (Linked Concurrent Ring Queue) - x86-64 only, fastest
- **WCQ** (Wait-Free Circular Queue) - Bounded latency guarantee

This notebook helps you choose the right queue for your application.

In [None]:
import time
import random
import threading
import platform
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import List, Callable, Optional
import matplotlib.pyplot as plt
import numpy as np

# Detect platform
IS_X86_64 = platform.machine() in ('x86_64', 'AMD64')
print(f"Platform: {platform.machine()}")
print(f"LCRQ available: {IS_X86_64}")

# Try to import real library
try:
    from concurrent_collections import LockFreeQueue, FastQueue, WaitFreeQueue
    SIMULATION_MODE = False
    print("Using concurrent_collections library")
except ImportError:
    SIMULATION_MODE = True
    print("Library not installed - running in simulation mode")

In [None]:
# Simulation classes
if SIMULATION_MODE:
    import queue
    
    class SimulatedQueue:
        def __init__(self, name, capacity=1024):
            self.name = name
            self._queue = queue.Queue(maxsize=capacity)
            self._capacity = capacity
        
        def put(self, item):
            try:
                self._queue.put_nowait(item)
                return True
            except queue.Full:
                return False
        
        def get(self):
            try:
                return self._queue.get_nowait()
            except queue.Empty:
                return None
        
        def __len__(self):
            return self._queue.qsize()
    
    LockFreeQueue = lambda cap=1024: SimulatedQueue("SCQ", cap)
    FastQueue = lambda cap=1024: SimulatedQueue("LCRQ" if IS_X86_64 else "SCQ", cap)
    WaitFreeQueue = lambda cap=1024, threads=8: SimulatedQueue("WCQ", cap)

## Benchmark Framework

In [None]:
@dataclass
class QueueBenchmarkResult:
    name: str
    producers: int
    consumers: int
    ops_per_sec: float
    enqueue_p50_us: float
    enqueue_p99_us: float
    enqueue_max_us: float  # Important for wait-free verification
    dequeue_p50_us: float
    dequeue_p99_us: float
    dequeue_max_us: float
    failed_enqueues: int
    failed_dequeues: int

def benchmark_queue(queue_factory: Callable, name: str,
                    producers: int, consumers: int,
                    ops_per_thread: int = 5000) -> QueueBenchmarkResult:
    """Benchmark a queue with producer/consumer pattern."""
    q = queue_factory()
    
    enqueue_latencies = []
    dequeue_latencies = []
    failed_enq = [0]
    failed_deq = [0]
    lock = threading.Lock()
    done = threading.Event()
    
    def producer():
        local_lat = []
        failures = 0
        for i in range(ops_per_thread):
            start = time.perf_counter_ns()
            success = q.put(i)
            elapsed = (time.perf_counter_ns() - start) / 1000
            local_lat.append(elapsed)
            if not success:
                failures += 1
        with lock:
            enqueue_latencies.extend(local_lat)
            failed_enq[0] += failures
    
    def consumer():
        local_lat = []
        failures = 0
        count = 0
        target = (producers * ops_per_thread) // consumers
        while count < target and not done.is_set():
            start = time.perf_counter_ns()
            item = q.get()
            elapsed = (time.perf_counter_ns() - start) / 1000
            local_lat.append(elapsed)
            if item is None:
                failures += 1
                time.sleep(0.0001)  # Brief sleep on empty
            else:
                count += 1
        with lock:
            dequeue_latencies.extend(local_lat)
            failed_deq[0] += failures
    
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=producers + consumers) as executor:
        prod_futures = [executor.submit(producer) for _ in range(producers)]
        cons_futures = [executor.submit(consumer) for _ in range(consumers)]
        
        for f in prod_futures:
            f.result()
        done.set()
        for f in cons_futures:
            f.result(timeout=5)
    
    duration = time.time() - start_time
    
    total_ops = producers * ops_per_thread * 2  # enqueue + dequeue
    
    def percentile(data, p):
        if not data:
            return 0
        sorted_data = sorted(data)
        idx = min(int(len(sorted_data) * p), len(sorted_data) - 1)
        return sorted_data[idx]
    
    return QueueBenchmarkResult(
        name=name,
        producers=producers,
        consumers=consumers,
        ops_per_sec=total_ops / duration,
        enqueue_p50_us=percentile(enqueue_latencies, 0.50),
        enqueue_p99_us=percentile(enqueue_latencies, 0.99),
        enqueue_max_us=max(enqueue_latencies) if enqueue_latencies else 0,
        dequeue_p50_us=percentile(dequeue_latencies, 0.50),
        dequeue_p99_us=percentile(dequeue_latencies, 0.99),
        dequeue_max_us=max(dequeue_latencies) if dequeue_latencies else 0,
        failed_enqueues=failed_enq[0],
        failed_dequeues=failed_deq[0]
    )

## Throughput Comparison

In [None]:
# Run benchmarks for different configurations
configs = [(1, 1), (2, 2), (4, 4), (8, 8)]

scq_results = []
lcrq_results = [] if IS_X86_64 else None
wcq_results = []

print("Running throughput benchmarks...")
for prod, cons in configs:
    print(f"  {prod}P/{cons}C...")
    scq_results.append(benchmark_queue(
        lambda: LockFreeQueue(1024), "SCQ", prod, cons))
    if IS_X86_64:
        lcrq_results.append(benchmark_queue(
            lambda: FastQueue(1024), "LCRQ", prod, cons))
    wcq_results.append(benchmark_queue(
        lambda: WaitFreeQueue(1024, prod + cons), "WCQ", prod, cons))

print("Done!")

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Throughput comparison
ax1 = axes[0]
x_labels = [f"{p}P/{c}C" for p, c in configs]
x = np.arange(len(configs))
width = 0.25

scq_tp = [r.ops_per_sec / 1000 for r in scq_results]
wcq_tp = [r.ops_per_sec / 1000 for r in wcq_results]

bars = [ax1.bar(x - width, scq_tp, width, label='SCQ', color='steelblue')]
if IS_X86_64 and lcrq_results:
    lcrq_tp = [r.ops_per_sec / 1000 for r in lcrq_results]
    bars.append(ax1.bar(x, lcrq_tp, width, label='LCRQ', color='coral'))
bars.append(ax1.bar(x + width, wcq_tp, width, label='WCQ', color='forestgreen'))

ax1.set_xlabel('Configuration')
ax1.set_ylabel('Throughput (K ops/sec)')
ax1.set_title('Queue Throughput Comparison')
ax1.set_xticks(x)
ax1.set_xticklabels(x_labels)
ax1.legend()
ax1.grid(axis='y', alpha=0.3)

# Max latency (critical for wait-free)
ax2 = axes[1]
scq_max = [r.enqueue_max_us for r in scq_results]
wcq_max = [r.enqueue_max_us for r in wcq_results]

ax2.plot(x_labels, scq_max, 'o-', label='SCQ', color='steelblue')
if IS_X86_64 and lcrq_results:
    lcrq_max = [r.enqueue_max_us for r in lcrq_results]
    ax2.plot(x_labels, lcrq_max, 's-', label='LCRQ', color='coral')
ax2.plot(x_labels, wcq_max, '^-', label='WCQ', color='forestgreen')

ax2.set_xlabel('Configuration')
ax2.set_ylabel('Max Latency (μs)')
ax2.set_title('Worst-Case Latency (Wait-Free Verification)')
ax2.legend()
ax2.grid(alpha=0.3)
ax2.set_yscale('log')

plt.tight_layout()
plt.show()

## Latency Distribution

In [None]:
# Compare latency at 4P/4C configuration
fig, ax = plt.subplots(figsize=(10, 6))

idx = 2  # 4P/4C
percentiles = ['P50', 'P99', 'Max']
x = np.arange(len(percentiles))
width = 0.25

scq_lat = [scq_results[idx].enqueue_p50_us,
           scq_results[idx].enqueue_p99_us,
           scq_results[idx].enqueue_max_us]
wcq_lat = [wcq_results[idx].enqueue_p50_us,
           wcq_results[idx].enqueue_p99_us,
           wcq_results[idx].enqueue_max_us]

ax.bar(x - width/2, scq_lat, width, label='SCQ', color='steelblue')
if IS_X86_64 and lcrq_results:
    lcrq_lat = [lcrq_results[idx].enqueue_p50_us,
                lcrq_results[idx].enqueue_p99_us,
                lcrq_results[idx].enqueue_max_us]
    ax.bar(x, lcrq_lat, width, label='LCRQ', color='coral')
ax.bar(x + width/2, wcq_lat, width, label='WCQ', color='forestgreen')

ax.set_xlabel('Percentile')
ax.set_ylabel('Latency (μs)')
ax.set_title('Enqueue Latency Distribution (4P/4C)')
ax.set_xticks(x)
ax.set_xticklabels(percentiles)
ax.legend()
ax.grid(axis='y', alpha=0.3)
ax.set_yscale('log')

plt.tight_layout()
plt.show()

## Selection Guide

In [None]:
def recommend_queue(results_8p8c: dict):
    """Generate queue recommendation based on benchmark results."""
    print("\n" + "="*60)
    print("QUEUE SELECTION GUIDE")
    print("="*60)
    
    print("\n1. FOR MAXIMUM THROUGHPUT:")
    if IS_X86_64:
        print("   -> FastQueue (LCRQ on x86-64)")
        print("   Uses double-width CAS for best performance")
    else:
        print("   -> LockFreeQueue (SCQ)")
        print("   Best portable option")
    
    print("\n2. FOR PORTABILITY:")
    print("   -> LockFreeQueue (SCQ)")
    print("   Works on all platforms with single-width CAS")
    
    print("\n3. FOR BOUNDED LATENCY / REAL-TIME:")
    print("   -> WaitFreeQueue (WCQ)")
    print("   Guarantees O(n) completion regardless of contention")
    
    print("\n4. AUTOMATIC SELECTION:")
    print("   -> FastQueue (auto-selects best backend)")
    print(f"   On this machine: {'LCRQ' if IS_X86_64 else 'SCQ'}")
    
    print("\n" + "="*60)
    print("BENCHMARK RESULTS (8P/8C configuration)")
    print("="*60)
    
    scq = scq_results[-1]
    wcq = wcq_results[-1]
    
    print(f"\n{'Queue':<12} {'Throughput':>15} {'P99 Latency':>15} {'Max Latency':>15}")
    print("-"*60)
    print(f"{'SCQ':<12} {scq.ops_per_sec/1000:>12.1f} K/s {scq.enqueue_p99_us:>12.1f} μs {scq.enqueue_max_us:>12.1f} μs")
    if IS_X86_64 and lcrq_results:
        lcrq = lcrq_results[-1]
        print(f"{'LCRQ':<12} {lcrq.ops_per_sec/1000:>12.1f} K/s {lcrq.enqueue_p99_us:>12.1f} μs {lcrq.enqueue_max_us:>12.1f} μs")
    print(f"{'WCQ':<12} {wcq.ops_per_sec/1000:>12.1f} K/s {wcq.enqueue_p99_us:>12.1f} μs {wcq.enqueue_max_us:>12.1f} μs")

recommend_queue(None)

## Trade-off Summary

In [None]:
print("""
╔══════════════════════════════════════════════════════════════════╗
║                    QUEUE TRADE-OFF SUMMARY                       ║
╠══════════════════════════════════════════════════════════════════╣
║ Queue │ Progress    │ Throughput │ Latency Bound │ Portability  ║
╠═══════╪═════════════╪════════════╪═══════════════╪══════════════╣
║ SCQ   │ Lock-free   │ Good       │ Unbounded     │ All platforms║
║ LCRQ  │ Lock-free   │ Excellent  │ Unbounded     │ x86-64 only  ║
║ WCQ   │ Wait-free   │ Moderate   │ O(n) bounded  │ All platforms║
╚═══════╧═════════════╧════════════╧═══════════════╧══════════════╝

Key:
- Lock-free: At least one thread makes progress
- Wait-free: Every thread makes progress in bounded steps
- O(n): Proportional to thread count
""")