<a href="https://colab.research.google.com/github/jeremiahoclark/python-coding-patterns/blob/main/01_design_patterns.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Design Patterns in Python (Creational, Structural, Behavioral)

This notebook covers classic "Gang of Four" design patterns adapted for Python, emphasizing Pythonic implementations that leverage Python's dynamic nature and built-in features.

## Overview

Design patterns are general, reusable solutions to common problems in software design. The classic patterns are categorized as:
- **Creational**: Object creation mechanisms
- **Structural**: Composition of classes/objects  
- **Behavioral**: Interaction and responsibility of objects

We'll explore 6 important patterns with practical examples using real-world data scenarios.

In [None]:
# Required imports for all patterns
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, Protocol, Callable
import time
import functools
from dataclasses import dataclass
from datetime import datetime
import json
import random

## 1.1 Singleton Pattern (Creational)

**Problem**: Ensure a class has only one instance globally, and provide a global point of access to that instance.

**Use Case**: Database connection manager, configuration settings, logging service.

**Real-world Example**: Application configuration that should be loaded once and shared across the entire application.

In [None]:
class DatabaseConnection:
    """Singleton database connection manager."""
    _instance: Optional['DatabaseConnection'] = None
    _initialized: bool = False

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def __init__(self, host: str = "localhost", port: int = 5432):
        if not self._initialized:
            self.host = host
            self.port = port
            self.connected = False
            self.connection_count = 0
            self._initialized = True
            print(f"Initializing database connection to {host}:{port}")

    def connect(self):
        if not self.connected:
            self.connected = True
            self.connection_count += 1
            print(f"Connected to database (connection #{self.connection_count})")
        else:
            print("Already connected")

    def get_stats(self) -> Dict[str, Any]:
        return {
            "host": self.host,
            "port": self.port,
            "connected": self.connected,
            "total_connections": self.connection_count,
            "instance_id": id(self)
        }

# Borg Pattern Alternative - Multiple instances, shared state
class ConfigurationManager:
    """Borg pattern: all instances share state."""
    _shared_state: Dict[str, Any] = {}

    def __init__(self):
        self.__dict__ = ConfigurationManager._shared_state
        if not hasattr(self, 'initialized'):
            self.initialized = True
            self.settings = {
                "debug": False,
                "api_url": "https://api.example.com",
                "timeout": 30
            }
            print("Configuration initialized")

    def update_setting(self, key: str, value: Any):
        self.settings[key] = value
        print(f"Updated {key} = {value}")

    def get_settings(self) -> Dict[str, Any]:
        return self.settings.copy()

In [None]:
# Demo: Singleton Pattern
print("=== Singleton Pattern Demo ===")

# Create multiple instances - should be the same object
db1 = DatabaseConnection("prod-server", 5432)
db2 = DatabaseConnection("dev-server", 3306)  # These params are ignored!

print(f"db1 is db2: {db1 is db2}")
print(f"db1 stats: {db1.get_stats()}")
print(f"db2 stats: {db2.get_stats()}")

# Connect using either instance
db1.connect()
print(f"After db1.connect() - db2 stats: {db2.get_stats()}")

print("\n=== Borg Pattern Demo ===")

# Create multiple instances - different objects, same state
config1 = ConfigurationManager()
config2 = ConfigurationManager()

print(f"config1 is config2: {config1 is config2}")
print(f"Initial settings: {config1.get_settings()}")

# Modify through one instance
config1.update_setting("debug", True)
print(f"config2 sees the change: {config2.get_settings()}")

## 1.2 Factory Pattern (Creational)

**Problem**: Decouple object creation from object usage, providing a flexible way to create objects.

**Use Case**: Creating different types of data processors, model factories, or UI components.

**Real-world Example**: Data processing pipeline that needs different processors for different file types.

