# Notebook 02: Threading & I/O-Bound Tasks

## Topic 2: When Waiting is the Bottleneck

In this notebook, we'll explore:
1. Why threading works for I/O-bound tasks
2. `ThreadPoolExecutor` basics
3. Two patterns: `map()` vs `submit()` + `as_completed()`
4. Exception handling in threads
5. Practical file processing example

---

In [None]:
import time
import os
import numpy as np
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import matplotlib.pyplot as plt

print(f"Number of CPU cores: {os.cpu_count()}")

---

## Section 1: I/O-Bound vs CPU-Bound Demo

Let's first understand the difference between I/O-bound and CPU-bound tasks, and see why threading helps with I/O but not CPU work.

In [None]:
# Simulate an I/O-bound task (like fetching data from an API)
def io_bound_task(task_id):
    """Simulate network I/O - waiting for data."""
    time.sleep(0.3)  # Simulate network delay
    return f"Task {task_id} completed"

# Simulate a CPU-bound task (like heavy computation)
def cpu_bound_task(task_id):
    """Simulate CPU-intensive work."""
    total = 0
    for i in range(2_000_000):
        total += i * i
    return f"Task {task_id}: {total}"

In [None]:
# Test I/O-bound: Sequential vs Threaded
n_tasks = 8

print("=== I/O-BOUND TASKS ===")
print(f"Running {n_tasks} tasks that each wait 0.3 seconds...\n")

# Sequential
start = time.time()
results = [io_bound_task(i) for i in range(n_tasks)]
seq_time = time.time() - start
print(f"Sequential: {seq_time:.2f}s")

# Threaded
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
    results = list(executor.map(io_bound_task, range(n_tasks)))
thread_time = time.time() - start
print(f"Threaded:   {thread_time:.2f}s")
print(f"Speedup:    {seq_time/thread_time:.1f}x")

In [None]:
# Test CPU-bound: Sequential vs Threaded
n_tasks = 4

print("\n=== CPU-BOUND TASKS ===")
print(f"Running {n_tasks} CPU-intensive tasks...\n")

# Sequential
start = time.time()
results = [cpu_bound_task(i) for i in range(n_tasks)]
seq_time = time.time() - start
print(f"Sequential: {seq_time:.2f}s")

# Threaded
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(cpu_bound_task, range(n_tasks)))
thread_time = time.time() - start
print(f"Threaded:   {thread_time:.2f}s")
print(f"Speedup:    {seq_time/thread_time:.1f}x (no speedup due to GIL!)")

### Key Observation

- **I/O-bound tasks**: Threading gives near-perfect speedup (8 tasks, 8x faster)
- **CPU-bound tasks**: Threading gives NO speedup (or even slower!) due to the GIL

**Remember**: Use `ThreadPoolExecutor` for I/O, `ProcessPoolExecutor` for CPU work.

---

## Section 2: Pattern 1 - executor.map()

The simplest pattern: apply the same function to many inputs.

### Finance Example: Fetching Stock Data

In [None]:
def simulate_fetch_stock_data(ticker):
    """
    Simulate fetching stock data from an API.
    In real life, this would be a call to Yahoo Finance, Alpha Vantage, etc.
    """
    # Simulate variable network latency (100-500ms)
    delay = 0.1 + 0.4 * np.random.random()
    time.sleep(delay)
    
    # Generate fake stock data
    np.random.seed(hash(ticker) % 2**32)
    base_price = np.random.uniform(10, 500)
    
    return {
        'ticker': ticker,
        'price': round(base_price, 2),
        'change': round(np.random.uniform(-5, 5), 2),
        'volume': np.random.randint(100000, 10000000),
        'fetch_time': round(delay, 3)
    }

# Test with one ticker
print(simulate_fetch_stock_data('AAPL'))

In [None]:
# Portfolio of tickers to fetch
tickers = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'META', 'NVDA', 'TSLA', 'JPM', 
           'BAC', 'WMT', 'PG', 'JNJ', 'UNH', 'HD', 'V', 'MA']

print(f"Fetching data for {len(tickers)} stocks...\n")

In [None]:
# Sequential fetching
print("Sequential fetching:")
start = time.time()
sequential_results = [simulate_fetch_stock_data(t) for t in tickers]
seq_time = time.time() - start
print(f"Time: {seq_time:.2f}s\n")

In [None]:
# Parallel fetching with executor.map()
print("Parallel fetching with ThreadPoolExecutor:")
start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
    parallel_results = list(executor.map(simulate_fetch_stock_data, tickers))
