<a href="https://colab.research.google.com/github/jeremiahoclark/Books/blob/main/07_api_web_development_patterns.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# API and Web Development Patterns

This notebook covers essential patterns for building web applications and APIs in Python, including MVC architecture, application factories, RESTful API design, and error handling patterns.

## 1. Model-View-Controller (MVC) Pattern

The MVC pattern separates concerns in web applications: Models (data), Views (presentation), Controllers (business logic).

In [None]:
# MVC Pattern Implementation
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime
import json

# MODEL LAYER
@dataclass
class User:
    """User model representing user data"""
    id: int
    username: str
    email: str
    created_at: datetime
    is_active: bool = True

    def to_dict(self) -> Dict[str, Any]:
        """Convert user to dictionary for serialization"""
        return {
            'id': self.id,
            'username': self.username,
            'email': self.email,
            'created_at': self.created_at.isoformat(),
            'is_active': self.is_active
        }

class UserRepository:
    """Data access layer for users (Model layer)"""

    def __init__(self):
        self._users: Dict[int, User] = {}
        self._next_id = 1

    def create(self, username: str, email: str) -> User:
        """Create a new user"""
        user = User(
            id=self._next_id,
            username=username,
            email=email,
            created_at=datetime.now()
        )
        self._users[self._next_id] = user
        self._next_id += 1
        return user

    def get_by_id(self, user_id: int) -> Optional[User]:
        """Get user by ID"""
        return self._users.get(user_id)

    def get_all(self) -> List[User]:
        """Get all users"""
        return list(self._users.values())

    def update(self, user_id: int, **kwargs) -> Optional[User]:
        """Update user fields"""
        user = self._users.get(user_id)
        if user:
            for key, value in kwargs.items():
                if hasattr(user, key):
                    setattr(user, key, value)
        return user

    def delete(self, user_id: int) -> bool:
        """Delete user by ID"""
        if user_id in self._users:
            del self._users[user_id]
            return True
        return False

# Demonstrate the model layer
print("=== Model Layer Demo ===")
user_repo = UserRepository()

# Create users
user1 = user_repo.create("alice", "alice@example.com")
user2 = user_repo.create("bob", "bob@example.com")

print(f"Created user: {user1.username} ({user1.id})")
print(f"Created user: {user2.username} ({user2.id})")
print(f"Total users: {len(user_repo.get_all())}")

In [None]:
# CONTROLLER LAYER (Business Logic)
class UserController:
    """Controller handling business logic for users"""

    def __init__(self, user_repository: UserRepository):
        self.user_repo = user_repository

    def create_user(self, username: str, email: str) -> Dict[str, Any]:
        """Create user with validation"""
        # Business logic validation
        if not username or len(username) < 3:
            return {
                'success': False,
                'error': 'Username must be at least 3 characters long'
            }

        if not email or '@' not in email:
            return {
                'success': False,
                'error': 'Valid email is required'
            }

        # Check if username already exists
        existing_users = self.user_repo.get_all()
        if any(u.username == username for u in existing_users):
            return {
                'success': False,
                'error': 'Username already exists'
            }

        # Create user if validation passes
        user = self.user_repo.create(username, email)
        return {
            'success': True,
            'user': user.to_dict()
        }

    def get_user(self, user_id: int) -> Dict[str, Any]:
        """Get user by ID"""
        user = self.user_repo.get_by_id(user_id)
        if user:
            return {
                'success': True,
                'user': user.to_dict()
            }
        return {
            'success': False,
            'error': 'User not found'
        }

    def list_users(self) -> Dict[str, Any]:
        """Get all users"""
        users = self.user_repo.get_all()
        return {
            'success': True,
            'users': [user.to_dict() for user in users],
            'count': len(users)
        }

    def update_user(self, user_id: int, **kwargs) -> Dict[str, Any]:
        """Update user with validation"""
        user = self.user_repo.get_by_id(user_id)
        if not user:
            return {
                'success': False,
                'error': 'User not found'
            }

        # Validate updates
        if 'username' in kwargs and len(kwargs['username']) < 3:
            return {
                'success': False,
                'error': 'Username must be at least 3 characters long'
            }

        updated_user = self.user_repo.update(user_id, **kwargs)
        return {
            'success': True,
            'user': updated_user.to_dict()
        }

    def delete_user(self, user_id: int) -> Dict[str, Any]:
        """Delete user"""
        success = self.user_repo.delete(user_id)
        if success:
            return {'success': True, 'message': 'User deleted successfully'}
        return {'success': False, 'error': 'User not found'}

# Demonstrate the controller layer
print("\n=== Controller Layer Demo ===")
user_controller = UserController(user_repo)

# Test validation
result = user_controller.create_user("ab", "invalid-email")
print(f"Invalid user creation: {result}")

# Valid user creation
result = user_controller.create_user("charlie", "charlie@example.com")
print(f"Valid user creation: {result['success']}")

# List users
result = user_controller.list_users()
print(f"Total users: {result['count']}")