In [None]:
# Data processors for different file types
class DataProcessor(ABC):
    """Abstract base class for data processors."""

    @abstractmethod
    def process(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        pass

    @abstractmethod
    def get_supported_formats(self) -> List[str]:
        pass

class CSVProcessor(DataProcessor):
    """Processor for CSV-like data."""

    def process(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        # Clean and standardize CSV data
        processed = []
        for row in data:
            cleaned_row = {k.strip().lower(): str(v).strip() for k, v in row.items()}
            processed.append(cleaned_row)
        print(f"CSV Processor: Cleaned {len(processed)} rows")
        return processed

    def get_supported_formats(self) -> List[str]:
        return ["csv", "tsv"]

class JSONProcessor(DataProcessor):
    """Processor for JSON data."""

    def process(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        # Flatten nested JSON structures
        processed = []
        for item in data:
            flattened = self._flatten_dict(item)
            processed.append(flattened)
        print(f"JSON Processor: Flattened {len(processed)} objects")
        return processed

    def _flatten_dict(self, d: Dict[str, Any], prefix: str = '') -> Dict[str, Any]:
        """Flatten nested dictionary."""
        result = {}
        for key, value in d.items():
            new_key = f"{prefix}_{key}" if prefix else key
            if isinstance(value, dict):
                result.update(self._flatten_dict(value, new_key))
            else:
                result[new_key] = value
        return result

    def get_supported_formats(self) -> List[str]:
        return ["json", "jsonl"]

class XMLProcessor(DataProcessor):
    """Processor for XML-like structured data."""

    def process(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        # Add XML-specific processing logic
        processed = []
        for item in data:
            # Convert to consistent structure
            standardized = {}
            for key, value in item.items():
                # Remove XML namespace prefixes
                clean_key = key.split(':')[-1] if ':' in key else key
                standardized[clean_key] = value
            processed.append(standardized)
        print(f"XML Processor: Standardized {len(processed)} elements")
        return processed

    def get_supported_formats(self) -> List[str]:
        return ["xml", "html"]

# Factory implementation
class DataProcessorFactory:
    """Factory for creating data processors."""

    _processors = {
        'csv': CSVProcessor,
        'tsv': CSVProcessor,
        'json': JSONProcessor,
        'jsonl': JSONProcessor,
        'xml': XMLProcessor,
        'html': XMLProcessor
    }

    @classmethod
    def create_processor(cls, file_format: str) -> DataProcessor:
        """Create a processor for the given format."""
        format_lower = file_format.lower()
        if format_lower not in cls._processors:
            raise ValueError(f"Unsupported format: {file_format}. "
                           f"Supported formats: {list(cls._processors.keys())}")

        processor_class = cls._processors[format_lower]
        return processor_class()

    @classmethod
    def get_supported_formats(cls) -> List[str]:
        """Get all supported formats."""
        return list(cls._processors.keys())

    @classmethod
    def register_processor(cls, formats: List[str], processor_class: type):
        """Register a new processor for given formats."""
        for fmt in formats:
            cls._processors[fmt.lower()] = processor_class

In [None]:
# Demo: Factory Pattern with sample data
print("=== Factory Pattern Demo ===")

# Sample data for different formats
csv_data = [
    {"Name ": "John Doe", " Age": "25", "City": " New York "},
    {"Name ": "Jane Smith", " Age": "30", "City": " Los Angeles "}
]

json_data = [
    {
        "user": {"name": "Alice", "age": 28},
        "address": {"city": "Chicago", "zip": "60601"}
    },
    {
        "user": {"name": "Bob", "age": 32},
        "address": {"city": "Miami", "zip": "33101"}
    }
]

xml_data = [
    {"ns:name": "Charlie", "ns:age": "35", "ns:department": "Engineering"},
    {"ns:name": "Diana", "ns:age": "29", "ns:department": "Marketing"}
]

# Process different data types using factory
data_samples = [
    ("csv", csv_data),
    ("json", json_data),
    ("xml", xml_data)
]

print(f"Supported formats: {DataProcessorFactory.get_supported_formats()}")
print()

results = []
for format_type, data in data_samples:
    print(f"Processing {format_type.upper()} data:")
    print(f"Input: {data}")

    # Factory creates appropriate processor
    processor = DataProcessorFactory.create_processor(format_type)
    processed_data = processor.process(data)

    print(f"Output: {processed_data}")
    print(f"Supported formats for this processor: {processor.get_supported_formats()}")
    print("-" * 50)

    results.append((format_type, processed_data))

# Try to create processor for unsupported format
try:
    unsupported_processor = DataProcessorFactory.create_processor("pdf")
except ValueError as e:
    print(f"Expected error: {e}")

## 1.3 Builder Pattern (Creational)

**Problem**: Separate the construction of complex objects from their representation, allowing step-by-step construction.

**Use Case**: Building complex configuration objects, SQL queries, or data transformation pipelines.

**Real-world Example**: Building a data analysis report with various optional components.

In [None]:
@dataclass
class DataReport:
    """A data analysis report with multiple optional components."""
    title: str
    data: List[Dict[str, Any]]
    summary_stats: Optional[Dict[str, Any]] = None
    charts: Optional[List[str]] = None
    filters: Optional[Dict[str, Any]] = None
    export_format: str = "json"
    include_metadata: bool = False

    def to_dict(self) -> Dict[str, Any]:
        """Convert report to dictionary format."""
        result = {
            "title": self.title,
            "data_count": len(self.data),
            "export_format": self.export_format
        }

        if self.summary_stats:
            result["summary_stats"] = self.summary_stats

        if self.charts:
            result["charts"] = self.charts

        if self.filters:
            result["applied_filters"] = self.filters

        if self.include_metadata:
            result["metadata"] = {
                "created_at": datetime.now().isoformat(),
                "total_records": len(self.data)
            }

        return result

class DataReportBuilder:
    """Builder for creating data reports with fluent interface."""

    def __init__(self):
        self.reset()

    def reset(self) -> 'DataReportBuilder':
        """Reset builder to initial state."""
        self._title: Optional[str] = None
        self._data: List[Dict[str, Any]] = []
        self._summary_stats: Optional[Dict[str, Any]] = None
        self._charts: List[str] = []
        self._filters: Dict[str, Any] = {}
        self._export_format: str = "json"
        self._include_metadata: bool = False
        return self

    def set_title(self, title: str) -> 'DataReportBuilder':
        """Set report title."""
        self._title = title
        return self

    def add_data(self, data: List[Dict[str, Any]]) -> 'DataReportBuilder':
        """Add data to the report."""
        self._data.extend(data)
        return self

    def calculate_summary_stats(self) -> 'DataReportBuilder':
        """Calculate and add summary statistics."""
        if not self._data:
            return self

        # Calculate basic stats
        numeric_fields = []
        for record in self._data:
            for key, value in record.items():
                if isinstance(value, (int, float)):
                    numeric_fields.append(key)

        numeric_fields = list(set(numeric_fields))

        stats = {
            "total_records": len(self._data),
            "fields": list(set().union(*(d.keys() for d in self._data))),
            "numeric_fields": numeric_fields
        }

        # Calculate means for numeric fields
        for field in numeric_fields:
            values = [record.get(field, 0) for record in self._data
                     if isinstance(record.get(field), (int, float))]
            if values:
                stats[f"{field}_mean"] = sum(values) / len(values)
                stats[f"{field}_min"] = min(values)
                stats[f"{field}_max"] = max(values)

        self._summary_stats = stats
        return self

    def add_chart(self, chart_type: str) -> 'DataReportBuilder':
        """Add a chart to the report."""
        if chart_type not in self._charts:
            self._charts.append(chart_type)
        return self

    def add_filter(self, field: str, condition: str, value: Any) -> 'DataReportBuilder':
        """Add a data filter."""
        if field not in self._filters:
            self._filters[field] = []
        self._filters[field].append({"condition": condition, "value": value})
        return self

    def set_export_format(self, format_type: str) -> 'DataReportBuilder':
        """Set export format."""
        self._export_format = format_type
        return self

    def include_metadata(self, include: bool = True) -> 'DataReportBuilder':
        """Include metadata in the report."""
        self._include_metadata = include
        return self

    def build(self) -> DataReport:
        """Build the final report."""
        if not self._title:
            raise ValueError("Report title is required")

        report = DataReport(
            title=self._title,
            data=self._data.copy(),
            summary_stats=self._summary_stats,
            charts=self._charts.copy() if self._charts else None,
            filters=self._filters.copy() if self._filters else None,
            export_format=self._export_format,
            include_metadata=self._include_metadata
        )

        return report

In [None]:
# Demo: Builder Pattern with sample sales data
print("=== Builder Pattern Demo ===")

# Sample sales data
sales_data = [
    {"product": "Laptop", "price": 999.99, "quantity": 2, "region": "North"},
    {"product": "Mouse", "price": 29.99, "quantity": 5, "region": "South"},
    {"product": "Keyboard", "price": 79.99, "quantity": 3, "region": "North"},
    {"product": "Monitor", "price": 299.99, "quantity": 1, "region": "East"},
    {"product": "Laptop", "price": 1199.99, "quantity": 1, "region": "West"}
]

customer_data = [
    {"name": "Alice Johnson", "age": 32, "purchases": 5, "total_spent": 1500.50},
    {"name": "Bob Wilson", "age": 28, "purchases": 3, "total_spent": 890.25},
    {"name": "Carol Davis", "age": 45, "purchases": 8, "total_spent": 2300.75}
]

# Create different types of reports using the builder

# 1. Simple sales report
builder = DataReportBuilder()
simple_report = (builder
                .set_title("Basic Sales Report")
                .add_data(sales_data)
                .set_export_format("csv")
                .build())

print("Simple Report:")
print(json.dumps(simple_report.to_dict(), indent=2))
print()

# 2. Comprehensive sales report with stats and charts
builder.reset()
comprehensive_report = (builder
                       .set_title("Comprehensive Sales Analysis")
                       .add_data(sales_data)
                       .calculate_summary_stats()
                       .add_chart("bar_chart")
                       .add_chart("pie_chart")
                       .add_filter("region", "equals", "North")
                       .add_filter("price", "greater_than", 100)
                       .set_export_format("pdf")
                       .include_metadata(True)
                       .build())

print("Comprehensive Report:")
print(json.dumps(comprehensive_report.to_dict(), indent=2))
print()

# 3. Customer analysis report
builder.reset()
customer_report = (builder
                  .set_title("Customer Demographics Report")
                  .add_data(customer_data)
                  .calculate_summary_stats()
                  .add_chart("histogram")
                  .add_filter("age", "between", [25, 40])
                  .include_metadata(True)
                  .build())

print("Customer Report:")
print(json.dumps(customer_report.to_dict(), indent=2))
print()

# Demonstrate error handling
try:
    builder.reset()
    invalid_report = builder.add_data(sales_data).build()  # Missing title
except ValueError as e:
    print(f"Expected error: {e}")

## 1.4 Decorator Pattern (Structural)

**Problem**: Add responsibilities to objects dynamically without altering their structure.

**Use Case**: Adding features to data sources, enhancing API responses, or adding logging/caching to functions.

**Real-world Example**: Data source that can be enhanced with caching, logging, and validation.

In [None]:
# Object-based Decorator Pattern
class DataSource(ABC):
    """Abstract data source interface."""

    @abstractmethod
    def get_data(self, query: str) -> List[Dict[str, Any]]:
        pass

class DatabaseDataSource(DataSource):
    """Concrete data source that simulates database queries."""

    def __init__(self, name: str = "ProductionDB"):
        self.name = name
        # Simulate database records
        self._data = {
            "users": [
                {"id": 1, "name": "Alice", "email": "alice@example.com", "age": 30},
                {"id": 2, "name": "Bob", "email": "bob@example.com", "age": 25},
                {"id": 3, "name": "Charlie", "email": "charlie@example.com", "age": 35}
            ],
            "products": [
                {"id": 1, "name": "Laptop", "price": 999.99, "category": "Electronics"},
                {"id": 2, "name": "Book", "price": 19.99, "category": "Education"},
                {"id": 3, "name": "Headphones", "price": 199.99, "category": "Electronics"}
            ]
        }

    def get_data(self, query: str) -> List[Dict[str, Any]]:
        """Simulate database query execution."""
        # Simple query simulation
        table = query.lower().strip()
        if table in self._data:
            # Simulate some processing time
            time.sleep(0.1)
            return self._data[table].copy()
        return []

class DataSourceDecorator(DataSource):
    """Base decorator class."""

    def __init__(self, data_source: DataSource):
        self._data_source = data_source

    def get_data(self, query: str) -> List[Dict[str, Any]]:
        return self._data_source.get_data(query)

class CachingDataSource(DataSourceDecorator):
    """Decorator that adds caching functionality."""

    def __init__(self, data_source: DataSource, cache_size: int = 100):
        super().__init__(data_source)
        self._cache: Dict[str, List[Dict[str, Any]]] = {}
        self._cache_size = cache_size
        self._cache_hits = 0
        self._cache_misses = 0

    def get_data(self, query: str) -> List[Dict[str, Any]]:
        if query in self._cache:
            self._cache_hits += 1
            print(f"Cache HIT for query: {query}")
            return self._cache[query].copy()

        self._cache_misses += 1
        print(f"Cache MISS for query: {query}")

        result = super().get_data(query)

        # Simple cache eviction if full
        if len(self._cache) >= self._cache_size:
            # Remove oldest entry (first key)
            oldest_key = next(iter(self._cache))
            del self._cache[oldest_key]

        self._cache[query] = result.copy()
        return result

    def get_cache_stats(self) -> Dict[str, int]:
        return {
            "hits": self._cache_hits,
            "misses": self._cache_misses,
            "hit_rate": self._cache_hits / (self._cache_hits + self._cache_misses) if (self._cache_hits + self._cache_misses) > 0 else 0,
            "cached_queries": len(self._cache)
        }

class LoggingDataSource(DataSourceDecorator):
    """Decorator that adds logging functionality."""

    def __init__(self, data_source: DataSource, log_level: str = "INFO"):
        super().__init__(data_source)
        self.log_level = log_level
        self.query_log: List[Dict[str, Any]] = []

    def get_data(self, query: str) -> List[Dict[str, Any]]:
        start_time = time.time()
        print(f"[{self.log_level}] Executing query: {query}")

        try:
            result = super().get_data(query)
            execution_time = time.time() - start_time

            log_entry = {
                "timestamp": datetime.now().isoformat(),
                "query": query,
                "execution_time_ms": round(execution_time * 1000, 2),
                "result_count": len(result),
                "status": "success"
            }

            self.query_log.append(log_entry)
            print(f"[{self.log_level}] Query completed in {log_entry['execution_time_ms']}ms, returned {len(result)} records")

            return result

        except Exception as e:
            execution_time = time.time() - start_time
            log_entry = {
                "timestamp": datetime.now().isoformat(),
                "query": query,
                "execution_time_ms": round(execution_time * 1000, 2),
                "error": str(e),
                "status": "error"
            }

            self.query_log.append(log_entry)
            print(f"[ERROR] Query failed: {e}")
            raise

    def get_query_log(self) -> List[Dict[str, Any]]:
        return self.query_log.copy()

class ValidationDataSource(DataSourceDecorator):
    """Decorator that adds data validation."""

    def __init__(self, data_source: DataSource, required_fields: Optional[List[str]] = None):
        super().__init__(data_source)
        self.required_fields = required_fields or []
        self.validation_errors: List[str] = []

    def get_data(self, query: str) -> List[Dict[str, Any]]:
        result = super().get_data(query)
        validated_result = []

        for i, record in enumerate(result):
            is_valid = True

            # Check required fields
            for field in self.required_fields:
                if field not in record or record[field] is None:
                    error_msg = f"Record {i}: Missing required field '{field}'"
                    self.validation_errors.append(error_msg)
                    print(f"[VALIDATION ERROR] {error_msg}")
                    is_valid = False

            if is_valid:
                validated_result.append(record)

        print(f"[VALIDATION] Validated {len(result)} records, {len(validated_result)} passed validation")
        return validated_result

    def get_validation_errors(self) -> List[str]:
        return self.validation_errors.copy()

In [None]:
# Function-based Decorator Pattern
def timing_decorator(func: Callable) -> Callable:
    """Decorator that measures function execution time."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        execution_time = time.time() - start_time
        print(f"Function '{func.__name__}' executed in {execution_time*1000:.2f}ms")
        return result
    return wrapper

def retry_decorator(max_attempts: int = 3, delay: float = 0.1):
    """Decorator that retries function execution on failure."""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    if attempt < max_attempts - 1:
                        print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
                        time.sleep(delay)
                    else:
                        print(f"All {max_attempts} attempts failed.")

            raise last_exception
        return wrapper
    return decorator

# Example functions using decorators
@timing_decorator
@retry_decorator(max_attempts=2, delay=0.05)
def unreliable_data_fetch(dataset_name: str, fail_rate: float = 0.3) -> List[Dict[str, Any]]:
    """Simulates an unreliable data fetching operation."""
    if random.random() < fail_rate:
        raise ConnectionError(f"Failed to fetch {dataset_name}")

    # Simulate data fetching
    time.sleep(0.05)  # Simulate network delay
    return [
        {"id": i, "dataset": dataset_name, "value": random.randint(1, 100)}
        for i in range(3)
    ]

In [None]:
# Demo: Decorator Pattern
print("=== Object-based Decorator Pattern Demo ===")

# Create base data source
db_source = DatabaseDataSource("SalesDB")

# Wrap with multiple decorators
enhanced_source = CachingDataSource(
    LoggingDataSource(
        ValidationDataSource(db_source, required_fields=["id", "name"]),
        log_level="DEBUG"
    ),
    cache_size=10
)

print("First query (cache miss):")
users1 = enhanced_source.get_data("users")
print(f"Retrieved {len(users1)} users")
print()

print("Second query (cache hit):")
users2 = enhanced_source.get_data("users")
print(f"Retrieved {len(users2)} users")
print()

print("Query products:")
products = enhanced_source.get_data("products")
print(f"Retrieved {len(products)} products")
print()

# Check cache statistics
if hasattr(enhanced_source, 'get_cache_stats'):
    cache_stats = enhanced_source.get_cache_stats()
    print(f"Cache statistics: {cache_stats}")

print("\n=== Function-based Decorator Pattern Demo ===")

# Test function decorators with unreliable data fetching
datasets = ["sensor_data", "user_metrics", "error_logs"]

for dataset in datasets:
    try:
        print(f"\nFetching {dataset}:")
        data = unreliable_data_fetch(dataset, fail_rate=0.4)
        print(f"Successfully fetched {len(data)} records")
        print(f"Sample: {data[0] if data else 'No data'}")
    except Exception as e:
        print(f"Failed to fetch {dataset}: {e}")

## 1.5 Strategy Pattern (Behavioral)

**Problem**: Define a family of algorithms, encapsulate each one, and make them interchangeable at runtime.

**Use Case**: Different sorting algorithms, payment methods, or data processing strategies.

**Real-world Example**: Different algorithms for processing and analyzing customer data based on business requirements.

In [None]:
# Strategy Pattern for Customer Analysis
class CustomerAnalysisStrategy(ABC):
    """Abstract strategy for customer analysis."""

    @abstractmethod
    def analyze(self, customers: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Analyze customer data and return insights."""
        pass

    @abstractmethod
    def get_strategy_name(self) -> str:
        """Return the name of this strategy."""
        pass

class SegmentationStrategy(CustomerAnalysisStrategy):
    """Strategy for customer segmentation analysis."""

    def analyze(self, customers: List[Dict[str, Any]]) -> Dict[str, Any]:
        segments = {"high_value": [], "medium_value": [], "low_value": []}

        # Calculate spending thresholds
        total_spends = [c.get("total_spent", 0) for c in customers]
        if not total_spends:
            return {"error": "No spending data available"}

        avg_spend = sum(total_spends) / len(total_spends)
        high_threshold = avg_spend * 1.5
        low_threshold = avg_spend * 0.5

        for customer in customers:
            spend = customer.get("total_spent", 0)
            if spend >= high_threshold:
                segments["high_value"].append(customer)
            elif spend >= low_threshold:
                segments["medium_value"].append(customer)
            else:
                segments["low_value"].append(customer)

        return {
            "strategy": self.get_strategy_name(),
            "total_customers": len(customers),
            "thresholds": {"high": high_threshold, "low": low_threshold},
            "segments": {
                "high_value": len(segments["high_value"]),
                "medium_value": len(segments["medium_value"]),
                "low_value": len(segments["low_value"])
            },
            "segment_percentages": {
                "high_value": round(len(segments["high_value"]) / len(customers) * 100, 1),
                "medium_value": round(len(segments["medium_value"]) / len(customers) * 100, 1),
                "low_value": round(len(segments["low_value"]) / len(customers) * 100, 1)
            }
        }

    def get_strategy_name(self) -> str:
        return "Customer Segmentation"

class ChurnRiskStrategy(CustomerAnalysisStrategy):
    """Strategy for churn risk analysis."""

    def analyze(self, customers: List[Dict[str, Any]]) -> Dict[str, Any]:
        risk_levels = {"high_risk": [], "medium_risk": [], "low_risk": []}

        for customer in customers:
            # Simple churn risk calculation based on recency and frequency
            days_since_purchase = customer.get("days_since_last_purchase", 0)
            purchase_frequency = customer.get("purchases", 0)

            # Risk scoring logic
            risk_score = 0
            if days_since_purchase > 90:
                risk_score += 3
            elif days_since_purchase > 30:
                risk_score += 1

            if purchase_frequency < 2:
                risk_score += 2
            elif purchase_frequency < 5:
                risk_score += 1

            # Categorize risk
            if risk_score >= 4:
                risk_levels["high_risk"].append(customer)
            elif risk_score >= 2:
                risk_levels["medium_risk"].append(customer)
            else:
                risk_levels["low_risk"].append(customer)

        return {
            "strategy": self.get_strategy_name(),
            "total_customers": len(customers),
            "risk_distribution": {
                "high_risk": len(risk_levels["high_risk"]),
                "medium_risk": len(risk_levels["medium_risk"]),
                "low_risk": len(risk_levels["low_risk"])
            },
            "recommendations": {
                "high_risk": "Immediate retention campaign",
                "medium_risk": "Engagement improvement program",
                "low_risk": "Continue current service level"
            }
        }

    def get_strategy_name(self) -> str:
        return "Churn Risk Analysis"

class LifetimeValueStrategy(CustomerAnalysisStrategy):
    """Strategy for customer lifetime value analysis."""

    def analyze(self, customers: List[Dict[str, Any]]) -> Dict[str, Any]:
        clv_data = []

        for customer in customers:
            # Simple CLV calculation: avg_order_value * purchase_frequency * customer_lifespan
            total_spent = customer.get("total_spent", 0)
            purchases = customer.get("purchases", 1)
            avg_order_value = total_spent / purchases if purchases > 0 else 0

            # Estimate annual purchase frequency
            days_active = customer.get("days_active", 365)
            annual_frequency = (purchases / days_active) * 365 if days_active > 0 else 0

            # Estimate customer lifespan (simplified)
            estimated_lifespan = 3  # years

            clv = avg_order_value * annual_frequency * estimated_lifespan

            clv_data.append({
                "customer": customer.get("name", "Unknown"),
                "clv": round(clv, 2),
                "avg_order_value": round(avg_order_value, 2),
                "annual_frequency": round(annual_frequency, 2)
            })

        # Calculate statistics
        clv_values = [item["clv"] for item in clv_data]
        total_clv = sum(clv_values)
        avg_clv = total_clv / len(clv_values) if clv_values else 0

        # Sort by CLV
        clv_data.sort(key=lambda x: x["clv"], reverse=True)

        return {
            "strategy": self.get_strategy_name(),
            "total_customers": len(customers),
            "total_clv": round(total_clv, 2),
            "average_clv": round(avg_clv, 2),
            "top_customers": clv_data[:3],  # Top 3 by CLV
            "clv_distribution": {
                "min": round(min(clv_values), 2) if clv_values else 0,
                "max": round(max(clv_values), 2) if clv_values else 0
            }
        }

    def get_strategy_name(self) -> str:
        return "Customer Lifetime Value"

class CustomerAnalyzer:
    """Context class that uses different analysis strategies."""

    def __init__(self, strategy: CustomerAnalysisStrategy):
        self._strategy = strategy

    def set_strategy(self, strategy: CustomerAnalysisStrategy):
        """Change the analysis strategy at runtime."""
        self._strategy = strategy

    def analyze_customers(self, customers: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Perform customer analysis using the current strategy."""
        return self._strategy.analyze(customers)

    def get_current_strategy(self) -> str:
        """Get the name of the current strategy."""
        return self._strategy.get_strategy_name()

# Function-based strategy alternative
def demographic_analysis(customers: List[Dict[str, Any]]) -> Dict[str, Any]:
    """Function-based strategy for demographic analysis."""
    age_groups = {"18-25": 0, "26-35": 0, "36-45": 0, "46+": 0}
    total_customers = len(customers)

    for customer in customers:
        age = customer.get("age", 0)
        if 18 <= age <= 25:
            age_groups["18-25"] += 1
        elif 26 <= age <= 35:
            age_groups["26-35"] += 1
        elif 36 <= age <= 45:
            age_groups["36-45"] += 1
        elif age > 45:
            age_groups["46+"] += 1

    return {
        "strategy": "Demographic Analysis",
        "total_customers": total_customers,
        "age_distribution": age_groups,
        "age_percentages": {
            group: round(count / total_customers * 100, 1) if total_customers > 0 else 0
            for group, count in age_groups.items()
        }
    }

# Strategy registry for dynamic strategy selection
ANALYSIS_STRATEGIES = {
    "segmentation": SegmentationStrategy,
    "churn_risk": ChurnRiskStrategy,
    "lifetime_value": LifetimeValueStrategy
}

FUNCTION_STRATEGIES = {
    "demographic": demographic_analysis
}

def create_analyzer(strategy_name: str) -> CustomerAnalyzer:
    """Factory function to create analyzer with specified strategy."""
    if strategy_name not in ANALYSIS_STRATEGIES:
        raise ValueError(f"Unknown strategy: {strategy_name}. Available: {list(ANALYSIS_STRATEGIES.keys())}")

    strategy_class = ANALYSIS_STRATEGIES[strategy_name]
    return CustomerAnalyzer(strategy_class())

In [None]:
# Demo: Strategy Pattern
print("=== Strategy Pattern Demo ===")

# Sample customer data with more attributes for analysis
customer_data = [
    {
        "name": "Alice Johnson",
        "age": 32,
        "purchases": 8,
        "total_spent": 2400.50,
        "days_since_last_purchase": 15,
        "days_active": 545
    },
    {
        "name": "Bob Wilson",
        "age": 28,
        "purchases": 3,
        "total_spent": 450.25,
        "days_since_last_purchase": 120,
        "days_active": 180
    },
    {
        "name": "Carol Davis",
        "age": 45,
        "purchases": 15,
        "total_spent": 3200.75,
        "days_since_last_purchase": 5,
        "days_active": 820
    },
    {
        "name": "David Brown",
        "age": 23,
        "purchases": 1,
        "total_spent": 89.99,
        "days_since_last_purchase": 200,
        "days_active": 30
    },
    {
        "name": "Eva Martinez",
        "age": 38,
        "purchases": 12,
        "total_spent": 1850.00,
        "days_since_last_purchase": 25,
        "days_active": 400
    }
]

print(f"Analyzing {len(customer_data)} customers...\n")

# Test each class-based strategy
strategies = ["segmentation", "churn_risk", "lifetime_value"]

for strategy_name in strategies:
    print(f"--- {strategy_name.upper()} ANALYSIS ---")

    analyzer = create_analyzer(strategy_name)
    result = analyzer.analyze_customers(customer_data)

    print(json.dumps(result, indent=2))
    print()

# Test function-based strategy
print("--- DEMOGRAPHIC ANALYSIS (Function-based) ---")
demographic_result = demographic_analysis(customer_data)
print(json.dumps(demographic_result, indent=2))
print()

# Demonstrate strategy switching at runtime
print("--- RUNTIME STRATEGY SWITCHING ---")
analyzer = create_analyzer("segmentation")
print(f"Current strategy: {analyzer.get_current_strategy()}")

# Switch to churn risk analysis
analyzer.set_strategy(ChurnRiskStrategy())
print(f"Switched to: {analyzer.get_current_strategy()}")

churn_result = analyzer.analyze_customers(customer_data)
print(f"Churn analysis result: {churn_result['risk_distribution']}")

# Switch to lifetime value analysis
analyzer.set_strategy(LifetimeValueStrategy())
print(f"Switched to: {analyzer.get_current_strategy()}")

clv_result = analyzer.analyze_customers(customer_data)
print(f"CLV analysis - Average CLV: ${clv_result['average_clv']}")
print(f"Top customer: {clv_result['top_customers'][0]['customer']} with CLV: ${clv_result['top_customers'][0]['clv']}")

## 1.6 Observer Pattern (Behavioral)

**Problem**: Define a one-to-many dependency between objects so that when one object changes state, all dependents are notified automatically.

**Use Case**: Event systems, model-view architectures, or notification systems.

**Real-world Example**: Stock price monitoring system where multiple components need to react to price changes.

In [None]:
# Observer Pattern for Stock Price Monitoring
class Observer(ABC):
    """Abstract observer interface."""

    @abstractmethod
    def update(self, subject: 'Subject', data: Any) -> None:
        """Called when the subject's state changes."""
        pass

class Subject(ABC):
    """Abstract subject interface."""

    def __init__(self):
        self._observers: List[Observer] = []

    def attach(self, observer: Observer) -> None:
        """Add an observer."""
        if observer not in self._observers:
            self._observers.append(observer)
            print(f"Observer {observer.__class__.__name__} attached")

    def detach(self, observer: Observer) -> None:
        """Remove an observer."""
        if observer in self._observers:
            self._observers.remove(observer)
            print(f"Observer {observer.__class__.__name__} detached")

    def notify(self, data: Any = None) -> None:
        """Notify all observers of a state change."""
        print(f"Notifying {len(self._observers)} observers...")
        for observer in self._observers.copy():  # Copy to avoid issues if list is modified during iteration
            try:
                observer.update(self, data)
            except Exception as e:
                print(f"Error notifying observer {observer.__class__.__name__}: {e}")

    def get_observer_count(self) -> int:
        """Get the number of attached observers."""
        return len(self._observers)

@dataclass
class StockPrice:
    """Data class for stock price information."""
    symbol: str
    price: float
    change: float
    change_percent: float
    volume: int
    timestamp: str

class StockPriceMonitor(Subject):
    """Subject that monitors stock prices and notifies observers."""

    def __init__(self, symbol: str, initial_price: float = 100.0):
        super().__init__()
        self.symbol = symbol
        self._current_price = initial_price
        self._previous_price = initial_price
        self._price_history: List[StockPrice] = []

    def update_price(self, new_price: float, volume: int = 1000) -> None:
        """Update the stock price and notify observers."""
        self._previous_price = self._current_price
        self._current_price = new_price

        change = new_price - self._previous_price
        change_percent = (change / self._previous_price) * 100 if self._previous_price != 0 else 0

        stock_data = StockPrice(
            symbol=self.symbol,
            price=new_price,
            change=round(change, 2),
            change_percent=round(change_percent, 2),
            volume=volume,
            timestamp=datetime.now().isoformat()
        )

        self._price_history.append(stock_data)
        print(f"\n📈 {self.symbol} price updated: ${new_price:.2f} ({change:+.2f}, {change_percent:+.2f}%)")

        # Notify observers
        self.notify(stock_data)

    def get_current_price(self) -> float:
        return self._current_price

    def get_price_history(self) -> List[StockPrice]:
        return self._price_history.copy()

class PriceAlertObserver(Observer):
    """Observer that triggers price alerts."""

    def __init__(self, name: str, high_threshold: float, low_threshold: float):
        self.name = name
        self.high_threshold = high_threshold
        self.low_threshold = low_threshold
        self.alerts_sent = 0

    def update(self, subject: Subject, data: StockPrice) -> None:
        """Check price thresholds and send alerts."""
        if data.price >= self.high_threshold:
            self._send_alert("HIGH", data, f"Price ${data.price:.2f} >= ${self.high_threshold:.2f}")
        elif data.price <= self.low_threshold:
            self._send_alert("LOW", data, f"Price ${data.price:.2f} <= ${self.low_threshold:.2f}")

    def _send_alert(self, alert_type: str, data: StockPrice, message: str) -> None:
        self.alerts_sent += 1
        print(f"🚨 [{self.name}] {alert_type} ALERT for {data.symbol}: {message}")

class TradingBotObserver(Observer):
    """Observer that simulates automated trading decisions."""

    def __init__(self, name: str, buy_threshold_percent: float = -5.0, sell_threshold_percent: float = 10.0):
        self.name = name
        self.buy_threshold = buy_threshold_percent
        self.sell_threshold = sell_threshold_percent
        self.positions: Dict[str, int] = {}  # symbol -> quantity
        self.trades: List[Dict[str, Any]] = []

    def update(self, subject: Subject, data: StockPrice) -> None:
        """Make trading decisions based on price changes."""
        current_position = self.positions.get(data.symbol, 0)

        if data.change_percent <= self.buy_threshold and current_position <= 0:
            # Buy signal
            quantity = 100
            self._execute_trade("BUY", data, quantity)
        elif data.change_percent >= self.sell_threshold and current_position > 0:
            # Sell signal
            quantity = current_position
            self._execute_trade("SELL", data, quantity)

    def _execute_trade(self, action: str, data: StockPrice, quantity: int) -> None:
        """Execute a trade and update position."""
        if action == "BUY":
            self.positions[data.symbol] = self.positions.get(data.symbol, 0) + quantity
        elif action == "SELL":
            self.positions[data.symbol] = max(0, self.positions.get(data.symbol, 0) - quantity)

        trade = {
            "timestamp": data.timestamp,
            "action": action,
            "symbol": data.symbol,
            "quantity": quantity,
            "price": data.price,
            "total_value": quantity * data.price
        }

        self.trades.append(trade)
        print(f"🤖 [{self.name}] {action} {quantity} shares of {data.symbol} at ${data.price:.2f}")

    def get_portfolio_summary(self) -> Dict[str, Any]:
        """Get current portfolio summary."""
        return {
            "positions": self.positions.copy(),
            "total_trades": len(self.trades),
            "recent_trades": self.trades[-3:] if self.trades else []
        }

class AnalyticsObserver(Observer):
    """Observer that collects analytics and statistics."""

    def __init__(self, name: str):
        self.name = name
        self.price_updates = 0
        self.volatility_data: List[float] = []
        self.volume_data: List[int] = []

    def update(self, subject: Subject, data: StockPrice) -> None:
        """Collect analytics data."""
        self.price_updates += 1
        self.volatility_data.append(abs(data.change_percent))
        self.volume_data.append(data.volume)

        # Print analytics every 3 updates
        if self.price_updates % 3 == 0:
            self._print_analytics(data.symbol)

    def _print_analytics(self, symbol: str) -> None:
        """Print current analytics."""
        if not self.volatility_data:
            return

        avg_volatility = sum(self.volatility_data) / len(self.volatility_data)
        avg_volume = sum(self.volume_data) / len(self.volume_data)

        print(f"📊 [{self.name}] Analytics for {symbol}:")
        print(f"    • Updates processed: {self.price_updates}")
        print(f"    • Average volatility: {avg_volatility:.2f}%")
        print(f"    • Average volume: {avg_volume:,.0f}")

    def get_analytics_summary(self) -> Dict[str, Any]:
        """Get complete analytics summary."""
        if not self.volatility_data:
            return {"error": "No data available"}

        return {
            "total_updates": self.price_updates,
            "volatility": {
                "average": round(sum(self.volatility_data) / len(self.volatility_data), 2),
                "max": round(max(self.volatility_data), 2),
                "min": round(min(self.volatility_data), 2)
            },
            "volume": {
                "average": round(sum(self.volume_data) / len(self.volume_data), 0),
                "max": max(self.volume_data),
                "min": min(self.volume_data)
            }
        }

In [None]:
# Demo: Observer Pattern
print("=== Observer Pattern Demo ===")

# Create stock price monitor
apple_monitor = StockPriceMonitor("AAPL", initial_price=150.0)

# Create observers
price_alert = PriceAlertObserver("PriceAlert", high_threshold=160.0, low_threshold=140.0)
trading_bot = TradingBotObserver("AlgoBot", buy_threshold_percent=-3.0, sell_threshold_percent=5.0)
analytics = AnalyticsObserver("MarketAnalytics")

# Attach observers
print("Attaching observers...")
apple_monitor.attach(price_alert)
apple_monitor.attach(trading_bot)
apple_monitor.attach(analytics)

print(f"\nMonitoring started for {apple_monitor.symbol} with {apple_monitor.get_observer_count()} observers\n")

# Simulate price changes
price_changes = [
    (145.50, 1200),  # Drop triggers buy
    (148.20, 1500),  # Slight recovery
    (152.80, 2000),  # Rise
    (158.00, 1800),  # Continued rise triggers sell
    (155.30, 1600),  # Slight drop
    (161.50, 2200),  # High threshold breach
    (138.75, 2500),  # Significant drop triggers low alert
]

for price, volume in price_changes:
    apple_monitor.update_price(price, volume)
    time.sleep(0.1)  # Small delay for demonstration

print("\n" + "="*60)
print("FINAL SUMMARY")
print("="*60)

# Print final statistics
print(f"\n📈 Price History for {apple_monitor.symbol}:")
history = apple_monitor.get_price_history()
for i, price_data in enumerate(history[-3:], 1):  # Last 3 updates
    print(f"  {i}. ${price_data.price:.2f} ({price_data.change:+.2f}, {price_data.change_percent:+.2f}%)")

print(f"\n🚨 Price Alerts Summary:")
print(f"  • Total alerts sent: {price_alert.alerts_sent}")

print(f"\n🤖 Trading Bot Summary:")
portfolio = trading_bot.get_portfolio_summary()
print(f"  • Current positions: {portfolio['positions']}")
print(f"  • Total trades executed: {portfolio['total_trades']}")
if portfolio['recent_trades']:
    print(f"  • Last trade: {portfolio['recent_trades'][-1]['action']} {portfolio['recent_trades'][-1]['quantity']} @ ${portfolio['recent_trades'][-1]['price']:.2f}")

print(f"\n📊 Analytics Summary:")
analytics_summary = analytics.get_analytics_summary()
if 'error' not in analytics_summary:
    print(f"  • Total updates processed: {analytics_summary['total_updates']}")
    print(f"  • Average volatility: {analytics_summary['volatility']['average']}%")
    print(f"  • Max volatility: {analytics_summary['volatility']['max']}%")
    print(f"  • Average volume: {analytics_summary['volume']['average']:,.0f}")

# Demonstrate observer detachment
print(f"\n🔌 Detaching trading bot...")
apple_monitor.detach(trading_bot)

print(f"\nFinal update with fewer observers:")
apple_monitor.update_price(142.00, 1900)

print(f"\n✅ Demo completed. Final observer count: {apple_monitor.get_observer_count()}")

## Summary

This notebook demonstrated 6 essential design patterns in Python:

### 1. **Singleton Pattern**
- **Use Case**: Database connections, configuration management
- **Key Learning**: Python's module-level variables often provide simpler alternatives
- **Demo**: Database connection manager with shared state

### 2. **Factory Pattern**
- **Use Case**: Creating objects based on runtime parameters
- **Key Learning**: Decouples object creation from usage
- **Demo**: Data processor factory for different file formats

### 3. **Builder Pattern**
- **Use Case**: Complex object construction with optional components
- **Key Learning**: Fluent interfaces improve readability
- **Demo**: Data report builder with configurable components

### 4. **Decorator Pattern**
- **Use Case**: Adding functionality without modifying original objects
- **Key Learning**: Python's decorator syntax provides functional alternative
- **Demo**: Enhanced data sources with caching, logging, and validation

### 5. **Strategy Pattern**
- **Use Case**: Interchangeable algorithms or behaviors
- **Key Learning**: Functions can serve as strategies in Python
- **Demo**: Customer analysis with multiple analysis strategies

### 6. **Observer Pattern**
- **Use Case**: Event-driven systems and notifications
- **Key Learning**: Enables loose coupling between subjects and observers
- **Demo**: Stock price monitoring with multiple reactive components

### Key Takeaways
- **Pythonic Implementation**: Leverage Python's features (functions as first-class objects, duck typing, decorators)
- **Real-world Application**: Each pattern addresses common software design challenges
- **Trade-offs**: Consider simplicity vs. flexibility when applying patterns
- **Modern Python**: Use dataclasses, type hints, and modern syntax for cleaner implementations