par_time = time.time() - start
print(f"Time: {par_time:.2f}s")
print(f"Speedup: {seq_time/par_time:.1f}x")

In [None]:
# Results come back in INPUT ORDER (same as tickers list)
df = pd.DataFrame(parallel_results)
print("\nResults (note: order matches input ticker list):")
print(df.to_string(index=False))

### When to use `executor.map()`

- Same function applied to many inputs
- You need results in the **same order** as inputs
- Simple, clean code is important

---

## Section 3: Pattern 2 - submit() + as_completed()

More flexible pattern: process results as they arrive.

### When is this useful?
- Show progress to the user
- Process fast results while waiting for slow ones
- Early termination when you find what you need

In [None]:
def fetch_with_variable_delay(ticker):
    """
    Simulate fetching where some tickers are fast and some are slow.
    """
    # Some tickers are "slow" (like international exchanges)
    slow_tickers = {'TSLA', 'META', 'NVDA'}
    delay = 0.8 if ticker in slow_tickers else 0.2
    time.sleep(delay)
    
    np.random.seed(hash(ticker) % 2**32)
    return {
        'ticker': ticker,
        'price': round(np.random.uniform(10, 500), 2),
        'delay': delay
    }

In [None]:
# Using submit() + as_completed() to process results as they arrive
tickers = ['AAPL', 'TSLA', 'GOOGL', 'META', 'MSFT', 'NVDA', 'AMZN', 'JPM']

print("Processing results as they complete:\n")

start = time.time()
with ThreadPoolExecutor(max_workers=8) as executor:
    # Submit all tasks
    futures = {executor.submit(fetch_with_variable_delay, t): t for t in tickers}
    
    # Process results as they complete (NOT in submission order!)
    results = []
    for future in as_completed(futures):
        ticker = futures[future]
        result = future.result()
        elapsed = time.time() - start
        print(f"  [{elapsed:.2f}s] Got {ticker}: ${result['price']} (took {result['delay']}s)")
        results.append(result)

total_time = time.time() - start
print(f"\nTotal time: {total_time:.2f}s")

### Notice!

- Fast tickers (AAPL, GOOGL, MSFT, AMZN, JPM) complete first
- Slow tickers (TSLA, META, NVDA) complete last
- Results arrive in **completion order**, not submission order

This is useful for:
- Showing a progress bar
- Processing available data while waiting for the rest
- Canceling slow tasks if you've found what you need

---

## Section 4: Exception Handling

In real applications, some operations will fail. Here's how to handle errors gracefully.

In [None]:
def unreliable_fetch(ticker):
    """
    Simulate an unreliable API that sometimes fails.
    """
    time.sleep(0.2)
    
    # Simulate failures for certain tickers
    if ticker in ['BAD', 'INVALID', 'ERROR']:
        raise ValueError(f"Invalid ticker symbol: {ticker}")
    
    # Simulate random network failures (10% chance)
    if np.random.random() < 0.1:
        raise ConnectionError(f"Network timeout for {ticker}")
    
    np.random.seed(hash(ticker) % 2**32)
    return {'ticker': ticker, 'price': round(np.random.uniform(10, 500), 2)}

In [None]:
# Mix of valid and invalid tickers
tickers = ['AAPL', 'BAD', 'GOOGL', 'INVALID', 'MSFT', 'AMZN']

print("Fetching with error handling:\n")

successful = []
failed = []

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {executor.submit(unreliable_fetch, t): t for t in tickers}
    
    for future in as_completed(futures):
        ticker = futures[future]
        try:
            result = future.result()
            print(f"  SUCCESS: {ticker} = ${result['price']}")
            successful.append(result)
        except ValueError as e:
            print(f"  FAILED (invalid): {ticker} - {e}")
            failed.append({'ticker': ticker, 'error': str(e)})
        except ConnectionError as e:
            print(f"  FAILED (network): {ticker} - {e}")
            failed.append({'ticker': ticker, 'error': str(e)})

print(f"\nSuccessful: {len(successful)}, Failed: {len(failed)}")

### Best Practice: Always Handle Exceptions

When using `future.result()`, always wrap it in try/except:

```python
try:
    result = future.result()
except Exception as e:
    # Log the error, retry, or mark as failed
    print(f"Task failed: {e}")
```

---

## Section 5: Practical Example - Parallel File Processing

A common finance task: processing multiple data files.

In [None]:
# First, let's generate some sample files
import tempfile
import os

