In [1]:
import time
from pathlib import Path
import socket
import pandas as pd
import asyncio
import aiohttp
import sqlalchemy
import uuid
from datetime import datetime
import random

print('All imports are ready in one place.')

All imports are ready in one place.


# Step 1: Pre-check and Environment Setup

This cell checks for Docker, determines the correct `docker-compose` command, and verifies that no conflicting containers are running. This is a safety check to prevent errors when starting the services.

In [2]:
# Utility functions for service connectivity
def is_port_open(host: str, port: int, timeout: float = 1.0) -> bool:
    """Check if a TCP port is open on the given host."""
    try:
        with socket.create_connection((host, port), timeout=timeout):
            return True
    except Exception:
        return False


In [3]:
# Health check endpoints
HEALTH_ENDPOINTS = [
    ("choreography", "order-service", 'http://localhost:8011/health'),
    ("orchestration", "saga-orchestrator", 'http://localhost:8005/health'),
]

In [4]:
# Service endpoints for performance testing
ENDPOINTS = {
    'choreography': 'http://localhost:8001/orders',
    'orchestration': 'http://localhost:8011/orders'
}

# Health check endpoints (streamlined)
HEALTH_ENDPOINTS = [
    ("choreography", "order-service", 'http://localhost:8001/health'),
    ("orchestration", "saga-orchestrator", 'http://localhost:8005/health'),
]

In [5]:
# Quick health check for services
print("=== Service Health Check ===")
async def quick_health_check():
    async with aiohttp.ClientSession() as session:
        for pattern, service_name, health_url in HEALTH_ENDPOINTS:
            try:
                async with session.get(health_url, timeout=aiohttp.ClientTimeout(total=5)) as response:
                    status = "✓" if response.status == 200 else f"✗ {response.status}"
                    print(f"{pattern:12} | {service_name:15} | {status}")
            except Exception as e:
                print(f"{pattern:12} | {service_name:15} | ✗ {str(e)[:30]}...")

await quick_health_check()
print("="*50)

=== Service Health Check ===
choreography | order-service   | ✗ Cannot connect to host localho...
orchestration | saga-orchestrator | ✓


# Quick Service Health Check

This cell performs a quick health check of the required services before running performance tests.

# Performance Testing and CSV Export for Saga Pattern

## データリセットが必要な場合

テストデータが古い、または不整合がある場合は以下を実行してください：

```bash
cd saga_pattern/choreography_pattern
docker-compose down
docker-compose build --no-cache
docker-compose up -d

cd ../orchestration_pattern  
docker-compose down
docker-compose build --no-cache
docker-compose up -d
```

## テスト概要

以下の3段階のテストを実行し、Sagaパターンの性能と整合性を検証します：

1. **単発テスト**: Choreography/Orchestration 各5回（機能確認）
2. **異常ケーステスト**: 在庫不足・決済失敗を各10回（補償確認）  
3. **負荷テスト**: 100 VU × 3分間で約15,000注文（分位数計算用）

## 出力CSV

- `e2e_latency.csv`: E2Eレスポンス時間（p50/p95/p99算出用）
- `convergence_events.csv`: イベント収束時間（ヒストグラム用）
- `saga_steps.csv`: サガステップ詳細（補償タイムライン用）

In [18]:
# Database connection settings
DB_CONFIG = {
    'user': 'cloudmart_user',
    'password': 'cloudmart_pass',
    'host': 'localhost',
    'port': 3307,  # adjusted to match docker-compose host mapping (host 3307 -> container 3306)
    'database': 'cloudmart_saga'
}

CONN_STR = f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"

# Test endpoints
ENDPOINTS = {
    'choreography': 'http://localhost:8011/orders',
    'orchestration': 'http://localhost:8005/saga/start'
}

# Test results storage
test_results = []

print("Performance testing setup complete.")
print(f"Database connection: {CONN_STR}")
print(f"Endpoints: {ENDPOINTS}")

# Utility function to generate test data with failure injection
def generate_test_payload(scenario='success', pattern='choreography'):
    """Generate test payload with failure injection logic"""
    # Use valid customer and book IDs that exist in the database
    valid_customers = ["customer-001", "customer-002", "customer-003", "customer-004", "customer-005"]
    valid_books = ["book-123", "book-456", "book-789", "book-101", "book-202"]

    base_customer = random.choice(valid_customers)
    base_book = random.choice(valid_books)

    # Failure injection logic
    if scenario == 'stock_failure':
        # Use very high quantity to trigger stock failure (inventory won't have enough)
        return {
            "customer_id": base_customer,
            "items": [{"book_id": base_book, "quantity": 9999}]
        }
    elif scenario == 'payment_failure':
        # Use high-value book to trigger payment failure (amount > 5000)
        # The book price in the system is around 3500, so quantity 2 should exceed 5000
        return {
            "customer_id": base_customer,
            "items": [{"book_id": base_book, "quantity": 2}]
        }
    else:
        # Normal success case
        return {
            "customer_id": base_customer,
            "items": [{"book_id": base_book, "quantity": 1}]
        }
