In [5]:
import asyncio
import concurrent.futures
import time
import requests



# Using asyncio and concurrent.futures Together

## Overview:

- **asyncio**: For async/await patterns, I/O-bound async operations
- **concurrent.futures**: For running blocking code in thread/process pools
- **Together**: Run blocking code in executor pools from async functions

In [2]:
# ============================================================================
# Example 1: Running Blocking I/O in ThreadPoolExecutor from Async Code
# ============================================================================

# Blocking function (simulates I/O-bound operation)
def blocking_io_task(url_id):
    """Simulate blocking I/O operation"""
    print(f"Starting I/O task {url_id}")
    time.sleep(1)  # Simulate network request
    print(f"Completed I/O task {url_id}")
    return f"Result from task {url_id}"

# Async function that uses ThreadPoolExecutor
async def run_io_tasks_async():
    """Run blocking I/O tasks concurrently using thread pool"""
    print("=" * 70)
    print("Example 1: Blocking I/O in ThreadPoolExecutor from Async")
    print("=" * 70)
    
    # Create thread pool executor
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        # Run blocking tasks in executor
        loop = asyncio.get_event_loop()
        tasks = [
            loop.run_in_executor(executor, blocking_io_task, i)
            for i in range(5)
        ]
        # Wait for all tasks to complete
        results = await asyncio.gather(*tasks)
    
    print(f"\nAll results: {results}")
    return results

# Run the async function
results = await run_io_tasks_async()

Example 1: Blocking I/O in ThreadPoolExecutor from Async
Starting I/O task 0
Starting I/O task 1
Starting I/O task 2
Completed I/O task 0
Starting I/O task 3
Completed I/O task 1
Starting I/O task 4
Completed I/O task 2
Completed I/O task 3
Completed I/O task 4

All results: ['Result from task 0', 'Result from task 1', 'Result from task 2', 'Result from task 3', 'Result from task 4']


In [3]:
# ============================================================================
# Example 2: CPU-Bound Tasks with ProcessPoolExecutor
# ============================================================================

# CPU-bound blocking function
def cpu_bound_task(n):
    """Compute-intensive task"""
    print(f"Starting CPU task {n}")
    result = sum(i * i for i in range(n))
    print(f"Completed CPU task {n}")
    return result

# Async function using ProcessPoolExecutor
async def run_cpu_tasks_async():
    """Run CPU-bound tasks in process pool"""
    print("=" * 70)
    print("Example 2: CPU-Bound Tasks with ProcessPoolExecutor")
    print("=" * 70)
    
    # Create process pool executor (better for CPU-bound tasks)
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        loop = asyncio.get_event_loop()
        tasks = [
            loop.run_in_executor(executor, cpu_bound_task, 1000000)
            for _ in range(4)
        ]
        results = await asyncio.gather(*tasks)
    
    print(f"\nResults: {results}")
    return results

# Run the async function
results = await run_cpu_tasks_async()

Example 2: CPU-Bound Tasks with ProcessPoolExecutor
Starting CPU task 1000000Starting CPU task 1000000

Completed CPU task 1000000Completed CPU task 1000000

Starting CPU task 1000000Starting CPU task 1000000

Completed CPU task 1000000Completed CPU task 1000000


Results: [333332833333500000, 333332833333500000, 333332833333500000, 333332833333500000]


In [None]:
# ============================================================================
# Example 3: Mixing Async and Blocking Code
# ============================================================================

# Blocking function
def blocking_function(x):
    """Some blocking operation"""
    time.sleep(0.5)
    return x * 2

# Async function
async def async_function(x):
    """Some async operation"""
    await asyncio.sleep(0.5)
    return x + 1

# Main async function that mixes both
async def mixed_operations():
    """Mix async operations with blocking operations"""
    print("=" * 70)
    print("Example 3: Mixing Async and Blocking Code")
    print("=" * 70)
    
    # Pure async operations
    async_results = await asyncio.gather(
        async_function(1),
        async_function(2),
        async_function(3)
    )
    print(f"Async results: {async_results}")
    
    # Blocking operations in thread pool
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        blocking_results = await asyncio.gather(
            loop.run_in_executor(executor, blocking_function, 10),
            loop.run_in_executor(executor, blocking_function, 20),
            loop.run_in_executor(executor, blocking_function, 30)
        )
    print(f"Blocking results: {blocking_results}")
    
    return async_results, blocking_results

