#### [1]

In [1]:
class Display:
    def show_message(self, message):
        print(f"Display shows: {message}")

class Speaker:
    def play_sound(self, sound):
        print(f"Speaker plays: {sound}")

class Smartphone:
    def __init__(self):
        self.display = Display()
        self.speaker = Speaker()
    
    def notification(self, message, sound):
        self.display.show_message(message)
        self.speaker.play_sound(sound)

if __name__ == "__main__":
    phone = Smartphone()
    phone.notification("New message", "Ding!")

Display shows: New message
Speaker plays: Ding!


#### [2]

In [2]:
class Database:
    def save_data(self, data):
        print(f"Saving data: {data}")
    
    def retrieve_data(self, query):
        print(f"Retrieving data for query: {query}")
        return f"Results for {query}"

class Logger:
    def log(self, message):
        print(f"LOG: {message}")

class UserService:
    def __init__(self):
        self.database = Database()
        self.logger = Logger()
    
    def create_user(self, user_data):
        self.logger.log(f"Creating user with data: {user_data}")
        self.database.save_data(user_data)
        
    def find_user(self, user_id):
        self.logger.log(f"Finding user with ID: {user_id}")
        return self.database.retrieve_data(f"user_id={user_id}")

if __name__ == "__main__":
    user_service = UserService()
    user_service.create_user({"name": "Alice", "email": "alice@example.com"})
    result = user_service.find_user(42)
    print(f"Found: {result}")

LOG: Creating user with data: {'name': 'Alice', 'email': 'alice@example.com'}
Saving data: {'name': 'Alice', 'email': 'alice@example.com'}
LOG: Finding user with ID: 42
Retrieving data for query: user_id=42
Found: Results for user_id=42


#### [3]

In [3]:
class PaymentProcessor:
    def process_payment(self, amount):
        print(f"Processing payment of ${amount}")
        return True

class ShippingService:
    def calculate_shipping(self, weight, destination):
        shipping_cost = weight * 2.5
        print(f"Shipping to {destination} costs ${shipping_cost}")
        return shipping_cost
    
    def ship(self, product, destination):
        print(f"Shipping {product} to {destination}")

class OrderManager:
    def __init__(self):
        self.payment_processor = PaymentProcessor()
        self.shipping_service = ShippingService()
    
    def place_order(self, product, price, weight, destination):
        print(f"New order: {product}")
        
        # Handle payment
        payment_success = self.payment_processor.process_payment(price)
        
        if payment_success:
            # Calculate shipping
            shipping_cost = self.shipping_service.calculate_shipping(weight, destination)
            
            # Ship the product
            self.shipping_service.ship(product, destination)
            
            total_cost = price + shipping_cost
            print(f"Order completed! Total cost: ${total_cost}")
            return True
        else:
            print("Order failed: Payment unsuccessful")
            return False

if __name__ == "__main__":
    order_manager = OrderManager()
    order_manager.place_order("Laptop", 999.99, 4.5, "New York")

New order: Laptop
Processing payment of $999.99
Shipping to New York costs $11.25
Shipping Laptop to New York
Order completed! Total cost: $1011.24


#### [4]

In [4]:
class EmailSender:
    def send_email(self, to_address, subject, content):
        print(f"Sending email to {to_address}")
        print(f"Subject: {subject}")
        print(f"Content: {content}")

class SMSSender:
    def send_sms(self, phone_number, message):
        print(f"Sending SMS to {phone_number}")
        print(f"Message: {message}")

class NotificationManager:
    def __init__(self):
        self.email_sender = EmailSender()
        self.sms_sender = SMSSender()
    
    def notify_user(self, user, notification_type, message):
        print(f"Notification for user: {user['name']}")
        
        if notification_type == "email" and "email" in user:
            self.email_sender.send_email(
                user["email"], 
                "New Notification", 
                message
            )
        elif notification_type == "sms" and "phone" in user:
            self.sms_sender.send_sms(
                user["phone"], 
                message
            )
        elif notification_type == "both":
            if "email" in user:
                self.email_sender.send_email(
                    user["email"], 
                    "New Notification", 
                    message
                )
            if "phone" in user:
                self.sms_sender.send_sms(
                    user["phone"], 
                    message
                )
        else:
            print("Could not send notification: Missing contact information")

if __name__ == "__main__":
    user_info = {
        "name": "John Doe",
        "email": "john@example.com",
        "phone": "555-123-4567"
    }
    
    notification = NotificationManager()
    notification.notify_user(user_info, "both", "Your package has shipped!")

Notification for user: John Doe
Sending email to john@example.com
Subject: New Notification
Content: Your package has shipped!
Sending SMS to 555-123-4567
Message: Your package has shipped!


#### [5]

In [5]:
class FileStorage:
    def save(self, file_name, data):
        print(f"Saving {len(data)} bytes to file: {file_name}")
    
    def load(self, file_name):
        print(f"Loading data from file: {file_name}")
        return f"Data from {file_name}"

class Encryptor:
    def encrypt(self, data):
        print(f"Encrypting data: {data[:10]}...")
        return f"ENCRYPTED({data})"
    
    def decrypt(self, encrypted_data):
        print(f"Decrypting data: {encrypted_data[:15]}...")
        # Extract original data from inside ENCRYPTED()
        return encrypted_data[10:-1]

class SecureFileManager:
    def __init__(self):
        self.storage = FileStorage()
        self.encryptor = Encryptor()
    
    def save_secure_file(self, file_name, data):
        print(f"Saving secure file: {file_name}")
        
        # First encrypt the data
        encrypted_data = self.encryptor.encrypt(data)
        
        # Then store the encrypted data
        self.storage.save(file_name, encrypted_data)
    
    def load_secure_file(self, file_name):
        print(f"Loading secure file: {file_name}")
        
        # First load the encrypted data
        encrypted_data = self.storage.load(file_name)
        
        # Then decrypt it
        decrypted_data = self.encryptor.decrypt(encrypted_data)
        
        return decrypted_data

if __name__ == "__main__":
    manager = SecureFileManager()
    manager.save_secure_file("secret.txt", "This is confidential information")
    content = manager.load_secure_file("secret.txt")
    print(f"Loaded content: {content}")

Saving secure file: secret.txt
Encrypting data: This is co...
Saving 43 bytes to file: secret.txt
Loading secure file: secret.txt
Loading data from file: secret.txt
Decrypting data: Data from secre...
Loaded content: secret.tx


#### [6]

In [6]:
class DataAnalyzer:
    def analyze(self, data):
        print(f"Analyzing data with {len(data)} entries")
        return {"average": sum(data) / len(data), "max": max(data), "min": min(data)}

class DataVisualizer:
    def create_chart(self, data, chart_type):
        print(f"Creating {chart_type} chart with {len(data)} data points")
        return f"{chart_type} chart created"
    
    def export_image(self, chart, file_name):
        print(f"Exporting chart to {file_name}")
        return True

class ReportGenerator:
    def __init__(self):
        self.analyzer = DataAnalyzer()
        self.visualizer = DataVisualizer()
    
    def generate_report(self, data, report_name):
        print(f"Generating report: {report_name}")
        
        # Analyze the data
        analysis_results = self.analyzer.analyze(data)
        
        # Create visualizations
        bar_chart = self.visualizer.create_chart(data, "bar")
        line_chart = self.visualizer.create_chart(data, "line")
        
        # Export the charts
        self.visualizer.export_image(bar_chart, f"{report_name}_bar.png")
        self.visualizer.export_image(line_chart, f"{report_name}_line.png")
        
        # Compile report summary
        report = {
            "name": report_name,
            "analysis": analysis_results,
            "charts": [f"{report_name}_bar.png", f"{report_name}_line.png"]
        }
        
        print(f"Report completed with {len(report['charts'])} visualizations")
        return report

