# S - Single Responsibility Principle (SRP)

In [None]:
from typing import List, Dict
from datetime import datetime
import uuid
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, filename='order_processing.log',
                    format='%(asctime)s - %(levelname)s - %(message)s')

class Order:
    """Class responsible solely for managing order data"""
    def __init__(self, customer_id: str, items: List[Dict[str, float]]):
        self._order_id = str(uuid.uuid4())
        self._customer_id = customer_id
        self._items = items  # List of dicts with item name and price
        self._created_at = datetime.now()
        self._status = "PENDING"

    @property
    def order_id(self) -> str:
        return self._order_id

    @property
    def total(self) -> float:
        return sum(item["price"] for item in self._items)

    @property
    def status(self) -> str:
        return self._status

    def update_status(self, new_status: str):
        """Update order status"""
        self._status = new_status

    def get_details(self) -> Dict[str, any]:
        """Return order details"""
        return {
            "order_id": self._order_id,
            "customer_id": self._customer_id,
            "items": self._items,
            "total": self.total,
            "status": self._status,
            "created_at": self._created_at.isoformat()
        }

In [None]:
class PaymentProcessor:
    """Class responsible solely for handling payments"""
    def __init__(self):
        self._transactions: Dict[str, float] = {}

    def process_payment(self, order: Order, payment_method: str) -> bool:
        """Process payment for an order"""
        if order.total <= 0:
            return False
        
        # Simulate payment processing
        success = self._simulate_payment(order.total, payment_method)
        if success:
            self._transactions[order.order_id] = order.total
            order.update_status("PAID")
        return success

    def refund(self, order: Order) -> bool:
        """Process refund for an order"""
        if order.order_id in self._transactions:
            success = self._simulate_refund(order.total)
            if success:
                del self._transactions[order.order_id]
                order.update_status("REFUNDED")
            return success
        return False

    def _simulate_payment(self, amount: float, method: str) -> bool:
        """Simulate payment processing"""
        # In real scenario, this would call an external payment gateway
        return True if amount > 0 else False

    def _simulate_refund(self, amount: float) -> bool:
        """Simulate refund processing"""
        return True

In [None]:
class NotificationService:
    """Class responsible solely for sending notifications"""
    def send_order_confirmation(self, order: Order, email: str):
        """Send order confirmation to customer"""
        message = (
            f"Order {order.order_id} confirmed!\n"
            f"Total: ${order.total:.2f}\n"
            f"Items: {', '.join(item['name'] for item in order.get_details()['items'])}"
        )
        self._send_email(email, "Order Confirmation", message)

    def send_payment_failure(self, order: Order, email: str):
        """Send payment failure notification"""
        message = f"Payment failed for Order {order.order_id}. Total: ${order.total:.2f}"
        self._send_email(email, "Payment Failure", message)

    def _send_email(self, to: str, subject: str, body: str):
        """Simulate email sending"""
        # In real scenario, this would use an email service
        print(f"Sending email to {to}: Subject: {subject}\n{body}")

In [None]:
class OrderLogger:
    """Class responsible solely for logging order-related events"""
    def __init__(self):
        self._logger = logging.getLogger("OrderLogger")

    def log_order_creation(self, order: Order):
        """Log order creation"""
        self._logger.info(f"Order created: {order.order_id} for customer {order.get_details()['customer_id']}")

    def log_payment_status(self, order: Order, success: bool):
        """Log payment status"""
        status = "successful" if success else "failed"
        self._logger.info(f"Payment {status} for Order {order.order_id} - Total: ${order.total:.2f}")

    def log_status_change(self, order: Order, old_status: str):
        """Log status changes"""
        self._logger.info(f"Order {order.order_id} status changed from {old_status} to {order.status}")

In [None]:
class OrderManager:
    """Orchestrates the order processing workflow"""
    def __init__(self):
        self.payment_processor = PaymentProcessor()
        self.notification_service = NotificationService()
        self.logger = OrderLogger()

    def create_and_process_order(self, customer_id: str, items: List[Dict[str, float]], 
                               email: str, payment_method: str) -> Order:
        # Create order
        order = Order(customer_id, items)
        self.logger.log_order_creation(order)

        # Process payment
        payment_success = self.payment_processor.process_payment(order, payment_method)
        self.logger.log_payment_status(order, payment_success)

        # Send notification
        if payment_success:
            self.notification_service.send_order_confirmation(order, email)
        else:
            self.notification_service.send_payment_failure(order, email)
            order.update_status("FAILED")

        return order