# Run
# results = await mixed_operations()

In [None]:
# ============================================================================
# Example 4: Real-World: Fetching URLs with Requests (Blocking) in Async
# ============================================================================

def fetch_url(url):
    """Blocking HTTP request"""
    try:
        response = requests.get(url, timeout=5)
        return {"url": url, "status": response.status_code, "length": len(response.text)}
    except Exception as e:
        return {"url": url, "error": str(e)}

async def fetch_urls_async(urls):
    """Fetch multiple URLs concurrently using thread pool"""
    print("=" * 70)
    print("Example 4: Fetching URLs Concurrently")
    print("=" * 70)
    
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # Submit all tasks
        tasks = [
            loop.run_in_executor(executor, fetch_url, url)
            for url in urls
        ]
        # Wait for all to complete
        results = await asyncio.gather(*tasks)
    
    for result in results:
        print(f"  {result}")
    
    return results

# Example URLs (using httpbin for testing)
urls = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/1",
]

# Run (uncomment to test - requires internet)
# results = await fetch_urls_async(urls)

In [None]:
# ============================================================================
# Example 5: Using run_in_executor with Different Executors
# ============================================================================

def heavy_computation(n):
    """CPU-intensive computation"""
    return sum(i ** 2 for i in range(n))

async def compare_executors():
    """Compare ThreadPoolExecutor vs ProcessPoolExecutor"""
    print("=" * 70)
    print("Example 5: Comparing Executors")
    print("=" * 70)
    
    loop = asyncio.get_event_loop()
    n = 100000
    
    # Method 1: ThreadPoolExecutor (GIL limited for CPU-bound)
    print("\n1. Using ThreadPoolExecutor:")
    start = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        tasks = [
            loop.run_in_executor(executor, heavy_computation, n)
            for _ in range(4)
        ]
        results_thread = await asyncio.gather(*tasks)
    thread_time = time.time() - start
    print(f"   Time: {thread_time:.2f}s")
    print(f"   Results: {results_thread[:2]}...")
    
    # Method 2: ProcessPoolExecutor (better for CPU-bound)
    print("\n2. Using ProcessPoolExecutor:")
    start = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        tasks = [
            loop.run_in_executor(executor, heavy_computation, n)
            for _ in range(4)
        ]
        results_process = await asyncio.gather(*tasks)
    process_time = time.time() - start
    print(f"   Time: {process_time:.2f}s")
    print(f"   Results: {results_process[:2]}...")
    
    print(f"\nProcessPoolExecutor is {'faster' if process_time < thread_time else 'slower'} for CPU-bound tasks")

# Run
# await compare_executors()

In [None]:
# ============================================================================
# Example 6: Error Handling with Executors
# ============================================================================

def task_with_error(n):
    """Task that may raise an error"""
    if n == 2:
        raise ValueError(f"Error in task {n}")
    time.sleep(0.1)
    return n * 2

async def handle_errors():
    """Handle errors from executor tasks"""
    print("=" * 70)
    print("Example 6: Error Handling")
    print("=" * 70)
    
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor() as executor:
        tasks = [
            loop.run_in_executor(executor, task_with_error, i)
            for i in range(5)
        ]
        
        # Handle errors individually
        results = []
        for task in asyncio.as_completed(tasks):
            try:
                result = await task
                results.append(f"Success: {result}")
            except Exception as e:
                results.append(f"Error: {e}")
    
    for result in results:
        print(f"  {result}")

# Run
# await handle_errors()

In [None]:
# ============================================================================
# Example 7: Complete Example - Async Main with Blocking Operations
# ============================================================================

def blocking_database_query(query_id):
    """Simulate blocking database query"""
    print(f"  Executing query {query_id}...")
    time.sleep(0.5)  # Simulate DB query time
    return f"Query {query_id} result"

async def async_api_call(api_id):
    """Simulate async API call"""
    print(f"  Calling API {api_id}...")
    await asyncio.sleep(0.3)  # Simulate network delay
    return f"API {api_id} response"