In [None]:
# VIEW LAYER (Presentation)
class UserView:
    """View layer for formatting user data"""

    @staticmethod
    def render_user_list(users_data: Dict[str, Any]) -> str:
        """Render users list as formatted text"""
        if not users_data['success']:
            return f"Error: {users_data.get('error', 'Unknown error')}"

        users = users_data['users']
        if not users:
            return "No users found."

        output = ["Users List:"]
        output.append("-" * 50)
        for user in users:
            status = "Active" if user['is_active'] else "Inactive"
            output.append(f"ID: {user['id']} | {user['username']} ({user['email']}) - {status}")
        output.append(f"\nTotal: {len(users)} users")
        return "\n".join(output)

    @staticmethod
    def render_user_detail(user_data: Dict[str, Any]) -> str:
        """Render single user details"""
        if not user_data['success']:
            return f"Error: {user_data.get('error', 'Unknown error')}"

        user = user_data['user']
        return f"""
User Details:
============
ID: {user['id']}
Username: {user['username']}
Email: {user['email']}
Status: {'Active' if user['is_active'] else 'Inactive'}
Created: {user['created_at']}
""".strip()

    @staticmethod
    def render_json(data: Dict[str, Any]) -> str:
        """Render data as JSON (for API responses)"""
        return json.dumps(data, indent=2, default=str)

    @staticmethod
    def render_operation_result(result: Dict[str, Any]) -> str:
        """Render operation result message"""
        if result['success']:
            message = result.get('message', 'Operation completed successfully')
            return f"✓ {message}"
        else:
            error = result.get('error', 'Unknown error')
            return f"✗ Error: {error}"

# Demonstrate the view layer
print("\n=== View Layer Demo ===")
user_view = UserView()

# Render users list
users_data = user_controller.list_users()
print(user_view.render_user_list(users_data))

# Render single user
user_data = user_controller.get_user(1)
print("\n" + user_view.render_user_detail(user_data))

# Render as JSON
print("\nJSON representation:")
print(user_view.render_json(user_data))

In [None]:
# COMPLETE MVC APPLICATION SIMULATION
class WebApplication:
    """Simulates a web application using MVC pattern"""

    def __init__(self):
        # Initialize layers
        self.user_repo = UserRepository()
        self.user_controller = UserController(self.user_repo)
        self.user_view = UserView()

    def handle_request(self, method: str, path: str, data: Dict[str, Any] = None) -> str:
        """Simulate handling web requests"""
        try:
            if path == "/users" and method == "GET":
                # List users
                result = self.user_controller.list_users()
                return self.user_view.render_user_list(result)

            elif path == "/users" and method == "POST":
                # Create user
                if not data:
                    return "✗ Error: No data provided"
                result = self.user_controller.create_user(
                    data.get('username', ''),
                    data.get('email', '')
                )
                return self.user_view.render_operation_result(result)

            elif path.startswith("/users/") and method == "GET":
                # Get single user
                user_id = int(path.split("/")[-1])
                result = self.user_controller.get_user(user_id)
                return self.user_view.render_user_detail(result)

            elif path.startswith("/users/") and method == "PUT":
                # Update user
                user_id = int(path.split("/")[-1])
                if not data:
                    return "✗ Error: No data provided"
                result = self.user_controller.update_user(user_id, **data)
                return self.user_view.render_operation_result(result)

            elif path.startswith("/users/") and method == "DELETE":
                # Delete user
                user_id = int(path.split("/")[-1])
                result = self.user_controller.delete_user(user_id)
                return self.user_view.render_operation_result(result)

            else:
                return "✗ Error: 404 Not Found"

        except Exception as e:
            return f"✗ Error: 500 Internal Server Error - {str(e)}"

# Demonstrate complete MVC application
print("\n=== Complete MVC Application Demo ===")
app = WebApplication()

# Simulate various HTTP requests
print("1. Create users:")
print(app.handle_request("POST", "/users", {"username": "alice", "email": "alice@example.com"}))
print(app.handle_request("POST", "/users", {"username": "bob", "email": "bob@example.com"}))

print("\n2. List all users:")
print(app.handle_request("GET", "/users"))

print("\n3. Get specific user:")
print(app.handle_request("GET", "/users/1"))

print("\n4. Update user:")
print(app.handle_request("PUT", "/users/1", {"email": "alice.updated@example.com"}))

print("\n5. Try to create invalid user:")
print(app.handle_request("POST", "/users", {"username": "ab", "email": "invalid"}))

print("\n6. Delete user:")
print(app.handle_request("DELETE", "/users/2"))

print("\n7. Final user list:")
print(app.handle_request("GET", "/users"))

## 2. Application Factory and Blueprints Pattern

The Application Factory pattern creates Flask apps in a function, enabling different configurations and modular organization with blueprints.

In [None]:
# Application Factory Pattern Simulation
from typing import Dict, Any, Callable, List
from dataclasses import dataclass, field

@dataclass
class Config:
    """Configuration class"""
    DATABASE_URL: str = "sqlite:///default.db"
    DEBUG: bool = False
    SECRET_KEY: str = "default-secret-key"
    API_PREFIX: str = "/api/v1"

@dataclass
class TestConfig(Config):
    """Test configuration"""
    DATABASE_URL: str = "sqlite:///:memory:"
    DEBUG: bool = True
    SECRET_KEY: str = "test-secret-key"
    TESTING: bool = True

@dataclass
class Route:
    """Simple route representation"""
    path: str
    method: str
    handler: Callable
    blueprint: str = "main"