# Async HTTP client utilities
async def make_request(session, url, payload, pattern, scenario):
    """Make async HTTP request and record timing"""
    request_id = uuid.uuid4().hex[:8]
    start_time = time.time()

    try:
        async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=30)) as response:
            response_time = time.time() - start_time

            if response.status == 200 or response.status == 201:
                try:
                    result = await response.json()
                    order_id = result.get('order_id', result.get('id', request_id))
                except Exception:
                    order_id = request_id
                    result = await response.text()
            else:
                order_id = request_id
                result = await response.text()

            return {
                'saga_pattern': pattern,
                'scenario': scenario,
                'order_id': order_id,
                'request_id': request_id,
                'status_code': response.status,
                'response_time': response_time,
                'timestamp': datetime.now().isoformat(),
                'result': str(result)[:200],  # Truncate long results",
                'load_phase': 'single'  # Default for single tests, will be updated for load tests",
            }

    except Exception as e:
        response_time = time.time() - start_time
        return {
            'saga_pattern': pattern,
            'scenario': scenario,
            'order_id': request_id,
            'request_id': request_id,
            'status_code': 'ERROR',
            'response_time': response_time,
            'timestamp': datetime.now().isoformat(),
            'result': str(e)[:200],
            'load_phase': 'single'  # Default for single tests, will be updated for load tests",
        }

print("Utility functions loaded successfully.")

Performance testing setup complete.
Database connection: mysql+pymysql://cloudmart_user:cloudmart_pass@localhost:3307/cloudmart_saga
Endpoints: {'choreography': 'http://localhost:8011/orders', 'orchestration': 'http://localhost:8005/saga/start'}
Utility functions loaded successfully.


In [19]:
# Clear previous test results to start fresh
test_results = []
print("Previous test results cleared. Ready for new tests.")

Previous test results cleared. Ready for new tests.


In [20]:
# Test the updated failure injection logic with a few manual tests
async def test_failure_injection():
    """Quick test of the updated failure injection logic"""
    print("=== Testing Updated Failure Injection ===")

    async with aiohttp.ClientSession() as session:
        # Test 1: Success case
        print("\n1. Testing success case:")
        payload = generate_test_payload('success')
        print(f"   Payload: {payload}")
        result = await make_request(session, ENDPOINTS['choreography'], payload, 'choreography', 'success')
        print(f"   Result: {result['status_code']} - {result['response_time']:.3f}s")

        # Test 2: Stock failure case
        print("\n2. Testing stock failure (high quantity):")
        payload = generate_test_payload('stock_failure')
        print(f"   Payload: {payload}")
        result = await make_request(session, ENDPOINTS['choreography'], payload, 'choreography', 'stock_failure')
        print(f"   Result: {result['status_code']} - {result['response_time']:.3f}s")

        # Test 3: Payment failure case
        print("\n3. Testing payment failure (high amount):")
        payload = generate_test_payload('payment_failure')
        print(f"   Payload: {payload}")
        result = await make_request(session, ENDPOINTS['choreography'], payload, 'choreography', 'payment_failure')
        print(f"   Result: {result['status_code']} - {result['response_time']:.3f}s")

await test_failure_injection()

=== Testing Updated Failure Injection ===

1. Testing success case:
   Payload: {'customer_id': 'customer-001', 'items': [{'book_id': 'book-123', 'quantity': 1}]}
   Result: 200 - 0.059s

2. Testing stock failure (high quantity):
   Payload: {'customer_id': 'customer-003', 'items': [{'book_id': 'book-101', 'quantity': 9999}]}
   Result: 200 - 0.020s

3. Testing payment failure (high amount):
   Payload: {'customer_id': 'customer-003', 'items': [{'book_id': 'book-202', 'quantity': 2}]}
   Result: 200 - 0.011s


In [21]:
# Single-shot and abnormal case tests
async def run_single_tests():
    """Run single-shot tests for functional verification"""
    print("=== Running Single-shot Tests ===")

    test_cases = [
        # Normal success cases (5 times each)
        ('choreography', 'success', 5),
        ('orchestration', 'success', 5),

        # Failure cases (10 times each)
        ('choreography', 'stock_failure', 10),
        ('choreography', 'payment_failure', 10),
        ('orchestration', 'stock_failure', 10),
        ('orchestration', 'payment_failure', 10),
    ]

    async with aiohttp.ClientSession() as session:
        for pattern, scenario, count in test_cases:
            print(f"\nTesting {pattern} - {scenario} ({count} times)")
            url = ENDPOINTS[pattern]

            for i in range(count):
                payload = generate_test_payload(scenario, pattern)
                result = await make_request(session, url, payload, pattern, scenario)
                test_results.append(result)

                status_symbol = "✓" if result['status_code'] in [200, 201] else "✗"
                print(f"  {i+1:2d}. {status_symbol} {result['status_code']} - {result['response_time']:.3f}s - {result['order_id']}")

                # Brief delay between requests
                await asyncio.sleep(0.1)