if __name__ == "__main__":
    sample_data = [12, 34, 56, 78, 90, 23, 45, 67]
    report_gen = ReportGenerator()
    report = report_gen.generate_report(sample_data, "quarterly_sales")
    print(f"Report summary: {report}")

Generating report: quarterly_sales
Analyzing data with 8 entries
Creating bar chart with 8 data points
Creating line chart with 8 data points
Exporting chart to quarterly_sales_bar.png
Exporting chart to quarterly_sales_line.png
Report completed with 2 visualizations
Report summary: {'name': 'quarterly_sales', 'analysis': {'average': 50.625, 'max': 90, 'min': 12}, 'charts': ['quarterly_sales_bar.png', 'quarterly_sales_line.png']}


#### [7]

In [11]:
from dataclasses import dataclass
from typing import Dict, List, Optional
import logging


class ConfigManager:
    def __init__(self, config_path: str):
        self.config_path = config_path
        self.settings: Dict = {"default_timeout": 30, "retry_attempts": 3}
        
    def get_setting(self, key: str) -> Optional[object]:
        return self.settings.get(key)


class RequestHandler:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.logger = logging.getLogger("requests")
        
    def fetch_data(self, endpoint: str) -> Dict:
        self.logger.info(f"Fetching data from {endpoint}")
        # In real code, this would use requests or httpx
        return {"status": "success", "data": ["item1", "item2"]}


@dataclass
class ApiResponse:
    success: bool
    data: Optional[List] = None
    error: Optional[str] = None


class ApiClient:
    def __init__(self, api_url: str, config_file: str = "config.json"):
        # Compose functionality through contained objects
        self.config = ConfigManager(config_file)
        self.requester = RequestHandler(api_url)
        self.logger = logging.getLogger("api_client")
    
    def get_resources(self, resource_type: str) -> ApiResponse:
        timeout = self.config.get_setting("default_timeout")
        self.logger.debug(f"Using timeout of {timeout}s")
        
        try:
            endpoint = f"/api/{resource_type}"
            result = self.requester.fetch_data(endpoint)
            return ApiResponse(success=True, data=result.get("data"))
        except Exception as e:
            self.logger.error(f"Failed to fetch {resource_type}: {str(e)}")
            return ApiResponse(success=False, error=str(e))


if __name__ == "__main__":
    client = ApiClient("https://api.example.com")
    response = client.get_resources("users")
    print(f"API Response: {response}")

API Response: ApiResponse(success=True, data=['item1', 'item2'], error=None)


#### [8]

In [12]:
from typing import List, Dict, Optional
from datetime import datetime
import json


class EventStore:
    def __init__(self, store_path: str = "events.json"):
        self.store_path = store_path
        self.events: List[Dict] = []
    
    def add_event(self, event_data: Dict) -> bool:
        timestamp = datetime.now().isoformat()
        self.events.append({"timestamp": timestamp, **event_data})
        return True
    
    def query_events(self, event_type: Optional[str] = None) -> List[Dict]:
        if event_type:
            return [e for e in self.events if e.get("type") == event_type]
        return self.events


class NotificationSender:
    def send_email(self, recipient: str, subject: str, message: str) -> bool:
        print(f"Sending email to {recipient}")
        print(f"Subject: {subject}")
        print(f"Message: {message}")
        return True
    
    def send_sms(self, phone: str, message: str) -> bool:
        print(f"Sending SMS to {phone}")
        print(f"Message: {message}")
        return True


class AuditLogger:
    def log_action(self, user: str, action: str, details: Dict) -> None:
        print(f"AUDIT: User {user} performed {action}")
        print(f"Details: {json.dumps(details, indent=2)}")


class UserManager:
    def __init__(self):
        # Compose functionality through contained objects
        self.event_store = EventStore()
        self.notifier = NotificationSender()
        self.audit = AuditLogger()
    
    def create_user(self, username: str, email: str, phone: str) -> Dict:
        user_data = {"username": username, "email": email, "phone": phone}
        
        # Log event
        self.event_store.add_event({
            "type": "user_created",
            "user": username
        })
        
        # Audit logging
        self.audit.log_action("system", "create_user", user_data)
        
        # Send welcome notification
        self.notifier.send_email(
            email, 
            "Welcome to our platform!", 
            f"Hello {username}, your account has been created successfully."
        )
        
        return {"success": True, "user": user_data}
    
    def get_user_activity(self, username: str) -> List[Dict]:
        events = self.event_store.query_events()
        return [e for e in events if e.get("user") == username]


if __name__ == "__main__":
    user_mgr = UserManager()
    result = user_mgr.create_user("johndoe", "john@example.com", "555-123-4567")
    print(f"User creation result: {result}")
    
    # Check activity
    activity = user_mgr.get_user_activity("johndoe")
    print(f"User activity: {activity}")

AUDIT: User system performed create_user
Details: {
  "username": "johndoe",
  "email": "john@example.com",
  "phone": "555-123-4567"
}
Sending email to john@example.com
Subject: Welcome to our platform!
Message: Hello johndoe, your account has been created successfully.
User creation result: {'success': True, 'user': {'username': 'johndoe', 'email': 'john@example.com', 'phone': '555-123-4567'}}
User activity: [{'timestamp': '2025-04-12T05:08:35.754239', 'type': 'user_created', 'user': 'johndoe'}]


#### [9]

In [13]:
import time
from typing import Dict, List, Any, Optional
from enum import Enum


class LogLevel(Enum):
    DEBUG = 1
    INFO = 2
    WARNING = 3
    ERROR = 4


class MetricsCollector:
    def __init__(self):
        self.metrics: Dict[str, int] = {}
    
    def increment(self, metric_name: str, value: int = 1) -> None:
        if metric_name not in self.metrics:
            self.metrics[metric_name] = 0
        self.metrics[metric_name] += value
    
    def get_metrics(self) -> Dict[str, int]:
        return self.metrics.copy()


class SimpleLogger:
    def __init__(self, min_level: LogLevel = LogLevel.INFO):
        self.min_level = min_level
    
    def log(self, level: LogLevel, message: str) -> None:
        if level.value >= self.min_level.value:
            timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
            print(f"{timestamp} [{level.name}] {message}")


class DataProcessor:
    def process(self, data: List[Dict]) -> List[Dict]:
        """Process a list of data items."""
        return [self._transform_item(item) for item in data]
    
    def _transform_item(self, item: Dict) -> Dict:
        """Transform a single data item."""
        return {k: v.upper() if isinstance(v, str) else v for k, v in item.items()}


class DataPipeline:
    def __init__(self):
        # Using composition to include logging, metrics, and processing
        self.logger = SimpleLogger(LogLevel.INFO)
        self.metrics = MetricsCollector()
        self.processor = DataProcessor()
    
    def run_pipeline(self, input_data: List[Dict]) -> List[Dict]:
        self.logger.log(LogLevel.INFO, f"Starting pipeline with {len(input_data)} items")
        start_time = time.time()
        
        # Process the data
        try:
            self.metrics.increment("pipeline_runs")
            self.metrics.increment("items_processed", len(input_data))
            
            result = self.processor.process(input_data)
            
            elapsed = time.time() - start_time
            self.logger.log(LogLevel.INFO, f"Pipeline completed in {elapsed:.2f} seconds")
            return result
            
        except Exception as e:
            self.logger.log(LogLevel.ERROR, f"Pipeline error: {str(e)}")
            self.metrics.increment("pipeline_errors")
            raise
    
    def get_statistics(self) -> Dict[str, Any]:
        return {
            "metrics": self.metrics.get_metrics()
        }


if __name__ == "__main__":
    # Sample data
    sample_data = [
        {"id": 1, "name": "apple", "category": "fruit"},
        {"id": 2, "name": "banana", "category": "fruit"},
        {"id": 3, "name": "carrot", "category": "vegetable"}
    ]
    
    # Create and run the pipeline
    pipeline = DataPipeline()
    processed_data = pipeline.run_pipeline(sample_data)
    
    print("\nProcessed data:")
    for item in processed_data:
        print(f"  {item}")
    
    print("\nPipeline statistics:")
    stats = pipeline.get_statistics()
    print(f"  {stats}")