class Blueprint:
    """Simplified blueprint implementation"""

    def __init__(self, name: str, url_prefix: str = ""):
        self.name = name
        self.url_prefix = url_prefix
        self.routes: List[Route] = []
        self.before_request_handlers: List[Callable] = []

    def route(self, path: str, methods: List[str] = None):
        """Decorator to register routes"""
        if methods is None:
            methods = ["GET"]

        def decorator(func):
            for method in methods:
                route = Route(
                    path=path,
                    method=method,
                    handler=func,
                    blueprint=self.name
                )
                self.routes.append(route)
            return func
        return decorator

    def before_request(self, func):
        """Register before request handler"""
        self.before_request_handlers.append(func)
        return func

class FlaskApp:
    """Simplified Flask app simulation"""

    def __init__(self, name: str):
        self.name = name
        self.config: Dict[str, Any] = {}
        self.routes: List[Route] = []
        self.blueprints: Dict[str, Blueprint] = {}
        self.extensions: Dict[str, Any] = {}
        self.before_request_handlers: List[Callable] = []

    def configure(self, config_object):
        """Load configuration from object"""
        for key in dir(config_object):
            if not key.startswith('_'):
                self.config[key] = getattr(config_object, key)

    def register_blueprint(self, blueprint: Blueprint):
        """Register a blueprint with the app"""
        self.blueprints[blueprint.name] = blueprint

        # Add blueprint routes to app
        for route in blueprint.routes:
            full_path = blueprint.url_prefix + route.path
            app_route = Route(
                path=full_path,
                method=route.method,
                handler=route.handler,
                blueprint=blueprint.name
            )
            self.routes.append(app_route)

    def route(self, path: str, methods: List[str] = None):
        """Decorator to register routes directly on app"""
        if methods is None:
            methods = ["GET"]

        def decorator(func):
            for method in methods:
                route = Route(
                    path=path,
                    method=method,
                    handler=func
                )
                self.routes.append(route)
            return func
        return decorator

    def handle_request(self, method: str, path: str, data: Dict[str, Any] = None) -> str:
        """Handle incoming request"""
        # Find matching route
        for route in self.routes:
            if route.path == path and route.method == method:
                try:
                    # Execute before request handlers
                    for handler in self.before_request_handlers:
                        handler()

                    # Execute blueprint-specific before request handlers
                    if route.blueprint in self.blueprints:
                        blueprint = self.blueprints[route.blueprint]
                        for handler in blueprint.before_request_handlers:
                            handler()

                    # Execute route handler
                    if data:
                        return route.handler(data)
                    else:
                        return route.handler()
                except Exception as e:
                    return f"Error: {str(e)}"

        return "404 Not Found"

    def list_routes(self) -> str:
        """List all registered routes"""
        output = ["Registered Routes:"]
        output.append("-" * 50)
        for route in self.routes:
            blueprint_info = f" [{route.blueprint}]" if route.blueprint != "main" else ""
            output.append(f"{route.method:6} {route.path:<30} {route.handler.__name__}{blueprint_info}")
        return "\n".join(output)

print("Application Factory and Blueprint classes defined.")

In [None]:
# Create blueprints for different modules

# Main blueprint for core functionality
main_bp = Blueprint("main")

@main_bp.route("/")
def index():
    return "Welcome to the main page!"

@main_bp.route("/health")
def health_check():
    return "Application is healthy"

# Authentication blueprint
auth_bp = Blueprint("auth", url_prefix="/auth")

@auth_bp.before_request
def auth_before_request():
    print("[Auth Blueprint] Before request handler executed")

@auth_bp.route("/login", methods=["GET", "POST"])
def login(data=None):
    if data:
        username = data.get('username')
        password = data.get('password')
        if username and password:
            return f"Login successful for user: {username}"
        return "Invalid credentials"
    return "Login page - GET request"

@auth_bp.route("/logout", methods=["POST"])
def logout():
    return "Logged out successfully"

@auth_bp.route("/register", methods=["GET", "POST"])
def register(data=None):
    if data:
        username = data.get('username')
        email = data.get('email')
        if username and email:
            return f"User {username} registered with email {email}"
        return "Username and email required"
    return "Registration page - GET request"

# API blueprint for REST endpoints
api_bp = Blueprint("api", url_prefix="/api/v1")

@api_bp.before_request
def api_before_request():
    print("[API Blueprint] Before request handler executed")

@api_bp.route("/users", methods=["GET"])
def api_get_users():
    return '{"users": [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]}'

@api_bp.route("/users", methods=["POST"])
def api_create_user(data=None):
    if data and 'name' in data:
        return f'{{"id": 3, "name": "{data["name"]}", "status": "created"}}'
    return '{"error": "Name is required"}'

@api_bp.route("/users/<user_id>", methods=["GET"])
def api_get_user(user_id=1):
    return f'{{"id": {user_id}, "name": "User {user_id}"}}'

print("Blueprints created with routes.")

In [None]:
# Application Factory Implementation
def create_app(config_object=None):
    """Application factory function"""
    # Create app instance
    app = FlaskApp(__name__)

    # Load configuration
    if config_object:
        app.configure(config_object)
    else:
        app.configure(Config())

    # Initialize extensions (simulated)
    initialize_extensions(app)

    # Register blueprints
    app.register_blueprint(main_bp)
    app.register_blueprint(auth_bp)
    app.register_blueprint(api_bp)

    # Register global error handlers
    register_error_handlers(app)

    return app

def initialize_extensions(app: FlaskApp):
    """Initialize Flask extensions"""
    # Simulate database initialization
    app.extensions['database'] = {
        'url': app.config.get('DATABASE_URL'),
        'connected': True
    }
    print(f"Database initialized: {app.config.get('DATABASE_URL')}")

    # Simulate other extensions
    if app.config.get('DEBUG'):
        app.extensions['debug_toolbar'] = True
        print("Debug toolbar enabled")