# Create a temporary directory for our sample files
temp_dir = tempfile.mkdtemp()
print(f"Created temp directory: {temp_dir}\n")

# Generate sample stock data files
stocks = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'META', 'NVDA', 'TSLA', 'JPM']
dates = pd.date_range('2020-01-01', '2023-12-31', freq='D')

print("Generating sample data files...")
for ticker in stocks:
    np.random.seed(hash(ticker) % 2**32)
    
    # Generate random price data
    returns = np.random.normal(0.0005, 0.02, len(dates))
    prices = 100 * np.cumprod(1 + returns)
    
    df = pd.DataFrame({
        'date': dates,
        'close': prices,
        'volume': np.random.randint(1000000, 10000000, len(dates))
    })
    
    filepath = os.path.join(temp_dir, f"{ticker}.csv")
    df.to_csv(filepath, index=False)
    print(f"  Created {ticker}.csv ({len(df)} rows)")

In [None]:
def process_stock_file(filepath):
    """
    Read a stock file and calculate summary statistics.
    This simulates I/O (file reading) followed by light computation.
    """
    # Simulate some I/O delay (file reading from slow storage)
    time.sleep(0.1)
    
    df = pd.read_csv(filepath)
    ticker = os.path.basename(filepath).replace('.csv', '')
    
    # Calculate returns
    df['returns'] = df['close'].pct_change()
    
    # Calculate statistics
    stats = {
        'ticker': ticker,
        'start_price': df['close'].iloc[0],
        'end_price': df['close'].iloc[-1],
        'total_return': (df['close'].iloc[-1] / df['close'].iloc[0] - 1) * 100,
        'volatility': df['returns'].std() * np.sqrt(252) * 100,  # Annualized
        'avg_volume': df['volume'].mean(),
        'sharpe': (df['returns'].mean() * 252) / (df['returns'].std() * np.sqrt(252))
    }
    
    return stats

In [None]:
# Get list of all CSV files
csv_files = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if f.endswith('.csv')]
print(f"Processing {len(csv_files)} files...\n")

In [None]:
# Sequential processing
print("Sequential processing:")
start = time.time()
sequential_stats = [process_stock_file(f) for f in csv_files]
seq_time = time.time() - start
print(f"Time: {seq_time:.2f}s")

In [None]:
# Parallel processing
print("\nParallel processing:")
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    parallel_stats = list(executor.map(process_stock_file, csv_files))
par_time = time.time() - start
print(f"Time: {par_time:.2f}s")
print(f"Speedup: {seq_time/par_time:.1f}x")

In [None]:
# Display results
results_df = pd.DataFrame(parallel_stats)
results_df = results_df.round(2)
print("\nStock Summary Statistics:")
print(results_df.to_string(index=False))

In [None]:
# Cleanup temp files
import shutil
shutil.rmtree(temp_dir)
print(f"Cleaned up temp directory")

---

## Summary: Threading for I/O-Bound Tasks

### Key Takeaways

1. **Threading works for I/O-bound tasks** because the GIL is released during I/O operations

2. **Two main patterns**:
   - `executor.map(func, items)` - Simple, results in order
   - `executor.submit()` + `as_completed()` - Process results as they arrive

3. **Always handle exceptions** when calling `future.result()`

4. **Don't use threading for CPU-bound work** - you won't get speedup due to the GIL

### When to Use Threading

| Use Threading For | Don't Use Threading For |
|-------------------|------------------------|
| API calls | Monte Carlo simulations |
| Database queries | Heavy calculations |
| File I/O | Number crunching |
| Network requests | Optimization algorithms |

---

## Exercises

### Exercise 1: Parallel Data Fetching

Modify the `simulate_fetch_stock_data` function to also return the 52-week high and low. Then fetch data for 20 tickers in parallel.

In [None]:
# Your code here
def fetch_extended_data(ticker):
    """Fetch extended stock data including 52-week high/low."""
    pass  # Implement me!

### Exercise 2: Retry Logic

Implement a wrapper function that retries failed fetches up to 3 times before giving up.

In [None]:
# Your code here
def fetch_with_retry(ticker, max_retries=3):
    """Fetch data with automatic retry on failure."""
    pass  # Implement me!

### Exercise 3: Progress Tracking

Using `tqdm`, add a progress bar to the parallel file processing example.

In [None]:
# Hint: Use tqdm with as_completed
# from tqdm import tqdm
# for future in tqdm(as_completed(futures), total=len(futures)):
#     ...