2025-04-12 05:11:15 [INFO] Starting pipeline with 3 items
2025-04-12 05:11:15 [INFO] Pipeline completed in 0.00 seconds

Processed data:
  {'id': 1, 'name': 'APPLE', 'category': 'FRUIT'}
  {'id': 2, 'name': 'BANANA', 'category': 'FRUIT'}
  {'id': 3, 'name': 'CARROT', 'category': 'VEGETABLE'}

Pipeline statistics:
  {'metrics': {'pipeline_runs': 1, 'items_processed': 3}}


#### [10]

In [14]:
from typing import Protocol, List
from dataclasses import dataclass

class PaymentMethod(Protocol):
    def process_payment(self, amount: float) -> bool:
        ...

class CreditCardPayment:
    def process_payment(self, amount: float) -> bool:
        print(f"Processing credit card payment for ${amount:.2f}")
        return True

class PayPalPayment:
    def process_payment(self, amount: float) -> bool:
        print(f"Processing PayPal payment for ${amount:.2f}")
        return True

@dataclass
class PaymentProcessor:
    payment_methods: List[PaymentMethod]
    
    def process(self, amount: float) -> bool:
        for method in self.payment_methods:
            if method.process_payment(amount):
                return True
        return False

# Usage
if __name__ == "__main__":
    payment_methods = [CreditCardPayment(), PayPalPayment()]
    processor = PaymentProcessor(payment_methods)
    
    result = processor.process(100.50)
    print(f"Payment successful: {result}")

Processing credit card payment for $100.50
Payment successful: True


#### [11]

In [16]:
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
import json

# Interfaces
class CacheStorage(ABC):
    @abstractmethod
    def get(self, key: str) -> Optional[Any]:
        pass
    
    @abstractmethod
    def set(self, key: str, value: Any, ttl: Optional[timedelta] = None) -> None:
        pass

class MetricsProvider(ABC):
    @abstractmethod
    def increment(self, metric: str, tags: Dict[str, str] = None) -> None:
        pass

# Implementations
class RedisStorage(CacheStorage):
    def get(self, key: str) -> Optional[Any]:
        print(f"[Redis] GET {key}")
        # Actual implementation would use redis-py client
        return json.loads('{"mock": "data"}') if key == "valid" else None
    
    def set(self, key: str, value: Any, ttl: Optional[timedelta] = None) -> None:
        print(f"[Redis] SET {key} (TTL: {ttl})")

class MemoryStorage(CacheStorage):
    def __init__(self):
        self._store: Dict[str, Any] = {}
    
    def get(self, key: str) -> Optional[Any]:
        print(f"[Memory] GET {key}")
        return self._store.get(key)
    
    def set(self, key: str, value: Any, ttl: Optional[timedelta] = None) -> None:
        print(f"[Memory] SET {key} (TTL: {ttl})")
        self._store[key] = value

class StatsDMetrics(MetricsProvider):
    def increment(self, metric: str, tags: Dict[str, str] = None) -> None:
        print(f"[StatsD] Incrementing {metric} with tags {tags}")

# Composed Cache Service
class CacheService:
    def __init__(
        self,
        storage: CacheStorage,
        metrics: MetricsProvider,
        fallback_storage: Optional[CacheStorage] = None
    ):
        self._primary = storage
        self._fallback = fallback_storage
        self._metrics = metrics
    
    def get(self, key: str) -> Optional[Any]:
        try:
            self._metrics.increment("cache.attempt", {"key": key})
            value = self._primary.get(key)
            
            if value is None and self._fallback:
                value = self._fallback.get(key)
                if value:
                    self._metrics.increment("cache.fallback_hit")
            
            if value:
                self._metrics.increment("cache.hit")
            else:
                self._metrics.increment("cache.miss")
            
            return value
        except Exception as e:
            self._metrics.increment("cache.error", {"error": str(e)})
            raise

    def set(self, key: str, value: Any, ttl: Optional[timedelta] = None) -> None:
        self._primary.set(key, value, ttl)
        if self._fallback:
            self._fallback.set(key, value, ttl)

# Usage
if __name__ == "__main__":
    redis = RedisStorage()
    memory = MemoryStorage()
    metrics = StatsDMetrics()
    
    cache = CacheService(
        storage=redis,
        fallback_storage=memory,
        metrics=metrics
    )
    
    # Will use Redis -> fallback to Memory
    data = cache.get("some_key")
    cache.set("important", {"data": 123}, timedelta(minutes=5))

[StatsD] Incrementing cache.attempt with tags {'key': 'some_key'}
[Redis] GET some_key
[Memory] GET some_key
[StatsD] Incrementing cache.miss with tags None
[Redis] SET important (TTL: 0:05:00)
[Memory] SET important (TTL: 0:05:00)


#### [12]

In [18]:
import requests
from typing import Optional, Callable, Any, Dict, Protocol
from unittest.mock import MagicMock
import time

# Mock server setup - This replaces the real API call
requests.get = MagicMock(return_value=MagicMock(
    status_code=200,
    json=lambda: {"id": "123", "data": "mock_response"}
))

# Interfaces
class Logger(Protocol):
    def log(self, message: str, level: str = "INFO") -> None: ...

class Cache(Protocol):
    def get(self, key: str) -> Optional[Any]: ...
    def set(self, key: str, value: Any, ttl: int = 0) -> None: ...

# Implementations
class ConsoleLogger:
    def log(self, message: str, level: str = "INFO") -> None:
        print(f"[{level}] {message}")

class MemoryCache:
    def __init__(self):
        self._store = {}
    
    def get(self, key: str) -> Optional[Any]:
        return self._store.get(key)
    
    def set(self, key: str, value: Any, ttl: int = 0) -> None:
        self._store[key] = value

# API Client with Composition
class JsonApiClient:
    def __init__(
        self,
        base_url: str,
        retry_strategy: Optional[Callable[[int], float]] = None,
        logger: Optional[Logger] = None,
        cache: Optional[Cache] = None
    ):
        self.base_url = base_url
        self.retry = retry_strategy or self._default_retry
        self.logger = logger
        self.cache = cache

    def _default_retry(self, attempt: int) -> float:
        return min(2 ** attempt, 5)

    def get_resource(self, resource_id: str) -> Dict[str, Any]:
        cache_key = f"{self.base_url}/resources/{resource_id}"
        
        if self.cache and (cached := self.cache.get(cache_key)):
            if self.logger:
                self.logger.log(f"Cache hit for {cache_key}")
            return cached

        attempt = 0
        while True:
            attempt += 1
            try:
                response = requests.get(f"{self.base_url}/resources/{resource_id}")
                response.raise_for_status()
                data = response.json()
                
                if self.cache:
                    self.cache.set(cache_key, data, ttl=300)
                
                return data
            except Exception as e:
                if attempt >= 3:
                    if self.logger:
                        self.logger.log(f"Final failure after {attempt} attempts", "ERROR")
                    raise
                
                delay = self.retry(attempt)
                if self.logger:
                    self.logger.log(f"Retry #{attempt} after {delay}s: {str(e)}", "WARNING")
                time.sleep(delay)

# Working Example
if __name__ == "__main__":
    client = JsonApiClient(
        base_url="https://mock-api.example.com",
        logger=ConsoleLogger(),
        cache=MemoryCache(),
        retry_strategy=lambda attempt: attempt * 0.1  # Shorter delays for demo
    )

    # First call - hits the mock API
    print("First call:", client.get_resource("123"))
    
    # Second call - returns cached result
    print("Cached call:", client.get_resource("123"))

First call: {'id': '123', 'data': 'mock_response'}
[INFO] Cache hit for https://mock-api.example.com/resources/123
Cached call: {'id': '123', 'data': 'mock_response'}


