## 1. Setup and Configuration

In [9]:
import os
import time
import json
import requests
from typing import Dict

# Modern data stack
import psycopg
import polars as pl
import plotly.express as px
import plotly.graph_objects as go

# Configuration - Auto-detect environment
# Set NOETL_ENV=kubernetes to run against in-cluster services
# Set NOETL_ENV=localhost (default) to run against port-forwarded services
ENVIRONMENT = os.getenv("NOETL_ENV", "localhost").lower()

if ENVIRONMENT == "kubernetes":
    # In-cluster configuration
    DB_CONFIG = {
        "host": "postgres.postgres.svc.cluster.local",
        "port": "5432",
        "user": os.getenv("POSTGRES_USER", "demo"),
        "password": os.getenv("POSTGRES_PASSWORD", "demo"),
        "dbname": os.getenv("POSTGRES_DB", "demo_noetl")
    }
    NOETL_SERVER_URL = "http://noetl.noetl.svc.cluster.local:8082"
else:
    # Localhost configuration (port-forwarded from kind cluster)
    DB_CONFIG = {
        "host": "localhost",
        "port": "54321",  # Maps to postgres NodePort 30321
        "user": os.getenv("POSTGRES_USER", "demo"),
        "password": os.getenv("POSTGRES_PASSWORD", "demo"),
        "dbname": os.getenv("POSTGRES_DB", "demo_noetl")
    }
    NOETL_SERVER_URL = "http://localhost:8082"  # Maps to noetl NodePort 30082

TEST_PATH = "tests/pagination/loop_with_pagination/loop_with_pagination"
POLL_INTERVAL = 2
MAX_WAIT = 120

print("‚úì Configuration loaded")
print(f"  Environment: {ENVIRONMENT}")
print(f"  Server: {NOETL_SERVER_URL}")
print(f"  Database: {DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['dbname']}")
print(f"  Test: {TEST_PATH}")


‚úì Configuration loaded
  Environment: localhost
  Server: http://localhost:8082
  Database: localhost:54321/demo_noetl
  Test: tests/pagination/loop_with_pagination/loop_with_pagination


## 2. Initialize Test Table

In [10]:
# Create test schema and table - completely self-contained
import os
import psycopg

def get_postgres_connection():
    """Get psycopg3 connection"""
    conn_string = f"host={os.getenv('POSTGRES_HOST', 'postgres.postgres.svc.cluster.local')} " \
                  f"port={os.getenv('POSTGRES_PORT', '5432')} " \
                  f"dbname={os.getenv('POSTGRES_DB', 'demo_noetl')} " \
                  f"user={os.getenv('POSTGRES_USER', 'demo')} " \
                  f"password={os.getenv('POSTGRES_PASSWORD', 'demo')}"
    return psycopg.connect(conn_string)

create_table_sql = """
CREATE SCHEMA IF NOT EXISTS noetl_test;

DROP TABLE IF EXISTS noetl_test.pagination_loop_results;

CREATE TABLE noetl_test.pagination_loop_results (
    id SERIAL PRIMARY KEY,
    execution_id BIGINT,
    endpoint_name TEXT,
    endpoint_path TEXT,
    page_size INTEGER,
    result_count INTEGER,
    result_data JSONB,
    iteration_index INTEGER,
    iteration_count INTEGER,
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_pagination_loop_execution_id 
ON noetl_test.pagination_loop_results(execution_id);
"""

with get_postgres_connection() as conn:
    with conn.cursor() as cur:
        cur.execute(create_table_sql)
        conn.commit()

print("‚úì Test table created")
print("  Schema: noetl_test")
print("  Table: pagination_loop_results")

‚úì Test table created
  Schema: noetl_test
  Table: pagination_loop_results


## 3. Database Utilities

In [11]:
def get_postgres_connection():
    """Get psycopg3 connection"""
    conn_string = f"host={DB_CONFIG['host']} port={DB_CONFIG['port']} " \
                  f"dbname={DB_CONFIG['dbname']} user={DB_CONFIG['user']} " \
                  f"password={DB_CONFIG['password']}"
    return psycopg.connect(conn_string)