In [None]:
# Demonstration
def main():
    # Sample items
    items = [
        {"name": "Laptop", "price": 999.99},
        {"name": "Mouse", "price": 29.99}
    ]

    # Create order manager
    manager = OrderManager()

    # Process an order
    order = manager.create_and_process_order(
        customer_id="CUST123",
        items=items,
        email="customer@example.com",
        payment_method="CreditCard"
    )

    print("\nOrder Details:")
    for key, value in order.get_details().items():
        print(f"{key}: {value}")

    # Simulate a refund
    old_status = order.status
    if manager.payment_processor.refund(order):
        manager.logger.log_status_change(order, old_status)

if __name__ == "__main__":
    main()

# O - Open/Closed Principle (OCP) 

In [None]:
from abc import ABC, abstractmethod
from typing import List, Dict
from datetime import datetime
import json
import csv

# Abstract base class for report generators
class ReportGenerator(ABC):
    @abstractmethod
    def generate(self, data: List[Dict]) -> str:
        """Generate a report from the provided data"""
        pass

    @abstractmethod
    def get_file_extension(self) -> str:
        """Return the file extension for this report type"""
        pass

# Concrete class for JSON report
class JSONReportGenerator(ReportGenerator):
    def generate(self, data: List[Dict]) -> str:
        """Generate a JSON formatted report"""
        report = {
            "timestamp": datetime.now().isoformat(),
            "record_count": len(data),
            "data": data
        }
        return json.dumps(report, indent=2)

    def get_file_extension(self) -> str:
        return ".json"

# Concrete class for CSV report
class CSVReportGenerator(ReportGenerator):
    def generate(self, data: List[Dict]) -> str:
        """Generate a CSV formatted report"""
        if not data:
            return ""
        
        # Get headers from first record
        headers = list(data[0].keys())
        output = [",".join(headers)]
        
        # Add data rows
        for record in data:
            row = [str(record.get(header, "")) for header in headers]
            output.append(",".join(row))
        
        return "\n".join(output)

    def get_file_extension(self) -> str:
        return ".csv"

# Concrete class for HTML report
class HTMLReportGenerator(ReportGenerator):
    def generate(self, data: List[Dict]) -> str:
        """Generate an HTML formatted report"""
        if not data:
            return "<html><body>No data</body></html>"
        
        headers = list(data[0].keys())
        html = [
            "<html>",
            "<head><style>",
            "table { border-collapse: collapse; }",
            "th, td { border: 1px solid black; padding: 8px; }",
            "</style></head>",
            "<body>",
            f"<h2>Report - {datetime.now().isoformat()}</h2>",
            "<table>",
            "<tr>"
        ]
        
        # Headers
        html.extend(f"<th>{header}</th>" for header in headers)
        html.append("</tr>")
        
        # Data rows
        for record in data:
            html.append("<tr>")
            html.extend(f"<td>{record.get(header, '')}</td>" for header in headers)
            html.append("</tr>")
        
        html.extend(["</table>", "</body>", "</html>"])
        return "\n".join(html)

    def get_file_extension(self) -> str:
        return ".html"

# Report manager class
class ReportManager:
    def __init__(self):
        self._generators: List[ReportGenerator] = []

    def add_generator(self, generator: ReportGenerator):
        """Add a new report generator (open for extension)"""
        self._generators.append(generator)

    def generate_reports(self, data: List[Dict], output_dir: str = "./"):
        """Generate all registered reports without modifying this method"""
        results = []
        for generator in self._generators:
            report_content = generator.generate(data)
            file_ext = generator.get_file_extension()
            filename = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}{file_ext}"
            filepath = f"{output_dir}{filename}"
            
            with open(filepath, "w", encoding="utf-8") as f:
                f.write(report_content)
            
            results.append(f"Generated {filepath}")
        return results

    def get_supported_formats(self) -> List[str]:
        """Return list of supported file extensions"""
        return [gen.get_file_extension() for gen in self._generators]