#### [13]

In [19]:
from abc import ABC, abstractmethod
from typing import List, Dict, Optional
from dataclasses import dataclass
import time

# Interfaces
class NotificationChannel(ABC):
    @abstractmethod
    def send(self, recipient: str, message: str) -> bool:
        pass

class AnalyticsTracker(ABC):
    @abstractmethod
    def track_event(self, event_name: str, metadata: Dict[str, Any]) -> None:
        pass

class RateLimiter(ABC):
    @abstractmethod
    def check_limit(self, key: str) -> bool:
        pass

# Implementations
class EmailChannel(NotificationChannel):
    def send(self, recipient: str, message: str) -> bool:
        print(f"Sending email to {recipient}: {message}")
        return True

class SMSChannel(NotificationChannel):
    def send(self, recipient: str, message: str) -> bool:
        print(f"Sending SMS to {recipient}: {message[:30]}...")
        return True

class GoogleAnalyticsTracker(AnalyticsTracker):
    def track_event(self, event_name: str, metadata: Dict[str, Any]) -> None:
        print(f"[Analytics] {event_name}: {metadata}")

class SlidingWindowRateLimiter(RateLimiter):
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests = {}

    def check_limit(self, key: str) -> bool:
        current_time = time.time()
        if key not in self.requests:
            self.requests[key] = []
        
        # Remove old requests
        self.requests[key] = [
            t for t in self.requests[key] 
            if current_time - t < self.window
        ]
        
        if len(self.requests[key]) >= self.max_requests:
            return False
            
        self.requests[key].append(current_time)
        return True

# Composed Notification Service
@dataclass
class NotificationService:
    channels: List[NotificationChannel]
    analytics: AnalyticsTracker
    rate_limiter: RateLimiter
    default_channel: NotificationChannel = None

    def __post_init__(self):
        self.default_channel = self.channels[0] if self.channels else None

    def send_notification(
        self,
        recipient: str,
        message: str,
        channel_type: Optional[str] = None
    ) -> bool:
        if not self.rate_limiter.check_limit(recipient):
            self.analytics.track_event("rate_limit_exceeded", {"recipient": recipient})
            return False

        channel = self.default_channel
        if channel_type:
            channel = next((c for c in self.channels if c.__class__.__name__.lower().startswith(channel_type)), None)

        if not channel:
            return False

        success = channel.send(recipient, message)
        self.analytics.track_event(
            "notification_sent",
            {
                "channel": channel.__class__.__name__,
                "recipient": recipient,
                "success": success
            }
        )
        return success

# Usage Example
if __name__ == "__main__":
    # Configure dependencies
    email = EmailChannel()
    sms = SMSChannel()
    analytics = GoogleAnalyticsTracker()
    rate_limiter = SlidingWindowRateLimiter(max_requests=2, window_seconds=60)

    # Compose the service
    notifier = NotificationService(
        channels=[email, sms],
        analytics=analytics,
        rate_limiter=rate_limiter
    )

    # Send notifications
    notifier.send_notification("user@example.com", "Welcome to our service!", "email")
    notifier.send_notification("+1234567890", "Your verification code is 123456", "sms")
    
    # This will hit rate limit
    notifier.send_notification("user@example.com", "Reminder: Your subscription is expiring")

Sending email to user@example.com: Welcome to our service!
[Analytics] notification_sent: {'channel': 'EmailChannel', 'recipient': 'user@example.com', 'success': True}
Sending SMS to +1234567890: Your verification code is 1234...
[Analytics] notification_sent: {'channel': 'SMSChannel', 'recipient': '+1234567890', 'success': True}
Sending email to user@example.com: Reminder: Your subscription is expiring
[Analytics] notification_sent: {'channel': 'EmailChannel', 'recipient': 'user@example.com', 'success': True}


#### [14]

In [21]:
from abc import ABC, abstractmethod
from typing import List, Dict, BinaryIO
import zipfile
import json
import csv
from io import StringIO, BytesIO
import time

# Interfaces
class DataFormatter(ABC):
    @abstractmethod
    def format(self, data: List[Dict]) -> bytes:
        pass

class CompressionStrategy(ABC):
    @abstractmethod
    def compress(self, data: bytes) -> bytes:
        pass

class ProgressTracker(ABC):
    @abstractmethod
    def update(self, progress: float) -> None:
        pass

# Implementations
class JSONFormatter(DataFormatter):
    def format(self, data: List[Dict]) -> bytes:
        return json.dumps(data, indent=2).encode('utf-8')

class CSVFormatter(DataFormatter):
    def format(self, data: List[Dict]) -> bytes:
        if not data:
            return b''
        
        output = StringIO()
        writer = csv.DictWriter(output, fieldnames=data[0].keys())
        writer.writeheader()
        writer.writerows(data)
        return output.getvalue().encode('utf-8')

class ZIPCompression(CompressionStrategy):
    def compress(self, data: bytes) -> bytes:
        output = BytesIO()
        with zipfile.ZipFile(output, 'w') as zipf:
            zipf.writestr('data', data)
        return output.getvalue()

class ConsoleProgressTracker(ProgressTracker):
    def update(self, progress: float) -> None:
        print(f"\rExport progress: {progress:.1%}", end='', flush=True)

# Composed File Exporter
class DataExporter:
    def __init__(
        self,
        formatter: DataFormatter,
        compressor: Optional[CompressionStrategy] = None,
        progress_tracker: Optional[ProgressTracker] = None
    ):
        self.formatter = formatter
        self.compressor = compressor
        self.progress = progress_tracker

    def export(self, data: List[Dict], output_stream: BinaryIO) -> None:
        total_steps = 3 if self.compressor else 2
        current_step = 1
        
        # Step 1: Format data
        if self.progress:
            self.progress.update(current_step/total_steps)
        formatted = self.formatter.format(data)
        current_step += 1
        
        # Step 2: Compress (if configured)
        if self.compressor:
            if self.progress:
                self.progress.update(current_step/total_steps)
            formatted = self.compressor.compress(formatted)
            current_step += 1
        
        # Step 3: Write output
        if self.progress:
            self.progress.update(1.0)
        output_stream.write(formatted)
        
        if self.progress:
            print()  # New line after progress

# Usage Example
if __name__ == "__main__":
    # Sample data
    sales_data = [
        {"id": 1, "product": "Laptop", "amount": 999.99},
        {"id": 2, "product": "Mouse", "amount": 19.99},
        {"id": 3, "product": "Keyboard", "amount": 49.99}
    ]

    # Configure export options
    json_exporter = DataExporter(
        formatter=JSONFormatter(),
        progress_tracker=ConsoleProgressTracker()
    )
    
    csv_compressed_exporter = DataExporter(
        formatter=CSVFormatter(),
        compressor=ZIPCompression(),
        progress_tracker=ConsoleProgressTracker()
    )

    # Perform exports
    print("Exporting JSON:")
    with open('sales.json', 'wb') as f:
        json_exporter.export(sales_data, f)
    
    print("\nExporting Compressed CSV:")
    with open('sales.csv.zip', 'wb') as f:
        csv_compressed_exporter.export(sales_data, f)

Exporting JSON:
Export progress: 100.0%

Exporting Compressed CSV:
Export progress: 100.0%


#### [15]

In [22]:
from abc import ABC, abstractmethod
from typing import List, Dict, Optional
import re
from dataclasses import dataclass
from pathlib import Path
import json

# Interfaces
class TextExtractor(ABC):
    @abstractmethod
    def extract_text(self, file_path: Path) -> str:
        pass

class TextAnalyzer(ABC):
    @abstractmethod
    def analyze(self, text: str) -> Dict[str, Any]:
        pass

class DocumentExporter(ABC):
    @abstractmethod
    def export(self, analysis: Dict[str, Any], output_path: Path) -> None:
        pass