# Run single tests
await run_single_tests()

print(f"\nSingle tests completed. Total results: {len(test_results)}")

# Show summary
df_single = pd.DataFrame(test_results)
if not df_single.empty:
    summary = df_single.groupby(['pattern', 'scenario']).agg({
        'response_time': ['count', 'mean', 'std'],
        'status_code': lambda x: (x.isin([200, 201])).sum()
    }).round(3)
    print("\nSingle Test Summary:")
    print(summary)

=== Running Single-shot Tests ===

Testing choreography - success (5 times)
   1. ✓ 200 - 0.028s - order-bceddc45
   2. ✓ 200 - 0.022s - order-bc903ba9
   3. ✓ 200 - 0.023s - order-887d0c5a
   4. ✓ 200 - 0.030s - order-f5f45cb7
   3. ✓ 200 - 0.023s - order-887d0c5a
   4. ✓ 200 - 0.030s - order-f5f45cb7
   5. ✓ 200 - 0.030s - order-3685466d

Testing orchestration - success (5 times)
   1. ✓ 200 - 0.049s - order-746e17e6
   5. ✓ 200 - 0.030s - order-3685466d

Testing orchestration - success (5 times)
   1. ✓ 200 - 0.049s - order-746e17e6
   2. ✓ 200 - 0.026s - order-6ca9fc2b
   3. ✓ 200 - 0.025s - order-1a3a9281
   2. ✓ 200 - 0.026s - order-6ca9fc2b
   3. ✓ 200 - 0.025s - order-1a3a9281
   4. ✓ 200 - 0.033s - order-9de98541
   5. ✓ 200 - 0.028s - order-32e49f29
   4. ✓ 200 - 0.033s - order-9de98541
   5. ✓ 200 - 0.028s - order-32e49f29

Testing choreography - stock_failure (10 times)
   1. ✓ 200 - 0.015s - order-94a48bb2
   2. ✓ 200 - 0.033s - order-ec1da7f7

Testing choreography - stock

In [22]:
# Check test results summary
print("=== Test Results Summary ===")
print(f"Total test results: {len(test_results)}")

if test_results:
    df = pd.DataFrame(test_results)
    success_count = (df['status_code'].isin([200, 201])).sum()
    error_count = (df['status_code'] == 'ERROR').sum()
    print(f"Successful requests: {success_count}")
    print(f"Error requests: {error_count}")
    print(f"Success rate: {success_count / len(test_results) * 100:.1f}%")

    if error_count > 0:
        print("\nSample error details:")
        error_results = df[df['status_code'] == 'ERROR'].head(3)
        for _, row in error_results.iterrows():
            print(f"  {row['pattern']} - {row['scenario']}: {row['result'][:100]}...")

    print("\nStatus code distribution:")
    print(df['status_code'].value_counts())

=== Test Results Summary ===
Total test results: 50
Successful requests: 50
Error requests: 0
Success rate: 100.0%

Status code distribution:
status_code
200    50
Name: count, dtype: int64


In [23]:
# Load test with failure injection
async def run_load_test(duration_seconds=180, virtual_users=100):
    """Run load test with failure injection"""
    print("=== Running Load Test ===")
    print(f"Duration: {duration_seconds}s, Virtual Users: {virtual_users}")
    print(f"Expected requests: ~{duration_seconds * virtual_users // 2} (assuming 0.5 req/s per VU)")

    start_time = time.time()
    end_time = start_time + duration_seconds

    async def worker(session, worker_id):
        """Individual worker generating load"""
        worker_results = []
        request_count = 0

        while time.time() < end_time:
            request_count += 1

            # Determine pattern (50/50 split)
            pattern = 'choreography' if worker_id % 2 == 0 else 'orchestration'

            # Failure injection logic
            rand_val = random.random()
            if rand_val < 0.08:  # 8% stock failure
                scenario = 'stock_failure'
            elif rand_val < 0.11:  # 3% payment failure (8% + 3%)
                scenario = 'payment_failure'
            else:
                scenario = 'success'

            url = ENDPOINTS[pattern]
            payload = generate_test_payload(scenario, pattern)

            result = await make_request(session, url, payload, pattern, f"load_{scenario}")
            worker_results.append(result)

            # Control request rate (roughly 0.5 requests per second per worker)
            await asyncio.sleep(random.uniform(1.5, 2.5))

        print(f"Worker {worker_id:3d} completed {request_count} requests")
        return worker_results

    # Run concurrent workers
    async with aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(limit=virtual_users, limit_per_host=virtual_users//2)
    ) as session:

        tasks = [worker(session, i) for i in range(virtual_users)]
        worker_results = await asyncio.gather(*tasks, return_exceptions=True)

        # Flatten results
        load_results = []
        for worker_result in worker_results:
            if isinstance(worker_result, list):
                load_results.extend(worker_result)
            else:
                print(f"Worker error: {worker_result}")

        test_results.extend(load_results)

    actual_duration = time.time() - start_time
    print(f"\nLoad test completed in {actual_duration:.1f}s")
    print(f"Total requests generated: {len(load_results)}")

    # Quick analysis
    if load_results:
        df_load = pd.DataFrame(load_results)
        success_rate = (df_load['status_code'].isin([200, 201])).mean() * 100
        avg_response_time = df_load['response_time'].mean()
        p95_response_time = df_load['response_time'].quantile(0.95)

        scenario_counts = df_load['scenario'].value_counts()

        print("Load Test Summary:")
        print(f"Success rate: {success_rate:.1f}%")
        print(f"Average response time: {avg_response_time:.3f}s")
        print(f"P95 response time: {p95_response_time:.3f}s")
        print("Scenario distribution:")
        for scenario, count in scenario_counts.items():
            print(f"  {scenario}: {count} ({count/len(load_results)*100:.1f}%)")

