# Part 4: Parallel Task Execution - Running Multiple SQL Queries Concurrently

## The Problem:
You have a loop that runs SQL queries one at a time (sequential):
```python
for sql_query in queries:
    result = execute_query(sql_query)  # Slow! Waits for each query
```

## The Solution:
Run multiple queries in parallel (3 at a time) to speed up execution.

**What you'll learn:**
1. How to use Prefect's `.submit()` for async task execution
2. How to limit concurrency (max 3 tasks at once)
3. How to collect results from parallel tasks
4. Best practices for database connection pooling

## Approach 1: Using `.submit()` with Manual Batching

This approach runs exactly 3 queries at a time by batching them.

In [None]:
from prefect import flow, task
from typing import List, Dict, Any
import time
from datetime import datetime

# Simulate SQL query execution
@task(name="execute_sql_query")
def execute_sql_query(query_name: str, query: str) -> Dict[str, Any]:
    """
    Simulates executing a SQL query.
    In production, this would connect to your database.
    """
    print(f"🔵 Starting query: {query_name} at {datetime.now().strftime('%H:%M:%S')}")
    
    # Simulate query execution time (1-3 seconds)
    import random
    execution_time = random.uniform(1, 3)
    time.sleep(execution_time)
    
    # Simulate results
    result = {
        "query_name": query_name,
        "query": query,
        "row_count": random.randint(100, 1000),
        "execution_time": execution_time,
        "completed_at": datetime.now().strftime('%H:%M:%S')
    }
    
    print(f"✅ Completed query: {query_name} ({execution_time:.2f}s)")
    return result


@flow(name="parallel_sql_batch_approach")
def run_queries_in_batches(queries: List[Dict[str, str]], batch_size: int = 3):
    """
    Run SQL queries in batches of N at a time.
    
    Args:
        queries: List of dicts with 'name' and 'sql' keys
        batch_size: Number of queries to run concurrently (default: 3)
    """
    all_results = []
    
    # Process queries in batches
    for i in range(0, len(queries), batch_size):
        batch = queries[i:i + batch_size]
        print(f"\n📦 Processing batch {i//batch_size + 1} ({len(batch)} queries)")
        
        # Submit all tasks in this batch
        futures = []
        for q in batch:
            future = execute_sql_query.submit(q["name"], q["sql"])
            futures.append(future)
        
        # Wait for all tasks in this batch to complete
        batch_results = [future.result() for future in futures]
        all_results.extend(batch_results)
        
        print(f"✅ Batch {i//batch_size + 1} complete")
    
    return all_results


# Example: Simulate reading queries from a config file
sample_queries = [
    {"name": "customer_orders", "sql": "SELECT * FROM orders WHERE customer_id = 123"},
    {"name": "product_sales", "sql": "SELECT product_id, SUM(sales) FROM sales GROUP BY product_id"},
    {"name": "inventory_check", "sql": "SELECT * FROM inventory WHERE stock < 10"},
    {"name": "user_activity", "sql": "SELECT user_id, COUNT(*) FROM activity GROUP BY user_id"},
    {"name": "revenue_report", "sql": "SELECT date, SUM(revenue) FROM transactions GROUP BY date"},
    {"name": "top_customers", "sql": "SELECT customer_id, total_spent FROM customers ORDER BY total_spent DESC LIMIT 100"},
    {"name": "shipping_status", "sql": "SELECT order_id, status FROM shipments WHERE status = 'pending'"},
    {"name": "returns_analysis", "sql": "SELECT product_id, COUNT(*) FROM returns GROUP BY product_id"},
]

print("=" * 70)
print("APPROACH 1: Manual Batching (3 queries at a time)")
print("=" * 70)
print(f"Total queries: {len(sample_queries)}")
print(f"Batch size: 3")
print(f"Expected batches: {len(sample_queries) // 3 + (1 if len(sample_queries) % 3 else 0)}")
print("=" * 70)

# Run the flow
start_time = time.time()
results = run_queries_in_batches(sample_queries, batch_size=3)
end_time = time.time()

print("\n" + "=" * 70)
print("RESULTS:")
print("=" * 70)
for r in results:
    print(f"  • {r['query_name']}: {r['row_count']} rows in {r['execution_time']:.2f}s")

print(f"\n⏱️  Total execution time: {end_time - start_time:.2f} seconds")
print(f"📊 Average time per query: {(end_time - start_time) / len(sample_queries):.2f} seconds")

## Approach 2: Using Task Concurrency Limits (Prefect 2.x Feature)

This approach uses Prefect's built-in concurrency limiting to automatically control how many tasks run simultaneously.

In [None]:
from prefect import flow, task

# Define task with concurrency limit
@task(
    name="execute_sql_with_limit",
    tags=["database"],
    retries=2,
    retry_delay_seconds=10
)
def execute_sql_with_concurrency_limit(query_name: str, query: str) -> Dict[str, Any]:
    """
    Execute SQL query with automatic concurrency limiting.
    Prefect will ensure max 3 of these run at once if configured.
    """
    print(f"🔵 Starting query: {query_name} at {datetime.now().strftime('%H:%M:%S')}")
    
    import random
    execution_time = random.uniform(1, 3)
    time.sleep(execution_time)
    
    result = {
        "query_name": query_name,
        "query": query,
        "row_count": random.randint(100, 1000),
        "execution_time": execution_time,
        "completed_at": datetime.now().strftime('%H:%M:%S')
    }
    
    print(f"✅ Completed query: {query_name} ({execution_time:.2f}s)")
    return result


@flow(name="parallel_sql_with_concurrency_limit")
def run_queries_with_limit(queries: List[Dict[str, str]]):
    """
    Submit all queries at once - Prefect controls concurrency automatically.
    
    Note: In production, set concurrency limit via:
      1. Prefect UI: Settings → Concurrency Limits → Create
      2. CLI: prefect concurrency-limit create database 3
      3. Code: shown in next example
    """
    # Submit ALL queries at once
    futures = []
    for q in queries:
        future = execute_sql_with_concurrency_limit.submit(q["name"], q["sql"])
        futures.append(future)
    
    # Prefect automatically limits concurrent execution to 3
    # (if concurrency limit is configured)
    results = [future.result() for future in futures]
    
    return results


print("=" * 70)
print("APPROACH 2: Task Concurrency Limits")
print("=" * 70)
print("This approach submits ALL tasks at once.")
print("Prefect's concurrency limiter ensures only 3 run simultaneously.")
print()
print("⚠️  Note: Concurrency limits must be configured on the Prefect server:")
print("   prefect concurrency-limit create database 3")
print("=" * 70)

# For this demo, we'll use manual batching since concurrency limits
# require server configuration. See next cell for how to set them up.
print("\n(Running with manual batching for demo purposes...)")
results = run_queries_in_batches(sample_queries, batch_size=3)

## Approach 3: Using Python's asyncio with Semaphore (Most Control)

This approach gives you the most control over concurrency using Python's built-in asyncio.

In [None]:
import asyncio
from prefect import flow, task

@task(name="async_execute_sql")
async def execute_sql_async(query_name: str, query: str, semaphore: asyncio.Semaphore) -> Dict[str, Any]:
    """
    Async SQL execution with semaphore-based concurrency control.
    The semaphore ensures max 3 queries run at once.
    """
    async with semaphore:  # This blocks if 3 tasks are already running
        print(f"🔵 Starting query: {query_name} at {datetime.now().strftime('%H:%M:%S')}")
        
        import random
        execution_time = random.uniform(1, 3)
        await asyncio.sleep(execution_time)  # Async sleep
        
        result = {
            "query_name": query_name,
            "query": query,
            "row_count": random.randint(100, 1000),
            "execution_time": execution_time,
            "completed_at": datetime.now().strftime('%H:%M:%S')
        }
        
        print(f"✅ Completed query: {query_name} ({execution_time:.2f}s)")
        return result


