# Concurrency & Parallelism

This lesson covers concurrent and parallel programming concepts in Python.

## Learning Objectives

By the end of this lesson, you will be able to:

1. **Concurrency vs Parallelism**
   - Understand the difference between concurrency and parallelism
   - Know when to use each approach
   - Understand Python's Global Interpreter Lock (GIL)

2. **Threading**
   - Use threading for I/O-bound tasks
   - Handle shared data safely
   - Use ThreadPoolExecutor for thread management

3. **Multiprocessing**
   - Use multiprocessing for CPU-bound tasks
   - Handle inter-process communication
   - Use ProcessPoolExecutor for process management

4. **Asyncio**
   - Use asyncio for asynchronous programming
   - Handle coroutines and tasks
   - Implement async/await patterns

5. **Synchronization**
   - Use locks, semaphores, and other synchronization primitives
   - Handle race conditions
   - Implement thread-safe data structures

6. **Performance Optimization**
   - Choose the right concurrency model
   - Optimize for different types of workloads
   - Measure and improve performance


## 1. Concurrency vs Parallelism

Understanding the difference between concurrency and parallelism is crucial for choosing the right approach.

### Key Concepts

- **Concurrency**: Multiple tasks making progress but not necessarily simultaneously
- **Parallelism**: Multiple tasks executing simultaneously
- **GIL**: Global Interpreter Lock that prevents true parallelism in Python threads


In [None]:
# 1. Concurrency vs Parallelism
print("1. Concurrency vs Parallelism")
print("-" * 30)

import time
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def cpu_bound_task(n):
    """CPU-bound task that performs calculations."""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

def io_bound_task(duration):
    """I/O-bound task that simulates waiting."""
    time.sleep(duration)
    return f"Completed after {duration} seconds"

# Sequential execution
print("Sequential execution:")
start_time = time.time()
results = []
for i in range(3):
    result = io_bound_task(1)
    results.append(result)
sequential_time = time.time() - start_time
print(f"Sequential time: {sequential_time:.2f} seconds")
print(f"Results: {results}")

# Concurrent execution with threading
print("\nConcurrent execution with threading:")
start_time = time.time()
threads = []
results = []

def thread_worker(duration):
    result = io_bound_task(duration)
    results.append(result)