# Run multi-phase load tests (WARNING: This will take ~6 minutes total)
print("=== Starting Multi-Phase Load Tests ===")
print("This will take approximately 6 minutes total (3 phases).")

# Phase 1: Light load (Warm-up)
print("\n--- Phase 1: Light Load (Warm-up) ---")
print("Duration: 90s, Virtual Users: 30")
light_start = len(test_results)
await run_load_test(duration_seconds=90, virtual_users=30)
light_end = len(test_results)

# Add phase identifier to light load results
for i in range(light_start, light_end):
    if i < len(test_results):
        test_results[i]['load_phase'] = 'light'

# Brief pause between phases
print("Pausing 10 seconds between phases...")
await asyncio.sleep(10)

# Phase 2: Medium load (Standard)
print("\n--- Phase 2: Medium Load (Standard) ---")
print("Duration: 120s, Virtual Users: 80")
medium_start = len(test_results)
await run_load_test(duration_seconds=120, virtual_users=80)
medium_end = len(test_results)

# Add phase identifier to medium load results
for i in range(medium_start, medium_end):
    if i < len(test_results):
        test_results[i]['load_phase'] = 'medium'

# Brief pause between phases
print("Pausing 10 seconds between phases...")
await asyncio.sleep(10)

# Phase 3: Heavy load (Stress test)
print("\n--- Phase 3: Heavy Load (Stress Test) ---")
print("Duration: 90s, Virtual Users: 150")
heavy_start = len(test_results)
await run_load_test(duration_seconds=90, virtual_users=150)
heavy_end = len(test_results)

# Add phase identifier to heavy load results
for i in range(heavy_start, heavy_end):
    if i < len(test_results):
        test_results[i]['load_phase'] = 'heavy'

print("\n=== Multi-Phase Load Test Summary ===")
print(f"Light phase: {light_end - light_start} requests")
print(f"Medium phase: {medium_end - medium_start} requests")
print(f"Heavy phase: {len(test_results) - heavy_start} requests")
print(f"Total load test requests: {len(test_results) - light_start}")
print(f"All tests completed. Total results collected: {len(test_results)}")

=== Starting Multi-Phase Load Tests ===
This will take approximately 6 minutes total (3 phases).

--- Phase 1: Light Load (Warm-up) ---
Duration: 90s, Virtual Users: 30
=== Running Load Test ===
Duration: 90s, Virtual Users: 30
Expected requests: ~1350 (assuming 0.5 req/s per VU)
Worker  11 completed 46 requests
Worker   5 completed 45 requests
Worker  11 completed 46 requests
Worker   5 completed 45 requests
Worker  29 completed 44 requests
Worker  21 completed 45 requests
Worker  16 completed 46 requests
Worker  29 completed 44 requests
Worker  21 completed 45 requests
Worker  16 completed 46 requests
Worker   7 completed 45 requests
Worker   4 completed 45 requests
Worker   9 completed 44 requests
Worker  15 completed 46 requests
Worker  17 completed 44 requests
Worker  27 completed 45 requests
Worker   7 completed 45 requests
Worker   4 completed 45 requests
Worker   9 completed 44 requests
Worker  15 completed 46 requests
Worker  17 completed 44 requests
Worker  27 completed 45 re

In [25]:
# Database aggregation and CSV export (robust refactor with schema inspection)
import traceback
import numpy as np
from sqlalchemy import inspect