@flow(name="parallel_sql_with_semaphore")
async def run_queries_with_semaphore(queries: List[Dict[str, str]], max_concurrent: int = 3):
    """
    Run SQL queries with semaphore-controlled concurrency.
    
    Args:
        queries: List of query configurations
        max_concurrent: Maximum number of concurrent queries (default: 3)
    """
    # Create a semaphore that allows max_concurrent tasks at once
    semaphore = asyncio.Semaphore(max_concurrent)
    
    # Create all tasks at once
    tasks = [
        execute_sql_async(q["name"], q["sql"], semaphore)
        for q in queries
    ]
    
    # Run all tasks - semaphore controls concurrency automatically
    results = await asyncio.gather(*tasks)
    
    return results


print("=" * 70)
print("APPROACH 3: asyncio with Semaphore")
print("=" * 70)
print("This uses Python's asyncio.Semaphore to limit concurrency.")
print("Most flexible and works without Prefect server configuration.")
print("=" * 70)

# Run the async flow
start_time = time.time()
results = await run_queries_with_semaphore(sample_queries, max_concurrent=3)
end_time = time.time()

print("\n" + "=" * 70)
print("RESULTS:")
print("=" * 70)
for r in results:
    print(f"  • {r['query_name']}: {r['row_count']} rows in {r['execution_time']:.2f}s")

print(f"\n⏱️  Total execution time: {end_time - start_time:.2f} seconds")
print(f"📊 Queries completed: {len(results)}")
print(f"🚀 Concurrency limit: 3 queries at a time")

## Real-World Example: Loading Queries from SQL Config File

Here's a complete example showing how to read queries from a config file and run them in parallel.

In [None]:
import yaml
import json
from pathlib import Path
from prefect import flow, task
from typing import List, Dict, Any

# Example 1: Reading from YAML config file
sample_yaml_config = """
queries:
  - name: daily_sales
    sql: |
      SELECT 
        date,
        SUM(amount) as total_sales,
        COUNT(*) as transaction_count
      FROM sales
      WHERE date >= CURRENT_DATE - INTERVAL '7 days'
      GROUP BY date
    
  - name: top_products
    sql: |
      SELECT 
        product_id,
        product_name,
        SUM(quantity) as total_sold
      FROM order_items
      JOIN products USING (product_id)
      WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
      GROUP BY product_id, product_name
      ORDER BY total_sold DESC
      LIMIT 100
    
  - name: customer_segments
    sql: |
      SELECT 
        customer_segment,
        COUNT(DISTINCT customer_id) as customer_count,
        AVG(lifetime_value) as avg_ltv
      FROM customers
      GROUP BY customer_segment
"""

# Example 2: Reading from JSON config file
sample_json_config = """
{
  "database": {
    "host": "postgres.company.com",
    "port": 5432,
    "database": "analytics"
  },
  "queries": [
    {
      "name": "inventory_levels",
      "sql": "SELECT warehouse_id, product_id, quantity FROM inventory WHERE quantity < reorder_level",
      "priority": "high"
    },
    {
      "name": "pending_orders",
      "sql": "SELECT order_id, customer_id, status, created_at FROM orders WHERE status = 'pending'",
      "priority": "medium"
    }
  ]
}
"""


@task(name="load_queries_from_config")
def load_queries_from_file(config_path: str, format: str = "yaml") -> List[Dict[str, str]]:
    """
    Load SQL queries from a configuration file.
    
    Args:
        config_path: Path to the config file
        format: File format ('yaml' or 'json')
    
    Returns:
        List of query dictionaries with 'name' and 'sql' keys
    """
    # In this demo, we'll use the sample configs
    # In production, you'd read from actual files
    
    if format == "yaml":
        config = yaml.safe_load(sample_yaml_config)
    else:
        config = json.loads(sample_json_config)
    
    queries = config.get("queries", [])
    
    print(f"📄 Loaded {len(queries)} queries from config file")
    for q in queries:
        print(f"   • {q['name']}")
    
    return queries


@task(name="execute_database_query")
def execute_database_query(query_name: str, query: str, db_config: Dict[str, Any] = None) -> Dict[str, Any]:
    """
    Execute a SQL query against a database.
    
    In production, this would:
    1. Get connection from pool
    2. Execute query
    3. Fetch results
    4. Return connection to pool
    """
    print(f"🔵 Executing: {query_name} at {datetime.now().strftime('%H:%M:%S')}")
    
    # Simulate database query execution
    import random
    execution_time = random.uniform(1, 3)
    time.sleep(execution_time)
    
    # Simulate results
    result = {
        "query_name": query_name,
        "success": True,
        "row_count": random.randint(50, 500),
        "execution_time": execution_time,
        "data": f"[Sample data from {query_name}]"
    }
    
    print(f"✅ Completed: {query_name} - {result['row_count']} rows ({execution_time:.2f}s)")
    return result


@flow(name="production_etl_pipeline")
def run_etl_pipeline(config_path: str = "queries.yaml", max_concurrent: int = 3):
    """
    Production-ready ETL pipeline that:
    1. Loads queries from config file
    2. Executes them in parallel (max 3 at a time)
    3. Collects and returns results
    
    Args:
        config_path: Path to query config file
        max_concurrent: Maximum concurrent queries
    """
    # Step 1: Load queries from config
    queries = load_queries_from_file(config_path, format="yaml")
    
    # Step 2: Run queries in batches
    all_results = []
    
    for i in range(0, len(queries), max_concurrent):
        batch = queries[i:i + max_concurrent]
        batch_num = i // max_concurrent + 1
        
        print(f"\n📦 Batch {batch_num}: Running {len(batch)} queries")
        
        # Submit batch
        futures = [
            execute_database_query.submit(q["name"], q["sql"])
            for q in batch
        ]
        
        # Wait for completion
        batch_results = [f.result() for f in futures]
        all_results.extend(batch_results)
    
    # Step 3: Summary
    successful = sum(1 for r in all_results if r["success"])
    total_rows = sum(r["row_count"] for r in all_results)
    
    print("\n" + "=" * 70)
    print("ETL PIPELINE SUMMARY:")
    print("=" * 70)
    print(f"✅ Successful queries: {successful}/{len(all_results)}")
    print(f"📊 Total rows processed: {total_rows:,}")
    print(f"🚀 Concurrency: {max_concurrent} queries at a time")
    
    return all_results


# Run the production pipeline
print("=" * 70)
print("PRODUCTION EXAMPLE: ETL Pipeline with Config File")
print("=" * 70)

start_time = time.time()
results = run_etl_pipeline(config_path="queries.yaml", max_concurrent=3)
end_time = time.time()

print(f"\n⏱️  Total pipeline time: {end_time - start_time:.2f} seconds")
print(f"📈 Average query time: {(end_time - start_time) / len(results):.2f} seconds")

## Production Best Practices: Database Connection Pooling

When running parallel SQL queries, use connection pooling to avoid overwhelming your database.