def register_error_handlers(app: FlaskApp):
    """Register global error handlers"""
    # This would be more complex in a real Flask app
    pass

# Create different app instances
print("=== Creating Apps with Different Configurations ===")

# Production app
print("\n1. Production App:")
prod_app = create_app(Config())
print(f"Debug mode: {prod_app.config.get('DEBUG')}")
print(f"Database URL: {prod_app.config.get('DATABASE_URL')}")

# Test app
print("\n2. Test App:")
test_app = create_app(TestConfig())
print(f"Debug mode: {test_app.config.get('DEBUG')}")
print(f"Database URL: {test_app.config.get('DATABASE_URL')}")
print(f"Testing mode: {test_app.config.get('TESTING')}")

print("\n3. Available Routes:")
print(prod_app.list_routes())

In [None]:
# Demonstrate application usage
print("=== Testing Application Functionality ===")

# Use the production app for testing
app = prod_app

print("\n1. Main blueprint routes:")
print(f"GET /: {app.handle_request('GET', '/')}")
print(f"GET /health: {app.handle_request('GET', '/health')}")

print("\n2. Auth blueprint routes:")
print(f"GET /auth/login: {app.handle_request('GET', '/auth/login')}")
print(f"POST /auth/login: {app.handle_request('POST', '/auth/login', {'username': 'alice', 'password': 'secret'})}")
print(f"POST /auth/register: {app.handle_request('POST', '/auth/register', {'username': 'bob', 'email': 'bob@example.com'})}")

print("\n3. API blueprint routes:")
print(f"GET /api/v1/users: {app.handle_request('GET', '/api/v1/users')}")
print(f"POST /api/v1/users: {app.handle_request('POST', '/api/v1/users', {'name': 'Charlie'})}")

print("\n4. Testing 404:")
print(f"GET /nonexistent: {app.handle_request('GET', '/nonexistent')}")

# Test with different app configuration
print("\n=== Testing with Test Configuration ===")
print(f"Test app database: {test_app.config.get('DATABASE_URL')}")
print(f"Same route on test app: {test_app.handle_request('GET', '/')}")

## 3. RESTful API Design Patterns

RESTful APIs use HTTP methods and status codes effectively, following resource-based URL patterns and consistent response formats.

In [None]:
# RESTful API Implementation
import json
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum

class HTTPStatus(Enum):
    """HTTP status codes"""
    OK = 200
    CREATED = 201
    NO_CONTENT = 204
    BAD_REQUEST = 400
    UNAUTHORIZED = 401
    FORBIDDEN = 403
    NOT_FOUND = 404
    METHOD_NOT_ALLOWED = 405
    CONFLICT = 409
    INTERNAL_SERVER_ERROR = 500

@dataclass
class APIResponse:
    """Standard API response format"""
    status_code: int
    data: Any = None
    message: str = ""
    errors: List[str] = None
    meta: Dict[str, Any] = None

    def to_json(self) -> str:
        """Convert response to JSON"""
        response_dict = {
            'status_code': self.status_code,
            'data': self.data,
            'message': self.message
        }

        if self.errors:
            response_dict['errors'] = self.errors

        if self.meta:
            response_dict['meta'] = self.meta

        return json.dumps(response_dict, default=str, indent=2)