def export_performance_csvs():
    """Export performance data to 3 CSV files with robust handling when DB is down and adaptive SQL.

    Guarantees: always returns three DataFrames (possibly empty). Does not raise on DB connection errors.
    """
    print("=== Exporting Performance Data to CSV (robust) ===")

    csv_dir = Path.cwd() / 'data' / 'orchestration_pattern'
    try:
        csv_dir.mkdir(parents=True, exist_ok=True)
        print(f"Ensured output directory exists: {csv_dir}")
    except Exception as e:
        print(f"Failed to create output directory {csv_dir}: {e}")
        traceback.print_exc()

    # Prepare empty DataFrames to return in any failure case
    df_e2e = pd.DataFrame()
    df_conv = pd.DataFrame()
    df_saga = pd.DataFrame()

    # Attempt to create engine but don't let failure abort the function
    engine = None
    try:
        engine = sqlalchemy.create_engine(CONN_STR)
    except Exception as e:
        print(f"Failed to create SQLAlchemy engine: {e}")
        traceback.print_exc()
        engine = None

    # Helper to safely execute read_sql_query
    def safe_read_sql(sql_text, params=None, parse_dates=None):
        nonlocal engine
        try:
            if engine is None:
                raise RuntimeError("DB engine is not available")
            return pd.read_sql_query(sql_text, engine, params=params, parse_dates=parse_dates)
        except Exception as e:
            print(f"safe_read_sql failed: {e}")
            traceback.print_exc()
            return pd.DataFrame()

    # Discover columns in events table (if DB is available)
    events_columns = []
    chosen_ts = None
    if engine is not None:
        try:
            inspector = inspect(engine)
            if 'events' in inspector.get_table_names():
                cols = inspector.get_columns('events')
                events_columns = [c['name'] for c in cols]
                print(f"Events table columns discovered: {events_columns}")

                # Prefer processed_at, then common alternatives
                candidates = ['processed_at', 'processed_time', 'processed_ts', 'processed_on', 'timestamp', 'event_time', 'created_at', 'occurred_at']
                for cand in candidates:
                    if cand in events_columns:
                        chosen_ts = cand
                        break
                if chosen_ts:
                    print(f"Using timestamp column for events queries: {chosen_ts}")
                else:
                    print("No known timestamp column found in events table; convergence/saga queries will be skipped unless fallback from other tables is possible.")
            else:
                print("No 'events' table found in database (inspector).")
        except Exception as e:
            print(f"Failed to inspect 'events' table: {e}")
            traceback.print_exc()

    # If we have in-memory test_results, write e2e from that first
    try:
        print("Exporting E2E latency data...")
        csv_path_e2e = csv_dir / 'e2e_latency.csv'

        if 'test_results' in globals() and test_results:
            df_raw = pd.DataFrame(test_results)
            try:
                df_raw[['pattern', 'scenario', 'status_code', 'response_time', 'timestamp', 'load_phase']].to_csv(csv_path_e2e, index=False)
                print(f"✓ E2E latency data exported from memory: {csv_path_e2e} ({len(df_raw)} rows)")
            except Exception as e:
                print(f"Failed to write in-memory e2e CSV: {e}")
                traceback.print_exc()

            df_e2e = df_raw.copy()
            if 'response_time' in df_e2e.columns:
                df_e2e['e2e_ms'] = df_e2e['response_time'] * 1000
        else:
            # Fallback to DB
            q_e2e = sqlalchemy.text("""
            SELECT
              'orchestration' AS pattern,
              CASE
                WHEN o.status IN ('CANCELLED', 'FAILED') THEN 'failure'
                ELSE 'success'
              END AS scenario,
              o.order_id,
              o.created_at,
              COALESCE(o.confirmed_at, o.cancelled_at, o.updated_at) AS finished_at,
              TIMESTAMPDIFF(MICROSECOND, o.created_at,
                COALESCE(o.confirmed_at, o.cancelled_at, o.updated_at)) / 1000 AS e2e_ms,
              NULL AS http_response_time_s
            FROM orders o
            WHERE o.created_at IS NOT NULL
              AND COALESCE(o.confirmed_at, o.cancelled_at, o.updated_at) IS NOT NULL
            ORDER BY o.created_at DESC;
            """)
            df_e2e = safe_read_sql(q_e2e, parse_dates=['created_at', 'finished_at'])
            try:
                df_e2e.to_csv(csv_path_e2e, index=False)
                print(f"✓ E2E latency data exported from DB (if any): {csv_path_e2e} ({len(df_e2e)} rows)")
            except Exception as e:
                print(f"Failed to write DB-derived e2e CSV: {e}")
                traceback.print_exc()
    except Exception as e:
        print(f"Unexpected error during E2E export: {e}")
        traceback.print_exc()

    # Convergence events
    try:
        print("Exporting convergence events data...")
        csv_path_conv = csv_dir / 'convergence_events.csv'

        if chosen_ts is None:
            print("Skipping convergence events SQL because no timestamp column was determined.")
            df_conv = pd.DataFrame()
        else:
            q_conv = sqlalchemy.text(f"""
            SELECT aggregate_id, event_type, {chosen_ts} as processed_at
            FROM events
            WHERE {chosen_ts} IS NOT NULL
            ORDER BY aggregate_id, {chosen_ts};
            """)

            df_conv = safe_read_sql(q_conv, parse_dates=['processed_at'])
            try:
                df_conv.to_csv(csv_path_conv, index=False)
                print(f"✓ Convergence events exported: {csv_path_conv} ({len(df_conv)} rows)")
            except Exception as e:
                print(f"Failed to write convergence events CSV: {e}")
                traceback.print_exc()
    except Exception as e:
        print(f"Unexpected error during convergence export: {e}")
        traceback.print_exc()

    # Saga steps: try SQL with window functions if chosen_ts is available; otherwise rely on pandas fallback
    try:
        print("Exporting saga steps data (SQL attempt)...")
        csv_path_saga = csv_dir / 'saga_steps.csv'

        if chosen_ts is None:
            print("Skipping saga SQL because no timestamp column was determined. Will rely on pandas fallback (if df_conv available).")
            df_saga = pd.DataFrame()
        else:
            # Build SQL dynamically using chosen timestamp column
            q_saga = sqlalchemy.text(f"""
            WITH step_durations AS (
              SELECT
                aggregate_id,
                event_type,
                {chosen_ts} as processed_at,
                LAG({chosen_ts}, 1, {chosen_ts}) OVER (PARTITION BY aggregate_id ORDER BY {chosen_ts}) as prev_processed_at
              FROM events
              WHERE {chosen_ts} IS NOT NULL
            )
            SELECT
              s.aggregate_id AS saga_id,
              s.aggregate_id AS order_id,
              ROW_NUMBER() OVER (PARTITION BY s.aggregate_id ORDER BY s.processed_at) AS step_number,
              s.event_type AS step_name,
              CASE
                WHEN s.event_type LIKE :cancel OR s.event_type LIKE :fail THEN 'compensation'
                ELSE 'forward'
              END AS command_type,
              'completed' AS status,
              s.prev_processed_at AS started_at,
              s.processed_at AS completed_at,
              TIMESTAMPDIFF(MICROSECOND, s.prev_processed_at, s.processed_at) / 1000 AS duration_ms
            FROM step_durations s
            ORDER BY s.aggregate_id, s.processed_at;
            """)

            df_saga = safe_read_sql(q_saga, params={'cancel': '%Cancel%', 'fail': '%Fail%'}, parse_dates=['started_at', 'completed_at'])

            if not df_saga.empty:
                try:
                    df_saga.to_csv(csv_path_saga, index=False)
                    print(f"✓ Saga steps exported from SQL: {csv_path_saga} ({len(df_saga)} rows)")
                except Exception as e:
                    print(f"Failed to write saga_steps CSV from SQL result: {e}")
                    traceback.print_exc()
            else:
                print("SQL returned no saga rows; will attempt pandas fallback if events exist")

    except Exception as e:
        print(f"Saga steps SQL attempt failed: {e}")
        traceback.print_exc()

    # Pandas fallback: build saga steps from df_conv if df_saga is empty
    if (df_saga is None) or (isinstance(df_saga, pd.DataFrame) and df_saga.empty):
        try:
            if not df_conv.empty:
                print("Attempting Pandas fallback to build saga steps from convergence events...")
                df_tmp = df_conv.sort_values(['aggregate_id', 'processed_at']).copy()
                df_tmp['step_number'] = df_tmp.groupby('aggregate_id').cumcount() + 1
                df_tmp['prev_processed_at'] = df_tmp.groupby('aggregate_id')['processed_at'].shift(1)
                df_tmp['prev_processed_at'] = df_tmp['prev_processed_at'].fillna(df_tmp['processed_at'])

                df_saga = pd.DataFrame({
                    'saga_id': df_tmp['aggregate_id'],
                    'order_id': df_tmp['aggregate_id'],
                    'step_number': df_tmp['step_number'],
                    'step_name': df_tmp['event_type'],
                    'command_type': np.where(df_tmp['event_type'].str.contains('Cancel|Fail', case=False, na=False), 'compensation', 'forward'),
                    'status': 'completed',
                    'started_at': df_tmp['prev_processed_at'],
                    'completed_at': df_tmp['processed_at'],
                    'duration_ms': ((df_tmp['processed_at'] - df_tmp['prev_processed_at']).dt.total_seconds() * 1000).fillna(0)
                })

                try:
                    df_saga.to_csv(csv_path_saga, index=False)
                    print(f"✓ Saga steps exported via Pandas fallback: {csv_path_saga} ({len(df_saga)} rows)")
                except Exception as e:
                    print(f"Failed to write saga_steps CSV during pandas fallback: {e}")
                    traceback.print_exc()
            else:
                print("Pandas fallback skipped: no convergence events data (df_conv empty)")
        except Exception as e:
            print(f"Pandas fallback failed unexpectedly: {e}")
            traceback.print_exc()

    # Final summary and guaranteed return of DataFrames
    print("\n=== Export Summary (final) ===")
    print(f"E2E latency records: {len(df_e2e) if isinstance(df_e2e, pd.DataFrame) else 0}")
    print(f"Event records: {len(df_conv) if isinstance(df_conv, pd.DataFrame) else 0}")
    print(f"Saga step records: {len(df_saga) if isinstance(df_saga, pd.DataFrame) else 0}")

    # Ensure non-None DataFrames
    df_e2e = df_e2e if isinstance(df_e2e, pd.DataFrame) else pd.DataFrame()
    df_conv = df_conv if isinstance(df_conv, pd.DataFrame) else pd.DataFrame()
    df_saga = df_saga if isinstance(df_saga, pd.DataFrame) else pd.DataFrame()

    return df_e2e, df_conv, df_saga