for i in range(3):
    thread = threading.Thread(target=thread_worker, args=(1,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

concurrent_time = time.time() - start_time
print(f"Concurrent time: {concurrent_time:.2f} seconds")
print(f"Results: {results}")

# Parallel execution with multiprocessing
print("\nParallel execution with multiprocessing:")
start_time = time.time()
with ProcessPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(io_bound_task, 1) for _ in range(3)]
    results = [future.result() for future in futures]
parallel_time = time.time() - start_time
print(f"Parallel time: {parallel_time:.2f} seconds")
print(f"Results: {results}")

# CPU-bound task comparison
print("\nCPU-bound task comparison:")
n = 1000000

# Sequential
start_time = time.time()
sequential_result = cpu_bound_task(n)
sequential_time = time.time() - start_time
print(f"Sequential CPU task: {sequential_time:.2f} seconds")

# Threading (limited by GIL)
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(cpu_bound_task, n//4) for _ in range(4)]
    threading_results = [future.result() for future in futures]
threading_time = time.time() - start_time
print(f"Threading CPU task: {threading_time:.2f} seconds")

# Multiprocessing (true parallelism)
start_time = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(cpu_bound_task, n//4) for _ in range(4)]
    multiprocessing_results = [future.result() for future in futures]
multiprocessing_time = time.time() - start_time
print(f"Multiprocessing CPU task: {multiprocessing_time:.2f} seconds")

# Performance comparison
print(f"\nPerformance comparison:")
print(f"Sequential: 1.00x")
print(f"Threading: {sequential_time/threading_time:.2f}x")
print(f"Multiprocessing: {sequential_time/multiprocessing_time:.2f}x")


In [None]:
# 2. Threading
print("2. Threading")
print("-" * 20)

import threading
import time
import random
from concurrent.futures import ThreadPoolExecutor
import queue

# Basic threading
print("Basic threading:")
def worker(name, duration):
    print(f"Worker {name} starting")
    time.sleep(duration)
    print(f"Worker {name} finished")

# Create and start threads
threads = []
for i in range(3):
    thread = threading.Thread(target=worker, args=(f"Thread-{i+1}", random.uniform(1, 3)))
    threads.append(thread)
    thread.start()

# Wait for all threads to complete
for thread in threads:
    thread.join()

print("All threads completed")

# Threading with shared data
print("\nThreading with shared data:")
counter = 0
lock = threading.Lock()

def increment_counter():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

# Create threads
threads = []
for i in range(5):
    thread = threading.Thread(target=increment_counter)
    threads.append(thread)
    thread.start()

# Wait for completion
for thread in threads:
    thread.join()

print(f"Final counter value: {counter}")

# ThreadPoolExecutor
print("\nThreadPoolExecutor:")
def square_number(n):
    return n ** 2

# Submit tasks to thread pool
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(square_number, i) for i in range(10)]
    results = [future.result() for future in futures]

print(f"Results: {results}")

# Producer-Consumer pattern
print("\nProducer-Consumer pattern:")
q = queue.Queue(maxsize=5)

def producer():
    for i in range(10):
        q.put(f"Item-{i}")
        print(f"Produced: Item-{i}")
        time.sleep(0.1)

def consumer():
    while True:
        try:
            item = q.get(timeout=1)
            print(f"Consumed: {item}")
            time.sleep(0.2)
            q.task_done()
        except queue.Empty:
            break

# Start producer and consumer threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

# Thread-safe data structures
print("\nThread-safe data structures:")
from collections import deque
import collections

# Thread-safe queue
thread_safe_queue = queue.Queue()

def queue_worker():
    for i in range(5):
        thread_safe_queue.put(f"Data-{i}")
        time.sleep(0.1)

def queue_reader():
    while not thread_safe_queue.empty():
        try:
            data = thread_safe_queue.get(timeout=0.5)
            print(f"Read: {data}")
            thread_safe_queue.task_done()
        except queue.Empty:
            break

# Start threads
writer_thread = threading.Thread(target=queue_worker)
reader_thread = threading.Thread(target=queue_reader)

writer_thread.start()
reader_thread.start()

writer_thread.join()
reader_thread.join()

# Thread local storage
print("\nThread local storage:")
thread_local_data = threading.local()

def thread_local_worker(thread_id):
    thread_local_data.value = f"Thread-{thread_id}"
    thread_local_data.counter = 0
    
    for i in range(3):
        thread_local_data.counter += 1
        print(f"{thread_local_data.value}: counter = {thread_local_data.counter}")
        time.sleep(0.1)

# Create threads with local storage
threads = []
for i in range(3):
    thread = threading.Thread(target=thread_local_worker, args=(i+1,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()


In [None]:
# 3. Multiprocessing
print("3. Multiprocessing")
print("-" * 20)

import multiprocessing
import time
import os
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp

# Basic multiprocessing
print("Basic multiprocessing:")
def worker_process(name, duration):
    print(f"Process {name} (PID: {os.getpid()}) starting")
    time.sleep(duration)
    print(f"Process {name} (PID: {os.getpid()}) finished")
    return f"Result from {name}"

# Create and start processes
processes = []
for i in range(3):
    process = multiprocessing.Process(target=worker_process, args=(f"Process-{i+1}", 2))
    processes.append(process)
    process.start()

# Wait for all processes to complete
for process in processes:
    process.join()

print("All processes completed")

# ProcessPoolExecutor
print("\nProcessPoolExecutor:")
def cpu_intensive_task(n):
    """CPU-intensive task that benefits from multiprocessing."""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

# Submit tasks to process pool
with ProcessPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(cpu_intensive_task, 1000000) for _ in range(4)]
    results = [future.result() for future in futures]

print(f"Results: {results}")

# Shared memory with multiprocessing
print("\nShared memory with multiprocessing:")
def worker_with_shared_data(shared_array, process_id):
    """Worker that modifies shared data."""
    for i in range(len(shared_array)):
        shared_array[i] = process_id * 100 + i
    print(f"Process {process_id} modified shared array")

# Create shared array
shared_array = multiprocessing.Array('i', 10)

# Create processes
processes = []
for i in range(3):
    process = multiprocessing.Process(target=worker_with_shared_data, args=(shared_array, i+1))
    processes.append(process)
    process.start()

# Wait for completion
for process in processes:
    process.join()

print(f"Final shared array: {list(shared_array)}")

# Process communication with Queue
print("\nProcess communication with Queue:")
def producer_process(queue):
    """Producer process that puts data in queue."""
    for i in range(5):
        queue.put(f"Data-{i}")
        print(f"Producer put: Data-{i}")
        time.sleep(0.1)

def consumer_process(queue):
    """Consumer process that gets data from queue."""
    while True:
        try:
            data = queue.get(timeout=1)
            print(f"Consumer got: {data}")
            time.sleep(0.2)
        except:
            break

# Create queue for inter-process communication
process_queue = multiprocessing.Queue()

# Create producer and consumer processes
producer = multiprocessing.Process(target=producer_process, args=(process_queue,))
consumer = multiprocessing.Process(target=consumer_process, args=(process_queue,))

# Start processes
producer.start()
consumer.start()

# Wait for completion
producer.join()
consumer.join()

# Process synchronization
print("\nProcess synchronization:")
def synchronized_worker(lock, counter, process_id):
    """Worker that uses process lock for synchronization."""
    for _ in range(3):
        with lock:
            counter.value += 1
            print(f"Process {process_id}: counter = {counter.value}")
        time.sleep(0.1)

# Create shared counter and lock
shared_counter = multiprocessing.Value('i', 0)
process_lock = multiprocessing.Lock()

# Create processes
processes = []
for i in range(3):
    process = multiprocessing.Process(target=synchronized_worker, args=(process_lock, shared_counter, i+1))
    processes.append(process)
    process.start()

# Wait for completion
for process in processes:
    process.join()

print(f"Final counter value: {shared_counter.value}")

# Process pool with map
print("\nProcess pool with map:")
def square_number(n):
    return n ** 2

# Use process pool to map function over data
with ProcessPoolExecutor(max_workers=4) as executor:
    numbers = list(range(10))
    results = list(executor.map(square_number, numbers))

print(f"Numbers: {numbers}")
print(f"Squared: {results}")

# Process vs Thread performance comparison
print("\nProcess vs Thread performance comparison:")
def cpu_bound_task(n):
    """CPU-bound task for performance comparison."""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

# Test with different numbers of workers
n = 1000000
workers = [1, 2, 4, 8]

print("Workers | Threading | Multiprocessing")
print("-" * 40)

for num_workers in workers:
    # Threading
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(cpu_bound_task, n//num_workers) for _ in range(num_workers)]
        results = [future.result() for future in futures]
    threading_time = time.time() - start_time
    
    # Multiprocessing
    start_time = time.time()
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(cpu_bound_task, n//num_workers) for _ in range(num_workers)]
        results = [future.result() for future in futures]
    multiprocessing_time = time.time() - start_time
    
    print(f"{num_workers:7} | {threading_time:8.2f}s | {multiprocessing_time:13.2f}s")


In [None]:
# 4. Asyncio
print("4. Asyncio")
print("-" * 15)

import asyncio
import aiohttp
import time
import random

# Basic asyncio
print("Basic asyncio:")
async def async_task(name, duration):
    print(f"Task {name} starting")
    await asyncio.sleep(duration)
    print(f"Task {name} finished")
    return f"Result from {name}"

async def main():
    # Create tasks
    tasks = [
        async_task("Task-1", 1),
        async_task("Task-2", 2),
        async_task("Task-3", 1.5)
    ]
    
    # Run tasks concurrently
    results = await asyncio.gather(*tasks)
    print(f"Results: {results}")

# Run the async main function
asyncio.run(main())

# Async with asyncio.gather
print("\nAsync with asyncio.gather:")
async def fetch_data(url):
    """Simulate fetching data from a URL."""
    await asyncio.sleep(random.uniform(0.5, 2.0))
    return f"Data from {url}"

async def gather_example():
    urls = ["http://example.com", "http://test.com", "http://demo.com"]
    tasks = [fetch_data(url) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

# Run gather example
results = asyncio.run(gather_example())
print(f"Gathered results: {results}")

# Async with asyncio.create_task
print("\nAsync with asyncio.create_task:")
async def create_task_example():
    # Create tasks
    task1 = asyncio.create_task(fetch_data("http://api1.com"))
    task2 = asyncio.create_task(fetch_data("http://api2.com"))
    task3 = asyncio.create_task(fetch_data("http://api3.com"))
    
    # Wait for tasks to complete
    result1 = await task1
    result2 = await task2
    result3 = await task3
    
    return [result1, result2, result3]

# Run create task example
results = asyncio.run(create_task_example())
print(f"Task results: {results}")

# Async with asyncio.wait
print("\nAsync with asyncio.wait:")
async def wait_example():
    tasks = [
        asyncio.create_task(fetch_data("http://slow.com")),
        asyncio.create_task(fetch_data("http://fast.com")),
        asyncio.create_task(fetch_data("http://medium.com"))
    ]
    
    # Wait for all tasks to complete
    done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
    
    results = []
    for task in done:
        results.append(task.result())
    
    return results

# Run wait example
results = asyncio.run(wait_example())
print(f"Wait results: {results}")

# Async with asyncio.as_completed
print("\nAsync with asyncio.as_completed:")
async def as_completed_example():
    tasks = [
        asyncio.create_task(fetch_data("http://api1.com")),
        asyncio.create_task(fetch_data("http://api2.com")),
        asyncio.create_task(fetch_data("http://api3.com"))
    ]
    
    results = []
    for coro in asyncio.as_completed(tasks):
        result = await coro
        results.append(result)
        print(f"Completed: {result}")
    
    return results

# Run as_completed example
results = asyncio.run(as_completed_example())
print(f"As completed results: {results}")

# Async with asyncio.timeout
print("\nAsync with asyncio.timeout:")
async def timeout_example():
    try:
        async with asyncio.timeout(1.0):
            await fetch_data("http://slow.com")
    except asyncio.TimeoutError:
        print("Task timed out!")

# Run timeout example
asyncio.run(timeout_example())

# Async with asyncio.shield
print("\nAsync with asyncio.shield:")
async def shield_example():
    async def long_running_task():
        await asyncio.sleep(2)
        return "Long task completed"
    
    # Shield the task from cancellation
    shielded_task = asyncio.shield(long_running_task())
    
    # Cancel the shielded task
    shielded_task.cancel()
    
    try:
        result = await shielded_task
        print(f"Shielded result: {result}")
    except asyncio.CancelledError:
        print("Shielded task was cancelled")

# Run shield example
asyncio.run(shield_example())

# Async with asyncio.gather and return_exceptions
print("\nAsync with asyncio.gather and return_exceptions:")
async def gather_with_exceptions():
    async def task_with_exception():
        await asyncio.sleep(0.5)
        raise ValueError("Task failed!")
    
    async def normal_task():
        await asyncio.sleep(1)
        return "Normal task completed"
    
    tasks = [
        task_with_exception(),
        normal_task()
    ]
    
    # Gather with return_exceptions=True
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")

# Run gather with exceptions example
asyncio.run(gather_with_exceptions())

# Async with asyncio.gather and return_when
print("\nAsync with asyncio.gather and return_when:")
async def gather_with_return_when():
    tasks = [
        asyncio.create_task(fetch_data("http://fast.com")),
        asyncio.create_task(fetch_data("http://slow.com")),
        asyncio.create_task(fetch_data("http://medium.com"))
    ]
    
    # Wait for first task to complete
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    
    print(f"Completed tasks: {len(done)}")
    print(f"Pending tasks: {len(pending)}")
    
    # Cancel pending tasks
    for task in pending:
        task.cancel()
    
    # Wait for cancelled tasks
    await asyncio.gather(*pending, return_exceptions=True)

# Run gather with return_when example
asyncio.run(gather_with_return_when())

# Async with asyncio.gather and timeout
print("\nAsync with asyncio.gather and timeout:")
async def gather_with_timeout():
    tasks = [
        asyncio.create_task(fetch_data("http://slow.com")),
        asyncio.create_task(fetch_data("http://very-slow.com")),
        asyncio.create_task(fetch_data("http://fast.com"))
    ]
    
    try:
        async with asyncio.timeout(1.5):
            results = await asyncio.gather(*tasks)
            print(f"All tasks completed: {results}")
    except asyncio.TimeoutError:
        print("Gather timed out!")
        
        # Check which tasks completed
        for i, task in enumerate(tasks):
            if task.done():
                print(f"Task {i} completed: {task.result()}")
            else:
                print(f"Task {i} still running")

# Run gather with timeout example
asyncio.run(gather_with_timeout())


In [None]:
# 5. Performance Comparison
print("5. Performance Comparison")
print("-" * 25)

import time
import threading
import multiprocessing
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# Performance comparison functions
def cpu_bound_task(n):
    """CPU-bound task for performance comparison."""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

async def async_cpu_bound_task(n):
    """Async CPU-bound task."""
    result = 0
    for i in range(n):
        result += i ** 2
    return result

def io_bound_task(duration):
    """I/O-bound task for performance comparison."""
    time.sleep(duration)
    return f"Completed after {duration} seconds"

async def async_io_bound_task(duration):
    """Async I/O-bound task."""
    await asyncio.sleep(duration)
    return f"Completed after {duration} seconds"

# CPU-bound performance comparison
print("CPU-bound performance comparison:")
n = 1000000
workers = [1, 2, 4, 8]

print("Workers | Sequential | Threading | Multiprocessing | Asyncio")
print("-" * 60)

for num_workers in workers:
    # Sequential
    start_time = time.time()
    for _ in range(num_workers):
        cpu_bound_task(n//num_workers)
    sequential_time = time.time() - start_time
    
    # Threading
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(cpu_bound_task, n//num_workers) for _ in range(num_workers)]
        results = [future.result() for future in futures]
    threading_time = time.time() - start_time
    
    # Multiprocessing
    start_time = time.time()
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(cpu_bound_task, n//num_workers) for _ in range(num_workers)]
        results = [future.result() for future in futures]
    multiprocessing_time = time.time() - start_time
    
    # Asyncio
    async def async_cpu_test():
        tasks = [async_cpu_bound_task(n//num_workers) for _ in range(num_workers)]
        return await asyncio.gather(*tasks)
    
    start_time = time.time()
    asyncio.run(async_cpu_test())
    asyncio_time = time.time() - start_time
    
    print(f"{num_workers:7} | {sequential_time:9.2f}s | {threading_time:8.2f}s | {multiprocessing_time:13.2f}s | {asyncio_time:6.2f}s")

# I/O-bound performance comparison
print("\nI/O-bound performance comparison:")
duration = 1.0
workers = [1, 2, 4, 8]

print("Workers | Sequential | Threading | Multiprocessing | Asyncio")
print("-" * 60)

for num_workers in workers:
    # Sequential
    start_time = time.time()
    for _ in range(num_workers):
        io_bound_task(duration)
    sequential_time = time.time() - start_time
    
    # Threading
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(io_bound_task, duration) for _ in range(num_workers)]
        results = [future.result() for future in futures]
    threading_time = time.time() - start_time
    
    # Multiprocessing
    start_time = time.time()
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        futures = [executor.submit(io_bound_task, duration) for _ in range(num_workers)]
        results = [future.result() for future in futures]
    multiprocessing_time = time.time() - start_time
    
    # Asyncio
    async def async_io_test():
        tasks = [async_io_bound_task(duration) for _ in range(num_workers)]
        return await asyncio.gather(*tasks)
    
    start_time = time.time()
    asyncio.run(async_io_test())
    asyncio_time = time.time() - start_time
    
    print(f"{num_workers:7} | {sequential_time:9.2f}s | {threading_time:8.2f}s | {multiprocessing_time:13.2f}s | {asyncio_time:6.2f}s")

# Memory usage comparison
print("\nMemory usage comparison:")
import psutil
import os

def get_memory_usage():
    """Get current memory usage in MB."""
    process = psutil.Process(os.getpid())
    return process.memory_info().rss / 1024 / 1024

# Test memory usage for different approaches
print("Approach | Memory Usage (MB)")
print("-" * 30)

# Sequential
memory_before = get_memory_usage()
for _ in range(4):
    cpu_bound_task(100000)
memory_after = get_memory_usage()
print(f"Sequential | {memory_after - memory_before:.2f}")

# Threading
memory_before = get_memory_usage()
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(cpu_bound_task, 100000) for _ in range(4)]
    results = [future.result() for future in futures]
memory_after = get_memory_usage()
print(f"Threading | {memory_after - memory_before:.2f}")

# Multiprocessing
memory_before = get_memory_usage()
with ProcessPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(cpu_bound_task, 100000) for _ in range(4)]
    results = [future.result() for future in futures]
memory_after = get_memory_usage()
print(f"Multiprocessing | {memory_after - memory_before:.2f}")

# Asyncio
memory_before = get_memory_usage()
async def async_memory_test():
    tasks = [async_cpu_bound_task(100000) for _ in range(4)]
    return await asyncio.gather(*tasks)

asyncio.run(async_memory_test())
memory_after = get_memory_usage()
print(f"Asyncio | {memory_after - memory_before:.2f}")

# Scalability analysis
print("\nScalability analysis:")
print("CPU-bound tasks:")
print("- Sequential: O(n) - linear scaling")
print("- Threading: O(n) - limited by GIL")
print("- Multiprocessing: O(n/k) - scales with CPU cores")
print("- Asyncio: O(n) - single-threaded")

print("\nI/O-bound tasks:")
print("- Sequential: O(n) - linear scaling")
print("- Threading: O(n/k) - good for I/O")
print("- Multiprocessing: O(n/k) - good for I/O")
print("- Asyncio: O(n/k) - excellent for I/O")

# Best practices summary
print("\nBest practices summary:")
print("1. Use sequential execution for simple, small tasks")
print("2. Use threading for I/O-bound tasks")
print("3. Use multiprocessing for CPU-bound tasks")
print("4. Use asyncio for I/O-bound tasks with many concurrent operations")
print("5. Consider memory usage and overhead")
print("6. Profile your code to identify bottlenecks")
print("7. Use appropriate synchronization primitives")
print("8. Handle exceptions properly in concurrent code")
print("9. Consider the Global Interpreter Lock (GIL) limitations")
print("10. Use context managers for resource management")


## Practice Exercises

### Exercise 1: Threading vs Multiprocessing
Create a program that compares the performance of threading and multiprocessing for both CPU-bound and I/O-bound tasks. Measure execution time and memory usage for different numbers of workers.

### Exercise 2: Async Web Scraper
Build an async web scraper that fetches data from multiple URLs concurrently. Use asyncio and aiohttp to implement the scraper with proper error handling and rate limiting.

### Exercise 3: Producer-Consumer Pattern
Implement a producer-consumer pattern using both threading and multiprocessing. The producer should generate data, and the consumer should process it. Use queues for communication between threads/processes.

### Exercise 4: Thread Pool vs Process Pool
Create a program that uses both ThreadPoolExecutor and ProcessPoolExecutor to process a list of tasks. Compare the performance and choose the appropriate executor based on the task type.

### Exercise 5: Async Task Management
Implement an async task manager that can handle multiple tasks with different priorities. Use asyncio to manage task execution, cancellation, and error handling.

### Exercise 6: Concurrent File Processing
Build a program that processes multiple files concurrently using different approaches (threading, multiprocessing, asyncio). Compare the performance and choose the best approach for your use case.

### Exercise 7: Real-time Data Processing
Create a real-time data processing system that uses threading or asyncio to handle incoming data streams. Implement proper synchronization and error handling.

### Exercise 8: Performance Profiling
Profile your concurrent code to identify bottlenecks and optimize performance. Use tools like cProfile, line_profiler, or memory_profiler to analyze your code.

### Exercise 9: Error Handling in Concurrent Code
Implement proper error handling for concurrent code. Handle exceptions, timeouts, and cancellation gracefully.

### Exercise 10: Custom Thread Pool
Create a custom thread pool implementation that provides more control over thread management. Implement features like dynamic scaling, task prioritization, and resource management.


## Summary

In this lesson, we've covered the fundamentals of concurrency and parallelism in Python:

### Key Concepts Covered:
1. **Concurrency vs Parallelism**: Understanding the difference between concurrent and parallel execution
2. **Threading**: Using threads for I/O-bound tasks, understanding the GIL limitations
3. **Multiprocessing**: Using processes for CPU-bound tasks, true parallelism
4. **Asyncio**: Asynchronous programming for I/O-bound tasks with many concurrent operations
5. **Performance Comparison**: Comparing different approaches for different types of tasks

### Key Takeaways:
- **Use threading for I/O-bound tasks** where you need to wait for external resources
- **Use multiprocessing for CPU-bound tasks** where you need true parallelism
- **Use asyncio for I/O-bound tasks** with many concurrent operations
- **Consider the GIL limitations** when choosing between threading and multiprocessing
- **Profile your code** to identify bottlenecks and optimize performance
- **Handle exceptions properly** in concurrent code
- **Use appropriate synchronization primitives** to avoid race conditions

### Next Steps:
- Practice with the exercises provided
- Explore advanced topics like distributed computing
- Learn about specific libraries like Celery for task queues
- Study design patterns for concurrent programming
- Experiment with different approaches for your specific use cases

### Resources for Further Learning:
- [Python Threading Documentation](https://docs.python.org/3/library/threading.html)
- [Python Multiprocessing Documentation](https://docs.python.org/3/library/multiprocessing.html)
- [Python Asyncio Documentation](https://docs.python.org/3/library/asyncio.html)
- [Concurrent Programming in Python](https://realpython.com/python-concurrency/)
- [AsyncIO Tutorial](https://docs.python.org/3/library/asyncio-task.html)


# 3. Concurrency & Parallelism - Working with Threads and Processes

Welcome to the third lesson of the Advanced Level! In this lesson, you'll learn how to write concurrent and parallel programs in Python.

## Learning Objectives

By the end of this lesson, you will be able to:
- Understand the difference between concurrency and parallelism
- Use threading for I/O-bound tasks
- Use multiprocessing for CPU-bound tasks
- Work with asyncio for asynchronous programming
- Understand the Global Interpreter Lock (GIL)
- Write efficient concurrent programs

## Table of Contents

1. [Concurrency vs Parallelism](#concurrency-vs-parallelism)
2. [Threading](#threading)
3. [Multiprocessing](#multiprocessing)
4. [Asyncio](#asyncio)
5. [Global Interpreter Lock](#global-interpreter-lock)
6. [Best Practices](#best-practices)


## Concurrency vs Parallelism

Understanding the difference between concurrency and parallelism is crucial for writing efficient programs.

### Concurrency
- **Definition**: Multiple tasks making progress over the same period
- **Use Case**: I/O-bound operations (file operations, network requests)
- **Python Tools**: `threading`, `asyncio`
- **Example**: Downloading multiple files simultaneously

### Parallelism
- **Definition**: Multiple tasks executing simultaneously
- **Use Case**: CPU-bound operations (mathematical calculations)
- **Python Tools**: `multiprocessing`
- **Example**: Processing large datasets with multiple CPU cores


In [None]:
# Threading Examples
import threading
import time
import requests
from concurrent.futures import ThreadPoolExecutor
import queue

# Basic threading example
def worker(name, delay):
    """A simple worker function."""
    print(f"Worker {name} starting")
    time.sleep(delay)
    print(f"Worker {name} finished")

print("Basic Threading Example")
print("=" * 30)

# Create and start threads
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(f"Thread-{i+1}", 2))
    threads.append(t)
    t.start()

# Wait for all threads to complete
for t in threads:
    t.join()

print("All threads completed!")

# Threading with shared data
print(f"\nThreading with Shared Data")
print("=" * 30)

# Shared counter
counter = 0
lock = threading.Lock()

def increment_counter():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

# Without lock (race condition)
counter_no_lock = 0

def increment_no_lock():
    global counter_no_lock
    for _ in range(100000):
        counter_no_lock += 1

# Test with lock
threads_with_lock = []
for _ in range(3):
    t = threading.Thread(target=increment_counter)
    threads_with_lock.append(t)
    t.start()

for t in threads_with_lock:
    t.join()

# Test without lock
threads_no_lock = []
for _ in range(3):
    t = threading.Thread(target=increment_no_lock)
    threads_no_lock.append(t)
    t.start()

for t in threads_no_lock:
    t.join()

print(f"Counter with lock: {counter}")
print(f"Counter without lock: {counter_no_lock}")
print(f"Expected: 300000")

# ThreadPoolExecutor example
print(f"\nThreadPoolExecutor Example")
print("=" * 30)

def fetch_url(url):
    """Simulate fetching a URL."""
    time.sleep(1)  # Simulate network delay
    return f"Data from {url}"

urls = [
    "https://example.com",
    "https://python.org",
    "https://github.com",
    "https://stackoverflow.com"
]

# Sequential execution
start_time = time.time()
sequential_results = []
for url in urls:
    result = fetch_url(url)
    sequential_results.append(result)
sequential_time = time.time() - start_time

# Concurrent execution
start_time = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
    concurrent_results = list(executor.map(fetch_url, urls))
concurrent_time = time.time() - start_time

print(f"Sequential time: {sequential_time:.2f} seconds")
print(f"Concurrent time: {concurrent_time:.2f} seconds")
print(f"Speedup: {sequential_time/concurrent_time:.2f}x")

# Producer-Consumer pattern
print(f"\nProducer-Consumer Pattern")
print("=" * 30)

def producer(q, items):
    """Producer function."""
    for item in items:
        q.put(item)
        print(f"Produced: {item}")
        time.sleep(0.5)
    q.put(None)  # Signal end

def consumer(q, name):
    """Consumer function."""
    while True:
        item = q.get()
        if item is None:
            break
        print(f"Consumer {name} consumed: {item}")
        time.sleep(0.3)
        q.task_done()

# Create queue and threads
q = queue.Queue()
items = ["item1", "item2", "item3", "item4", "item5"]

producer_thread = threading.Thread(target=producer, args=(q, items))
consumer_thread = threading.Thread(target=consumer, args=(q, "A"))

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

print("Producer-Consumer completed!")

# Thread-safe data structures
print(f"\nThread-Safe Data Structures")
print("=" * 30)

from collections import deque
import random

# Thread-safe queue
thread_safe_queue = queue.Queue()

def producer_safe(q, name, count):
    """Thread-safe producer."""
    for i in range(count):
        item = f"{name}-{i}"
        q.put(item)
        print(f"Producer {name} put: {item}")
        time.sleep(random.uniform(0.1, 0.3))

def consumer_safe(q, name):
    """Thread-safe consumer."""
    while True:
        try:
            item = q.get(timeout=1)
            print(f"Consumer {name} got: {item}")
            time.sleep(random.uniform(0.1, 0.3))
            q.task_done()
        except queue.Empty:
            break

# Create multiple producers and consumers
producers = []
consumers = []

# Start producers
for i in range(2):
    p = threading.Thread(target=producer_safe, args=(thread_safe_queue, f"P{i+1}", 3))
    producers.append(p)
    p.start()

# Start consumers
for i in range(2):
    c = threading.Thread(target=consumer_safe, args=(thread_safe_queue, f"C{i+1}"))
    consumers.append(c)
    c.start()

# Wait for producers to finish
for p in producers:
    p.join()

# Wait for queue to be empty
thread_safe_queue.join()

# Stop consumers
for c in consumers:
    c.join()

print("Thread-safe operations completed!")

# Thread local storage
print(f"\nThread Local Storage")
print("=" * 30)

thread_local_data = threading.local()

def worker_with_local(name):
    """Worker with thread-local data."""
    thread_local_data.name = name
    thread_local_data.counter = 0
    
    for i in range(3):
        thread_local_data.counter += 1
        print(f"Thread {thread_local_data.name}: counter = {thread_local_data.counter}")
        time.sleep(0.1)

# Create threads with local storage
local_threads = []
for i in range(3):
    t = threading.Thread(target=worker_with_local, args=(f"Local-{i+1}",))
    local_threads.append(t)
    t.start()

for t in local_threads:
    t.join()

print("Thread local storage completed!")