# Demonstration
def main():
    # Sample data
    sales_data = [
        {"product": "Laptop", "price": 999.99, "quantity": 5},
        {"product": "Mouse", "price": 29.99, "quantity": 10},
        {"product": "Keyboard", "price": 59.99, "quantity": 8}
    ]

    # Create report manager
    report_manager = ReportManager()

    # Add report generators (extending functionality)
    report_manager.add_generator(JSONReportGenerator())
    report_manager.add_generator(CSVReportGenerator())
    report_manager.add_generator(HTMLReportGenerator())

    # Generate reports
    print("Supported formats:", report_manager.get_supported_formats())
    results = report_manager.generate_reports(sales_data)
    
    print("\nReport generation results:")
    for result in results:
        print(result)

    # Simulate adding a new report type without modifying ReportManager
    class XMLReportGenerator(ReportGenerator):
        def generate(self, data: List[Dict]) -> str:
            lines = ["<?xml version='1.0' encoding='UTF-8'?>"]
            lines.append(f"<report timestamp='{datetime.now().isoformat()}'>")
            for record in data:
                lines.append("  <item>")
                for key, value in record.items():
                    lines.append(f"    <{key}>{value}</{key}>")
                lines.append("  </item>")
            lines.append("</report>")
            return "\n".join(lines)

        def get_file_extension(self) -> str:
            return ".xml"

    # Add new generator without changing existing code
    report_manager.add_generator(XMLReportGenerator())
    print("\nUpdated supported formats:", report_manager.get_supported_formats())
    
    # Generate reports with new type
    more_results = report_manager.generate_reports(sales_data)
    print("\nNew report generation results:")
    for result in more_results:
        print(result)


if __name__ == "__main__":
    main()

In [None]:
class MarkdownReportGenerator(ReportGenerator):
    def generate(self, data: List[Dict]) -> str:
        lines = [f"# Sales Report - {datetime.now().isoformat()}", ""]
        lines.append("| " + " | ".join(data[0].keys()) + " |")
        lines.append("| " + " | ".join(["---"] * len(data[0])) + " |")
        for record in data:
            lines.append("| " + " | ".join(str(value) for value in record.values()) + " |")
        return "\n".join(lines)

    def get_file_extension(self) -> str:
        return ".md"

# L - Liskov Substitution Principle (LSP)

In [None]:
from abc import ABC, abstractmethod
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import time

# Abstract base class for playable media
class PlayableMedia(ABC):
    def __init__(self, title: str, duration: float):  # duration in seconds
        self._title = title
        self._duration = duration
        self._is_playing = False
        self._start_time: Optional[datetime] = None

    @property
    def title(self) -> str:
        return self._title

    @property
    def duration(self) -> float:
        return self._duration

    @abstractmethod
    def play(self) -> str:
        """Start playing the media"""
        pass

    @abstractmethod
    def stop(self) -> str:
        """Stop playing the media"""
        pass

    def get_remaining_time(self) -> float:
        """Calculate remaining playback time"""
        if not self._is_playing or not self._start_time:
            return self._duration
        
        elapsed = (datetime.now() - self._start_time).total_seconds()
        remaining = self._duration - elapsed
        return max(0.0, remaining)

    def __str__(self) -> str:
        status = "playing" if self._is_playing else "stopped"
        return f"{self._title} ({status}, {self._duration}s)"


# Concrete class for audio files
class AudioFile(PlayableMedia):
    def __init__(self, title: str, duration: float, bitrate: int):
        super().__init__(title, duration)
        self._bitrate = bitrate

    def play(self) -> str:
        if self._is_playing:
            return f"{self._title} is already playing"
        self._is_playing = True
        self._start_time = datetime.now()
        return f"Playing audio: {self._title} at {self._bitrate}kbps"

    def stop(self) -> str:
        if not self._is_playing:
            return f"{self._title} is not playing"
        self._is_playing = False
        self._start_time = None
        return f"Stopped audio: {self._title}"

    def __str__(self) -> str:
        return f"{super().__str__()} [Audio, {self._bitrate}kbps]"