# Export CSV files (robust)
df_e2e, df_conv, df_saga = export_performance_csvs()

# Check CSV export results
print("=== CSV Export Results ===")
print(f"E2E latency data: {len(df_e2e)} rows exported")
print(f"Convergence events data: {len(df_conv)} rows exported")
print(f"Saga steps data: {len(df_saga)} rows exported")


=== Exporting Performance Data to CSV (robust) ===
Ensured output directory exists: /Users/codefox/workspace/practice_infra_arch/saga_pattern/data/orchestration_pattern
Events table columns discovered: ['event_id', 'aggregate_id', 'aggregate_type', 'event_type', 'event_data', 'version', 'created_at']
Using timestamp column for events queries: created_at
Exporting E2E latency data...
✓ E2E latency data exported from memory: /Users/codefox/workspace/practice_infra_arch/saga_pattern/data/orchestration_pattern/e2e_latency.csv (12795 rows)
Exporting convergence events data...
✓ Convergence events exported: /Users/codefox/workspace/practice_infra_arch/saga_pattern/data/orchestration_pattern/convergence_events.csv (51056 rows)
Exporting saga steps data (SQL attempt)...
✓ Convergence events exported: /Users/codefox/workspace/practice_infra_arch/saga_pattern/data/orchestration_pattern/convergence_events.csv (51056 rows)
Exporting saga steps data (SQL attempt)...
✓ Saga steps exported from SQL: 