@dataclass
class Book:
    """Book resource model"""
    id: int
    title: str
    author: str
    isbn: str
    publication_year: int
    available: bool = True
    created_at: datetime = None
    updated_at: datetime = None

    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now()
        self.updated_at = datetime.now()

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for API response"""
        return asdict(self)

    def update(self, **kwargs):
        """Update book fields"""
        for key, value in kwargs.items():
            if hasattr(self, key) and key not in ['id', 'created_at']:
                setattr(self, key, value)
        self.updated_at = datetime.now()

class BookRepository:
    """Repository for book data management"""

    def __init__(self):
        self._books: Dict[int, Book] = {}
        self._next_id = 1
        self._seed_data()

    def _seed_data(self):
        """Add some initial data"""
        sample_books = [
            {"title": "Python Tricks", "author": "Dan Bader", "isbn": "978-1775093305", "publication_year": 2017},
            {"title": "Clean Code", "author": "Robert Martin", "isbn": "978-0132350884", "publication_year": 2008},
            {"title": "Effective Python", "author": "Brett Slatkin", "isbn": "978-0134853987", "publication_year": 2019}
        ]

        for book_data in sample_books:
            self.create(**book_data)

    def create(self, title: str, author: str, isbn: str, publication_year: int) -> Book:
        """Create a new book"""
        book = Book(
            id=self._next_id,
            title=title,
            author=author,
            isbn=isbn,
            publication_year=publication_year
        )
        self._books[self._next_id] = book
        self._next_id += 1
        return book

    def get_all(self, limit: int = None, offset: int = 0) -> List[Book]:
        """Get all books with pagination"""
        books = list(self._books.values())
        if limit:
            return books[offset:offset + limit]
        return books[offset:]

    def get_by_id(self, book_id: int) -> Optional[Book]:
        """Get book by ID"""
        return self._books.get(book_id)

    def update(self, book_id: int, **kwargs) -> Optional[Book]:
        """Update book"""
        book = self._books.get(book_id)
        if book:
            book.update(**kwargs)
        return book

    def delete(self, book_id: int) -> bool:
        """Delete book"""
        if book_id in self._books:
            del self._books[book_id]
            return True
        return False

    def count(self) -> int:
        """Get total count of books"""
        return len(self._books)

    def search(self, query: str) -> List[Book]:
        """Search books by title or author"""
        query = query.lower()
        return [
            book for book in self._books.values()
            if query in book.title.lower() or query in book.author.lower()
        ]

print("Book model and repository defined.")

In [None]:
# RESTful API Controller
class BooksAPIController:
    """RESTful API controller for books resource"""

    def __init__(self, repository: BookRepository):
        self.repository = repository

    def list_books(self, query_params: Dict[str, Any] = None) -> APIResponse:
        """GET /books - List all books with pagination"""
        try:
            query_params = query_params or {}

            # Handle search
            if 'search' in query_params:
                books = self.repository.search(query_params['search'])
            else:
                # Handle pagination
                limit = query_params.get('limit', 10)
                offset = query_params.get('offset', 0)

                try:
                    limit = int(limit)
                    offset = int(offset)
                except ValueError:
                    return APIResponse(
                        status_code=HTTPStatus.BAD_REQUEST.value,
                        errors=["Invalid pagination parameters"]
                    )

                if limit <= 0 or offset < 0:
                    return APIResponse(
                        status_code=HTTPStatus.BAD_REQUEST.value,
                        errors=["Limit must be positive, offset must be non-negative"]
                    )

                books = self.repository.get_all(limit=limit, offset=offset)

            # Prepare response with metadata
            total_count = self.repository.count()
            meta = {
                'total_count': total_count,
                'count': len(books),
            }

            if 'search' not in query_params:
                meta['limit'] = limit
                meta['offset'] = offset
                meta['has_next'] = offset + limit < total_count
                meta['has_prev'] = offset > 0

            return APIResponse(
                status_code=HTTPStatus.OK.value,
                data=[book.to_dict() for book in books],
                meta=meta
            )

        except Exception as e:
            return APIResponse(
                status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value,
                errors=["Internal server error"]
            )

    def get_book(self, book_id: int) -> APIResponse:
        """GET /books/{id} - Get single book"""
        book = self.repository.get_by_id(book_id)

        if not book:
            return APIResponse(
                status_code=HTTPStatus.NOT_FOUND.value,
                errors=[f"Book with ID {book_id} not found"]
            )

        return APIResponse(
            status_code=HTTPStatus.OK.value,
            data=book.to_dict()
        )

    def create_book(self, book_data: Dict[str, Any]) -> APIResponse:
        """POST /books - Create new book"""
        # Validate required fields
        required_fields = ['title', 'author', 'isbn', 'publication_year']
        errors = []

        for field in required_fields:
            if field not in book_data or not book_data[field]:
                errors.append(f"{field} is required")

        # Validate data types and values
        if 'publication_year' in book_data:
            try:
                year = int(book_data['publication_year'])
                current_year = datetime.now().year
                if year < 1000 or year > current_year + 1:
                    errors.append("Publication year must be between 1000 and next year")
            except ValueError:
                errors.append("Publication year must be a valid integer")

        if 'isbn' in book_data and len(book_data['isbn']) < 10:
            errors.append("ISBN must be at least 10 characters")

        if errors:
            return APIResponse(
                status_code=HTTPStatus.BAD_REQUEST.value,
                errors=errors
            )

        try:
            book = self.repository.create(
                title=book_data['title'],
                author=book_data['author'],
                isbn=book_data['isbn'],
                publication_year=int(book_data['publication_year'])
            )

            return APIResponse(
                status_code=HTTPStatus.CREATED.value,
                data=book.to_dict(),
                message="Book created successfully"
            )

        except Exception as e:
            return APIResponse(
                status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value,
                errors=["Failed to create book"]
            )

    def update_book(self, book_id: int, book_data: Dict[str, Any]) -> APIResponse:
        """PUT /books/{id} - Update book"""
        book = self.repository.get_by_id(book_id)

        if not book:
            return APIResponse(
                status_code=HTTPStatus.NOT_FOUND.value,
                errors=[f"Book with ID {book_id} not found"]
            )

        # Validate update data
        errors = []

        if 'publication_year' in book_data:
            try:
                year = int(book_data['publication_year'])
                current_year = datetime.now().year
                if year < 1000 or year > current_year + 1:
                    errors.append("Publication year must be between 1000 and next year")
            except ValueError:
                errors.append("Publication year must be a valid integer")

        if 'isbn' in book_data and len(book_data['isbn']) < 10:
            errors.append("ISBN must be at least 10 characters")

        if errors:
            return APIResponse(
                status_code=HTTPStatus.BAD_REQUEST.value,
                errors=errors
            )

        try:
            updated_book = self.repository.update(book_id, **book_data)

            return APIResponse(
                status_code=HTTPStatus.OK.value,
                data=updated_book.to_dict(),
                message="Book updated successfully"
            )

        except Exception as e:
            return APIResponse(
                status_code=HTTPStatus.INTERNAL_SERVER_ERROR.value,
                errors=["Failed to update book"]
            )

    def delete_book(self, book_id: int) -> APIResponse:
        """DELETE /books/{id} - Delete book"""
        success = self.repository.delete(book_id)

        if not success:
            return APIResponse(
                status_code=HTTPStatus.NOT_FOUND.value,
                errors=[f"Book with ID {book_id} not found"]
            )

        return APIResponse(
            status_code=HTTPStatus.NO_CONTENT.value,
            message="Book deleted successfully"
        )

print("RESTful API controller defined.")

In [None]:
# Demonstrate RESTful API usage
print("=== RESTful API Demo ===")

# Initialize repository and controller
book_repo = BookRepository()
api_controller = BooksAPIController(book_repo)

print("\n1. GET /books - List all books:")
response = api_controller.list_books()
print(f"Status: {response.status_code}")
print(f"Total books: {response.meta['total_count']}")
for book in response.data:
    print(f"  - {book['title']} by {book['author']}")

print("\n2. GET /books?limit=2&offset=1 - Paginated list:")
response = api_controller.list_books({'limit': 2, 'offset': 1})
print(f"Status: {response.status_code}")
print(f"Showing {response.meta['count']} of {response.meta['total_count']} books")
print(f"Has next: {response.meta['has_next']}, Has prev: {response.meta['has_prev']}")

print("\n3. GET /books/1 - Get specific book:")
response = api_controller.get_book(1)
print(f"Status: {response.status_code}")
if response.data:
    book = response.data
    print(f"Book: {book['title']} by {book['author']} ({book['publication_year']})")

print("\n4. POST /books - Create new book:")
new_book_data = {
    'title': 'Design Patterns',
    'author': 'Gang of Four',
    'isbn': '978-0201633610',
    'publication_year': 1994
}
response = api_controller.create_book(new_book_data)
print(f"Status: {response.status_code}")
print(f"Message: {response.message}")
if response.data:
    print(f"Created book ID: {response.data['id']}")

print("\n5. PUT /books/4 - Update book:")
update_data = {'available': False, 'title': 'Design Patterns (Updated)'}
response = api_controller.update_book(4, update_data)
print(f"Status: {response.status_code}")
print(f"Message: {response.message}")
if response.data:
    print(f"Updated title: {response.data['title']}")
    print(f"Available: {response.data['available']}")

print("\n6. GET /books?search=python - Search books:")
response = api_controller.list_books({'search': 'python'})
print(f"Status: {response.status_code}")
print(f"Found {response.meta['count']} books matching 'python':")
for book in response.data:
    print(f"  - {book['title']}")

print("\n7. DELETE /books/3 - Delete book:")
response = api_controller.delete_book(3)
print(f"Status: {response.status_code}")
print(f"Message: {response.message}")

print("\n8. Error handling - GET /books/999 (non-existent):")
response = api_controller.get_book(999)
print(f"Status: {response.status_code}")
print(f"Errors: {response.errors}")

print("\n9. Error handling - POST with invalid data:")
invalid_data = {'title': '', 'author': 'Test', 'publication_year': 'invalid'}
response = api_controller.create_book(invalid_data)
print(f"Status: {response.status_code}")
print(f"Errors: {response.errors}")

print("\n10. Final book count:")
response = api_controller.list_books()
print(f"Total books remaining: {response.meta['total_count']}")

## 4. Error Handling in Web Applications

Centralized error handling provides consistent error responses and proper logging for web applications.

In [None]:
# Web Application Error Handling Patterns
import logging
import traceback
from typing import Dict, Any, Optional, Union
from datetime import datetime
from enum import Enum
import uuid

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

class ErrorType(Enum):
    """Types of application errors"""
    VALIDATION_ERROR = "validation_error"
    NOT_FOUND = "not_found"
    UNAUTHORIZED = "unauthorized"
    FORBIDDEN = "forbidden"
    RATE_LIMITED = "rate_limited"
    DATABASE_ERROR = "database_error"
    EXTERNAL_SERVICE_ERROR = "external_service_error"
    INTERNAL_SERVER_ERROR = "internal_server_error"

class ApplicationError(Exception):
    """Base application error class"""

    def __init__(self, message: str, error_type: ErrorType, status_code: int = 500,
                 details: Dict[str, Any] = None, cause: Exception = None):
        super().__init__(message)
        self.message = message
        self.error_type = error_type
        self.status_code = status_code
        self.details = details or {}
        self.cause = cause
        self.error_id = str(uuid.uuid4())
        self.timestamp = datetime.now()

class ValidationError(ApplicationError):
    """Validation error (400)"""
    def __init__(self, message: str, field_errors: Dict[str, str] = None):
        super().__init__(
            message=message,
            error_type=ErrorType.VALIDATION_ERROR,
            status_code=400,
            details={'field_errors': field_errors or {}}
        )

class NotFoundError(ApplicationError):
    """Resource not found error (404)"""
    def __init__(self, resource: str, identifier: Any = None):
        message = f"{resource} not found"
        if identifier:
            message += f" (ID: {identifier})"

        super().__init__(
            message=message,
            error_type=ErrorType.NOT_FOUND,
            status_code=404,
            details={'resource': resource, 'identifier': identifier}
        )

class UnauthorizedError(ApplicationError):
    """Authentication error (401)"""
    def __init__(self, message: str = "Authentication required"):
        super().__init__(
            message=message,
            error_type=ErrorType.UNAUTHORIZED,
            status_code=401
        )

class DatabaseError(ApplicationError):
    """Database operation error (500)"""
    def __init__(self, operation: str, cause: Exception = None):
        super().__init__(
            message=f"Database operation failed: {operation}",
            error_type=ErrorType.DATABASE_ERROR,
            status_code=500,
            details={'operation': operation},
            cause=cause
        )

class ExternalServiceError(ApplicationError):
    """External service error (503)"""
    def __init__(self, service: str, cause: Exception = None):
        super().__init__(
            message=f"External service unavailable: {service}",
            error_type=ErrorType.EXTERNAL_SERVICE_ERROR,
            status_code=503,
            details={'service': service},
            cause=cause
        )

print("Error classes defined.")

In [None]:
# Centralized Error Handler
class ErrorHandler:
    """Centralized error handling for web applications"""

    def __init__(self, debug: bool = False, log_errors: bool = True):
        self.debug = debug
        self.log_errors = log_errors
        self.logger = logging.getLogger(__name__)

    def handle_error(self, error: Exception, request_path: str = "",
                    request_method: str = "", user_id: str = None) -> Dict[str, Any]:
        """Handle any error and return appropriate response"""

        if isinstance(error, ApplicationError):
            return self._handle_application_error(error, request_path, request_method, user_id)
        else:
            return self._handle_unexpected_error(error, request_path, request_method, user_id)

    def _handle_application_error(self, error: ApplicationError, request_path: str,
                                 request_method: str, user_id: str) -> Dict[str, Any]:
        """Handle known application errors"""

        # Log the error based on severity
        if self.log_errors:
            log_data = {
                'error_id': error.error_id,
                'error_type': error.error_type.value,
                'message': error.message,
                'status_code': error.status_code,
                'request_path': request_path,
                'request_method': request_method,
                'user_id': user_id,
                'details': error.details
            }

            if error.status_code >= 500:
                self.logger.error(f"Server error: {log_data}", exc_info=error.cause)
            elif error.status_code >= 400:
                self.logger.warning(f"Client error: {log_data}")
            else:
                self.logger.info(f"Application error: {log_data}")

        # Prepare response
        response = {
            'error': {
                'type': error.error_type.value,
                'message': error.message,
                'error_id': error.error_id,
                'timestamp': error.timestamp.isoformat()
            },
            'status_code': error.status_code
        }

        # Include details for client errors if they're helpful
        if error.status_code < 500 and error.details:
            response['error']['details'] = error.details

        # Include stack trace in debug mode
        if self.debug and error.cause:
            response['error']['debug_info'] = {
                'cause': str(error.cause),
                'traceback': traceback.format_exception(type(error.cause), error.cause, error.cause.__traceback__)
            }

        return response

    def _handle_unexpected_error(self, error: Exception, request_path: str,
                                request_method: str, user_id: str) -> Dict[str, Any]:
        """Handle unexpected errors"""

        error_id = str(uuid.uuid4())

        if self.log_errors:
            log_data = {
                'error_id': error_id,
                'error_type': 'unexpected_error',
                'message': str(error),
                'request_path': request_path,
                'request_method': request_method,
                'user_id': user_id
            }
            self.logger.error(f"Unexpected error: {log_data}", exc_info=True)

        response = {
            'error': {
                'type': 'internal_server_error',
                'message': 'An unexpected error occurred',
                'error_id': error_id,
                'timestamp': datetime.now().isoformat()
            },
            'status_code': 500
        }

        # Include error details in debug mode
        if self.debug:
            response['error']['debug_info'] = {
                'original_error': str(error),
                'traceback': traceback.format_exception(type(error), error, error.__traceback__)
            }

        return response

    def create_error_response_json(self, error_response: Dict[str, Any]) -> str:
        """Convert error response to JSON"""
        return json.dumps(error_response, indent=2, default=str)

# Error-prone service for demonstration
class UserService:
    """Example service that can throw various errors"""

    def __init__(self):
        self.users = {
            1: {'id': 1, 'name': 'Alice', 'email': 'alice@example.com'},
            2: {'id': 2, 'name': 'Bob', 'email': 'bob@example.com'}
        }

    def get_user(self, user_id: int) -> Dict[str, Any]:
        """Get user by ID - can raise NotFoundError"""
        if user_id not in self.users:
            raise NotFoundError('User', user_id)
        return self.users[user_id]

    def create_user(self, user_data: Dict[str, Any]) -> Dict[str, Any]:
        """Create user - can raise ValidationError"""
        errors = {}

        if not user_data.get('name'):
            errors['name'] = 'Name is required'

        if not user_data.get('email'):
            errors['email'] = 'Email is required'
        elif '@' not in user_data.get('email', ''):
            errors['email'] = 'Invalid email format'

        if errors:
            raise ValidationError('Validation failed', field_errors=errors)

        # Simulate database error
        if user_data.get('name') == 'error':
            raise DatabaseError('user_creation', cause=Exception('Connection timeout'))

        user_id = max(self.users.keys()) + 1
        user = {'id': user_id, **user_data}
        self.users[user_id] = user
        return user

    def authenticate_user(self, token: str) -> Dict[str, Any]:
        """Authenticate user - can raise UnauthorizedError"""
        if not token or token != 'valid_token':
            raise UnauthorizedError('Invalid or missing authentication token')
        return {'user_id': 1, 'username': 'alice'}

    def call_external_service(self) -> str:
        """Call external service - can raise ExternalServiceError"""
        # Simulate service failure
        raise ExternalServiceError('payment_service', cause=Exception('Service timeout'))

    def cause_unexpected_error(self):
        """Cause an unexpected error"""
        return 1 / 0  # ZeroDivisionError

print("Error handler and service classes defined.")

In [None]:
# Demonstrate error handling patterns
print("=== Error Handling Demo ===")

# Initialize error handler and service
error_handler = ErrorHandler(debug=True)  # Enable debug mode for demo
user_service = UserService()

def simulate_request(operation: str, **kwargs):
    """Simulate handling a web request with error handling"""
    print(f"\n--- {operation} ---")
    try:
        if operation == "get_user":
            result = user_service.get_user(kwargs['user_id'])
            print(f"Success: {result}")

        elif operation == "create_user":
            result = user_service.create_user(kwargs['user_data'])
            print(f"Success: {result}")

        elif operation == "authenticate":
            result = user_service.authenticate_user(kwargs['token'])
            print(f"Success: {result}")

        elif operation == "external_service":
            result = user_service.call_external_service()
            print(f"Success: {result}")

        elif operation == "unexpected_error":
            result = user_service.cause_unexpected_error()
            print(f"Success: {result}")

    except Exception as e:
        # Handle error using centralized error handler
        error_response = error_handler.handle_error(
            error=e,
            request_path=f"/{operation}",
            request_method="POST",
            user_id="user_123"
        )

        print(f"Error Response (Status: {error_response['status_code']}):")
        print(error_handler.create_error_response_json(error_response))

# Test different types of errors

print("1. Valid request:")
simulate_request("get_user", user_id=1)

print("\n2. Not Found Error:")
simulate_request("get_user", user_id=999)

print("\n3. Validation Error:")
simulate_request("create_user", user_data={'name': '', 'email': 'invalid-email'})

print("\n4. Unauthorized Error:")
simulate_request("authenticate", token="invalid_token")

print("\n5. Database Error:")
simulate_request("create_user", user_data={'name': 'error', 'email': 'test@example.com'})

print("\n6. External Service Error:")
simulate_request("external_service")

print("\n7. Unexpected Error:")
simulate_request("unexpected_error")

print("\n8. Valid user creation:")
simulate_request("create_user", user_data={'name': 'Charlie', 'email': 'charlie@example.com'})

In [None]:
# Production vs Development Error Handling
print("\n=== Production vs Development Error Handling ===")

# Development error handler (with debug info)
dev_error_handler = ErrorHandler(debug=True, log_errors=True)

# Production error handler (no debug info)
prod_error_handler = ErrorHandler(debug=False, log_errors=True)

# Simulate the same error in both environments
def test_error_in_environments():
    try:
        # Cause a database error
        raise DatabaseError('user_lookup', cause=Exception('Connection refused: Database server down'))
    except Exception as e:
        print("\nDevelopment Environment Response:")
        dev_response = dev_error_handler.handle_error(e, "/api/users", "GET", "user_123")
        print(dev_error_handler.create_error_response_json(dev_response))

        print("\nProduction Environment Response:")
        prod_response = prod_error_handler.handle_error(e, "/api/users", "GET", "user_123")
        print(prod_error_handler.create_error_response_json(prod_response))

test_error_in_environments()

# Custom error response formatting for different content types
class ErrorResponseFormatter:
    """Format error responses for different content types"""

    @staticmethod
    def format_for_api(error_response: Dict[str, Any]) -> str:
        """Format error for JSON API response"""
        return json.dumps(error_response, indent=2, default=str)

    @staticmethod
    def format_for_html(error_response: Dict[str, Any]) -> str:
        """Format error for HTML response"""
        error = error_response['error']
        status_code = error_response['status_code']

        return f"""