# Implementations
class PDFExtractor(TextExtractor):
    def extract_text(self, file_path: Path) -> str:
        print(f"Extracting text from PDF: {file_path}")
        # In a real implementation, use PyPDF2 or pdfminer
        return "Sample PDF text with keywords: python, composition, OOP"

class TXTExtractor(TextExtractor):
    def extract_text(self, file_path: Path) -> str:
        print(f"Extracting text from TXT: {file_path}")
        return file_path.read_text()

class KeywordAnalyzer(TextAnalyzer):
    def __init__(self, keywords: List[str]):
        self.keywords = keywords
    
    def analyze(self, text: str) -> Dict[str, Any]:
        word_counts = {kw: len(re.findall(rf"\b{kw}\b", text.lower())) 
                      for kw in self.keywords}
        return {
            "word_counts": word_counts,
            "total_keywords": sum(word_counts.values())
        }

class SentimentAnalyzer(TextAnalyzer):
    def analyze(self, text: str) -> Dict[str, Any]:
        # Simplified sentiment analysis
        positive_words = {"good", "excellent", "happy"}
        negative_words = {"bad", "poor", "unhappy"}
        
        pos_count = sum(1 for word in text.lower().split() if word in positive_words)
        neg_count = sum(1 for word in text.lower().split() if word in negative_words)
        
        return {
            "positive_words": pos_count,
            "negative_words": neg_count,
            "sentiment": "positive" if pos_count > neg_count else "negative"
        }

class JSONExporter(DocumentExporter):
    def export(self, analysis: Dict[str, Any], output_path: Path) -> None:
        print(f"Exporting analysis to JSON: {output_path}")
        output_path.write_text(json.dumps(analysis, indent=2))

class MarkdownExporter(DocumentExporter):
    def export(self, analysis: Dict[str, Any], output_path: Path) -> None:
        print(f"Exporting analysis to Markdown: {output_path}")
        markdown = "## Document Analysis\n\n"
        markdown += "\n".join(f"- **{k}**: {v}" for k, v in analysis.items())
        output_path.write_text(markdown)

# Composed Document Processor
@dataclass
class DocumentProcessor:
    extractor: TextExtractor
    analyzers: List[TextAnalyzer]
    exporter: DocumentExporter
    
    def process(self, input_path: Path, output_path: Path) -> None:
        # Step 1: Extract text
        text = self.extractor.extract_text(input_path)
        
        # Step 2: Analyze text
        analysis = {}
        for analyzer in self.analyzers:
            analysis.update(analyzer.analyze(text))
        
        # Step 3: Export results
        self.exporter.export(analysis, output_path)

# Usage Example
if __name__ == "__main__":
    # Create sample text file
    sample_text = Path("sample.txt")
    sample_text.write_text("This document discusses Python design patterns. "
                         "Composition over inheritance is a good practice. "
                         "It makes your code more flexible and maintainable.")
    
    # Configure processor
    processor = DocumentProcessor(
        extractor=TXTExtractor(),
        analyzers=[
            KeywordAnalyzer(["python", "composition", "inheritance"]),
            SentimentAnalyzer()
        ],
        exporter=MarkdownExporter()
    )
    
    # Process document
    processor.process(sample_text, Path("analysis.md"))
    
    # Clean up
    sample_text.unlink()

Extracting text from TXT: sample.txt
Exporting analysis to Markdown: analysis.md


#### [16]

In [23]:
from abc import ABC, abstractmethod
from typing import List, Dict, Tuple, Optional
import numpy as np
from dataclasses import dataclass
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score

# Interfaces
class DataPreprocessor(ABC):
    @abstractmethod
    def preprocess(self, X: np.ndarray) -> np.ndarray:
        pass

class Model(ABC):
    @abstractmethod
    def train(self, X: np.ndarray, y: np.ndarray) -> None:
        pass
    
    @abstractmethod
    def predict(self, X: np.ndarray) -> np.ndarray:
        pass

class Evaluator(ABC):
    @abstractmethod
    def evaluate(self, y_true: np.ndarray, y_pred: np.ndarray) -> Dict[str, float]:
        pass

# Implementations
class StandardScalerPreprocessor(DataPreprocessor):
    def preprocess(self, X: np.ndarray) -> np.ndarray:
        print("Standardizing features...")
        return (X - X.mean(axis=0)) / X.std(axis=0)

class MinMaxPreprocessor(DataPreprocessor):
    def preprocess(self, X: np.ndarray) -> np.ndarray:
        print("Normalizing features...")
        return (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))

class LogisticRegressionModel(Model):
    def __init__(self, learning_rate: float = 0.01, n_iters: int = 1000):
        self.learning_rate = learning_rate
        self.n_iters = n_iters
        self.weights = None
        self.bias = None
        
    def train(self, X: np.ndarray, y: np.ndarray) -> None:
        n_samples, n_features = X.shape
        self.weights = np.zeros(n_features)
        self.bias = 0
        
        # Gradient descent
        for _ in range(self.n_iters):
            linear = np.dot(X, self.weights) + self.bias
            predictions = self._sigmoid(linear)
            
            dw = (1 / n_samples) * np.dot(X.T, (predictions - y))
            db = (1 / n_samples) * np.sum(predictions - y)
            
            self.weights -= self.learning_rate * dw
            self.bias -= self.learning_rate * db
    
    def predict(self, X: np.ndarray) -> np.ndarray:
        linear = np.dot(X, self.weights) + self.bias
        return (self._sigmoid(linear) > 0.5).astype(int)
    
    def _sigmoid(self, x: np.ndarray) -> np.ndarray:
        return 1 / (1 + np.exp(-x))