In [26]:
# Multi-phase load test analysis
def analyze_load_phases():
    """Analyze results by load phase"""
    print("=== Load Phase Analysis ===")

    # Convert test_results to DataFrame for analysis
    df_all = pd.DataFrame(test_results)

    if df_all.empty or 'load_phase' not in df_all.columns:
        print("No load phase data available for analysis.")
        return

    # Filter only load test results (exclude single tests)
    load_phases = ['light', 'medium', 'heavy']
    df_load = df_all[df_all['load_phase'].isin(load_phases)].copy()

    if df_load.empty:
        print("No multi-phase load test data found.")
        return

    print(f"Total load test requests: {len(df_load)}")

    # Phase-by-phase analysis
    for phase in load_phases:
        phase_data = df_load[df_load['load_phase'] == phase]
        if phase_data.empty:
            continue

        success_rate = (phase_data['status_code'].isin([200, 201])).mean() * 100
        avg_response = phase_data['response_time'].mean()
        p95_response = phase_data['response_time'].quantile(0.95)
        p99_response = phase_data['response_time'].quantile(0.99)

        scenario_dist = phase_data['scenario'].value_counts()

        print(f"\n--- {phase.upper()} Load Phase ---")
        print(f"  Requests: {len(phase_data)}")
        print(f"  Success rate: {success_rate:.1f}%")
        print(f"  Avg response time: {avg_response:.3f}s")
        print(f"  P95 response time: {p95_response:.3f}s")
        print(f"  P99 response time: {p99_response:.3f}s")
        print("  Scenario distribution:")
        for scenario, count in scenario_dist.items():
            print(f"    {scenario}: {count} ({count/len(phase_data)*100:.1f}%)")

    # Export phase-specific CSV
    csv_path = Path.cwd() / 'data' / 'orchestration_pattern' / 'load_phase_results.csv'
    df_load.to_csv(csv_path, index=False)
    print(f"\n✓ Load phase results exported: 'orchestration_pattern/load_phase_results.csv' ({len(df_load)} rows)")

    # Performance comparison across phases
    phase_summary = df_load.groupby('load_phase').agg({
        'response_time': ['count', 'mean', 'std', lambda x: x.quantile(0.95), lambda x: x.quantile(0.99)],
        'status_code': lambda x: (x.isin([200, 201])).mean() * 100
    }).round(3)

    phase_summary.columns = ['Count', 'Avg_RT', 'Std_RT', 'P95_RT', 'P99_RT', 'Success_Rate']
    print("\n=== Phase Comparison Summary ===")
    print(phase_summary)

    return df_load

# Run phase analysis
df_load_phases = analyze_load_phases()

=== Load Phase Analysis ===
Total load test requests: 12745

--- LIGHT Load Phase ---
  Requests: 1351
  Success rate: 100.0%
  Avg response time: 0.024s
  P95 response time: 0.048s
  P99 response time: 0.206s
  Scenario distribution:
    load_success: 1220 (90.3%)
    load_stock_failure: 84 (6.2%)
    load_payment_failure: 47 (3.5%)

--- MEDIUM Load Phase ---
  Requests: 4764
  Success rate: 100.0%
  Avg response time: 0.025s
  P95 response time: 0.047s
  P99 response time: 0.392s
  Scenario distribution:
    load_success: 4226 (88.7%)
    load_stock_failure: 400 (8.4%)
    load_payment_failure: 138 (2.9%)