<!DOCTYPE html>
<html>
<head><title>Error {status_code}</title></head>
<body>
    <h1>Error {status_code}</h1>
    <p>{error['message']}</p>
    <small>Error ID: {error['error_id']}</small>
</body>
</html>
""".strip()

    @staticmethod
    def format_for_text(error_response: Dict[str, Any]) -> str:
        """Format error for plain text response"""
        error = error_response['error']
        status_code = error_response['status_code']

        return f"Error {status_code}: {error['message']} (ID: {error['error_id']})"

print("\n=== Different Response Formats ===")
try:
    raise ValidationError("Invalid input data", {'email': 'Email format is invalid'})
except Exception as e:
    error_response = prod_error_handler.handle_error(e, "/contact", "POST", "guest")

    formatter = ErrorResponseFormatter()

    print("\nJSON API Format:")
    print(formatter.format_for_api(error_response))

    print("\nHTML Format:")
    print(formatter.format_for_html(error_response))

    print("\nPlain Text Format:")
    print(formatter.format_for_text(error_response))

## Key Takeaways

1. **MVC Pattern**: Separates concerns into Models (data), Views (presentation), and Controllers (business logic)
2. **Application Factory**: Enables configurable app creation and modular organization with blueprints
3. **RESTful APIs**: Use proper HTTP methods, status codes, and resource-based URLs for predictable interfaces
4. **Error Handling**: Centralized error handling provides consistent responses and proper logging

## Best Practices

- Follow REST conventions for predictable API design
- Use application factories for flexible configuration management
- Implement proper error hierarchies for different error types
- Log errors appropriately based on severity and environment
- Validate input data consistently across all endpoints
- Use proper HTTP status codes and response formats

## Exercises

1. Implement a complete MVC pattern for a blog application with posts and comments
2. Create Flask blueprints for user management and content management
3. Design a RESTful API for a library system with books, authors, and borrowers
4. Implement rate limiting and authentication error handling
5. Create error handlers that format responses differently for API vs web requests