def query_to_polars(query: str) -> pl.DataFrame:
    """Execute query and return as Polars DataFrame"""
    with get_postgres_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(query)
            columns = [desc[0] for desc in cur.description]
            data = cur.fetchall()
    if not data:
        return pl.DataFrame(schema=columns)
    return pl.DataFrame({col: [row[i] for row in data] for i, col in enumerate(columns)})

print("‚úì Database utilities loaded")

‚úì Database utilities loaded


## 4. Execute Pagination Loop Test

In [12]:
def start_test() -> Dict:
    """Start pagination loop test"""
    url = f"{NOETL_SERVER_URL}/api/run/playbook"
    payload = {"path": TEST_PATH}
    
    print(f"Starting test: {TEST_PATH}")
    response = requests.post(url, json=payload, timeout=30)
    response.raise_for_status()
    
    result = response.json()
    execution_id = result['execution_id']
    
    print(f"‚úì Test started")
    print(f"  Execution ID: {execution_id}")
    print(f"  Status: {result['status']}")
    
    return result

test_result = start_test()
EXECUTION_ID = test_result['execution_id']

Starting test: tests/pagination/loop_with_pagination/loop_with_pagination
‚úì Test started
  Execution ID: 512664515830350754
  Status: running


## 5. Monitor Execution

In [13]:
def monitor_execution(execution_id: int):
    """Monitor test execution"""
    start_time = time.time()
    last_count = 0
    
    print(f"Monitoring execution {execution_id}...")
    print(f"{'Time':<6} {'Steps':<6} {'Status':<12} {'Events'}")
    print("-" * 50)
    
    while (time.time() - start_time) < MAX_WAIT:
        query = f"""
            SELECT event_type, COUNT(*) as count
            FROM noetl.event
            WHERE execution_id = {execution_id}
            GROUP BY event_type
        """
        df = query_to_polars(query)
        
        step_count = df.filter(pl.col('event_type') == 'step_completed')['count'].sum() or 0
        is_complete = df.filter(pl.col('event_type') == 'playbook_completed').height > 0
        is_failed = df.filter(pl.col('event_type') == 'playbook_failed').height > 0
        
        if step_count != last_count or is_complete or is_failed:
            elapsed = int(time.time() - start_time)
            status = "COMPLETED" if is_complete else ("FAILED" if is_failed else "RUNNING")
            total = df['count'].sum()
            print(f"{elapsed:<6} {step_count:<6} {status:<12} {total}")
            last_count = step_count
        
        if is_complete:
            print(f"\n‚úì Test completed in {elapsed}s")
            return True
        elif is_failed:
            print(f"\n‚úó Test failed after {elapsed}s")
            return False
        
        time.sleep(POLL_INTERVAL)
    
    print(f"\n‚ö† Timeout after {MAX_WAIT}s")
    return False

success = monitor_execution(EXECUTION_ID)

Monitoring execution 512664515830350754...
Time   Steps  Status       Events
--------------------------------------------------
0      0      COMPLETED    15

‚úì Test completed in 0s


## 6. Validate Iterator Event Architecture

**Expected Behavior:**
1. Worker detects `loop` configuration in step
2. Routes to iterator executor 
3. Analyzes collection (filter, sort, limit)
4. Emits `iterator_started` event with metadata
5. Server should process event and enqueue iteration jobs (NOT YET IMPLEMENTED)

This section verifies the distributed loop architecture is working correctly.

In [14]:
# Validate iterator event architecture
print(f"\nüìä Iterator Event Validation for Execution {EXECUTION_ID}")
print("=" * 80)

# 1. Check basic event flow
events_query = f"""
    SELECT 
        event_type,
        node_name,
        status,
        created_at
    FROM noetl.event
    WHERE execution_id = {EXECUTION_ID}
    ORDER BY event_id
"""
events_df = query_to_polars(events_query)

print(f"\n‚úì Event Flow ({events_df.height} events):")
for row in events_df.iter_rows(named=True):
    print(f"  {row['event_type']:25} {row['status']:12} {row['node_name'] or ''}")

# 2. Check iterator_started event exists
iterator_check = f"""
    SELECT 
        event_type,
        status,
        context->>'total_count' as total_count,
        context->>'mode' as mode,
        context->>'iterator_name' as iterator_name,
        jsonb_array_length(context->'collection') as collection_size,
        context->'nested_task'->>'tool' as nested_tool
    FROM noetl.event
    WHERE execution_id = {EXECUTION_ID}
      AND event_type = 'iterator_started'
"""
iterator_df = query_to_polars(iterator_check)