# Concrete class for video files
class VideoFile(PlayableMedia):
    def __init__(self, title: str, duration: float, resolution: str):
        super().__init__(title, duration)
        self._resolution = resolution

    def play(self) -> str:
        if self._is_playing:
            return f"{self._title} is already playing"
        self._is_playing = True
        self._start_time = datetime.now()
        return f"Playing video: {self._title} at {self._resolution}"

    def stop(self) -> str:
        if not self._is_playing:
            return f"{self._title} is not playing"
        self._is_playing = False
        self._start_time = None
        return f"Stopped video: {self._title}"

    def __str__(self) -> str:
        return f"{super().__str__()} [Video, {self._resolution}]"


# Concrete class for live streams (special case)
class LiveStream(PlayableMedia):
    def __init__(self, title: str, source_url: str):
        # Live streams have "infinite" duration until stopped
        super().__init__(title, duration=float("inf"))
        self._source_url = source_url

    def play(self) -> str:
        if self._is_playing:
            return f"{self._title} is already streaming"
        self._is_playing = True
        self._start_time = datetime.now()
        return f"Streaming live: {self._title} from {self._source_url}"

    def stop(self) -> str:
        if not self._is_playing:
            return f"{self._title} is not streaming"
        self._is_playing = False
        self._start_time = None
        return f"Stopped stream: {self._title}"

    def get_remaining_time(self) -> float:
        # Override for live streams: always "infinite" while playing
        return float("inf") if self._is_playing else 0.0

    def __str__(self) -> str:
        return f"{super().__str__()} [Live Stream, {self._source_url}]"


# Media player class demonstrating LSP
class MediaPlayer:
    def __init__(self):
        self._playlist: List[PlayableMedia] = []
        self._current_media: Optional[PlayableMedia] = None

    def add_to_playlist(self, media: PlayableMedia):
        self._playlist.append(media)

    def play_media(self, media: PlayableMedia) -> str:
        """Play any PlayableMedia subtype"""
        if self._current_media and self._current_media._is_playing:
            self._current_media.stop()
        self._current_media = media
        return media.play()

    def stop_current(self) -> str:
        """Stop current media if playing"""
        if self._current_media:
            return self._current_media.stop()
        return "Nothing is playing"

    def get_playlist_status(self) -> List[str]:
        """Get status of all media in playlist"""
        return [str(media) for media in self._playlist]

    def simulate_playback(self, seconds: float):
        """Simulate time passing for demonstration"""
        if self._current_media and self._current_media._is_playing:
            # Adjust start time to simulate elapsed time
            self._current_media._start_time -= timedelta(seconds=seconds)
            remaining = self._current_media.get_remaining_time()
            if remaining <= 0 and self._current_media.duration != float("inf"):
                self._current_media.stop()


# Demonstration
def main():
    # Create media instances
    audio = AudioFile("Song 1", 180.0, 320)  # 3 minutes
    video = VideoFile("Movie Clip", 300.0, "1080p")  # 5 minutes
    stream = LiveStream("Live News", "http://stream.example.com")

    # Create media player
    player = MediaPlayer()
    player.add_to_playlist(audio)
    player.add_to_playlist(video)
    player.add_to_playlist(stream)

    # Demonstrate substitution
    print("Initial playlist status:")
    for status in player.get_playlist_status():
        print(status)

    print("\nPlaying different media types:")
    print(player.play_media(audio))
    time.sleep(1)  # Simulate delay
    player.simulate_playback(100.0)  # Simulate 100s of playback
    print(f"Remaining time: {audio.get_remaining_time():.1f}s")
    print(player.stop_current())

    print(player.play_media(video))
    time.sleep(1)
    player.simulate_playback(200.0)
    print(f"Remaining time: {video.get_remaining_time():.1f}s")
    print(player.stop_current())

    print(player.play_media(stream))
    time.sleep(1)
    player.simulate_playback(1000.0)  # Long time, still infinite
    print(f"Remaining time: {stream.get_remaining_time():.1f}s")
    print(player.stop_current())

    print("\nFinal playlist status:")
    for status in player.get_playlist_status():
        print(status)


if __name__ == "__main__":
    main()

# I - Interface Segregation Principle (ISP)

# D - Dependency Inversion Principle (DIP)