async def main_application():
    """Main application mixing async and blocking operations"""
    print("=" * 70)
    print("Example 7: Complete Application Example")
    print("=" * 70)
    
    print("\n1. Running async operations:")
    async_results = await asyncio.gather(
        async_api_call(1),
        async_api_call(2),
        async_api_call(3)
    )
    print(f"   Async results: {async_results}")
    
    print("\n2. Running blocking operations in thread pool:")
    loop = asyncio.get_event_loop()
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        blocking_results = await asyncio.gather(
            loop.run_in_executor(executor, blocking_database_query, 1),
            loop.run_in_executor(executor, blocking_database_query, 2),
            loop.run_in_executor(executor, blocking_database_query, 3)
        )
    print(f"   Blocking results: {blocking_results}")
    
    print("\n3. Combining results:")
    combined = list(zip(async_results, blocking_results))
    for async_res, blocking_res in combined:
        print(f"   {async_res} + {blocking_res}")
    
    return combined

# Run the main application
# results = await main_application()

## Summary: When to Use Each

### asyncio:
- ✅ I/O-bound operations (network, file I/O)
- ✅ Many concurrent operations
- ✅ Async/await patterns
- ❌ CPU-bound tasks (blocked by GIL)

### concurrent.futures.ThreadPoolExecutor:
- ✅ I/O-bound blocking code from async context
- ✅ Running blocking libraries (requests, etc.) from async code
- ❌ CPU-bound tasks (GIL limitation)

### concurrent.futures.ProcessPoolExecutor:
- ✅ CPU-bound tasks
- ✅ Parallel computation
- ❌ I/O-bound tasks (overhead)

### Using Together:
```python
# Pattern: Run blocking code in executor from async function
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
    result = await loop.run_in_executor(executor, blocking_function, args)
```

In [None]:
# ============================================================================
# Quick Test: Run All Examples
# ============================================================================

# Uncomment to run all examples
# Note: Some examples require internet connection

async def run_all_examples():
    """Run all examples sequentially"""
    print("Running all examples...\n")
    
    # Example 1
    await run_io_tasks_async()
    print("\n")
    
    # Example 2 (CPU-bound, may take a moment)
    # await run_cpu_tasks_async()
    # print("\n")
    
    # Example 3
    await mixed_operations()
    print("\n")
    
    # Example 5
    # await compare_executors()
    # print("\n")
    
    # Example 6
    await handle_errors()
    print("\n")
    
    # Example 7
    await main_application()

# Run all examples
# await run_all_examples()

In [15]:
def blocking_task(n):
    print(f"开始阻塞任务 {n} ...")
    time.sleep(1)           # 模拟 IO 或 CPU 阻塞 2 秒
    print(f"阻塞任务 {n} 完成")
    return f"结果-{n}"

async def main():
    # loop = asyncio.get_running_loop()
    loop = asyncio.get_event_loop()

    # None → 使用默认的 ThreadPoolExecutor
    # thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=4)
    futures = [loop.run_in_executor(None, blocking_task, i)
    for i in range(10)]
    results = await asyncio.gather(*futures)
    print(results)

# In Jupyter notebooks, use await directly (don't use asyncio.run())
# Jupyter already has a running event loop
await main()

开始阻塞任务 0 ...开始阻塞任务 1 ...
开始阻塞任务 2 ...
开始阻塞任务 3 ...
开始阻塞任务 4 ...

开始阻塞任务 5 ...
开始阻塞任务 6 ...
开始阻塞任务 7 ...
开始阻塞任务 8 ...
开始阻塞任务 9 ...
阻塞任务 1 完成
阻塞任务 3 完成
阻塞任务 0 完成
阻塞任务 2 完成
阻塞任务 8 完成
阻塞任务 5 完成
阻塞任务 6 完成
阻塞任务 4 完成
阻塞任务 9 完成
阻塞任务 7 完成
['结果-0', '结果-1', '结果-2', '结果-3', '结果-4', '结果-5', '结果-6', '结果-7', '结果-8', '结果-9']


In [3]:
import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

async def main():
    print("开始运行 main")
    await asyncio.gather(
        say_hello(),
        say_hello(),
        say_hello()
    )
    print("main 结束")

await main()

开始运行 main
Hello
Hello
Hello


World
World
World
main 结束