In [None]:
# Production example with connection pooling
production_example = """
# ============================================================================
# PRODUCTION CODE: Parallel SQL Queries with Connection Pooling
# ============================================================================

from prefect import flow, task
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
from typing import List, Dict, Any
import pandas as pd

# Global connection pool (created once, reused across tasks)
# This is crucial for parallel execution!
engine = create_engine(
    "postgresql://user:password@host:5432/database",
    poolclass=QueuePool,
    pool_size=5,          # Base pool size
    max_overflow=10,      # Can grow to 15 total connections
    pool_pre_ping=True,   # Test connections before using
    echo=False
)


@task(
    name="execute_query_with_pool",
    retries=3,
    retry_delay_seconds=10
)
def execute_query_with_pool(query_name: str, sql: str) -> pd.DataFrame:
    \"\"\"
    Execute SQL query using connection from pool.
    
    Connection pooling ensures:
    - We don't create too many database connections
    - Connections are reused efficiently
    - Failed connections are recycled
    \"\"\"
    try:
        # Get connection from pool
        with engine.connect() as conn:
            # Execute query
            result = pd.read_sql(text(sql), conn)
            
            print(f"✅ {query_name}: {len(result)} rows")
            return result
            
    except Exception as e:
        print(f"❌ {query_name} failed: {e}")
        raise


@flow(name="parallel_queries_production")
def run_parallel_queries(queries: List[Dict[str, str]], batch_size: int = 3):
    \"\"\"
    Run queries in parallel with connection pooling.
    
    The pool automatically limits concurrent connections,
    so we can safely submit more tasks than we have connections.
    \"\"\"
    all_results = {}
    
    # Process in batches
    for i in range(0, len(queries), batch_size):
        batch = queries[i:i + batch_size]
        
        # Submit batch (pool handles connection limiting)
        futures = [
            execute_query_with_pool.submit(q["name"], q["sql"])
            for q in batch
        ]
        
        # Collect results
        for q, future in zip(batch, futures):
            all_results[q["name"]] = future.result()
    
    return all_results


# ============================================================================
# EXAMPLE USAGE
# ============================================================================

if __name__ == "__main__":
    # Load queries from config
    import yaml
    
    with open("queries.yaml") as f:
        config = yaml.safe_load(f)
    
    # Run with 3 concurrent queries
    results = run_parallel_queries(
        queries=config["queries"],
        batch_size=3
    )
    
    # Save results
    for name, df in results.items():
        df.to_parquet(f"output/{name}.parquet")
        print(f"Saved {name}: {len(df)} rows")


# ============================================================================
# KEY BENEFITS:
# ============================================================================
# 
# ✅ Connection Pooling:
#    - Reuses database connections efficiently
#    - Prevents connection exhaustion
#    - Handles connection failures gracefully
#
# ✅ Parallel Execution:
#    - Runs 3 queries at once
#    - Reduces total pipeline time
#    - Better resource utilization
#
# ✅ Error Handling:
#    - Automatic retries (3 attempts)
#    - Doesn't fail entire pipeline if one query fails
#    - Detailed error logging
#
# ✅ Monitoring:
#    - All queries tracked in Prefect UI
#    - Can see which queries are slow
#    - Full execution history
#
# ============================================================================
"""

print(production_example)

print("\n" + "=" * 70)
print("CONNECTION POOL CONFIGURATION GUIDE")
print("=" * 70)

pooling_guide = """
When configuring connection pools for parallel queries:

1. pool_size (base connections):
   - Set to your typical concurrency level
   - Example: If running 3 queries at a time → pool_size=3 to 5

2. max_overflow (extra connections):
   - Handles bursts when needed
   - Example: max_overflow=5 allows up to 10 total (if pool_size=5)

3. pool_pre_ping (connection testing):
   - Always set to True for production
   - Tests connections before use to avoid stale connection errors

4. Database limits:
   - Check your database's max_connections setting
   - Leave headroom for other applications
   - Example: DB max=100, use pool_size=5 + max_overflow=10 per worker

5. Multiple workers:
   - If running 3 workers, each needs its own pool
   - Total connections = workers × (pool_size + max_overflow)
   - Example: 3 workers × 15 max = 45 total connections

RECOMMENDED SETTINGS FOR PARALLEL QUERIES:

# For 3 concurrent queries per worker:
engine = create_engine(
    connection_string,
    poolclass=QueuePool,
    pool_size=3,              # Match concurrency
    max_overflow=2,           # Allow bursts
    pool_pre_ping=True,       # Test connections
    pool_recycle=3600,        # Recycle connections every hour
    echo=False                # Set True for debugging
)
"""

print(pooling_guide)

## Summary: Comparison of Approaches

Let's compare the three approaches and help you choose the right one.

In [None]:
import pandas as pd

# Create comparison table
comparison_data = {
    "Approach": [
        "1. Manual Batching\n(.submit() + batching)",
        "2. Concurrency Limits\n(Prefect server config)",
        "3. asyncio Semaphore\n(Python native)"
    ],
    "Pros": [
        "• Simple to understand\n• No server config needed\n• Works immediately\n• Full control over batching",
        "• Clean code\n• Centralized control\n• Server enforces limits\n• Works across deployments",
        "• Most flexible\n• No dependencies\n• Great for async code\n• Fine-grained control"
    ],
    "Cons": [
        "• Manual batch logic\n• More verbose code\n• Harder to change limit",
        "• Requires server setup\n• Must configure limits first\n• Less obvious in code",
        "• Requires async/await\n• More complex syntax\n• Need to understand asyncio"
    ],
    "Best For": [
        "• Quick prototypes\n• Simple pipelines\n• Learning Prefect\n• No server access",
        "• Production systems\n• Multiple deployments\n• Team workflows\n• Centralized governance",
        "• Complex async flows\n• Maximum performance\n• I/O-bound operations\n• Advanced users"
    ],
    "Recommended?": [
        "✅ Start here",
        "🚀 Production",
        "⚡ Advanced"
    ]
}

df = pd.DataFrame(comparison_data)

print("=" * 100)
print("COMPARISON OF PARALLEL EXECUTION APPROACHES")
print("=" * 100)
print()
for idx, row in df.iterrows():
    print(f"{'='*100}")
    print(f"{row['Approach']} - {row['Recommended?']}")
    print(f"{'='*100}")
    print(f"\n✅ PROS:\n{row['Pros']}")
    print(f"\n❌ CONS:\n{row['Cons']}")
    print(f"\n🎯 BEST FOR:\n{row['Best For']}")
    print()

print("\n" + "=" * 100)
print("RECOMMENDATIONS:")
print("=" * 100)

recommendations = """
📝 For Your Use Case (SQL queries from config file):

1️⃣ START WITH APPROACH 1 (Manual Batching):
   - Easy to implement right now
   - No extra setup required
   - Perfect for getting started
   - Code example provided above

2️⃣ MOVE TO APPROACH 2 (Concurrency Limits) when:
   - You have Prefect server running
   - Multiple people on the team
   - Want centralized control
   - Running in production

3️⃣ CONSIDER APPROACH 3 (Semaphore) if:
   - You need maximum performance
   - Already using async/await
   - Have complex async operations
   - Comfortable with asyncio


🏆 RECOMMENDED IMPLEMENTATION FOR YOU:

from prefect import flow, task
from typing import List, Dict
import yaml

@task(name="execute_sql", retries=2)
def execute_sql(query_name: str, sql: str) -> dict:
    # Your database query logic here
    with db_connection_pool.connect() as conn:
        result = pd.read_sql(sql, conn)
        return {"name": query_name, "rows": len(result), "data": result}

@flow(name="parallel_etl")
def run_etl(config_file: str, batch_size: int = 3):
    # Load queries from config
    with open(config_file) as f:
        queries = yaml.safe_load(f)["queries"]
    
    all_results = []
    
    # Process in batches of 3
    for i in range(0, len(queries), batch_size):
        batch = queries[i:i + batch_size]
        
        # Submit all queries in this batch
        futures = [execute_sql.submit(q["name"], q["sql"]) for q in batch]
        
        # Wait for batch to complete
        batch_results = [f.result() for f in futures]
        all_results.extend(batch_results)
    
    return all_results

# Usage:
results = run_etl("queries.yaml", batch_size=3)
"""

print(recommendations)

print("\n" + "=" * 100)
print("PERFORMANCE COMPARISON")
print("=" * 100)

perf_example = """
Example: 10 queries, each takes 2 seconds

Sequential (old way):
  • Query 1: 2s
  • Query 2: 2s
  • ... (8 more)
  • Query 10: 2s
  TOTAL: 20 seconds ❌

Parallel with batch_size=3 (new way):
  • Batch 1 (queries 1-3): 2s (all run together)
  • Batch 2 (queries 4-6): 2s (all run together)
  • Batch 3 (queries 7-9): 2s (all run together)
  • Batch 4 (query 10): 2s
  TOTAL: 8 seconds ✅ (2.5x faster!)

Parallel with batch_size=5:
  • Batch 1 (queries 1-5): 2s
  • Batch 2 (queries 6-10): 2s
  TOTAL: 4 seconds ✅ (5x faster!)

⚠️  BUT: More concurrency = more database load
    Always check your database can handle it!
"""

print(perf_example)

## 🎯 Quick Start: Your Exact Use Case

Here's the complete code you can copy and adapt for your SQL config file scenario.