if iterator_df.height > 0:
    print(f"\n‚úì iterator_started Event Found:")
    row = iterator_df.row(0, named=True)
    print(f"  Status:           {row['status']}")
    print(f"  Total Count:      {row['total_count']}")
    print(f"  Collection Size:  {row['collection_size']}")
    print(f"  Mode:             {row['mode']}")
    print(f"  Iterator Name:    {row['iterator_name']}")
    print(f"  Nested Tool:      {row['nested_tool']}")
else:
    print("\n‚ùå iterator_started event NOT FOUND!")
    print("   This means the event callback integration failed.")

# 3. Check iterator metadata
if iterator_df.height > 0:
    metadata_query = f"""
        SELECT 
            context->'collection' as collection,
            context->'nested_task' as nested_task
        FROM noetl.event
        WHERE execution_id = {EXECUTION_ID}
          AND event_type = 'iterator_started'
    """
    
    with get_postgres_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(metadata_query)
            result = cur.fetchone()
            if result:
                collection = result[0]
                nested_task = result[1]
                
                print(f"\n‚úì Iterator Metadata:")
                print(f"  Collection: {json.dumps(collection, indent=2)[:200]}...")
                print(f"\n  Nested Task Tool: {nested_task.get('tool')}")
                print(f"  Has retry.on_success: {'retry' in nested_task and 'on_success' in nested_task.get('retry', {})}")
                
                if 'retry' in nested_task and 'on_success' in nested_task['retry']:
                    retry_config = nested_task['retry']['on_success']
                    print(f"  Pagination Config:")
                    print(f"    - while: {retry_config.get('while', 'N/A')[:60]}")
                    print(f"    - max_attempts: {retry_config.get('max_attempts', 'N/A')}")
                    print(f"    - collect.strategy: {retry_config.get('collect', {}).get('strategy', 'N/A')}")

# 4. Check for expected next events (iteration jobs)
iteration_check = f"""
    SELECT COUNT(*) as count
    FROM noetl.event
    WHERE execution_id = {EXECUTION_ID}
      AND event_type = 'iteration_completed'
"""
iteration_df = query_to_polars(iteration_check)

if iteration_df['count'][0] > 0:
    print(f"\n‚úì Found {iteration_df['count'][0]} iteration_completed events")
    print("  Server successfully processed iterator_started and enqueued iteration jobs!")
else:
    print(f"\n‚ö† No iteration_completed events found")
    print("  This is EXPECTED - Server orchestrator doesn't yet process iterator_started")
    print("  Next Implementation Step: Add _process_iterator_started() handler in orchestrator.py")

# 5. Summary
print(f"\n{'='*80}")
print("VALIDATION SUMMARY:")
print("=" * 80)

if iterator_df.height > 0:
    print("‚úÖ Worker Event Callback: WORKING")
    print("‚úÖ Iterator Executor: WORKING") 
    print("‚úÖ iterator_started Event: EMITTED")
    print("‚úÖ Event Schema: VALID")
    print("‚è≥ Server Orchestrator: NOT YET IMPLEMENTED")
    print("\nNext: Implement server-side iterator_started event processing to enqueue iteration jobs")
else:
    print("‚ùå Iterator event architecture validation FAILED")
    print("   Check worker logs for event emission errors")


üìä Iterator Event Validation for Execution 512664515830350754

‚úì Event Flow (15 events):
  playbook_started          STARTED      tests/pagination/loop_with_pagination/loop_with_pagination
  workflow_initialized      COMPLETED    workflow
  iterator_started          RUNNING      iterator
  action_started            RUNNING      fetch_all_endpoints_iter_0
  action_completed          COMPLETED    fetch_all_endpoints_iter_0
  action_completed          COMPLETED    fetch_all_endpoints
  workflow_completed        COMPLETED    workflow
  playbook_completed        COMPLETED    tests/pagination/loop_with_pagination/loop_with_pagination
  step_result               COMPLETED    fetch_all_endpoints_iter_0
  action_started            RUNNING      fetch_all_endpoints_iter_1
  action_completed          COMPLETED    fetch_all_endpoints_iter_1
  step_result               COMPLETED    fetch_all_endpoints_iter_1
  action_started            RUNNING      fetch_all_endpoints_iter_2
  action_completed 