class ClassificationEvaluator(Evaluator):
    def evaluate(self, y_true: np.ndarray, y_pred: np.ndarray) -> Dict[str, float]:
        return {
            "accuracy": accuracy_score(y_true, y_pred),
            "precision": self._precision(y_true, y_pred),
            "recall": self._recall(y_true, y_pred)
        }
    
    def _precision(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        tp = np.sum((y_pred == 1) & (y_true == 1))
        fp = np.sum((y_pred == 1) & (y_true == 0))
        return tp / (tp + fp) if (tp + fp) > 0 else 0.0
    
    def _recall(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        tp = np.sum((y_pred == 1) & (y_true == 1))
        fn = np.sum((y_pred == 0) & (y_true == 1))
        return tp / (tp + fn) if (tp + fn) > 0 else 0.0

# Composed ML Pipeline
@dataclass
class MLPipeline:
    preprocessor: DataPreprocessor
    model: Model
    evaluator: Evaluator
    
    def run(self, X: np.ndarray, y: np.ndarray, test_size: float = 0.2) -> Dict[str, float]:
        # Split data
        split_idx = int(len(X) * (1 - test_size))
        X_train, y_train = X[:split_idx], y[:split_idx]
        X_test, y_test = X[split_idx:], y[split_idx:]
        
        # Preprocess data
        X_train = self.preprocessor.preprocess(X_train)
        X_test = self.preprocessor.preprocess(X_test)
        
        # Train model
        self.model.train(X_train, y_train)
        
        # Evaluate
        y_pred = self.model.predict(X_test)
        return self.evaluator.evaluate(y_test, y_pred)

# Usage Example
if __name__ == "__main__":
    # Generate synthetic data
    X, y = make_classification(n_samples=1000, n_features=10, random_state=42)
    
    # Configure pipeline
    pipeline = MLPipeline(
        preprocessor=StandardScalerPreprocessor(),
        model=LogisticRegressionModel(learning_rate=0.1, n_iters=2000),
        evaluator=ClassificationEvaluator()
    )
    
    # Run pipeline
    results = pipeline.run(X, y)
    print("\nModel Evaluation Results:")
    for metric, value in results.items():
        print(f"{metric}: {value:.4f}")

Standardizing features...
Standardizing features...

Model Evaluation Results:
accuracy: 0.8900
precision: 0.8776
recall: 0.8958


In [None]:
#### [17]

In [24]:
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
from datetime import datetime
import time
import random

# Interfaces
class Task(ABC):
    @abstractmethod
    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        pass

class Condition(ABC):
    @abstractmethod
    def evaluate(self, context: Dict[str, Any]) -> bool:
        pass

class Logger(ABC):
    @abstractmethod
    def log(self, message: str, level: str = "INFO") -> None:
        pass

# Implementations
class DataFetchTask(Task):
    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        print("Fetching data from API...")
        time.sleep(1)
        return {"raw_data": [random.randint(1, 100) for _ in range(5)]}

class DataTransformTask(Task):
    def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        print("Transforming data...")
        raw = context["raw_data"]
        return {
            "transformed_data": [x * 2 for x in raw],
            "stats": {
                "mean": sum(raw) / len(raw),
                "max": max(raw)
            }
        }

class ThresholdCondition(Condition):
    def __init__(self, threshold: float):
        self.threshold = threshold
        
    def evaluate(self, context: Dict[str, Any]) -> bool:
        stats = context.get("stats", {})
        return stats.get("mean", 0) > self.threshold

class ConsoleLogger(Logger):
    def log(self, message: str, level: str = "INFO") -> None:
        timestamp = datetime.now().isoformat()
        print(f"[{timestamp}][{level}] {message}")

# Composed Workflow Engine
class WorkflowEngine:
    def __init__(
        self,
        tasks: List[Task],
        conditions: Optional[List[Condition]] = None,
        logger: Optional[Logger] = None
    ):
        self.tasks = tasks
        self.conditions = conditions or []
        self.logger = logger
        
    def run(self, initial_context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        context = initial_context or {}
        
        if self.logger:
            self.logger.log("Workflow started")
        
        for task in self.tasks:
            try:
                if self.logger:
                    self.logger.log(f"Executing {task.__class__.__name__}")
                
                start_time = time.time()
                result = task.execute(context)
                context.update(result)
                
                if self.logger:
                    duration = time.time() - start_time
                    self.logger.log(f"Task completed in {duration:.2f}s")
                
                # Check conditions after each task
                if not all(cond.evaluate(context) for cond in self.conditions):
                    if self.logger:
                        self.logger.log("Condition failed, aborting workflow", "WARNING")
                    break
                    
            except Exception as e:
                if self.logger:
                    self.logger.log(f"Task failed: {str(e)}", "ERROR")
                raise
        
        if self.logger:
            self.logger.log("Workflow completed")
        
        return context

# Usage Example
if __name__ == "__main__":
    # Configure workflow
    engine = WorkflowEngine(
        tasks=[
            DataFetchTask(),
            DataTransformTask()
        ],
        conditions=[
            ThresholdCondition(threshold=30)
        ],
        logger=ConsoleLogger()
    )
    
    # Execute workflow
    print("Running workflow:")
    result = engine.run()
    
    print("\nFinal Context:")
    for k, v in result.items():
        print(f"{k}: {v}")

Running workflow:
[2025-04-12T05:45:27.719467][INFO] Workflow started
[2025-04-12T05:45:27.719505][INFO] Executing DataFetchTask
Fetching data from API...
[2025-04-12T05:45:28.721785][INFO] Task completed in 1.00s
[2025-04-12T05:45:28.722388][INFO] Workflow completed

Final Context:
raw_data: [57, 5, 10, 21, 73]


#### [17]

In [25]:
from abc import ABC, abstractmethod
from typing import Dict, Any, Callable, Optional
from dataclasses import dataclass
import threading
import queue
import time
import uuid
import json

# Interfaces
class TaskQueue(ABC):
    @abstractmethod
    def enqueue(self, task: Dict[str, Any]) -> str:
        pass
    
    @abstractmethod
    def dequeue(self) -> Optional[Dict[str, Any]]:
        pass

class ResultStore(ABC):
    @abstractmethod
    def store_result(self, task_id: str, result: Dict[str, Any]) -> None:
        pass
    
    @abstractmethod
    def get_result(self, task_id: str) -> Optional[Dict[str, Any]]:
        pass

class Worker(ABC):
    @abstractmethod
    def process(self, task: Dict[str, Any]) -> Dict[str, Any]:
        pass

# Implementations
class InMemoryQueue(TaskQueue):
    def __init__(self):
        self._queue = queue.Queue()
        self._published_ids = set()
    
    def enqueue(self, task: Dict[str, Any]) -> str:
        task_id = str(uuid.uuid4())
        self._queue.put({"id": task_id, **task})
        self._published_ids.add(task_id)
        return task_id
    
    def dequeue(self) -> Optional[Dict[str, Any]]:
        try:
            return self._queue.get_nowait()
        except queue.Empty:
            return None

class RedisResultStore(ResultStore):
    def __init__(self):
        # Simulating Redis with a dict
        self._store = {}
    
    def store_result(self, task_id: str, result: Dict[str, Any]) -> None:
        self._store[task_id] = {
            "status": "completed",
            "result": result,
            "timestamp": time.time()
        }
    
    def get_result(self, task_id: str) -> Optional[Dict[str, Any]]:
        return self._store.get(task_id)

class TaskWorker(Worker):
    def __init__(self, task_handlers: Dict[str, Callable]):
        self.handlers = task_handlers
    
    def process(self, task: Dict[str, Any]) -> Dict[str, Any]:
        handler = self.handlers.get(task["type"])
        if not handler:
            return {"error": f"No handler for task type {task['type']}"}
        
        try:
            result = handler(task["payload"])
            return {"success": True, "data": result}
        except Exception as e:
            return {"success": False, "error": str(e)}

# Composed Distributed Task System
class DistributedTaskSystem:
    def __init__(
        self,
        queue: TaskQueue,
        result_store: ResultStore,
        worker: Worker,
        num_workers: int = 3
    ):
        self.queue = queue
        self.result_store = result_store
        self.worker = worker
        self.workers = []
        self._shutdown = False
        
        for i in range(num_workers):
            t = threading.Thread(
                target=self._worker_loop,
                name=f"Worker-{i+1}",
                daemon=True
            )
            self.workers.append(t)
    
    def start(self) -> None:
        for worker in self.workers:
            worker.start()
    
    def shutdown(self) -> None:
        self._shutdown = True
        for worker in self.workers:
            worker.join()
    
    def _worker_loop(self) -> None:
        while not self._shutdown:
            task = self.queue.dequeue()
            if task is None:
                time.sleep(0.1)
                continue
            
            result = self.worker.process(task)
            self.result_store.store_result(task["id"], result)
    
    def submit_task(self, task_type: str, payload: Dict[str, Any]) -> str:
        return self.queue.enqueue({
            "type": task_type,
            "payload": payload
        })

# Usage Example
if __name__ == "__main__":
    # Define task handlers
    def process_image(payload: Dict[str, Any]) -> Dict[str, Any]:
        print(f"Processing image: {payload['filename']}")
        time.sleep(1)  # Simulate work
        return {"size": (800, 600), "format": "jpeg"}
    
    def analyze_data(payload: Dict[str, Any]) -> Dict[str, Any]:
        print(f"Analyzing dataset: {payload['dataset']}")
        time.sleep(2)  # Simulate work
        return {"mean": 42.5, "std_dev": 3.14}
    
    # Configure system
    system = DistributedTaskSystem(
        queue=InMemoryQueue(),
        result_store=RedisResultStore(),
        worker=TaskWorker({
            "process_image": process_image,
            "analyze_data": analyze_data
        }),
        num_workers=2
    )
    
    # Start workers
    system.start()
    
    # Submit tasks
    tasks = [
        ("process_image", {"filename": "photo1.jpg"}),
        ("analyze_data", {"dataset": "sales_q1"}),
        ("process_image", {"filename": "photo2.png"}),
        ("analyze_data", {"dataset": "inventory"})
    ]
    
    task_ids = [system.submit_task(t[0], t[1]) for t in tasks]
    
    # Monitor results
    while True:
        all_done = True
        for task_id in task_ids:
            result = system.result_store.get_result(task_id)
            if result is None:
                all_done = False
                continue
            print(f"Task {task_id} result: {json.dumps(result, indent=2)}")
        
        if all_done:
            break
        
        time.sleep(0.5)
    
    system.shutdown()

Processing image: photo1.jpgAnalyzing dataset: sales_q1

Processing image: photo2.png
Task be084208-4201-4d35-933d-83e7397ec257 result: {
  "status": "completed",
  "result": {
    "success": true,
    "data": {
      "size": [
        800,
        600
      ],
      "format": "jpeg"
    }
  },
  "timestamp": 1744424417.365863
}
Task be084208-4201-4d35-933d-83e7397ec257 result: {
  "status": "completed",
  "result": {
    "success": true,
    "data": {
      "size": [
        800,
        600
      ],
      "format": "jpeg"
    }
  },
  "timestamp": 1744424417.365863
}
Analyzing dataset: inventory
Task be084208-4201-4d35-933d-83e7397ec257 result: {
  "status": "completed",
  "result": {
    "success": true,
    "data": {
      "size": [
        800,
        600
      ],
      "format": "jpeg"
    }
  },
  "timestamp": 1744424417.365863
}
Task b0c602dc-c730-43a8-a545-3df3a1d3b8cd result: {
  "status": "completed",
  "result": {
    "success": true,
    "data": {
      "mean": 42.5,
    

#### [18]

In [26]:
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import random
import time
from threading import Thread
from queue import Queue

# Interfaces
class DeviceProtocol(ABC):
    @abstractmethod
    def connect(self) -> bool:
        pass
    
    @abstractmethod
    def send_command(self, command: str) -> str:
        pass

class DataParser(ABC):
    @abstractmethod
    def parse(self, raw_data: str) -> Dict[str, float]:
        pass

class AlertHandler(ABC):
    @abstractmethod
    def trigger_alert(self, device_id: str, message: str) -> None:
        pass

# Implementations
class MQTTProtocol(DeviceProtocol):
    def __init__(self, broker: str):
        self.broker = broker
        self.connected = False
    
    def connect(self) -> bool:
        print(f"Connecting to MQTT broker at {self.broker}...")
        time.sleep(0.5)
        self.connected = True
        return True
    
    def send_command(self, command: str) -> str:
        return f"MQTT_ACK:{command}"

class CoAPProtocol(DeviceProtocol):
    def connect(self) -> bool:
        print("Establishing CoAP connection...")
        return True
    
    def send_command(self, command: str) -> str:
        return f"CoAP_RES:{command.upper()}"

class JSONParser(DataParser):
    def parse(self, raw_data: str) -> Dict[str, float]:
        # Simulate JSON parsing
        return {
            "temperature": random.uniform(20, 30),
            "humidity": random.uniform(40, 60),
            "voltage": random.uniform(3.2, 3.8)
        }

class CSVDataParser(DataParser):
    def parse(self, raw_data: str) -> Dict[str, float]:
        # Simulate CSV parsing
        values = raw_data.split(",")
        return {
            "temp": float(values[0]),
            "press": float(values[1]),
            "light": float(values[2])
        }

class EmailAlertHandler(AlertHandler):
    def trigger_alert(self, device_id: str, message: str) -> None:
        print(f"ALERT: Email sent for device {device_id} - {message}")

class SMSAlertHandler(AlertHandler):
    def trigger_alert(self, device_id: str, message: str) -> None:
        print(f"ALERT: SMS sent for device {device_id} - {message}")

# Composed IoT Device
class IoTDevice:
    def __init__(
        self,
        device_id: str,
        protocol: DeviceProtocol,
        parser: DataParser,
        alert_handlers: Optional[List[AlertHandler]] = None,
        health_check_interval: int = 60
    ):
        self.device_id = device_id
        self.protocol = protocol
        self.parser = parser
        self.alert_handlers = alert_handlers or []
        self.health_check_interval = health_check_interval
        self._data_queue = Queue()
        self._monitor_thread = Thread(target=self._monitor_device, daemon=True)
    
    def connect(self) -> bool:
        return self.protocol.connect()
    
    def send_command(self, command: str) -> str:
        return self.protocol.send_command(command)
    
    def process_data(self, raw_data: str) -> Dict[str, float]:
        return self.parser.parse(raw_data)
    
    def add_alert_handler(self, handler: AlertHandler) -> None:
        self.alert_handlers.append(handler)
    
    def start_monitoring(self) -> None:
        self._monitor_thread.start()
    
    def _monitor_device(self) -> None:
        while True:
            # Simulate device monitoring
            time.sleep(self.health_check_interval)
            status = self._check_device_health()
            
            if not status["healthy"]:
                for handler in self.alert_handlers:
                    handler.trigger_alert(
                        self.device_id,
                        f"Device unhealthy: {status['message']}"
                    )
    
    def _check_device_health(self) -> Dict[str, Any]:
        # Simulate health check (20% chance of failure)
        if random.random() < 0.2:
            return {"healthy": False, "message": "High temperature"}
        return {"healthy": True, "message": "OK"}

# Device Manager
class IoTDeviceManager:
    def __init__(self):
        self.devices: Dict[str, IoTDevice] = {}
    
    def register_device(self, device: IoTDevice) -> None:
        if device.connect():
            self.devices[device.device_id] = device
            device.start_monitoring()
    
    def send_command_to_device(self, device_id: str, command: str) -> Optional[str]:
        device = self.devices.get(device_id)
        if device:
            return device.send_command(command)
        return None
    
    def get_device_status(self) -> Dict[str, Dict[str, Any]]:
        return {
            dev_id: {
                "connected": True,
                "last_seen": time.time()
            }
            for dev_id in self.devices.keys()
        }

# Usage Example
if __name__ == "__main__":
    # Create device manager
    manager = IoTDeviceManager()

    # Create devices with different configurations
    sensor1 = IoTDevice(
        device_id="sensor-001",
        protocol=MQTTProtocol("mqtt.iot-server.com"),
        parser=JSONParser(),
        alert_handlers=[EmailAlertHandler()]
    )

    sensor2 = IoTDevice(
        device_id="sensor-002",
        protocol=CoAPProtocol(),
        parser=CSVDataParser(),
        alert_handlers=[SMSAlertHandler(), EmailAlertHandler()],
        health_check_interval=30
    )

    # Register devices
    manager.register_device(sensor1)
    manager.register_device(sensor2)

    # Simulate operations
    print("Sending commands to devices:")
    print("Sensor1 response:", manager.send_command_to_device("sensor-001", "read_temp"))
    print("Sensor2 response:", manager.send_command_to_device("sensor-002", "get_status"))

    # Process sample data
    print("\nProcessing data:")
    print("Sensor1 parsed:", sensor1.process_data('{"temp":22.5,"humidity":55}'))
    print("Sensor2 parsed:", sensor2.process_data("23.5,1013.25,780"))

    # Monitor for alerts (run for 2 minutes)
    print("\nMonitoring devices (wait 2 minutes for potential alerts)...")
    time.sleep(120)

Connecting to MQTT broker at mqtt.iot-server.com...
Establishing CoAP connection...
Sending commands to devices:
Sensor1 response: MQTT_ACK:read_temp
Sensor2 response: CoAP_RES:GET_STATUS

Processing data:
Sensor1 parsed: {'temperature': 26.57112620002397, 'humidity': 46.33726584168778, 'voltage': 3.33451292408693}
Sensor2 parsed: {'temp': 23.5, 'press': 1013.25, 'light': 780.0}

Monitoring devices (wait 2 minutes for potential alerts)...
ALERT: SMS sent for device sensor-002 - Device unhealthy: High temperature
ALERT: Email sent for device sensor-002 - Device unhealthy: High temperature
ALERT: Email sent for device sensor-001 - Device unhealthy: High temperature
ALERT: SMS sent for device sensor-002 - Device unhealthy: High temperature
ALERT: Email sent for device sensor-002 - Device unhealthy: High temperature
ALERT: SMS sent for device sensor-002 - Device unhealthy: High temperature
ALERT: Email sent for device sensor-002 - Device unhealthy: High temperature
ALERT: SMS sent for devi

#### [19]

In [28]:
from abc import ABC, abstractmethod
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass
import threading
import queue
import random
import time
from decimal import Decimal, getcontext


# Interfaces
class MarketDataFeed(ABC):
    @abstractmethod
    def connect(self) -> bool:
        pass
    
    @abstractmethod
    def subscribe(self, symbol: str) -> None:
        pass
    
    @abstractmethod
    def get_next_tick(self) -> Optional[Dict[str, Any]]:
        pass

class OrderExecution(ABC):
    @abstractmethod
    def send_order(self, order: Dict[str, Any]) -> str:
        pass
    
    @abstractmethod
    def cancel_order(self, order_id: str) -> bool:
        pass

class RiskManager(ABC):
    @abstractmethod
    def check_order(self, order: Dict[str, Any], portfolio: Dict[str, Any]) -> Tuple[bool, str]:
        pass

class Strategy(ABC):
    @abstractmethod
    def on_market_data(self, tick: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        pass

# Implementations
class BinanceMarketData(MarketDataFeed):
    def __init__(self):
        self._connected = False
        self._symbols = set()
        self._tick_queue = queue.Queue()
    
    def connect(self) -> bool:
        print("Connecting to Binance WebSocket...")
        self._connected = True
        return True
    
    def subscribe(self, symbol: str) -> None:
        self._symbols.add(symbol)
        print(f"Subscribed to {symbol} market data")
    
    def get_next_tick(self) -> Optional[Dict[str, Any]]:
        if not self._connected:
            return None
        
        # Simulate market data
        if random.random() > 0.7:  # 30% chance of new tick
            symbol = random.choice(list(self._symbols))
            return {
                "symbol": symbol,
                "price": Decimal(random.uniform(100, 200)).quantize(Decimal("0.01")),
                "quantity": Decimal(random.uniform(1, 10)).quantize(Decimal("0.0001")),
                "timestamp": int(time.time() * 1000)
            }
        return None

class AlpacaExecution(OrderExecution):
    def __init__(self):
        self._orders = {}
    
    def send_order(self, order: Dict[str, Any]) -> str:
        order_id = f"order_{int(time.time() * 1000)}"
        self._orders[order_id] = {
            **order,
            "status": "filled" if random.random() > 0.1 else "rejected"
        }
        print(f"Sent order {order_id} to Alpaca")
        return order_id
    
    def cancel_order(self, order_id: str) -> bool:
        if order_id in self._orders:
            self._orders[order_id]["status"] = "cancelled"
            return True
        return False

class BasicRiskManager(RiskManager):
    def __init__(self, max_position_size: Decimal = Decimal("10000")):
        self.max_position_size = max_position_size
    
    def check_order(self, order: Dict[str, Any], portfolio: Dict[str, Any]) -> Tuple[bool, str]:
        notional = order["price"] * order["quantity"]
        if notional > self.max_position_size:
            return False, f"Order size {notional} exceeds max {self.max_position_size}"
        return True, ""


class MeanReversionStrategy(Strategy):
    def __init__(self, symbol: str, lookback: int = 10):
        self.symbol = symbol
        self.lookback = lookback
        self.price_history = []
        # Set decimal precision
        getcontext().prec = 6
    
    def on_market_data(self, tick: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        if tick["symbol"] != self.symbol:
            return None
        
        # Convert price to Decimal for consistent arithmetic
        current_price = Decimal(str(tick["price"]))
        self.price_history.append(current_price)
        
        if len(self.price_history) > self.lookback:
            self.price_history.pop(0)
        
        if len(self.price_history) == self.lookback:
            avg_price = sum(self.price_history) / Decimal(self.lookback)
            
            if current_price < avg_price * Decimal("0.98"):
                return {
                    "symbol": self.symbol,
                    "side": "BUY",
                    "quantity": Decimal("1"),
                    "type": "LIMIT",
                    "price": current_price
                }
            elif current_price > avg_price * Decimal("1.02"):
                return {
                    "symbol": self.symbol,
                    "side": "SELL",
                    "quantity": Decimal("1"),
                    "type": "LIMIT",
                    "price": current_price
                }
        return None


# Composed Trading Engine
class TradingEngine:
    def __init__(
        self,
        data_feeds: List[MarketDataFeed],
        execution: OrderExecution,
        risk_manager: RiskManager,
        strategies: List[Strategy],
        portfolio: Dict[str, Any]
    ):
        self.data_feeds = data_feeds
        self.execution = execution
        self.risk_manager = risk_manager
        self.strategies = strategies
        self.portfolio = portfolio
        self._running = False
        self._engine_thread = threading.Thread(target=self._run_engine, daemon=True)
    
    def start(self) -> None:
        print("Starting trading engine...")
        for feed in self.data_feeds:
            feed.connect()
        self._running = True
        self._engine_thread.start()
    
    def stop(self) -> None:
        self._running = False
        self._engine_thread.join()
        print("Trading engine stopped")
    
    def _run_engine(self) -> None:
        while self._running:
            # Process market data
            for feed in self.data_feeds:
                tick = feed.get_next_tick()
                if tick:
                    self._process_tick(tick)
            
            time.sleep(0.1)  # Prevent CPU overload
    
    def _process_tick(self, tick: Dict[str, Any]) -> None:
        # Run strategies
        for strategy in self.strategies:
            order = strategy.on_market_data(tick)
            if order:
                # Check risk
                approved, reason = self.risk_manager.check_order(order, self.portfolio)
                if approved:
                    order_id = self.execution.send_order(order)
                    print(f"Executed order {order_id}")
                else:
                    print(f"Order rejected: {reason}")

# Usage Example
if __name__ == "__main__":
    # Setup components
    binance_feed = BinanceMarketData()
    binance_feed.subscribe("BTCUSDT")
    
    alpaca_exec = AlpacaExecution()
    risk_mgr = BasicRiskManager(max_position_size=Decimal("5000"))
    strategy = MeanReversionStrategy(symbol="BTCUSDT")
    
    # Create trading engine
    engine = TradingEngine(
        data_feeds=[binance_feed],
        execution=alpaca_exec,
        risk_manager=risk_mgr,
        strategies=[strategy],
        portfolio={"USD": Decimal("10000"), "BTC": Decimal("0")}
    )
    
    # Run for 30 seconds
    try:
        engine.start()
        time.sleep(30)
    finally:
        engine.stop()

Subscribed to BTCUSDT market data
Starting trading engine...
Connecting to Binance WebSocket...
Sent order order_1744425590522 to Alpaca
Executed order order_1744425590522
Sent order order_1744425590826 to Alpaca
Executed order order_1744425590826
Sent order order_1744425591128 to Alpaca
Executed order order_1744425591128
Sent order order_1744425591332 to Alpaca
Executed order order_1744425591332
Sent order order_1744425592140 to Alpaca
Executed order order_1744425592140
Sent order order_1744425592946 to Alpaca
Executed order order_1744425592946
Sent order order_1744425594162 to Alpaca
Executed order order_1744425594162
Sent order order_1744425594566 to Alpaca
Executed order order_1744425594566
Sent order order_1744425594869 to Alpaca
Executed order order_1744425594869
Sent order order_1744425595074 to Alpaca
Executed order order_1744425595074
Sent order order_1744425595175 to Alpaca
Executed order order_1744425595175
Sent order order_1744425595278 to Alpaca
Executed order order_174442