In [None]:
complete_solution = """
# ============================================================================
# COMPLETE SOLUTION: Parallel SQL Queries from Config File
# ============================================================================
# 
# This is ready-to-use code that you can adapt to your project.
# It runs 3 SQL queries at a time instead of 1.
#

from prefect import flow, task
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
import yaml
import pandas as pd
from typing import List, Dict, Any
from pathlib import Path


# ============================================================================
# STEP 1: Database Connection Pool Setup
# ============================================================================

def get_db_engine():
    \"\"\"
    Create database engine with connection pooling.
    Modify the connection string for your database.
    \"\"\"
    return create_engine(
        # Replace with your database connection string:
        "postgresql://user:password@host:5432/database",
        # Connection pool settings:
        poolclass=QueuePool,
        pool_size=3,           # Match your concurrency level
        max_overflow=2,        # Allow some burst capacity
        pool_pre_ping=True,    # Test connections before use
        pool_recycle=3600,     # Recycle connections hourly
        echo=False             # Set True for SQL logging
    )


# Global engine (created once, reused)
engine = get_db_engine()


# ============================================================================
# STEP 2: Task to Execute Individual Queries
# ============================================================================

@task(
    name="execute_single_query",
    retries=2,
    retry_delay_seconds=10,
    tags=["database", "sql"]
)
def execute_query(query_name: str, sql: str) -> Dict[str, Any]:
    \"\"\"
    Execute a single SQL query and return results.
    
    Args:
        query_name: Name/identifier for the query
        sql: SQL query string to execute
    
    Returns:
        Dictionary with query results and metadata
    \"\"\"
    try:
        # Get connection from pool
        with engine.connect() as conn:
            # Execute query and load into DataFrame
            df = pd.read_sql(text(sql), conn)
            
            result = {
                "name": query_name,
                "success": True,
                "row_count": len(df),
                "columns": list(df.columns),
                "data": df,
                "error": None
            }
            
            print(f"✅ {query_name}: Retrieved {len(df)} rows")
            return result
            
    except Exception as e:
        print(f"❌ {query_name} failed: {str(e)}")
        return {
            "name": query_name,
            "success": False,
            "row_count": 0,
            "columns": [],
            "data": None,
            "error": str(e)
        }


# ============================================================================
# STEP 3: Load Queries from Config File
# ============================================================================

@task(name="load_query_config")
def load_queries(config_path: str) -> List[Dict[str, str]]:
    \"\"\"
    Load SQL queries from YAML or JSON config file.
    
    Expected format (YAML):
    queries:
      - name: query1
        sql: SELECT * FROM table1
      - name: query2
        sql: SELECT * FROM table2
    
    Args:
        config_path: Path to config file
    
    Returns:
        List of query dictionaries
    \"\"\"
    config_file = Path(config_path)
    
    if config_file.suffix in ['.yaml', '.yml']:
        with open(config_file) as f:
            config = yaml.safe_load(f)
    elif config_file.suffix == '.json':
        import json
        with open(config_file) as f:
            config = json.load(f)
    else:
        raise ValueError(f"Unsupported file format: {config_file.suffix}")
    
    queries = config.get("queries", [])
    print(f"📄 Loaded {len(queries)} queries from {config_path}")
    
    return queries


# ============================================================================
# STEP 4: Main Flow - Run Queries in Parallel (3 at a time)
# ============================================================================

@flow(name="parallel_sql_pipeline")
def run_parallel_queries(
    config_path: str,
    batch_size: int = 3,
    output_dir: str = "output"
) -> Dict[str, Any]:
    \"\"\"
    Main flow that executes SQL queries in parallel.
    
    Args:
        config_path: Path to query configuration file
        batch_size: Number of queries to run concurrently (default: 3)
        output_dir: Directory to save results (optional)
    
    Returns:
        Summary of execution results
    \"\"\"
    # Load queries from config file
    queries = load_queries(config_path)
    
    if not queries:
        print("⚠️  No queries found in config file")
        return {"success": False, "results": []}
    
    all_results = []
    
    # Process queries in batches
    total_batches = (len(queries) + batch_size - 1) // batch_size
    
    for i in range(0, len(queries), batch_size):
        batch = queries[i:i + batch_size]
        batch_num = i // batch_size + 1
        
        print(f"\\n{'='*70}")
        print(f"📦 Batch {batch_num}/{total_batches}: Running {len(batch)} queries")
        print(f"{'='*70}")
        
        # Submit all queries in this batch (runs in parallel)
        futures = [
            execute_query.submit(q["name"], q["sql"])
            for q in batch
        ]
        
        # Wait for all queries in batch to complete
        batch_results = [future.result() for future in futures]
        all_results.extend(batch_results)
    
    # ========================================================================
    # Optional: Save results to files
    # ========================================================================
    if output_dir:
        output_path = Path(output_dir)
        output_path.mkdir(exist_ok=True)
        
        for result in all_results:
            if result["success"] and result["data"] is not None:
                filename = output_path / f"{result['name']}.parquet"
                result["data"].to_parquet(filename)
                print(f"💾 Saved {result['name']} to {filename}")
    
    # ========================================================================
    # Summary
    # ========================================================================
    successful = sum(1 for r in all_results if r["success"])
    failed = len(all_results) - successful
    total_rows = sum(r["row_count"] for r in all_results if r["success"])
    
    print(f"\\n{'='*70}")
    print("📊 PIPELINE SUMMARY")
    print(f"{'='*70}")
    print(f"✅ Successful queries: {successful}/{len(all_results)}")
    print(f"❌ Failed queries: {failed}")
    print(f"📈 Total rows retrieved: {total_rows:,}")
    print(f"🚀 Concurrency: {batch_size} queries at a time")
    print(f"{'='*70}")
    
    return {
        "success": failed == 0,
        "total_queries": len(all_results),
        "successful": successful,
        "failed": failed,
        "total_rows": total_rows,
        "results": all_results
    }


# ============================================================================
# USAGE EXAMPLE
# ============================================================================

if __name__ == "__main__":
    # Run the pipeline
    summary = run_parallel_queries(
        config_path="queries.yaml",
        batch_size=3,
        output_dir="query_results"
    )
    
    # Check results
    if summary["success"]:
        print(f"\\n🎉 All {summary['successful']} queries completed successfully!")
    else:
        print(f"\\n⚠️  {summary['failed']} queries failed. Check logs for details.")


# ============================================================================
# EXAMPLE CONFIG FILE (queries.yaml)
# ============================================================================
#
# queries:
#   - name: daily_sales
#     sql: |
#       SELECT 
#         date,
#         SUM(amount) as total_sales
#       FROM sales
#       WHERE date >= CURRENT_DATE - INTERVAL '7 days'
#       GROUP BY date
#   
#   - name: top_products
#     sql: |
#       SELECT 
#         product_id,
#         COUNT(*) as order_count
#       FROM orders
#       GROUP BY product_id
#       ORDER BY order_count DESC
#       LIMIT 100
#
# ============================================================================
"""

print(complete_solution)

print("\n" + "=" * 100)
print("🚀 TO USE THIS IN YOUR PROJECT:")
print("=" * 100)

usage_steps = """
1. Create queries.yaml with your SQL queries:
   
   queries:
     - name: my_query_1
       sql: SELECT * FROM table1
     - name: my_query_2
       sql: SELECT * FROM table2
     # ... add more queries

2. Update database connection string in get_db_engine():
   
   "postgresql://user:password@host:5432/database"
   # Or use environment variables:
   import os
   os.getenv("DATABASE_URL")

3. Run the flow:
   
   python your_script.py
   
   # Or deploy it:
   prefect deploy your_script.py:run_parallel_queries

4. Monitor in Prefect UI:
   
   http://localhost:4200
   You'll see each query as a separate task!

5. Adjust batch_size based on:
   
   • Your database capacity (check max_connections)
   • Query complexity (simple queries = higher batch_size)
   • Available resources (memory, CPU)
   
   Start with 3, then test 5, 10, etc.
"""

print(usage_steps)

print("\n" + "=" * 100)
print("✅ YOU'RE READY! Copy the code above and customize for your needs.")
print("=" * 100)

# Learning Prefect: Hello World Flow