--- HEAVY Load Phase ---
  Requests: 6630
  Success rate: 99.9%
  Avg response time: 0.055s
  P95 response time: 0.129s
  P99 response time: 0.732s
  Scenario distribution:
    load_success: 5857 (88.3%)
    load_stock_failure: 566 (8.5%)
    load_payment_failure: 207 (3.1%)

✓ Load phase results exported: 'orchestration_pattern/load_phase_results.csv' (12745 rows)

=== Phase Compa

In [27]:
# Export raw test response times directly to CSV
df_raw = pd.DataFrame(test_results)
csv_path_raw = Path.cwd() / 'data' / 'orchestration_pattern' / 'e2e_latency.csv'
df_raw[['pattern', 'scenario', 'status_code', 'response_time', 'timestamp', 'load_phase']].to_csv(csv_path_raw, index=False)
print(f"✓ Raw latency data exported: orchestration_pattern/e2e_latency.csv ({len(df_raw)} rows)")

✓ Raw latency data exported: orchestration_pattern/e2e_latency.csv (12795 rows)


# Saga Orchestrator Improvements Summary

## Fixed Issues in the 'Insufficient stock' Error Analysis

I've implemented three key improvements to the saga orchestrator to address the error you encountered:

### 1. Fixed Compensation Loop (Priority 1)
**Problem**: Compensation was being run for failed steps, not just completed steps.
- **Before**: `for j in range(i, -1, -1):` - included the failed step itself
- **After**: `for j in range(i-1, -1, -1):` - only compensates previously completed steps
- **Added Safety Check**: Verifies `SagaStepLog.status == COMPLETED` before compensation

### 2. Fixed Saga Status Timing (Priority 2)  
**Problem**: Saga status was updated before step execution, causing misleading state.
- **Before**: Status updated before step execution 
- **After**: Status updated only after successful step completion
- **Benefit**: External status queries now reflect actual completion state

### 3. Fixed Multi-Item Inventory Handling (Priority 3)
**Problem**: Only first item processed for inventory operations, causing inconsistency.
- **Before**: `first_item = payload["items"][0]` - processed only first item
- **After**: Loops through all items individually with proper compensation
- **Safety**: If any item fails, compensates already-reserved items immediately

### What the Original Error Showed
- Inventory service correctly returned 400 for insufficient stock
- Orchestrator properly triggered compensation flow  
- **Issues**: Compensation included failed step + status updated prematurely + multi-item inconsistency

### Testing Recommendation
Run the existing notebook performance tests to verify the improvements work correctly with both success and failure scenarios.

In [15]:
# Test the improved error handling with a specific inventory failure scenario
async def test_improved_error_handling():
    """Test the improved orchestrator with insufficient stock scenario"""
    print("=== Testing Improved Error Handling ===")

    # Test with insufficient stock (very high quantity)
    test_payload = {
        "customer_id": "customer-001",
        "items": [
            {"book_id": "book-123", "quantity": 9999},  # Will trigger stock failure
            {"book_id": "book-456", "quantity": 1}      # This should not be processed if first fails
        ]
    }

    async with aiohttp.ClientSession() as session:
        print("Testing multi-item order with insufficient stock...")
        print(f"Payload: {test_payload}")

        try:
            start_time = time.time()
            async with session.post(
                ENDPOINTS['orchestration'],
                json=test_payload,
                timeout=aiohttp.ClientTimeout(total=30)
            ) as response:
                response_time = time.time() - start_time

                if response.status in [200, 201]:
                    result = await response.json()
                    saga_id = result.get('saga_id')
                    print(f"✓ Saga started: {saga_id}")

                    # Wait a moment for saga to process
                    await asyncio.sleep(2)

                    # Check saga status
                    async with session.get(f"http://localhost:8005/saga/{saga_id}/status") as status_response:
                        if status_response.status == 200:
                            status_data = await status_response.json()
                            print(f"Saga Status: {status_data['status']}")
                            print(f"Steps: {len(status_data.get('steps', []))}")

                            for step in status_data.get('steps', []):
                                print(f"  Step {step['step_number']}: {step['step_name']} - {step['status']}")
                                if step.get('error_message'):
                                    print(f"    Error: {step['error_message'][:100]}...")
                else:
                    error_text = await response.text()
                    print(f"✗ Request failed: {response.status} - {error_text[:200]}...")

        except Exception as e:
            print(f"✗ Exception occurred: {str(e)}")

# Run the improved error handling test
await test_improved_error_handling()

=== Testing Improved Error Handling ===
Testing multi-item order with insufficient stock...
Payload: {'customer_id': 'customer-001', 'items': [{'book_id': 'book-123', 'quantity': 9999}, {'book_id': 'book-456', 'quantity': 1}]}
✓ Saga started: saga-8a6bd4e5
Saga Status: FAILED
Steps: 1
  Step 1: inventory.reserve_stock - FAILED
    Error: Service error: 400 - {"detail":"Insufficient stock. Available: 0, Requested: 9999"}...