## 7. Architecture Status & Next Steps

**‚úÖ Completed Implementation:**
- Worker-side event callback integration
- Iterator executor analysis and event emission  
- Event schema extensions (iterator_started, iterator_completed, etc.)
- Environment-aware configuration (localhost/kubernetes)

**‚è≥ Pending Implementation:**
- Server orchestrator `_process_iterator_started()` handler
- Iteration job enqueueing logic
- Iteration execution with pagination (retry.on_success)
- `iterator_completed` event emission after all iterations

**Note:** The test will timeout until server-side processing is implemented.

In [15]:
def show_execution_events(execution_id: int):
    """Display ordered table of events for an execution"""
    query = f"""
        SELECT 
            ROW_NUMBER() OVER (ORDER BY event_id) as seq,
            event_type,
            node_name,
            node_type,
            status,
            CASE 
                WHEN event_type = 'iterator_started' THEN 
                    'collection_size=' || jsonb_array_length(context->'collection')::text
                WHEN parent_execution_id IS NOT NULL THEN
                    'iteration=' || (context->>'iteration_index')
                ELSE ''
            END as details,
            created_at
        FROM noetl.event
        WHERE execution_id = {execution_id}
        ORDER BY event_id
    """
    
    df = query_to_polars(query)
    
    print(f"\nüìã Events for Execution {execution_id}")
    print("=" * 120)
    print(f"{'#':<4} {'Event Type':<25} {'Node':<25} {'Type':<12} {'Status':<12} {'Details':<20} {'Created'}")
    print("-" * 120)
    
    for row in df.iter_rows(named=True):
        marker = "üîÑ" if "iterator" in row['event_type'] else "  "
        node_name = row['node_name'] or '-'
        node_type = row['node_type'] or '-'
        status = row['status'] or '-'
        details = row['details'] or ''
        created = str(row['created_at']) if row['created_at'] else '-'
        
        print(f"{marker}{row['seq']:<3} {row['event_type']:<25} {node_name:<25} "
              f"{node_type:<12} {status:<12} {details:<20} {created}")
    
    print("-" * 120)
    print(f"Total events: {len(df)}\n")
    
    # Summary
    iterator_events = df.filter(pl.col('event_type').str.contains('iterator'))
    if len(iterator_events) > 0:
        print("‚úÖ Iterator events detected")
        print(f"   - iterator_started: {len(df.filter(pl.col('event_type') == 'iterator_started'))}")
        print(f"   - iteration jobs: Check queue table for parent_execution_id={execution_id}")

# Show events for the test execution
show_execution_events(EXECUTION_ID)


üìã Events for Execution 512664515830350754
#    Event Type                Node                      Type         Status       Details              Created
------------------------------------------------------------------------------------------------------------------------
  1   playbook_started          tests/pagination/loop_with_pagination/loop_with_pagination execution    STARTED                           2025-12-08 08:12:52.720337
  2   workflow_initialized      workflow                  workflow     COMPLETED                         2025-12-08 08:12:52.727407
üîÑ3   iterator_started          iterator                  iterator     RUNNING      collection_size=3    2025-12-08 08:12:52.730756
  4   action_started            fetch_all_endpoints_iter_0 task         RUNNING                           2025-12-08 08:12:53.367623
  5   action_completed          fetch_all_endpoints_iter_0 task         COMPLETED                         2025-12-08 08:12:53.456027
  6   action_completed  

## 9. Check Iteration Jobs in Queue

In [None]:
# Check if iteration jobs were enqueued
queue_query = f"""
SELECT 
    execution_id,
    parent_execution_id,
    node_name,
    status,
    context->>'iteration_index' as iteration_index,
    context->>'iterator_name' as iterator_name,
    created_at
FROM noetl.queue 
WHERE parent_execution_id = {EXECUTION_ID} OR execution_id = {EXECUTION_ID}
ORDER BY created_at
"""

df_queue = query_to_polars(queue_query)

print(f"\nüîç Queue Status for Execution {EXECUTION_ID}")
print("=" * 130)
print(f"{'Execution ID':<25} {'Parent':<25} {'Node':<35} {'Status':<12} {'Iter Index':<12} {'Created'}")
print("-" * 130)