This notebook will teach you the basics of Prefect by creating a simple "Hello World" flow.

**What you'll learn:**
1. How to create a Prefect `@task` - a unit of work
2. How to create a Prefect `@flow` - orchestrates tasks
3. How to run flows and see them in the Prefect UI

## Step 1: Import Prefect and configure the API

In [3]:
import os
from prefect import flow, task

# Configure to use your local Prefect server
os.environ["PREFECT_API_URL"] = "http://localhost:4200/api"

print("✅ Prefect imported and configured to use:", os.environ["PREFECT_API_URL"])

✅ Prefect imported and configured to use: http://localhost:4200/api


## Step 2: Create a simple task

A **task** is a Python function decorated with `@task`. Tasks are the building blocks of flows.

In [5]:
@task
def print_hello():
    """A simple task that prints Hello World"""
    message = "Hello World from Prefect!"
    print(message)
    return message

# Note: Tasks must be called from within a flow!
# If you want to test the function directly, use: print_hello.fn()
print("✅ Task 'print_hello' created successfully!")
print("Note: Tasks can only be called from within flows")

✅ Task 'print_hello' created successfully!
Note: Tasks can only be called from within flows


## Step 3: Create a flow that uses the task

A **flow** is the main orchestrator. It calls tasks and manages the workflow.

### First, let's verify connection to the Prefect server

In [7]:
import requests

# Test connection to Prefect server
try:
    response = requests.get("http://localhost:4200/api/health")
    if response.status_code == 200:
        print("✅ Successfully connected to Prefect server!")
        print(f"Server response: {response.json()}")
    else:
        print(f"❌ Server returned status code: {response.status_code}")
except Exception as e:
    print(f"❌ Could not connect to server: {e}")

✅ Successfully connected to Prefect server!
Server response: True


### Now let's run a simple flow!

For this to work with the external Prefect server, we'll write the flow to a Python file and run it.

In [8]:
# Let's use the existing prefect-main.py instead
# First, let's run it!
import subprocess

result = subprocess.run(
    ["uv", "run", "python", "prefect-main.py"],
    cwd="/Users/jichong/projects/playground/BOS/prefect-poc",
    capture_output=True,
    text=True,
    env={**os.environ, "PREFECT_API_URL": "http://localhost:4200/api"}
)

print("Flow Output:")
print(result.stdout)
if result.stderr:
    print("\nWarnings/Errors:")
    print(result.stderr)

Flow Output:
What is your favorite number?
Favorite number: 42
Customer IDs: ['customer30', 'customer71', 'customer27', 'customer56', 'customer72', 'customer95', 'customer47', 'customer75', 'customer71', 'customer28']


19:18:10.244 | [36mINFO[0m    | prefect.engine - Created flow run[35m 'organic-spoonbill'[0m for flow[1;35m 'my-favorite-function'[0m
19:18:10.245 | [36mINFO[0m    | Flow run[35m 'organic-spoonbill'[0m - View at [94mhttp://localhost:4200/flow-runs/flow-run/eddf86db-c258-473a-b62d-c85f973ed21b[0m
19:18:10.327 | [36mINFO[0m    | Flow run[35m 'organic-spoonbill'[0m - Created task run 'get_customer_ids-0' for task 'get_customer_ids'
19:18:10.328 | [36mINFO[0m    | Flow run[35m 'organic-spoonbill'[0m - Executing 'get_customer_ids-0' immediately...
19:18:10.400 | [36mINFO[0m    | Task run 'get_customer_ids-0' - Finished in state [32mCompleted[0m()
19:18:10.418 | [36mINFO[0m    | Flow run[35m 'organic-spoonbill'[0m - Finished in state [32mCompleted

### Check the Prefect UI!

🎉 **Success!** Your flow ran and was tracked by the Prefect server.

**What just happened:**
1. The flow `my-favorite-function` executed successfully
2. It created and executed the task `get_customer_ids`
3. All metadata was sent to your Prefect server at http://localhost:4200
4. You can see the run at the URL shown above (http://localhost:4200/flow-runs/flow-run/...)

### Query flow runs from the Prefect API

Now let's see how to programmatically query the flows and flow runs:

In [9]:
import asyncio
from prefect.client.orchestration import PrefectClient

async def get_recent_flow_runs():
    """Query recent flow runs from the Prefect server"""
    async with PrefectClient(api="http://localhost:4200/api") as client:
        # Get recent flow runs
        runs = await client.read_flow_runs(limit=5)
        
        print(f"Found {len(runs)} recent flow runs:\n")
        for run in runs:
            print(f"  • Flow: {run.name}")
            print(f"    State: {run.state.type if run.state else 'Unknown'}")
            print(f"    Start time: {run.start_time}")
            print(f"    ID: {run.id}")
            print()
        
        return runs

# Run the async function
runs = await get_recent_flow_runs()

Found 1 recent flow runs:

  • Flow: organic-spoonbill
    State: COMPLETED
    Start time: 2025-10-14T11:18:10.248871+00:00
    ID: eddf86db-c258-473a-b62d-c85f973ed21b



# Part 2: Deployments and Work Pools

Now let's learn about **Deployments** and **Work Pools** - how to let the Prefect server schedule and orchestrate your flows!

## Concepts:

- **Deployment**: Packages your flow for remote execution and scheduling
- **Work Pool**: A queue where the server sends flow runs to be executed
- **Worker**: An agent that pulls work from the pool and executes it

Think of it like a restaurant:
- **Deployment** = Menu item (your flow, ready to be ordered)
- **Work Pool** = Kitchen ticket queue
- **Worker** = Chef who takes tickets and cooks

## Step 1: Create a Work Pool

First, we need to create a work pool. A work pool is like a job queue where the server puts flow runs that need to be executed.

In [10]:
# Create a work pool using the Prefect CLI
import subprocess

# Create a process work pool (runs flows as local processes)
result = subprocess.run(
    ["uv", "run", "prefect", "work-pool", "create", "my-local-pool", "--type", "process"],
    capture_output=True,
    text=True,
    env={**os.environ, "PREFECT_API_URL": "http://localhost:4200/api"}
)

print("Work Pool Creation:")
print(result.stdout)
if result.stderr:
    print(result.stderr)

Work Pool Creation:
[32mCreated work pool 'my-local-pool'.[0m




## Step 2: Create a Deployment

Now let's deploy our flow. This tells the Prefect server about the flow and how to run it.

In [16]:
# For demonstration, let's show how to create a deployment via the command line
# This is the recommended approach for Prefect 2.13

deployment_guide = """
To create the deployment, run these commands in your terminal:

cd /Users/jichong/projects/playground/BOS/prefect-poc
export PREFECT_API_URL=http://localhost:4200/api

# Method 1: Quick deploy (interactive)
uv run prefect deploy deployed_flow.py:data_pipeline \\
    --name my-data-pipeline \\
    --pool my-local-pool

# Method 2: Or use Python code
"""

print("📋 Deployment Creation Guide")
print("=" * 70)
print(deployment_guide)

# For this demo, let's verify the work pool exists and is ready
async def check_work_pool():
    async with PrefectClient(api="http://localhost:4200/api") as client:
        try:
            pools = await client.read_work_pools()
            print(f"\n✅ Work pools available on server:")
            for pool in pools:
                print(f"   • {pool.name} (type: {pool.type})")
            return pools
        except Exception as e:
            print(f"❌ Error checking work pools: {e}")
            return []

pools = await check_work_pool()

print("\n💡 Next step: Start a worker (see next cell)")

📋 Deployment Creation Guide

To create the deployment, run these commands in your terminal:

cd /Users/jichong/projects/playground/BOS/prefect-poc
export PREFECT_API_URL=http://localhost:4200/api

# Method 1: Quick deploy (interactive)
uv run prefect deploy deployed_flow.py:data_pipeline \
    --name my-data-pipeline \
    --pool my-local-pool

# Method 2: Or use Python code


✅ Work pools available on server:
   • default-agent-pool (type: prefect-agent)
   • my-local-pool (type: process)

💡 Next step: Start a worker (see next cell)


## Step 3: Start a Worker

A worker is the "chef" that pulls jobs from the work pool and executes them. Without a worker, deployments won't run!

**Note:** We'll start the worker in the background so it can pick up flow runs.

In [17]:
# We'll show you the command to start a worker
# In practice, you'd run this in a separate terminal

worker_command = """
# Run this in a separate terminal:
cd /Users/jichong/projects/playground/BOS/prefect-poc
export PREFECT_API_URL=http://localhost:4200/api
uv run prefect worker start --pool my-local-pool
"""

print("🔧 To start a worker, run this command in a SEPARATE TERMINAL:")
print(worker_command)
print("\nThe worker will:")
print("  1. Connect to the Prefect server")
print("  2. Poll the 'my-local-pool' work pool for flow runs")
print("  3. Execute any flows that are scheduled")
print("  4. Report results back to the server")
print("\n⚠️  Leave the worker running to process flows!")

🔧 To start a worker, run this command in a SEPARATE TERMINAL:

# Run this in a separate terminal:
cd /Users/jichong/projects/playground/BOS/prefect-poc
export PREFECT_API_URL=http://localhost:4200/api
uv run prefect worker start --pool my-local-pool


The worker will:
  1. Connect to the Prefect server
  2. Poll the 'my-local-pool' work pool for flow runs
  3. Execute any flows that are scheduled
  4. Report results back to the server

⚠️  Leave the worker running to process flows!


## Step 4: Trigger a Flow Run from the Server

Now that we have a deployment, we can trigger it through the server API (not by running the Python file directly!).

In [None]:
# Trigger a flow run via the API
# This creates a flow run that gets queued in the work pool

result = subprocess.run([
    "uv", "run", "prefect", "deployment", "run",
    "data-pipeline/my-data-pipeline",  # flow-name/deployment-name
    "--param", "source=production-api",
    "--param", "destination=warehouse"
],
    cwd="/Users/jichong/projects/playground/BOS/prefect-poc",
    capture_output=True,
    text=True,
    env={**os.environ, "PREFECT_API_URL": "http://localhost:4200/api"}
)

print("Flow Run Triggered!")
print(result.stdout)
if result.stderr:
    print("\nMessages:")
    print(result.stderr)
    
print("\n" + "="*60)
print("What happens now:")
print("="*60)
print("1. ✅ Server creates a flow run and puts it in the work pool queue")
print("2. ⏳ Worker (if running) picks up the flow run")
print("3. 🚀 Worker executes the flow on its machine")
print("4. 📊 Worker reports progress back to the server")
print("5. 🎉 You can watch it in the UI at http://localhost:4200")

## Summary: How Deployments Work

### The Full Architecture:

```
┌─────────────────────────────────────────────────────┐
│  YOU (Developer)                                    │
│                                                     │
│  1. Create flow (deployed_flow.py)                 │
│  2. Deploy it: prefect deploy ...                  │
│     → Tells server about the flow                  │
│     → Server stores deployment metadata            │
└─────────────────────────────────────────────────────┘
                    │
                    │ deployment info
                    ▼
┌─────────────────────────────────────────────────────┐
│  PREFECT SERVER (Docker at :4200)                  │
│                                                     │
│  • Stores deployment definitions                   │
│  • Maintains work pool queues                      │
│  • Accepts flow run requests                       │
│  • Tracks all execution metadata                   │
└─────────────────────────────────────────────────────┘
                    │
                    │ flow runs queued here
                    ▼
┌─────────────────────────────────────────────────────┐
│  WORK POOL: my-local-pool                          │
│                                                     │
│  Queue of pending flow runs waiting to execute     │
└─────────────────────────────────────────────────────┘
                    │
                    │ worker polls for work
                    ▼
┌─────────────────────────────────────────────────────┐
│  WORKER (prefect worker start --pool ...)          │
│                                                     │
│  • Polls work pool every few seconds               │
│  • Picks up flow runs from queue                   │
│  • Executes the flow on its machine                │
│  • Sends progress/logs back to server              │
└─────────────────────────────────────────────────────┘
```

### Key Differences from Before:

| Aspect | Direct Execution | With Deployment |
|--------|-----------------|-----------------|
| **How to run** | `python flow.py` | Server API or schedule |
| **Who triggers** | You manually | Server (on schedule or API call) |
| **Where code runs** | Your terminal | Worker machine |
| **Code location** | Must be local | Must be accessible to worker |
| **Scheduling** | Manual only | Can schedule (cron, interval, etc.) |
| **Production ready** | No | Yes |

### Use Cases:

- **Direct execution** (what we did first): Development, testing, one-off runs
- **Deployments + Workers**: Production, scheduled jobs, distributed execution, team workflows

## 🎓 Try it Yourself!

Here's the complete workflow to see deployments in action:

### 1️⃣ Create the deployment (in a terminal):
```bash
cd /Users/jichong/projects/playground/BOS/prefect-poc
export PREFECT_API_URL=http://localhost:4200/api
uv run prefect deploy deployed_flow.py:data_pipeline --name my-data-pipeline --pool my-local-pool
```

### 2️⃣ Start a worker (in another terminal):
```bash
cd /Users/jichong/projects/playground/BOS/prefect-poc
export PREFECT_API_URL=http://localhost:4200/api
uv run prefect worker start --pool my-local-pool
```

### 3️⃣ Trigger a flow run (in a third terminal or from the UI):
```bash
# Via CLI:
uv run prefect deployment run data-pipeline/my-data-pipeline

# Or open http://localhost:4200/deployments and click "Run"
```

### 4️⃣ Watch it execute!
- The worker terminal will show the flow executing
- The UI at http://localhost:4200 will show real-time progress
- All logs and results are tracked by the server

---

### What you've learned:

✅ **Direct execution** (`python flow.py`):
- Code runs on your machine
- You trigger it manually
- Good for development/testing

✅ **Deployments + Workers**:
- Server orchestrates the execution
- Workers pull jobs from work pools
- Can schedule flows, handle retries, distribute work
- Production-ready approach

This is how Prefect powers production data pipelines at scale! 🚀

# Part 3: Blocks and Remote Storage

## What are Blocks?

**Blocks** are Prefect's way of storing reusable configuration and credentials. Think of them as secure configuration objects that can be:
- Created once and reused across many flows
- Stored on the Prefect server
- Referenced by name in deployments
- Used to connect to external systems (databases, cloud storage, git repos, etc.)

### Common Block Types:
- **Git Repository Blocks**: Connect to GitHub, GitLab, Bitbucket
- **Storage Blocks**: S3, GCS, Azure Blob Storage
- **Secret Blocks**: Store passwords, API keys
- **Infrastructure Blocks**: Docker, Kubernetes, cloud compute

### Why Blocks Matter for Private Bitbucket:
When your code is in a private Git repository:
1. **Worker needs access** to pull the code before running it
2. **Credentials must be secure** (not hardcoded in code)
3. **Blocks solve both problems** by storing Git credentials on the server

## Step 1: Create a Bitbucket Repository Block

Let's create a block that stores credentials for your private Bitbucket server.

In [None]:
# Create a Bitbucket repository block programmatically

from prefect.blocks.system import Secret
from prefect_bitbucket.credentials import BitBucketCredentials
from prefect_bitbucket.repository import BitBucketRepository

async def create_bitbucket_block():
    """
    Create a Bitbucket repository block for private repo access.
    This block will be stored on the Prefect server.
    """
    
    # Step 1: Create a secret block for the password/token
    # (In production, you'd set this more securely)
    try:
        bitbucket_token = Secret(value="your-app-password-or-token")
        await bitbucket_token.save(name="bitbucket-token", overwrite=True)
        print("✅ Created secret block: bitbucket-token")
    except Exception as e:
        print(f"Note: {e}")
    
    # Step 2: Create credentials block
    try:
        credentials = BitBucketCredentials(
            username="your-username",
            token=bitbucket_token  # Reference to the secret block
        )
        await credentials.save(name="my-bitbucket-creds", overwrite=True)
        print("✅ Created credentials block: my-bitbucket-creds")
    except Exception as e:
        print(f"⚠️  BitBucket block type not installed: {e}")
        print("   You would need to: pip install prefect-bitbucket")
    
    # Step 3: Create repository block
    try:
        repo_block = BitBucketRepository(
            repository="https://bitbucket.mycompany.com/scm/project/repo.git",
            credentials=credentials,
            reference="main"  # branch name
        )
        await repo_block.save(name="my-private-repo", overwrite=True)
        print("✅ Created repository block: my-private-repo")
    except Exception as e:
        print(f"⚠️  Could not create repo block: {e}")
    
    print("\n📝 Block created and stored on Prefect server!")
    print("   Workers can now use this block to pull code from private Bitbucket")

# Note: This will fail without prefect-bitbucket installed
# Showing the pattern for demonstration
print("Example: Creating Bitbucket Block")
print("=" * 60)
print("\n⚠️  Note: This requires 'prefect-bitbucket' to be installed:")
print("   pip install prefect-bitbucket")
print("\nFor demonstration purposes, let's see what blocks we currently have:")

In [18]:
# List all available blocks on the server
async def list_blocks():
    """Query all blocks stored on the Prefect server"""
    async with PrefectClient(api="http://localhost:4200/api") as client:
        # Get block types
        block_types = await client.read_block_types()
        
        print(f"📦 Available Block Types on Server:")
        print(f"   Found {len(block_types)} block types\n")
        
        for bt in block_types[:10]:  # Show first 10
            print(f"   • {bt.name}")
            if bt.description:
                print(f"     {bt.description[:80]}...")
        
        return block_types

blocks = await list_blocks()
print(f"\n💡 These blocks can be used in deployments to store credentials and config")

📦 Available Block Types on Server:
   Found 37 block types

   • AWS Credentials
     Block used to manage authentication with AWS. AWS authentication is
handled via ...
   • Azure
     Store data as a file on Azure Datalake and Azure Blob Storage....
   • Azure Blob Storage Credentials
     Block used to manage Blob Storage authentication with Azure.
Azure authenticatio...
   • Azure Container Instance Credentials
     Block used to manage Azure Container Instances authentication. Stores Azure Serv...
   • Azure Container Instance Job
     Run tasks using Azure Container Instances. Note this block is experimental. The ...
   • Azure Cosmos DB Credentials
     Block used to manage Cosmos DB authentication with Azure.
Azure authentication i...
   • AzureML Credentials
     Block used to manage authentication with AzureML. Azure authentication is
handle...
   • BigQuery Warehouse
     A block for querying a database with BigQuery.

Upon instantiating, a connection...
   • Custom Webhook


## Step 2: How Deployments Work with Private Bitbucket

Here's the complete architecture when using a private Bitbucket server:

### The Flow:

```
┌─────────────────────────────────────────────────────────┐
│  BITBUCKET SERVER (Private)                             │
│  https://bitbucket.mycompany.com                        │
│                                                         │
│  📁 Your Repository:                                    │
│    • flow.py (your flow code)                          │
│    • requirements.txt                                   │
│    • prefect.yaml (deployment config)                  │
└─────────────────────────────────────────────────────────┘
                    ▲
                    │ 1. Git clone with credentials
                    │
┌─────────────────────────────────────────────────────────┐
│  PREFECT SERVER (Your deployment)                       │
│  Could be: Docker, cloud, or on-premises                │
│                                                         │
│  Stores:                                                │
│    • 🔐 Bitbucket credentials (Block)                  │
│    • 📋 Deployment definitions                         │
│    • 🎯 Work pool configurations                       │
│    • 📊 Flow run metadata                              │
└─────────────────────────────────────────────────────────┘
                    │
                    │ 2. Worker polls for work
                    ▼
┌─────────────────────────────────────────────────────────┐
│  WORKER (Running on your infrastructure)                │
│                                                         │
│  When a flow run is scheduled:                          │
│    1. Worker gets job from work pool                   │
│    2. Worker loads Bitbucket credentials from block     │
│    3. Worker clones code from private repo              │
│    4. Worker creates Python environment                 │
│    5. Worker installs dependencies                      │
│    6. Worker executes the flow                          │
│    7. Worker sends results back to server               │
└─────────────────────────────────────────────────────────┘
```

### Key Points:

1. **Prefect server doesn't execute code** - It only stores metadata
2. **Worker pulls code** - Worker machine needs network access to Bitbucket
3. **Credentials stored securely** - Blocks keep secrets on the server
4. **Code is ephemeral** - Worker clones fresh code for each run

## Step 3: Creating a Deployment with Bitbucket Storage

Here's how to deploy a flow that lives in a private Bitbucket repository:

In [None]:
# Example deployment configuration with Bitbucket storage

deployment_example = """
# prefect.yaml - Deployment configuration file in your Bitbucket repo

name: production-deployment
prefect-version: 2.13.7

# Define where the code comes from
pull:
  - prefect.deployments.steps.git_clone:
      repository: https://bitbucket.mycompany.com/scm/project/myflows.git
      branch: main
      credentials: "{{ prefect.blocks.bitbucket-credentials.my-bitbucket-creds }}"

# Define deployments
deployments:
  - name: prod-data-pipeline
    entrypoint: flows/data_pipeline.py:data_pipeline
    work_pool:
      name: prod-worker-pool
    schedule:
      cron: "0 2 * * *"  # Run at 2 AM daily
    parameters:
      env: production
"""

print("📄 Example prefect.yaml with Bitbucket Storage")
print("=" * 70)
print(deployment_example)

print("\n" + "=" * 70)
print("How to deploy from your Bitbucket repository:")
print("=" * 70)

deployment_steps = """
1️⃣ Push your code to Bitbucket:
   • flows/data_pipeline.py (your flow code)
   • prefect.yaml (deployment config - shown above)
   • requirements.txt (Python dependencies)

2️⃣ Create the Bitbucket credentials block (on Prefect server):
   uv run prefect block register -m prefect_bitbucket
   # Then create the block via UI or Python as shown earlier

3️⃣ Deploy from your local machine (one time):
   cd /path/to/your/repo
   export PREFECT_API_URL=http://your-prefect-server:4200/api
   uv run prefect deploy --all

   This tells the server about your deployment and where to find the code.

4️⃣ Start workers (on your infrastructure):
   # These can run anywhere with access to Bitbucket
   export PREFECT_API_URL=http://your-prefect-server:4200/api
   uv run prefect worker start --pool prod-worker-pool

5️⃣ When a flow run is triggered:
   • Worker gets the job from the server
   • Worker uses the Bitbucket block to clone the repo
   • Worker runs the flow from the cloned code
   • All logs go back to the server
"""

print(deployment_steps)

## Step 4: Answering Your Questions

### Q: "If I deploy my own Prefect server, how will it connect to the private Bitbucket server?"

**Answer: The Prefect server DOESN'T connect to Bitbucket. The WORKER does!**

Here's the breakdown:

#### What the Prefect Server Does:
- ✅ Stores the Bitbucket credentials (in a Block)
- ✅ Stores deployment metadata (which repo, which branch)
- ✅ Manages the work queue
- ❌ **Does NOT** clone code
- ❌ **Does NOT** execute flows
- ❌ **Does NOT** need network access to Bitbucket

#### What the Worker Does:
- ✅ Runs on your infrastructure (can be same server, different server, laptop, cloud VM)
- ✅ **NEEDS** network access to your private Bitbucket server
- ✅ Pulls credentials from the Prefect server (via API)
- ✅ Uses credentials to clone code from Bitbucket
- ✅ Executes the flow
- ✅ Sends results back to Prefect server

### Network Requirements:

```
┌──────────────────────────────────────────────────┐
│  Your Network / VPN                              │
│                                                  │
│  ┌────────────────┐     ┌──────────────────┐   │
│  │ Prefect Server │     │ Bitbucket Server │   │
│  │ :4200          │     │ (Private)        │   │
│  └────────────────┘     └──────────────────┘   │
│         ▲                        ▲              │
│         │                        │              │
│         │  API calls             │ Git clone    │
│         │  (get credentials,     │ (needs auth) │
│         │   report status)       │              │
│         │                        │              │
│  ┌──────┴────────────────────────┴──────┐      │
│  │           WORKER                      │      │
│  │  • Needs access to BOTH servers       │      │
│  │  • Can be same or different machine   │      │
│  └───────────────────────────────────────┘      │
└──────────────────────────────────────────────────┘
```

### Security Best Practices:

1. **Worker Placement**: Deploy workers in the same network zone as Bitbucket
2. **Credentials**: Use SSH keys or app passwords, not your personal password
3. **Least Privilege**: Give the Bitbucket credentials read-only access
4. **Network Policies**: Workers need HTTPS/SSH to Bitbucket, HTTPS to Prefect server
5. **Secrets**: Never commit credentials to git - use Blocks!

## Real-World Example: Production Setup

Let's walk through a complete production setup:

### Scenario:
- Company has private Bitbucket server at `bitbucket.company.com`
- Data engineering team writes flows in `data-team/etl-flows` repo
- Prefect server deployed on `prefect.company.com`
- Workers run on Kubernetes in the same VPC

### Setup Steps:

#### 1. Install Prefect Bitbucket Integration

In [19]:
# Complete production example walkthrough

production_guide = """
================================================================================
PRODUCTION SETUP: Prefect + Private Bitbucket
================================================================================

STEP 1: Install Prefect Bitbucket Integration
----------------------------------------------
On your development machine and in worker Docker images:

    pip install prefect-bitbucket

This provides:
  • BitBucketCredentials block
  • BitBucketRepository block
  • Git clone steps for deployments


STEP 2: Create Bitbucket App Password
--------------------------------------
On bitbucket.company.com:
  1. Go to Personal Settings → App Passwords
  2. Create new app password with 'Repository Read' permission
  3. Save the password securely (you'll need it once)


STEP 3: Register Block Types & Create Blocks
---------------------------------------------
# Register the block types with your Prefect server
export PREFECT_API_URL=https://prefect.company.com/api
prefect block register -m prefect_bitbucket

# Create credentials block (via Python or UI)
from prefect_bitbucket.credentials import BitBucketCredentials
from prefect.blocks.system import Secret

async def setup_blocks():
    # Create secret for the app password
    token = Secret(value="your-app-password-here")
    await token.save("bitbucket-token")
    
    # Create credentials
    creds = BitBucketCredentials(
        username="serviceaccount@company.com",
        token=token
    )
    await creds.save("company-bitbucket-creds")

# Or use the Prefect UI:
# Navigate to Blocks → + → BitBucketCredentials


STEP 4: Structure Your Bitbucket Repository
--------------------------------------------
data-team/etl-flows/
├── flows/
│   ├── daily_etl.py
│   ├── weekly_report.py
│   └── __init__.py
├── requirements.txt
└── prefect.yaml

# requirements.txt
prefect==2.13.7
pandas
sqlalchemy
psycopg2-binary

# prefect.yaml
name: etl-flows
prefect-version: 2.13.7

pull:
  - prefect.deployments.steps.git_clone:
      repository: https://bitbucket.company.com/scm/data-team/etl-flows.git
      branch: "{{ branch }}"
      credentials: "{{ prefect.blocks.bitbucket-credentials.company-bitbucket-creds }}"

deployments:
  - name: daily-etl-prod
    entrypoint: flows/daily_etl.py:main_flow
    work_pool:
      name: k8s-prod-pool
    schedule:
      cron: "0 1 * * *"
    parameters:
      environment: production


STEP 5: Deploy from Your Machine (One Time)
--------------------------------------------
cd /path/to/etl-flows
export PREFECT_API_URL=https://prefect.company.com/api
prefect deploy --all

This registers the deployment with the server. The server now knows:
  • Where to find the code (Bitbucket URL)
  • Which credentials to use (the block name)
  • Which branch to use (main)
  • Schedule and parameters


STEP 6: Run Workers (Continuously)
-----------------------------------
Workers run on your infrastructure (K8s, VMs, etc)

# Kubernetes deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: prefect-worker
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: worker
        image: prefecthq/prefect:2.13.7-python3.10
        command: ["prefect", "worker", "start"]
        args: ["--pool", "k8s-prod-pool"]
        env:
        - name: PREFECT_API_URL
          value: "https://prefect.company.com/api"

Workers will:
  1. Poll the work pool for jobs
  2. When a job arrives, load credentials from the server
  3. Clone code from Bitbucket using those credentials
  4. Execute the flow
  5. Report results back


STEP 7: Trigger & Monitor
--------------------------
Flows run automatically on schedule, or trigger manually:

  prefect deployment run daily-etl-prod/daily-etl-prod

Monitor at: https://prefect.company.com/deployments


SECURITY NOTES:
--------------
✅ Credentials never in code - stored in Blocks on server
✅ Workers pull fresh code each run - no stale code
✅ Use service account with read-only Bitbucket access
✅ Workers need network access to both Prefect server AND Bitbucket
✅ Prefect server only needs to be accessible to workers (not Bitbucket)
"""

print(production_guide)


PRODUCTION SETUP: Prefect + Private Bitbucket

STEP 1: Install Prefect Bitbucket Integration
----------------------------------------------
On your development machine and in worker Docker images:

    pip install prefect-bitbucket

This provides:
  • BitBucketCredentials block
  • BitBucketRepository block
  • Git clone steps for deployments


STEP 2: Create Bitbucket App Password
--------------------------------------
On bitbucket.company.com:
  1. Go to Personal Settings → App Passwords
  2. Create new app password with 'Repository Read' permission
  3. Save the password securely (you'll need it once)


STEP 3: Register Block Types & Create Blocks
---------------------------------------------
# Register the block types with your Prefect server
export PREFECT_API_URL=https://prefect.company.com/api
prefect block register -m prefect_bitbucket

# Create credentials block (via Python or UI)
from prefect_bitbucket.credentials import BitBucketCredentials
from prefect.blocks.system import

## Summary: Key Takeaways

### 🔑 Blocks Explained:
- **What**: Reusable configuration objects stored on Prefect server
- **Why**: Securely store credentials, avoid hardcoding secrets
- **How**: Create once, reference by name in deployments
- **Types**: Git repos, databases, cloud storage, secrets, infrastructure

### 🔒 Private Bitbucket + Prefect Architecture:

| Component | Role | Needs Access To |
|-----------|------|-----------------|
| **Bitbucket Server** | Stores your flow code | Nothing (it's the source) |
| **Prefect Server** | Stores metadata, credentials, schedules | Nothing (workers connect to it) |
| **Worker** | Executes flows | **Both** Bitbucket AND Prefect Server |

### 📊 The Complete Flow:

1. **Developer** pushes code to private Bitbucket
2. **Developer** creates Bitbucket credentials Block on Prefect server
3. **Developer** runs `prefect deploy` to register deployment
4. **Prefect Server** stores deployment metadata (not the code!)
5. **Worker** polls Prefect server for work
6. **Worker** gets credentials from Prefect server (via Block)
7. **Worker** clones code from Bitbucket using those credentials
8. **Worker** executes the flow
9. **Worker** sends results back to Prefect server
10. **You** view results in Prefect UI

### ✅ What You've Learned:

1. **Blocks** are Prefect's way to store reusable config and credentials
2. **Prefect server** never executes code or clones repos
3. **Workers** do all the heavy lifting (clone + execute)
4. **Network access**: Workers need access to both Prefect server and Bitbucket
5. **Security**: Credentials in Blocks, never in code
6. **Deployment**: Code stays in git, workers pull it fresh each time

### 🚀 Next Steps:

1. Install `prefect-bitbucket`: `pip install prefect-bitbucket`
2. Create a service account in Bitbucket with read-only access
3. Create Bitbucket credentials Block on your Prefect server
4. Add `prefect.yaml` to your repository
5. Deploy: `prefect deploy --all`
6. Start workers in your infrastructure
7. Watch flows run from your private Bitbucket repo!

---

**Remember**: The beauty of this architecture is that your Prefect server can be anywhere (cloud, on-prem, DMZ), and your workers just need network access to both the server and Bitbucket. The server itself never needs to reach Bitbucket! 🎯