parent_jobs = 0
iteration_jobs = 0

for row in df_queue.iter_rows(named=True):
    exec_id = row['execution_id']
    parent_id = row['parent_execution_id'] or '-'
    node_name = row['node_name']
    status = row['status']
    iter_idx = row['iteration_index'] or '-'
    created = row['created_at']
    
    marker = "   ‚úì" if row['parent_execution_id'] else "   "
    print(f"{marker}{exec_id:<23} {parent_id:<25} {node_name:<35} {status:<12} {iter_idx:<12} {created}")
    
    if row['parent_execution_id']:
        iteration_jobs += 1
    else:
        parent_jobs += 1

print("-" * 130)
print(f"\nüìä Summary:")
print(f"   Parent jobs: {parent_jobs}")
print(f"   Iteration jobs: {iteration_jobs}")

# Updated to expect 3 iterations (3 endpoints in workload)
if iteration_jobs == 3:
    print("\nüéâ SUCCESS! Loop Completion Fix VALIDATED!")
    print("   ‚úÖ Server detected iterator_started")
    print("   ‚úÖ Enqueued 3 iteration jobs")
    print("   ‚úÖ All 3 iterations executed successfully")
    print("   ‚úÖ Parent action_completed event emitted")
    print("   ‚úÖ Workflow progressed to completion")
    print("   ‚úÖ Server-side loop orchestration WORKING!")
elif iteration_jobs == 0:
    print("\n‚ö† No iteration jobs found - iteration expansion not working")
else:
    print(f"\n‚ö† Expected 3 iteration jobs, found {iteration_jobs}")


üîç Queue Status for Execution 512664515830350754
Execution ID              Parent                    Node                                Status       Iter Index   Created
----------------------------------------------------------------------------------------------------------------------------------
   ‚úì512664515830350754      512664515830350754        fetch_all_endpoints_iter_0          done         -            2025-12-08 08:12:52.732945+00:00
   ‚úì512664515830350754      512664515830350754        fetch_all_endpoints_iter_1          done         -            2025-12-08 08:12:52.734478+00:00
   ‚úì512664515830350754      512664515830350754        fetch_all_endpoints_iter_2          done         -            2025-12-08 08:12:52.736483+00:00
----------------------------------------------------------------------------------------------------------------------------------

üìä Summary:
   Parent jobs: 0
   Iteration jobs: 3

‚ö† Expected 2 iteration jobs, found 3


## 8. View Execution Events (Ordered Table)

Query and display all events for a specific execution in chronological order with key details.

## Summary: Distributed Loop + Pagination Architecture

**What This Test Validates:**

This notebook demonstrates the **event-driven distributed loop architecture** where:
- Workers analyze collections and emit events
- Server processes events to orchestrate distributed execution
- Each iteration runs independently with pagination support

**Current Implementation Status:**

```
‚úÖ PHASE 1: Worker-Side Architecture (COMPLETE)
   ‚îú‚îÄ Loop detection in execute_task()
   ‚îú‚îÄ Iterator executor analysis (filter, sort, limit)
   ‚îú‚îÄ Event callback integration
   ‚îî‚îÄ iterator_started event emission

‚è≥ PHASE 2: Server-Side Orchestration (PENDING)
   ‚îú‚îÄ Process iterator_started event
   ‚îú‚îÄ Enqueue N iteration jobs
   ‚îú‚îÄ Track iteration completion
   ‚îî‚îÄ Emit iterator_completed event

üîÆ PHASE 3: Pagination via Retry (DESIGN READY)
   ‚îú‚îÄ HTTP action executes with retry.on_success
   ‚îú‚îÄ Server-side pagination state tracking
   ‚îú‚îÄ Page continuation logic
   ‚îî‚îÄ Result aggregation
```

**How to Use:**

1. **Run locally:** Execute cells 1-6 (default: localhost mode)
2. **Run in-cluster:** Set `NOETL_ENV=kubernetes` before cell 2
3. **Check validation:** Cell 6 shows full architecture validation

**Next Steps:**

Implement `_process_iterator_started()` in `orchestrator.py` to:
- Parse collection from iterator_started.context
- Create N PreparedJob instances
- Enqueue to noetl.queue table
- Workers will pick up and execute with pagination