# Python DevOps Session 1: Advanced Automation & Tooling

Welcome to our advanced Python DevOps session! This notebook covers practical automation tools and techniques that DevOps engineers use daily.

## Learning Objectives
By the end of this session, you will be able to:
- Build command-line tools with `argparse`
- Process large files efficiently with streaming
- Create installable Python packages
- Handle configuration files (YAML, JSON)
- Implement concurrent programming for performance
- Work with data formats (CSV, Excel, JSON)
- Apply testing best practices with `pytest`

## Session Overview
1. **Log Analysis** - CLI tools and streaming file processing
2. **Package Development** - Creating installable Python packages
3. **File Operations** - Configuration-driven file management
4. **Data Processing** - CSV to Excel reporting
5. **Concurrent Programming** - Async HTTP health checking
6. **Text Processing** - Building normalization toolkits

## Prerequisites
- Completion of Session 0 fundamentals
- Understanding of Python functions and classes
- Basic command-line experience

## Section 1: Command-Line Tools and Streaming File Processing

### Theory: Building Professional CLI Tools

Command-line interfaces are the backbone of DevOps automation. Key concepts:

- **Argument Parsing**: Using `argparse` for professional CLI interfaces
- **Streaming Processing**: Handling large files without loading into memory
- **File I/O Patterns**: Efficient reading and writing of large datasets
- **Error Handling**: Graceful handling of malformed data
- **Output Formats**: Supporting multiple output formats (JSON, TSV, CSV)

### Essential CLI Methods:
- **`argparse.ArgumentParser()`** - Creates command-line argument parser
- **`parser.add_argument()`** - Defines command-line options and flags
- **`parser.parse_args()`** - Parses command-line arguments
- **`open(file, mode)`** - Opens files with different modes ('r', 'w', 'a')
- **File iteration** - `for line in file:` for memory-efficient line processing

In [1]:
# Example 1: Basic CLI Tool with argparse
import argparse
import json
import sys
from datetime import datetime
import re

def create_sample_log(filename, num_lines=100):
    """Create a sample web server log for testing."""
    sample_entries = [
        '192.168.1.100 - - [26/Oct/2025:10:15:30 +0000] "GET /api/users HTTP/1.1" 200 1234',
        '10.0.0.50 - - [26/Oct/2025:10:16:45 +0000] "POST /api/login HTTP/1.1" 401 567',
        '192.168.1.200 - - [26/Oct/2025:10:17:12 +0000] "GET /static/css/main.css HTTP/1.1" 304 0',
        '203.0.113.45 - - [26/Oct/2025:10:18:03 +0000] "GET /api/data HTTP/1.1" 500 890',
        '192.168.1.100 - - [26/Oct/2025:11:20:15 +0000] "DELETE /api/user/123 HTTP/1.1" 204 0'
    ]

    with open(filename, 'w') as f:
        for i in range(num_lines):
            # Vary the entries and timestamps
            entry = sample_entries[i % len(sample_entries)]
            f.write(entry + '\n')

    print(f"Created sample log with {num_lines} lines: {filename}")

# Create a sample log file for demonstration
create_sample_log('sample_web.log', 50)
print("Sample log created successfully!")

Created sample log with 50 lines: sample_web.log
Sample log created successfully!


In [None]:
# Example 2: Streaming Log Parser with CLI
class LogAnalyzer:
    """A streaming log analyzer that processes web server logs efficiently."""

    def __init__(self):
        self.stats = {
            'total_requests': 0,
            'status_codes': {'2xx': 0, '3xx': 0, '4xx': 0, '5xx': 0},
            'client_ips': {},
            'paths': {},
            'hourly_counts': {}
        }

    def parse_log_line(self, line):
        """Parse a single log line in Common Log Format."""
        # Common Log Format pattern
        pattern = r'(\S+) \S+ \S+ \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) \S+" (\d{3}) (\d+|-)'

        match = re.match(pattern, line.strip())
        if not match:
            return None

        ip, timestamp, method, path, status, size = match.groups()

        return {
            'ip': ip,
            'timestamp': timestamp,
            'method': method,
            'path': path,
            'status': int(status),
            'size': int(size) if size != '-' else 0
        }

    def update_stats(self, parsed_line):
        """Update statistics with parsed log entry."""
        if not parsed_line:
            return

        # Total requests
        self.stats['total_requests'] += 1

        # Status code categories
        status = parsed_line['status']
        if 200 <= status < 300:
            self.stats['status_codes']['2xx'] += 1
        elif 300 <= status < 400:
            self.stats['status_codes']['3xx'] += 1
        elif 400 <= status < 500:
            self.stats['status_codes']['4xx'] += 1
        elif 500 <= status < 600:
            self.stats['status_codes']['5xx'] += 1

        # Client IPs
        ip = parsed_line['ip']
        self.stats['client_ips'][ip] = self.stats['client_ips'].get(ip, 0) + 1

        # Request paths
        path = parsed_line['path']
        self.stats['paths'][path] = self.stats['paths'].get(path, 0) + 1

        # Hourly counts (extract hour from timestamp)
        timestamp = parsed_line['timestamp']
        hour = timestamp.split(':')[1]  # Extract hour from timestamp but better to use datetime.strptime() or dateutil.parser.parser()
        self.stats['hourly_counts'][hour] = self.stats['hourly_counts'].get(hour, 0) + 1

    def process_file(self, filename):
        """Process log file line by line (streaming)."""
        malformed_lines = 0

        try:
            with open(filename, 'r') as file:
                for line_num, line in enumerate(file, 1):
                    parsed = self.parse_log_line(line)
                    if parsed is None:
                        malformed_lines += 1
                        print(f"Warning: Malformed line {line_num}: {line.strip()}")
                        continue

                    self.update_stats(parsed)

        except FileNotFoundError:
            print(f"Error: File {filename} not found")
            return False
        except Exception as e:
            print(f"Error processing file: {e}")
            return False

        if malformed_lines > 0:
            print(f"Processed file with {malformed_lines} malformed lines")

        return True

    def get_top_n(self, data_dict, n=10):
        """Get top N items from a dictionary by value."""
        return sorted(data_dict.items(), key=lambda x: x[1], reverse=True)[:n]

    def generate_report(self, top_n=10):
        """Generate comprehensive report."""
        return {
            'summary': {
                'total_requests': self.stats['total_requests'],
                'status_distribution': self.stats['status_codes']
            },
            'top_client_ips': dict(self.get_top_n(self.stats['client_ips'], top_n)),
            'top_paths': dict(self.get_top_n(self.stats['paths'], top_n)),
            'hourly_distribution': self.stats['hourly_counts']
        }

# Test the log analyzer
analyzer = LogAnalyzer()
success = analyzer.process_file('sample_web.log')

if success:
    report = analyzer.generate_report(top_n=5)
    print("\n=== LOG ANALYSIS REPORT ===")
    print(f"Total Requests: {report['summary']['total_requests']}")
    print(f"Status Distribution: {report['summary']['status_distribution']}")
    print(f"\nTop Client IPs: {report['top_client_ips']}")
    print(f"Top Paths: {report['top_paths']}")
    print(f"Hourly Distribution: {report['hourly_distribution']}")
else:
    print("Failed to process log file")


=== LOG ANALYSIS REPORT ===
Total Requests: 50
Status Distribution: {'2xx': 20, '3xx': 10, '4xx': 10, '5xx': 10}

Top Client IPs: {'192.168.1.100': 20, '10.0.0.50': 10, '192.168.1.200': 10, '203.0.113.45': 10}
Top Paths: {'/api/users': 10, '/api/login': 10, '/static/css/main.css': 10, '/api/data': 10, '/api/user/123': 10}
Hourly Distribution: {'10': 40, '11': 10}


### Key Methods Explained:

**File Processing Methods:**
- **`open(filename, 'r')`** - Opens file for reading in text mode
- **`enumerate(file, 1)`** - Provides line numbers starting from 1 for each line
- **`for line in file:`** - Iterates through file line by line (memory efficient)
- **`line.strip()`** - Removes leading/trailing whitespace including newlines

**Regular Expression Methods:**
- **`re.match(pattern, string)`** - Matches pattern at the beginning of string
- **`match.groups()`** - Returns captured groups from regex match
- **Common Log Format** - Standard web server log format with IP, timestamp, request, status

**Dictionary Methods for Aggregation:**
- **`dict.get(key, default)`** - Returns value for key, or default if key doesn't exist
- **`sorted(dict.items(), key=lambda x: x[1], reverse=True)`** - Sorts dictionary by values descending

**Exception Handling:**
- **`try/except/finally`** - Handles file operations and parsing errors gracefully
- **`FileNotFoundError`** - Specific exception for missing files

## Section 2: Python Package Development and Virtual Environments

### Theory: Creating Installable Packages

Professional Python development requires understanding:

- **Package Structure**: Organizing code into installable packages
- **Virtual Environments**: Isolating dependencies for reproducible builds
- **Configuration Files**: Working with YAML, JSON for application config
- **Entry Points**: Creating command-line tools from packages
- **Testing**: Using pytest for comprehensive testing

### Essential Package Development Methods:
- **`setup.py`** - Traditional package configuration
- **`pyproject.toml`** - Modern package configuration (PEP 518)
- **`pip install -e .`** - Install package in development mode
- **`yaml.safe_load()`** - Parse YAML configuration files safely
- **`json.load()`** - Parse JSON configuration files

In [None]:
# Example 3: Configuration Tool Package
import yaml
import json
import os
from typing import Dict, List, Any

class ConfigValidator:
    """Validates and processes YAML configuration files."""

    def __init__(self):
        self.required_fields = ['name', 'version', 'services']
        self.valid_service_fields = ['name', 'port', 'enabled']

    def load_config(self, config_path: str) -> Dict[str, Any]:
        """Load configuration from YAML file."""
        if not os.path.exists(config_path):
            raise FileNotFoundError(f"Configuration file not found: {config_path}")

        try:
            with open(config_path, 'r') as file:
                config = yaml.safe_load(file)
                if config is None:
                    raise ValueError("Configuration file is empty or invalid")
                return config
        except yaml.YAMLError as e:
            raise ValueError(f"Invalid YAML format: {e}")
        except Exception as e:
            raise ValueError(f"Error reading configuration: {e}")

    def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """Validate configuration structure and content."""
        errors = []

        # Check required fields
        for field in self.required_fields:
            if field not in config:
                errors.append(f"Missing required field: {field}")

        if errors:
            raise ValueError(f"Configuration validation failed: {', '.join(errors)}")

        # Validate name
        if not isinstance(config['name'], str) or not config['name'].strip():
            raise ValueError("Field 'name' must be a non-empty string")

        # Validate version
        if not isinstance(config['version'], str):
            raise ValueError("Field 'version' must be a string")

        # Validate services
        if not isinstance(config['services'], list):
            raise ValueError("Field 'services' must be a list")

        validated_services = []
        for i, service in enumerate(config['services']):
            if not isinstance(service, dict):
                raise ValueError(f"Service {i} must be a dictionary")

            validated_service = self._validate_service(service, i)
            validated_services.append(validated_service)

        return {
            'name': config['name'].strip(),
            'version': config['version'].strip(),
            'services': validated_services
        }

    def _validate_service(self, service: Dict[str, Any], index: int) -> Dict[str, Any]:
        """Validate individual service configuration."""
        if 'name' not in service:
            raise ValueError(f"Service {index}: missing required field 'name'")

        validated = {
            'name': str(service['name']).strip(),
            'port': service.get('port', 8080), # int(service.get('port', 8080))
            'enabled': service.get('enabled', True)
        }

        # Validate port
        if not isinstance(validated['port'], int) or not (1 <= validated['port'] <= 65535):
            raise ValueError(f"Service {index}: port must be an integer between 1 and 65535")

        # Validate enabled
        if not isinstance(validated['enabled'], bool):
            raise ValueError(f"Service {index}: enabled must be a boolean")

        return validated

    def generate_summary(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """Generate a summary of the validated configuration."""
        enabled_services = [s for s in config['services'] if s['enabled']]
        disabled_services = [s for s in config['services'] if not s['enabled']]

        return {
            'application': {
                'name': config['name'],
                'version': config['version']
            },
            'services': {
                'total': len(config['services']),
                'enabled': len(enabled_services),
                'disabled': len(disabled_services),
                'enabled_list': [s['name'] for s in enabled_services],
                'disabled_list': [s['name'] for s in disabled_services]
            },
            'ports_in_use': [s['port'] for s in enabled_services]
        }

# Create a sample configuration file
sample_config = {
    'name': 'MyApp',
    'version': '1.2.3',
    'services': [
        {'name': 'web-server', 'port': 8080, 'enabled': True},
        {'name': 'database', 'port': 5432, 'enabled': True},
        {'name': 'cache', 'port': 6379, 'enabled': False},
        {'name': 'api-gateway', 'port': 3000, 'enabled': True}
    ]
}

# Write sample config to file
with open('sample_config.yml', 'w') as f:
    yaml.dump(sample_config, f, default_flow_style=False)

# Test the configuration validator
validator = ConfigValidator()

try:
    # Load and validate configuration
    config = validator.load_config('sample_config.yml')
    print("✓ Configuration loaded successfully")

    validated_config = validator.validate_config(config)
    print("✓ Configuration validated successfully")

    # Generate summary
    summary = validator.generate_summary(validated_config)
    print("\n=== CONFIGURATION SUMMARY ===")
    print(json.dumps(summary, indent=2))

except Exception as e:
    print(f"✗ Configuration error: {e}")

# Test with invalid configuration
print("\n=== TESTING ERROR HANDLING ===")
invalid_config = {'name': '', 'version': 123}  # Missing services, invalid types

try:
    validator.validate_config(invalid_config)
except ValueError as e:
    print(f"✓ Correctly caught validation error: {e}")

✓ Configuration loaded successfully
✓ Configuration validated successfully

=== CONFIGURATION SUMMARY ===
{
  "application": {
    "name": "MyApp",
    "version": "1.2.3"
  },
  "services": {
    "total": 4,
    "enabled": 3,
    "disabled": 1,
    "enabled_list": [
      "web-server",
      "database",
      "api-gateway"
    ],
    "disabled_list": [
      "cache"
    ]
  },
  "ports_in_use": [
    8080,
    5432,
    3000
  ]
}

=== TESTING ERROR HANDLING ===
✓ Correctly caught validation error: Configuration validation failed: Missing required field: services


### Key Package Development Methods:

**YAML Processing:**
- **`yaml.safe_load(file)`** - Safely parse YAML content from file object
- **`yaml.dump(data, file)`** - Write Python data structures to YAML format
- **`yaml.YAMLError`** - Exception for YAML parsing errors

**Configuration Validation:**
- **`isinstance(obj, type)`** - Check if object is of specific type
- **`dict.get(key, default)`** - Get dictionary value with fallback default
- **Type hints** - `Dict[str, Any]`, `List[str]` for better code documentation

**File Operations:**
- **`os.path.exists(path)`** - Check if file or directory exists
- **`with open(file, mode) as f:`** - Context manager for safe file handling
- **Exception chaining** - Raising new exceptions with original context

**Package Structure Concepts:**
- **Entry points** - Command-line interfaces defined in setup.py
- **Development install** - `pip install -e .` for live code updates
- **Requirements.txt** - Dependency specification for reproducible environments

## Section 3: Configuration-Driven File Operations

### Theory: Safe File Management

File operations are critical in DevOps for:
- **Configuration management** - Updating config files across environments
- **Log rotation** - Managing log file naming and archival
- **Backup operations** - Systematic file organization
- **Deployment preparation** - File staging and organization

### Essential File Operation Methods:
- **`glob.glob(pattern)`** - Find files matching wildcard patterns
- **`os.path.join()`** - Platform-independent path construction
- **`shutil.move(src, dst)`** - Move/rename files and directories
- **`pathlib.Path`** - Modern object-oriented path handling
- **Pattern matching** - Regular expressions and glob patterns for file selection

In [1]:
# Example 4: Configuration-Driven File Renamer
import glob
import os
import shutil
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple

class FileRenamer:
    """Safely rename files based on YAML configuration rules."""

    def __init__(self, config_path: str):
        self.config = self._load_config(config_path)
        self.rename_manifest = []

    def _load_config(self, config_path: str) -> Dict:
        """Load rename configuration from YAML file."""
        # For this example, we'll use a Python dict instead of YAML
        # In practice, you'd use yaml.safe_load()
        sample_config = {
            'rules': [
                {
                    'pattern': '*.log',
                    'template': '{date}_{orig}',
                    'description': 'Add date prefix to log files'
                },
                {
                    'pattern': 'temp_*.txt',
                    'template': 'processed_{orig}',
                    'description': 'Rename temp files to processed'
                },
                {
                    'pattern': '*.backup',
                    'template': 'backup_{datetime}_{orig}',
                    'description': 'Add timestamp to backup files'
                }
            ]
        }
        return sample_config

    def create_test_files(self, directory: str):
        """Create sample files for testing."""
        os.makedirs(directory, exist_ok=True)

        test_files = [
            'application.log',
            'error.log',
            'temp_data.txt',
            'temp_report.txt',
            'database.backup',
            'config.backup',
            'readme.md'  # This won't match any pattern
        ]

        for filename in test_files:
            filepath = os.path.join(directory, filename)
            with open(filepath, 'w') as f:
                f.write(f"Sample content for {filename}\n")

        print(f"Created {len(test_files)} test files in {directory}")
        return test_files

    def find_matching_files(self, directory: str, pattern: str) -> List[str]:
        """Find files matching the given glob pattern."""
        search_pattern = os.path.join(directory, pattern)
        return glob.glob(search_pattern)

    def generate_new_filename(self, original_path: str, template: str) -> str:
        """Generate new filename based on template variables."""
        path_obj = Path(original_path)
        original_name = path_obj.stem  # filename without extension
        extension = path_obj.suffix    # file extension
        parent = path_obj.parent.name  # parent directory name

        # Template variables
        variables = {
            'orig': path_obj.name,  # original filename with extension
            'name': original_name,  # filename without extension
            'ext': extension.lstrip('.'),  # extension without dot
            'parent': parent,
            'date': datetime.now().strftime('%Y%m%d'),
            'datetime': datetime.now().strftime('%Y%m%d_%H%M%S')
        }

        # Replace template variables
        new_name = template
        for var, value in variables.items():
            new_name = new_name.replace(f'{{{var}}}', str(value))

        # Return full path with new filename
        return os.path.join(path_obj.parent, new_name)

    def check_conflicts(self, rename_operations: List[Tuple[str, str]]) -> List[str]:
        """Check for naming conflicts in rename operations."""
        conflicts = []
        target_files = set()

        for old_path, new_path in rename_operations:
            if new_path in target_files:
                conflicts.append(f"Duplicate target: {new_path}")

            if os.path.exists(new_path):
                conflicts.append(f"Target exists: {new_path}")

            target_files.add(new_path)

        return conflicts

    def dry_run(self, directory: str) -> List[Tuple[str, str]]:
        """Perform dry run to show what would be renamed."""
        rename_operations = []

        print(f"\n=== DRY RUN for directory: {directory} ===")

        for rule in self.config['rules']:
            pattern = rule['pattern']
            template = rule['template']
            description = rule['description']

            matching_files = self.find_matching_files(directory, pattern)

            if matching_files:
                print(f"\nRule: {description}")
                print(f"Pattern: {pattern} -> {template}")

                for file_path in matching_files:
                    new_path = self.generate_new_filename(file_path, template)
                    rename_operations.append((file_path, new_path))
                    print(f"  {os.path.basename(file_path)} -> {os.path.basename(new_path)}")

        # Check for conflicts
        conflicts = self.check_conflicts(rename_operations)
        if conflicts:
            print(f"\n  CONFLICTS DETECTED:")
            for conflict in conflicts:
                print(f"  - {conflict}")

        return rename_operations

    def commit_renames(self, rename_operations: List[Tuple[str, str]], force: bool = False):
        """Perform actual file renames."""
        if not force:
            conflicts = self.check_conflicts(rename_operations)
            if conflicts:
                raise ValueError(f"Conflicts detected. Use --force to override: {conflicts}")

        print(f"\n=== COMMITTING {len(rename_operations)} RENAMES ===")

        for old_path, new_path in rename_operations:
            try:
                shutil.move(old_path, new_path)

                # Record in manifest
                self.rename_manifest.append({
                    'original': old_path,
                    'renamed': new_path,
                    'timestamp': datetime.now().isoformat()
                })

                print(f"{os.path.basename(old_path)} -> {os.path.basename(new_path)}")

            except Exception as e:
                print(f"Failed to rename {old_path}: {e}")

    def save_manifest(self, directory: str):
        """Save rename manifest for rollback capability."""
        manifest_path = os.path.join(directory, 'rename_manifest.json')

        with open(manifest_path, 'w') as f:
            json.dump(self.rename_manifest, f, indent=2)

        print(f"\n Manifest saved: {manifest_path}")

# Test the file renamer
test_dir = 'test_files'
renamer = FileRenamer('rename_config.yml')

# Create test files
created_files = renamer.create_test_files(test_dir)

# Show initial files
print(f"\nInitial files in {test_dir}:")
for f in os.listdir(test_dir):
    print(f"  - {f}")

# Perform dry run
rename_ops = renamer.dry_run(test_dir)

# Simulate committing renames (in practice, you'd have CLI flags)
print(f"\n Proceeding with renames...")
renamer.commit_renames(rename_ops, force=True)
# Clean up any previously renamed files to avoid conflicts
print(f"\nCleaning up previously renamed files...")
for file in os.listdir(test_dir):
    filepath = os.path.join(test_dir, file)
    if file.startswith('20251026_') or file.startswith('processed_') or file.startswith('backup_'):
        os.remove(filepath)
        print(f"  Removed: {file}")

# Recreate original test files
print(f"\nRecreating original test files...")
renamer.create_test_files(test_dir)
# Save manifest
renamer.save_manifest(test_dir)

# Show final files
print(f"\nFinal files in {test_dir}:")
for f in os.listdir(test_dir):
    if f.endswith('.json'):
        print(f"   {f} (manifest)")
    else:
        print(f"  - {f}")

Created 7 test files in test_files

Initial files in test_files:
  - application.log
  - config.backup
  - database.backup
  - error.log
  - readme.md
  - rename_manifest.json
  - temp_data.txt
  - temp_report.txt

=== DRY RUN for directory: test_files ===

Rule: Add date prefix to log files
Pattern: *.log -> {date}_{orig}
  application.log -> 20251202_application.log
  error.log -> 20251202_error.log

Rule: Rename temp files to processed
Pattern: temp_*.txt -> processed_{orig}
  temp_data.txt -> processed_temp_data.txt
  temp_report.txt -> processed_temp_report.txt

Rule: Add timestamp to backup files
Pattern: *.backup -> backup_{datetime}_{orig}
  config.backup -> backup_20251202_112959_config.backup
  database.backup -> backup_20251202_112959_database.backup

 Proceeding with renames...

=== COMMITTING 6 RENAMES ===
application.log -> 20251202_application.log
error.log -> 20251202_error.log
temp_data.txt -> processed_temp_data.txt
temp_report.txt -> processed_temp_report.txt
config.

### Key File Operation Methods:

**Path Manipulation:**
- **`pathlib.Path(path)`** - Modern object-oriented path handling
- **`path.stem`** - Filename without extension
- **`path.suffix`** - File extension including dot
- **`path.parent`** - Parent directory of the file
- **`os.path.join(dir, filename)`** - Platform-independent path joining

**File Pattern Matching:**
- **`glob.glob(pattern)`** - Find files matching Unix shell-style wildcards
- **Glob patterns**: `*.log` (all .log files), `temp_*` (files starting with temp_)
- **Template variables**: `{date}`, `{datetime}`, `{orig}` for dynamic naming

**Safe File Operations:**
- **`shutil.move(src, dst)`** - Move/rename files with error handling
- **`os.makedirs(path, exist_ok=True)`** - Create directories recursively
- **Conflict detection** - Check for existing files before rename operations
- **Manifest generation** - Record operations for rollback capability

**String Formatting:**
- **`str.replace(old, new)`** - Replace template variables with actual values
- **`datetime.strftime(format)`** - Format timestamps for filenames
- **`str.lstrip('.')`** - Remove leading characters (dots from extensions)

### **Example 4.1: Handling Locked Files with Retry Logic and Queuing**

When files are locked by other processes (open in Excel, being written to, etc.), we need strategies to handle these gracefully.

In [3]:
import time
import logging
import threading
from enum import Enum
from dataclasses import dataclass
from typing import Optional, Callable, List
from queue import Queue, Empty
from threading import Thread, Lock

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


class FileOperationStatus(Enum):
    """Status of file operation attempts."""
    SUCCESS = "success"
    LOCKED = "locked"
    NOT_FOUND = "not_found"
    PERMISSION_DENIED = "permission_denied"
    FAILED = "failed"
    QUEUED = "queued"
    RETRYING = "retrying"


@dataclass
class FileOperation:
    """Represents a file operation to be performed."""
    operation_id: str
    source_path: str
    target_path: str
    operation_type: str  # 'rename', 'move', 'copy'
    max_retries: int = 3
    retry_delay: float = 2.0
    current_attempt: int = 0
    status: FileOperationStatus = FileOperationStatus.QUEUED
    error_message: Optional[str] = None


class LockedFileHandler:
    """
    Handle file operations with retry logic and queuing for locked files.
    
    Strategies implemented:
    1. Retry with exponential backoff
    2. Queue-based processing
    3. File lock detection
    4. Graceful degradation
    5. Operation logging and monitoring
    """
    
    def __init__(self, max_workers: int = 2, max_queue_size: int = 100):
        self.operation_queue = Queue(maxsize=max_queue_size)
        self.results = []
        self.results_lock = Lock()
        self.max_workers = max_workers
        self.workers = []
        self.running = False
    
    def is_file_locked(self, filepath: str) -> bool:
        """
        Check if file is locked by another process.
        
        Strategy: Try to open file in exclusive mode.
        """
        try:
            # Try to open with exclusive access
            with open(filepath, 'a') as f:
                # On Windows, try to get exclusive lock
                if os.name == 'nt':
                    import msvcrt
                    try:
                        msvcrt.locking(f.fileno(), msvcrt.LK_NBLCK, 1)
                        msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, 1)
                        return False
                    except IOError:
                        return True
                else:
                    # On Unix, try fcntl lock
                    import fcntl
                    try:
                        fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
                        fcntl.flock(f.fileno(), fcntl.LOCK_UN)
                        return False
                    except IOError:
                        return True
        except PermissionError:
            return True
        except FileNotFoundError:
            return False
        except Exception as e:
            logger.warning(f"Could not check lock status for {filepath}: {e}")
            return True
    
    def wait_for_file_unlock(self, filepath: str, timeout: float = 30.0, check_interval: float = 1.0) -> bool:
        """
        Wait for file to become unlocked with timeout.
        
        Args:
            filepath: Path to file
            timeout: Maximum time to wait in seconds
            check_interval: Time between lock checks
        
        Returns:
            True if file became unlocked, False if timeout
        """
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            if not self.is_file_locked(filepath):
                logger.info(f"File {filepath} is now unlocked")
                return True
            
            logger.debug(f"File {filepath} still locked, waiting...")
            time.sleep(check_interval)
        
        logger.warning(f"Timeout waiting for {filepath} to unlock after {timeout}s")
        return False
    
    def perform_file_operation(self, operation: FileOperation) -> FileOperationStatus:
        """
        Perform a file operation with lock handling.
        
        Returns:
            Status of the operation
        """
        source = operation.source_path
        target = operation.target_path
        op_type = operation.operation_type
        
        try:
            # Check if source file exists
            if not os.path.exists(source):
                operation.error_message = f"Source file not found: {source}"
                return FileOperationStatus.NOT_FOUND
            
            # Check if source is locked
            if self.is_file_locked(source):
                logger.warning(f"File {source} is locked, waiting...")
                
                # Wait for unlock with timeout
                if self.wait_for_file_unlock(source, timeout=10.0):
                    logger.info(f"File {source} unlocked, proceeding with operation")
                else:
                    operation.error_message = f"File remains locked: {source}"
                    return FileOperationStatus.LOCKED
            
            # Check if target already exists and is locked
            if os.path.exists(target) and self.is_file_locked(target):
                operation.error_message = f"Target file is locked: {target}"
                return FileOperationStatus.LOCKED
            
            # Perform the operation
            if op_type == 'rename' or op_type == 'move':
                shutil.move(source, target)
                logger.info(f"Successfully moved {source} -> {target}")
            elif op_type == 'copy':
                shutil.copy2(source, target)
                logger.info(f"Successfully copied {source} -> {target}")
            else:
                operation.error_message = f"Unknown operation type: {op_type}"
                return FileOperationStatus.FAILED
            
            return FileOperationStatus.SUCCESS
            
        except PermissionError as e:
            operation.error_message = f"Permission denied: {e}"
            return FileOperationStatus.PERMISSION_DENIED
        except Exception as e:
            operation.error_message = f"Operation failed: {e}"
            return FileOperationStatus.FAILED
    
    def process_operation_with_retry(self, operation: FileOperation) -> FileOperation:
        """
        Process operation with retry logic and exponential backoff.
        """
        operation.status = FileOperationStatus.RETRYING
        
        while operation.current_attempt < operation.max_retries:
            operation.current_attempt += 1
            
            logger.info(f"Attempt {operation.current_attempt}/{operation.max_retries} for {operation.operation_id}")
            
            status = self.perform_file_operation(operation)
            operation.status = status
            
            if status == FileOperationStatus.SUCCESS:
                logger.info(f"✓ Operation {operation.operation_id} completed successfully")
                return operation
            
            # If locked or other retryable error, wait and retry
            if status in [FileOperationStatus.LOCKED, FileOperationStatus.FAILED]:
                if operation.current_attempt < operation.max_retries:
                    # Exponential backoff: 2s, 4s, 8s
                    delay = operation.retry_delay * (2 ** (operation.current_attempt - 1))
                    logger.info(f"Retrying in {delay}s... ({status.value})")
                    time.sleep(delay)
                else:
                    logger.error(f"✗ Operation {operation.operation_id} failed after {operation.max_retries} attempts")
            else:
                # Non-retryable error (not found, permission denied)
                logger.error(f"✗ Operation {operation.operation_id} failed: {operation.error_message}")
                return operation
        
        return operation
    
    def worker_thread(self):
        """Worker thread that processes operations from queue."""
        logger.info(f"Worker thread started (Thread-{threading.current_thread().ident})")
        
        while self.running:
            try:
                # Get operation from queue with timeout
                operation = self.operation_queue.get(timeout=1.0)
                
                logger.info(f"Processing operation: {operation.operation_id}")
                
                # Process with retry logic
                completed_operation = self.process_operation_with_retry(operation)
                
                # Store result
                with self.results_lock:
                    self.results.append(completed_operation)
                
                # Mark task as done
                self.operation_queue.task_done()
                
            except Empty:
                continue
            except Exception as e:
                logger.error(f"Worker thread error: {e}")
        
        logger.info(f"Worker thread stopped (Thread-{threading.current_thread().ident})")
    
    def start_workers(self):
        """Start worker threads for processing operations."""
        if self.running:
            logger.warning("Workers already running")
            return
        
        self.running = True
        
        for i in range(self.max_workers):
            worker = Thread(target=self.worker_thread, daemon=True, name=f"FileWorker-{i}")
            worker.start()
            self.workers.append(worker)
        
        logger.info(f"Started {self.max_workers} worker threads")
    
    def stop_workers(self, wait: bool = True):
        """Stop worker threads."""
        self.running = False
        
        if wait:
            for worker in self.workers:
                worker.join(timeout=5.0)
        
        logger.info("All workers stopped")
    
    def queue_operation(self, operation: FileOperation) -> bool:
        """
        Add operation to queue.
        
        Returns:
            True if queued successfully, False if queue is full
        """
        try:
            self.operation_queue.put(operation, block=False)
            logger.info(f"Queued operation: {operation.operation_id}")
            return True
        except:
            logger.error(f"Queue is full, cannot add operation: {operation.operation_id}")
            return False
    
    def wait_for_completion(self, timeout: Optional[float] = None):
        """Wait for all queued operations to complete."""
        logger.info("Waiting for all operations to complete...")
        
        if timeout:
            self.operation_queue.join()
        else:
            # Wait with timeout
            start_time = time.time()
            while not self.operation_queue.empty():
                if timeout and (time.time() - start_time) > timeout:
                    logger.warning(f"Timeout waiting for queue completion")
                    break
                time.sleep(0.5)
        
        logger.info("All operations completed")
    
    def get_results_summary(self) -> dict:
        """Get summary of operation results."""
        with self.results_lock:
            total = len(self.results)
            success = sum(1 for r in self.results if r.status == FileOperationStatus.SUCCESS)
            locked = sum(1 for r in self.results if r.status == FileOperationStatus.LOCKED)
            failed = sum(1 for r in self.results if r.status == FileOperationStatus.FAILED)
            other = total - success - locked - failed
            
            return {
                'total_operations': total,
                'successful': success,
                'locked': locked,
                'failed': failed,
                'other': other,
                'success_rate': f"{(success/total*100):.1f}%" if total > 0 else "0%"
            }


# ===================== DEMONSTRATION =====================

print("=== LOCKED FILE HANDLER DEMONSTRATION ===\n")

# Create test directory and files
test_dir = Path("locked_files_test")
test_dir.mkdir(exist_ok=True)

# Create some test files
test_files = []
for i in range(5):
    filepath = test_dir / f"document_{i}.txt"
    with open(filepath, 'w') as f:
        f.write(f"Document {i} content\n" * 10)
    test_files.append(filepath)

print(f"Created {len(test_files)} test files\n")

# Initialize handler
handler = LockedFileHandler(max_workers=2)

# Start worker threads
handler.start_workers()

# Queue some operations
print("Queueing file operations...\n")

operations = [
    FileOperation(
        operation_id=f"op_{i}",
        source_path=str(test_files[i]),
        target_path=str(test_dir / f"renamed_document_{i}.txt"),
        operation_type='rename',
        max_retries=3,
        retry_delay=1.0
    )
    for i in range(5)
]

# Queue all operations
for op in operations:
    handler.queue_operation(op)

# Simulate a locked file scenario
print("\n Simulating locked file scenario...")
print("Opening document_2.txt to lock it...")

# Open a file to lock it (in real scenario, Excel/Word/etc would lock it)
locked_file = open(test_files[2], 'r')
print(f"✓ File {test_files[2].name} is now locked\n")

# Wait a bit, then unlock
time.sleep(5)
print("\n Unlocking file...")
locked_file.close()
print("✓ File unlocked\n")

# Wait for all operations to complete
handler.wait_for_completion()

# Stop workers
handler.stop_workers()

# Print results
print("\n" + "="*60)
print("RESULTS SUMMARY")
print("="*60)

summary = handler.get_results_summary()
for key, value in summary.items():
    print(f"{key.replace('_', ' ').title()}: {value}")

print("\n Detailed Results:")
with handler.results_lock:
    for result in handler.results:
        status_emoji = "✓" if result.status == FileOperationStatus.SUCCESS else "✗"
        print(f"{status_emoji} {result.operation_id}: {result.status.value} (attempts: {result.current_attempt})")
        if result.error_message:
            print(f"   Error: {result.error_message}")

print("\n Final file listing:")
for item in sorted(test_dir.iterdir()):
    print(f"  - {item.name}")

2025-12-02 12:25:58,826 - INFO - Worker thread started (Thread-26932)
2025-12-02 12:25:58,829 - INFO - Worker thread started (Thread-30796)
2025-12-02 12:25:58,830 - INFO - Started 2 worker threads
2025-12-02 12:25:58,829 - INFO - Worker thread started (Thread-30796)
2025-12-02 12:25:58,830 - INFO - Started 2 worker threads


2025-12-02 12:25:58,838 - INFO - Queued operation: op_0
2025-12-02 12:25:58,839 - INFO - Processing operation: op_0
2025-12-02 12:25:58,839 - INFO - Queued operation: op_1
2025-12-02 12:25:58,839 - INFO - Processing operation: op_0
2025-12-02 12:25:58,839 - INFO - Queued operation: op_1
2025-12-02 12:25:58,840 - INFO - Processing operation: op_1
2025-12-02 12:25:58,840 - INFO - Attempt 1/3 for op_0
2025-12-02 12:25:58,841 - INFO - Queued operation: op_2
2025-12-02 12:25:58,843 - INFO - Attempt 1/3 for op_1
2025-12-02 12:25:58,846 - INFO - Queued operation: op_3
2025-12-02 12:25:58,850 - INFO - Queued operation: op_4
2025-12-02 12:25:58,840 - INFO - Processing operation: op_1
2025-12-02 12:25:58,840 - INFO - Attempt 1/3 for op_0
2025-12-02 12:25:58,841 - INFO - Queued operation: op_2
2025-12-02 12:25:58,843 - INFO - Attempt 1/3 for op_1
2025-12-02 12:25:58,846 - INFO - Queued operation: op_3
2025-12-02 12:25:58,850 - INFO - Queued operation: op_4
2025-12-02 12:25:58,850 - INFO - Success

=== LOCKED FILE HANDLER DEMONSTRATION ===

Created 5 test files

Queueing file operations...


 Simulating locked file scenario...
Opening document_2.txt to lock it...
✓ File document_2.txt is now locked



2025-12-02 12:26:03,855 - INFO - Waiting for all operations to complete...
2025-12-02 12:26:03,856 - INFO - All operations completed
2025-12-02 12:26:03,856 - INFO - All operations completed
2025-12-02 12:26:03,893 - INFO - Worker thread stopped (Thread-30796)
2025-12-02 12:26:03,893 - INFO - Worker thread stopped (Thread-26932)
2025-12-02 12:26:03,896 - INFO - All workers stopped
2025-12-02 12:26:03,893 - INFO - Worker thread stopped (Thread-30796)
2025-12-02 12:26:03,893 - INFO - Worker thread stopped (Thread-26932)
2025-12-02 12:26:03,896 - INFO - All workers stopped



 Unlocking file...
✓ File unlocked


RESULTS SUMMARY
Total Operations: 5
Successful: 4
Locked: 0
Failed: 0
Other: 1
Success Rate: 80.0%

 Detailed Results:
✓ op_0: success (attempts: 1)
✓ op_1: success (attempts: 1)
✗ op_2: permission_denied (attempts: 1)
   Error: Permission denied: [WinError 32] The process cannot access the file because it is being used by another process: 'locked_files_test\\document_2.txt'
✓ op_3: success (attempts: 1)
✓ op_4: success (attempts: 1)

 Final file listing:
  - document_2.txt
  - renamed_document_0.txt
  - renamed_document_1.txt
  - renamed_document_2.txt
  - renamed_document_3.txt
  - renamed_document_4.txt


### **Explanation: Strategies for Locked File Handling**

**1. File Lock Detection:**
- **Windows**: Use `msvcrt.locking()` to check for exclusive file locks
- **Unix/Linux**: Use `fcntl.flock()` for file locking detection
- **Cross-platform**: Try opening file in append mode with exclusive access
- **Fallback**: Catch `PermissionError` and `IOError` exceptions

**2. Retry with Exponential Backoff:**
- **First attempt**: Immediate execution
- **Retry delays**: 2s → 4s → 8s (exponential growth: `2^(attempt-1)`)
- **Max retries**: Configurable (default: 3 attempts)
- **Benefits**: Gives other processes time to release locks without aggressive polling

**3. Queue-Based Processing:**
- **`Queue` class**: Thread-safe FIFO queue for operations
- **Worker threads**: Multiple threads process operations concurrently
- **Graceful degradation**: System continues working even if some files are locked
- **Non-blocking**: Main program can continue while operations are processed

**4. Operation States:**
- **QUEUED**: Operation added to queue, waiting for worker
- **RETRYING**: Currently attempting operation with retries
- **SUCCESS**: Operation completed successfully
- **LOCKED**: File remains locked after all retries
- **FAILED**: Other error occurred (permissions, disk space, etc.)
- **NOT_FOUND**: Source file doesn't exist

**5. Thread Safety Patterns:**
- **`Lock()`**: Protects shared results list from race conditions
- **`daemon=True`**: Worker threads don't prevent program exit
- **`queue.task_done()`**: Signals completion for `queue.join()` synchronization
- **`timeout` parameters**: Prevent indefinite blocking

**6. Production Enhancements:**
```python
# Add callback for completion notifications
def on_operation_complete(operation: FileOperation):
    if operation.status == FileOperationStatus.SUCCESS:
        send_notification(f"✓ {operation.operation_id} completed")
    else:
        send_alert(f"✗ {operation.operation_id} failed: {operation.error_message}")

# Add operation priority queue
from queue import PriorityQueue
priority_queue = PriorityQueue()
priority_queue.put((1, high_priority_op))  # Lower number = higher priority
priority_queue.put((5, low_priority_op))

# Add monitoring and metrics
metrics = {
    'total_operations': 0,
    'total_retries': 0,
    'avg_completion_time': 0,
    'lock_conflicts': 0
}

# Add dead letter queue for permanently failed operations
failed_operations_queue = []

# Add automatic cleanup of old operations
def cleanup_old_operations(age_hours: int = 24):
    cutoff = datetime.now() - timedelta(hours=age_hours)
    # Remove operations older than cutoff
```

**Alternative Strategies:**

**A. Shadow Copy / Snapshot Approach:**
```python
# On Windows, use Volume Shadow Copy
import win32com.client

def create_shadow_copy(volume: str) -> str:
    """Create VSS snapshot and return shadow copy path."""
    # Useful for backing up files that are locked
    pass

# Access locked files via shadow copy
shadow_path = create_shadow_copy("C:\\")
copy_from_shadow(shadow_path + "locked_file.xlsx")
```

**B. File System Watcher:**
```python
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class UnlockHandler(FileSystemEventHandler):
    def on_modified(self, event):
        # When file is closed/unlocked, retry operation
        if not is_file_locked(event.src_path):
            retry_pending_operation(event.src_path)
```

**C. Scheduled Retry (Cron-like):**
```python
# Instead of immediate retry, schedule for later
scheduler.schedule_retry(
    operation=failed_op,
    retry_after=timedelta(minutes=5),
    max_retries=10
)
```

**D. User Notification Pattern:**
```python
# Notify user to close file
def request_file_closure(filepath: str):
    # Send email/Slack message to file owner
    owner = get_file_owner(filepath)
    send_message(
        to=owner,
        subject=f"Please close {filepath}",
        body="Automated process needs access to this file."
    )
```

**DevOps Use Cases:**
- **Log rotation**: Rotate logs that may be locked by running services
- **Backup operations**: Backup files that users have open
- **Configuration updates**: Update config files in use by applications
- **Deployment scripts**: Replace DLLs/executables that are loaded
- **Report generation**: Process Excel files that analysts have open
- **Database file operations**: Handle locked SQLite databases

**Best Practices:**
1. Always implement timeout limits to prevent infinite waits
2. Log all retry attempts for debugging and monitoring
3. Provide clear error messages indicating why operations failed
4. Use exponential backoff to avoid overwhelming the system
5. Implement circuit breaker pattern for repeatedly failing operations
6. Monitor queue depth to detect systemic issues
7. Set maximum queue size to prevent memory exhaustion
8. Gracefully handle worker thread failures
9. Provide manual retry capability for operators
10. Keep audit trail of all file operations for compliance

## Section 4: Data Processing and Excel Reporting

### Theory: Data Pipeline Development

Data processing is central to DevOps observability:
- **ETL Operations** - Extract, Transform, Load data from multiple sources
- **Aggregation** - Summarizing metrics across time periods and services
- **Reporting** - Creating consumable reports for stakeholders
- **Data Validation** - Ensuring data quality and consistency

### Essential Data Processing Methods:
- **`pandas.read_csv()`** - Read CSV files into DataFrames
- **`DataFrame.groupby()`** - Group data for aggregation operations
- **`DataFrame.agg()`** - Apply aggregation functions (sum, mean, count)
- **`ExcelWriter`** - Create multi-sheet Excel workbooks
- **`to_excel()`** - Write DataFrames to Excel sheets

In [None]:
%pip install openpyxl

Defaulting to user installation because normal site-packages is not writeable
Collecting openpyxl
  Using cached openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting et-xmlfile (from openpyxl)
  Using cached et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)
Using cached openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
Using cached et_xmlfile-2.0.0-py3-none-any.whl (18 kB)
Installing collected packages: et-xmlfile, openpyxl

   -------------------- ------------------- 1/2 [openpyxl]
   -------------------- ------------------- 1/2 [openpyxl]
   -------------------- ------------------- 1/2 [openpyxl]
   -------------------- ------------------- 1/2 [openpyxl]
   -------------------- ------------------- 1/2 [openpyxl]
   -------------------- ------------------- 1/2 [openpyxl]
   ---------------------------------------- 2/2 [openpyxl]

Successfully installed et-xmlfile-2.0.0 openpyxl-3.1.5



[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: C:\Python313\python.exe -m pip install --upgrade pip


In [None]:
# Example 5: CSV to Excel Report Builder
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
from openpyxl.styles import Font, PatternFill, Alignment

class ReportBuilder:
    """Build comprehensive Excel reports from CSV transaction data."""

    def __init__(self, input_folder: str):
        self.input_folder = input_folder
        self.raw_data = None
        self.daily_summary = None
        self.service_summary = None

    def create_sample_data(self):
        """Create sample CSV files for testing."""
        os.makedirs(self.input_folder, exist_ok=True)

        # Generate sample transaction data
        services = ['auth-service', 'user-service', 'payment-service', 'notification-service']
        users = [f'user_{i:03d}' for i in range(1, 51)]  # 50 users

        # Create 3 CSV files representing different days
        for day_offset in range(3):
            date = datetime.now() - timedelta(days=day_offset)
            filename = f"transactions_{date.strftime('%Y%m%d')}.csv"
            filepath = os.path.join(self.input_folder, filename)

            # Generate random transactions for this day
            transactions = []
            for _ in range(100):  # 100 transactions per day
                timestamp = date + timedelta(
                    hours=np.random.randint(0, 24),
                    minutes=np.random.randint(0, 60)
                )

                transaction = {
                    'timestamp': timestamp.strftime('%Y-%m-%d %H:%M:%S'),
                    'user_id': np.random.choice(users),
                    'service': np.random.choice(services),
                    'amount': round(np.random.uniform(5.0, 500.0), 2)
                }
                transactions.append(transaction)

            # Write to CSV
            df = pd.DataFrame(transactions)
            df.to_csv(filepath, index=False)
            print(f"Created {filename} with {len(transactions)} transactions")

    def load_csv_files(self):
        """Load and merge all CSV files from input folder."""
        csv_files = [f for f in os.listdir(self.input_folder) if f.endswith('.csv')]

        if not csv_files:
            raise ValueError(f"No CSV files found in {self.input_folder}")

        dataframes = []

        for csv_file in csv_files:
            filepath = os.path.join(self.input_folder, csv_file)
            try:
                df = pd.read_csv(filepath)

                # Validate required columns
                required_columns = ['timestamp', 'user_id', 'service', 'amount']
                missing_columns = [col for col in required_columns if col not in df.columns]

                if missing_columns:
                    print(f"Warning: {csv_file} missing columns: {missing_columns}")
                    continue

                # Convert timestamp to datetime
                df['timestamp'] = pd.to_datetime(df['timestamp'])
                df['date'] = df['timestamp'].dt.date

                dataframes.append(df)
                print(f"Loaded {csv_file}: {len(df)} transactions")

            except Exception as e:
                print(f"Error loading {csv_file}: {e}")

        if not dataframes:
            raise ValueError("No valid CSV files could be loaded")

        # Merge all data
        self.raw_data = pd.concat(dataframes, ignore_index=True)
        print(f"\nTotal transactions loaded: {len(self.raw_data)}")

        return self.raw_data

    def generate_daily_summary(self):
        """Generate daily aggregated summary."""
        if self.raw_data is None:
            raise ValueError("No data loaded. Call load_csv_files() first.")

        self.daily_summary = self.raw_data.groupby('date').agg({
            'amount': ['count', 'sum', 'mean'],
            'user_id': 'nunique',
            'service': 'nunique'
        }).round(2)

        # Flatten column names
        self.daily_summary.columns = [
            'total_transactions', 'total_amount', 'avg_amount',
            'unique_users', 'unique_services'
        ]

        # Reset index to make date a column
        self.daily_summary.reset_index(inplace=True)

        return self.daily_summary

    def generate_service_summary(self):
        """Generate service-level aggregated summary."""
        if self.raw_data is None:
            raise ValueError("No data loaded. Call load_csv_files() first.")

        self.service_summary = self.raw_data.groupby('service').agg({
            'amount': ['count', 'sum', 'mean', 'min', 'max'],
            'user_id': 'nunique'
        }).round(2)

        # Flatten column names
        self.service_summary.columns = [
            'total_transactions', 'total_amount', 'avg_amount',
            'min_amount', 'max_amount', 'unique_users'
        ]

        # Reset index to make service a column
        self.service_summary.reset_index(inplace=True)

        # Sort by total amount descending
        self.service_summary = self.service_summary.sort_values('total_amount', ascending=False)

        return self.service_summary

    def create_excel_report(self, output_filename: str):
        """Create comprehensive Excel report with multiple sheets."""
        with pd.ExcelWriter(output_filename, engine='openpyxl') as writer:
            # Write daily summary
            self.daily_summary.to_excel(writer, sheet_name='DailySummary', index=False)

            # Write service summary
            self.service_summary.to_excel(writer, sheet_name='ServiceSummary', index=False)

            # Write raw merged data (limited to first 1000 rows for performance)
            raw_limited = self.raw_data.head(1000).copy()
            raw_limited.to_excel(writer, sheet_name='RawMerged', index=False)

            # Apply formatting
            self._format_excel_sheets(writer)

        print(f"\n Excel report created: {output_filename}")

        # Print summary statistics
        self._print_summary_stats()

    def _format_excel_sheets(self, writer):
        """Apply formatting to Excel sheets."""
        # Format Daily Summary sheet
        ws_daily = writer.sheets['DailySummary']

        # Header formatting
        header_font = Font(bold=True, color="FFFFFF")
        header_fill = PatternFill(start_color="366092", end_color="366092", fill_type="solid")

        for cell in ws_daily[1]:  # First row (headers)
            cell.font = header_font
            cell.fill = header_fill
            cell.alignment = Alignment(horizontal="center")

        # Auto-adjust column widths
        for column in ws_daily.columns:
            max_length = 0
            column_letter = column[0].column_letter
            for cell in column:
                try:
                    if len(str(cell.value)) > max_length:
                        max_length = len(str(cell.value))
                except:
                    pass
            adjusted_width = min(max_length + 2, 20)
            ws_daily.column_dimensions[column_letter].width = adjusted_width

    def _print_summary_stats(self):
        """Print summary statistics for validation."""
        print("\n=== REPORT SUMMARY ===")
        print(f"Daily Summary: {len(self.daily_summary)} days")
        print(f"Service Summary: {len(self.service_summary)} services")
        print(f"Raw Data: {len(self.raw_data)} total transactions")

        total_amount = self.raw_data['amount'].sum()
        avg_amount = self.raw_data['amount'].mean()
        print(f"Total Amount: ${total_amount:,.2f}")
        print(f"Average Transaction: ${avg_amount:.2f}")

        print("\nTop 3 Services by Volume:")
        for idx, row in self.service_summary.head(3).iterrows():
            print(f"  {row['service']}: ${row['total_amount']:,.2f} ({row['total_transactions']} transactions)")

# Test the report builder
print("=== CSV TO EXCEL REPORT BUILDER ===")

# Create report builder and sample data
builder = ReportBuilder('transaction_data')
builder.create_sample_data()

# Load and process data
raw_data = builder.load_csv_files()
daily_summary = builder.generate_daily_summary()
service_summary = builder.generate_service_summary()

print("\n Daily Summary (first 3 rows):")
print(daily_summary.head(3))

print("\n Service Summary:")
print(service_summary)

# Create Excel report
builder.create_excel_report('transaction_report.xlsx')

=== CSV TO EXCEL REPORT BUILDER ===
Created transactions_20251026.csv with 100 transactions
Created transactions_20251025.csv with 100 transactions
Created transactions_20251024.csv with 100 transactions
Loaded transactions_20251024.csv: 100 transactions
Loaded transactions_20251025.csv: 100 transactions
Loaded transactions_20251026.csv: 100 transactions

Total transactions loaded: 300

 Daily Summary (first 3 rows):
         date  total_transactions  total_amount  avg_amount  unique_users  \
0  2025-10-24                  33       8455.02      256.21            24   
1  2025-10-25                 108      26036.21      241.08            41   
2  2025-10-26                  97      25493.74      262.82            46   

   unique_services  
0                4  
1                4  
2                4  

 Service Summary:
                service  total_transactions  total_amount  avg_amount  \
2       payment-service                  79      20750.73      262.67   
3          user-servi

### Key Data Processing Methods:

**Pandas DataFrame Operations:**
- **`pd.read_csv(filepath)`** - Read CSV files into DataFrame objects
- **`pd.concat(dataframes)`** - Concatenate multiple DataFrames vertically
- **`df.groupby(column)`** - Group DataFrame rows by column values
- **`df.agg(functions)`** - Apply aggregation functions to grouped data
- **`pd.to_datetime(series)`** - Convert strings to datetime objects

**Data Aggregation Functions:**
- **`'count'`** - Count non-null values in each group
- **`'sum'`** - Sum numeric values in each group
- **`'mean'`** - Calculate average values in each group
- **`'nunique'`** - Count unique values in each group
- **`'min'/'max'`** - Find minimum/maximum values

**Excel Integration:**
- **`pd.ExcelWriter(filename, engine='openpyxl')`** - Create Excel file writer
- **`df.to_excel(writer, sheet_name)`** - Write DataFrame to Excel sheet
- **`openpyxl.styles`** - Apply formatting (Font, PatternFill, Alignment)
- **Context managers** - `with writer:` ensures proper file closure

**Data Validation Patterns:**
- **Column existence checking** - Validate required columns are present
- **Exception handling** - Graceful handling of file format errors
- **Data type conversion** - Convert strings to appropriate types

## Section 5: Concurrent Programming and HTTP Operations

### Theory: Asynchronous Operations in DevOps

Concurrent programming is essential for:
- **Health monitoring** - Checking multiple services simultaneously
- **API testing** - Parallel validation of endpoints
- **Data collection** - Gathering metrics from multiple sources
- **Performance optimization** - Reducing total execution time

### Essential Async and HTTP Methods:
- **`asyncio.run()`** - Run async functions from synchronous code
- **`aiohttp.ClientSession()`** - HTTP client for async requests
- **`asyncio.gather()`** - Run multiple async functions concurrently
- **`asyncio.sleep()`** - Non-blocking delay for retry logic
- **Exponential backoff** - Progressive retry delays for resilience

In [None]:
%pip install aiohttp

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.1.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
# Example 6: Concurrent REST Health Checker
import asyncio
import aiohttp
import json
import time
from typing import List, Dict, Any
import csv
from datetime import datetime

class HealthChecker:
    """Concurrent HTTP health checker with retry logic and exponential backoff."""

    def __init__(self, timeout: int = 10, max_retries: int = 3):
        self.timeout = timeout
        self.max_retries = max_retries
        self.results = []

    def create_sample_endpoints(self, filename: str):
        """Create sample endpoint configuration for testing."""
        endpoints = [
            {"name": "httpbin-get", "url": "https://httpbin.org/get", "expected_status": 200},
            {"name": "httpbin-status-200", "url": "https://httpbin.org/status/200", "expected_status": 200},
            {"name": "httpbin-delay", "url": "https://httpbin.org/delay/1", "expected_status": 200},
            {"name": "google", "url": "https://www.google.com", "expected_status": 200},
            {"name": "invalid-url", "url": "https://this-domain-does-not-exist-12345.com", "expected_status": 200}
        ]

        with open(filename, 'w') as f:
            json.dump(endpoints, f, indent=2)

        print(f"Created sample endpoints configuration: {filename}")
        return endpoints

    def load_endpoints(self, filename: str) -> List[Dict[str, Any]]:
        """Load endpoint configuration from JSON file."""
        try:
            with open(filename, 'r') as f:
                endpoints = json.load(f)

            # Validate endpoint structure
            for i, endpoint in enumerate(endpoints):
                required_fields = ['name', 'url', 'expected_status']
                missing_fields = [field for field in required_fields if field not in endpoint]

                if missing_fields:
                    raise ValueError(f"Endpoint {i}: missing fields {missing_fields}")

            print(f"Loaded {len(endpoints)} endpoints from {filename}")
            return endpoints

        except FileNotFoundError:
            raise FileNotFoundError(f"Endpoints file not found: {filename}")
        except json.JSONDecodeError as e:
            raise ValueError(f"Invalid JSON in endpoints file: {e}")

    async def check_endpoint_with_retry(self, session: aiohttp.ClientSession, endpoint: Dict[str, Any]) -> Dict[str, Any]:
        """Check a single endpoint with retry logic and exponential backoff."""
        name = endpoint['name']
        url = endpoint['url']
        expected_status = endpoint['expected_status']

        start_time = time.time()
        attempts = 0
        last_error = None

        for attempt in range(self.max_retries):
            attempts += 1

            try:
                # Calculate backoff delay: 2^attempt seconds (1, 2, 4, 8...)
                if attempt > 0:
                    backoff_delay = 2 ** (attempt - 1)
                    print(f"   {name}: Retry {attempt} after {backoff_delay}s delay")
                    await asyncio.sleep(backoff_delay)

                # Make HTTP request
                async with session.get(url, timeout=aiohttp.ClientTimeout(total=self.timeout)) as response:
                    response_time = (time.time() - start_time) * 1000  # Convert to milliseconds

                    result = {
                        'name': name,
                        'url': url,
                        'final_status_code': response.status,
                        'ok': response.status == expected_status,
                        'response_time_ms': round(response_time, 2),
                        'attempts': attempts,
                        'error': None
                    }

                    if result['ok']:
                        print(f" {name}: OK ({response.status}) in {response_time:.1f}ms")
                        return result
                    else:
                        last_error = f"Status {response.status}, expected {expected_status}"
                        if attempt < self.max_retries - 1:
                            print(f" {name}: {last_error}, retrying...")
                            continue

            except asyncio.TimeoutError:
                last_error = f"Timeout after {self.timeout}s"
                if attempt < self.max_retries - 1:
                    print(f" {name}: {last_error}, retrying...")
                    continue

            except aiohttp.ClientError as e:
                last_error = f"Connection error: {str(e)[:100]}"
                if attempt < self.max_retries - 1:
                    print(f" {name}: {last_error}, retrying...")
                    continue

            except Exception as e:
                last_error = f"Unexpected error: {str(e)[:100]}"
                if attempt < self.max_retries - 1:
                    print(f" {name}: {last_error}, retrying...")
                    continue

        # All retries failed
        response_time = (time.time() - start_time) * 1000
        result = {
            'name': name,
            'url': url,
            'final_status_code': None,
            'ok': False,
            'response_time_ms': round(response_time, 2),
            'attempts': attempts,
            'error': last_error
        }

        print(f" {name}: FAILED after {attempts} attempts - {last_error}")
        return result

    async def check_all_endpoints(self, endpoints: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Check all endpoints concurrently."""
        print(f"\n Starting health checks for {len(endpoints)} endpoints...")
        start_time = time.time()

        # Create HTTP session with connection pooling
        connector = aiohttp.TCPConnector(limit=10, limit_per_host=3)
        timeout = aiohttp.ClientTimeout(total=self.timeout)

        async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
            # Run all health checks concurrently
            tasks = [self.check_endpoint_with_retry(session, endpoint) for endpoint in endpoints]
            results = await asyncio.gather(*tasks, return_exceptions=True)

            # Handle any exceptions that weren't caught
            processed_results = []
            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    endpoint = endpoints[i]
                    processed_results.append({
                        'name': endpoint['name'],
                        'url': endpoint['url'],
                        'final_status_code': None,
                        'ok': False,
                        'response_time_ms': 0,
                        'attempts': 1,
                        'error': f"Unhandled exception: {str(result)}"
                    })
                else:
                    processed_results.append(result)

        total_time = time.time() - start_time
        print(f"\n  Total execution time: {total_time:.2f}s")

        self.results = processed_results
        return processed_results

    def save_results_csv(self, filename: str):
        """Save results to CSV file."""
        fieldnames = ['name', 'url', 'final_status_code', 'ok', 'response_time_ms', 'attempts', 'error']

        with open(filename, 'w', newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(self.results)

        print(f" Results saved to CSV: {filename}")

    def save_results_json(self, filename: str):
        """Save results to JSON file."""
        report = {
            'timestamp': datetime.now().isoformat(),
            'summary': {
                'total_endpoints': len(self.results),
                'successful': sum(1 for r in self.results if r['ok']),
                'failed': sum(1 for r in self.results if not r['ok']),
                'avg_response_time_ms': round(sum(r['response_time_ms'] for r in self.results) / len(self.results), 2) if self.results else 0
            },
            'results': self.results
        }

        with open(filename, 'w') as f:
            json.dump(report, f, indent=2)

        print(f" Results saved to JSON: {filename}")

    def print_summary(self):
        """Print summary of health check results."""
        if not self.results:
            print("No results to summarize")
            return

        successful = [r for r in self.results if r['ok']]
        failed = [r for r in self.results if not r['ok']]

        print(f"\n HEALTH CHECK SUMMARY")
        print(f"Total endpoints: {len(self.results)}")
        print(f" Successful: {len(successful)}")
        print(f" Failed: {len(failed)}")

        if successful:
            avg_response_time = sum(r['response_time_ms'] for r in successful) / len(successful)
            print(f" Average response time: {avg_response_time:.1f}ms")

        if failed:
            print(f"\n Failed endpoints:")
            for result in failed:
                print(f"  - {result['name']}: {result['error']}")

# Test the health checker
async def main():
    print("=== CONCURRENT REST HEALTH CHECKER ===")

    checker = HealthChecker(timeout=5, max_retries=3)

    # Create sample endpoints
    endpoints = checker.create_sample_endpoints('endpoints.json')

    # Load endpoints and run health checks
    endpoints = checker.load_endpoints('endpoints.json')
    results = await checker.check_all_endpoints(endpoints)

    # Save results and print summary
    checker.save_results_csv('health_check_results.csv')
    checker.save_results_json('health_check_results.json')
    checker.print_summary()

# Run the async main function
await main()

=== CONCURRENT REST HEALTH CHECKER ===
Created sample endpoints configuration: endpoints.json
Loaded 5 endpoints from endpoints.json

 Starting health checks for 5 endpoints...
 invalid-url: Connection error: Cannot connect to host this-domain-does-not-exist-12345.com:443 ssl:default [getaddrinfo failed], retrying...
   invalid-url: Retry 1 after 1s delay
 invalid-url: Connection error: Cannot connect to host this-domain-does-not-exist-12345.com:443 ssl:default [getaddrinfo failed], retrying...
   invalid-url: Retry 1 after 1s delay
 invalid-url: Connection error: Cannot connect to host this-domain-does-not-exist-12345.com:443 ssl:default [getaddrinfo failed], retrying...
   invalid-url: Retry 2 after 2s delay
 invalid-url: Connection error: Cannot connect to host this-domain-does-not-exist-12345.com:443 ssl:default [getaddrinfo failed], retrying...
   invalid-url: Retry 2 after 2s delay
 invalid-url: FAILED after 3 attempts - Connection error: Cannot connect to host this-domain-does-n

### Key Async and HTTP Methods:

**Async Programming Fundamentals:**
- **`async def function()`** - Define asynchronous function
- **`await expression`** - Wait for async operation to complete
- **`asyncio.gather(*tasks)`** - Run multiple async functions concurrently
- **`asyncio.sleep(seconds)`** - Non-blocking delay (vs `time.sleep()`)

**HTTP Client Operations:**
- **`aiohttp.ClientSession()`** - HTTP client with connection pooling
- **`session.get(url, timeout=timeout)`** - Async HTTP GET request
- **`aiohttp.ClientTimeout(total=seconds)`** - Request timeout configuration
- **`aiohttp.TCPConnector(limit=n)`** - Connection pool limits

**Error Handling and Retries:**
- **Exponential backoff** - `2 ** (attempt - 1)` for progressive delays
- **`aiohttp.ClientError`** - Base exception for HTTP client errors
- **`asyncio.TimeoutError`** - Exception for request timeouts
- **`return_exceptions=True`** - Handle exceptions in `asyncio.gather()`

**Performance Measurement:**
- **`time.time()`** - High-precision timestamp for response time calculation
- **Concurrent execution** - Multiple requests in parallel vs sequential
- **Connection pooling** - Reuse HTTP connections for efficiency

## Section 6: Text Processing and Package Development

### Theory: Text Normalization Pipelines

Text processing is fundamental for:
- **Log analysis** - Extracting insights from unstructured log data
- **Configuration processing** - Normalizing config file formats
- **Documentation generation** - Processing README and help text
- **Data preprocessing** - Cleaning text data for analysis

### Essential Text Processing Methods:
- **`str.lower()`** - Convert text to lowercase
- **`re.sub(pattern, replacement, text)`** - Replace text patterns with regex
- **`str.split()`** - Split text into tokens
- **`collections.Counter()`** - Count occurrences of items
- **`str.join(iterable)`** - Join text elements with separator

In [None]:
# Example 7: Text Normalization Toolkit
import re
import json
import sys
from collections import Counter
from typing import List, Dict, Set

class TextNormalizer:
    """Comprehensive text normalization toolkit for DevOps text processing."""

    def __init__(self, stopwords: Set[str] = None):
        """Initialize with optional custom stopwords set."""
        self.default_stopwords = {
            'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to',
            'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be',
            'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did',
            'will', 'would', 'could', 'should', 'may', 'might', 'can'
        }
        self.stopwords = stopwords if stopwords is not None else self.default_stopwords

    def normalize_whitespace(self, text: str) -> str:
        """Normalize whitespace by collapsing multiple spaces and removing leading/trailing space."""
        if not isinstance(text, str):
            raise TypeError("Input must be a string")

        # Replace multiple whitespace characters with single space
        normalized = re.sub(r'\s+', ' ', text)

        # Remove leading and trailing whitespace
        return normalized.strip()

    def remove_punctuation(self, text: str) -> str:
        """Remove punctuation while preserving word boundaries."""
        if not isinstance(text, str):
            raise TypeError("Input must be a string")

        # Remove punctuation but keep word boundaries
        # This pattern keeps alphanumeric characters and spaces
        cleaned = re.sub(r'[^\w\s]', '', text)

        return cleaned

    def lowercase(self, text: str) -> str:
        """Convert text to lowercase."""
        if not isinstance(text, str):
            raise TypeError("Input must be a string")

        return text.lower()

    def tokenize(self, text: str) -> List[str]:
        """Tokenize text into words using whitespace and basic punctuation."""
        if not isinstance(text, str):
            raise TypeError("Input must be a string")

        # Split on whitespace and filter out empty strings
        tokens = [token for token in text.split() if token]

        return tokens

    def unique_word_counts(self, tokens: List[str], stopwords: Set[str] = None) -> Dict[str, int]:
        """Count unique words, optionally removing stopwords."""
        if not isinstance(tokens, list):
            raise TypeError("Tokens must be a list")

        if stopwords is None:
            stopwords = self.stopwords

        # Filter out stopwords and count occurrences
        filtered_tokens = [token for token in tokens if token.lower() not in stopwords]

        return dict(Counter(filtered_tokens))

    def process_text_pipeline(self, text: str, remove_stopwords: bool = True) -> Dict[str, any]:
        """Complete text processing pipeline with all normalization steps."""
        original_length = len(text)

        # Step 1: Normalize whitespace
        normalized = self.normalize_whitespace(text)

        # Step 2: Remove punctuation
        no_punct = self.remove_punctuation(normalized)

        # Step 3: Convert to lowercase
        lowercased = self.lowercase(no_punct)

        # Step 4: Tokenize
        tokens = self.tokenize(lowercased)

        # Step 5: Count unique words
        word_counts = self.unique_word_counts(tokens, self.stopwords if remove_stopwords else set())

        # Get top 10 words
        top_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:10]

        return {
            'original_length': original_length,
            'normalized_length': len(lowercased),
            'total_tokens': len(tokens),
            'unique_words_count': len(word_counts),
            'top_10_words': dict(top_words),
            'processed_text': lowercased
        }

    def load_stopwords_file(self, filename: str) -> Set[str]:
        """Load custom stopwords from file (one word per line)."""
        try:
            with open(filename, 'r') as f:
                stopwords = {line.strip().lower() for line in f if line.strip()}

            print(f"Loaded {len(stopwords)} custom stopwords from {filename}")
            self.stopwords = stopwords
            return stopwords

        except FileNotFoundError:
            print(f"Stopwords file not found: {filename}. Using default stopwords.")
            return self.stopwords
        except Exception as e:
            print(f"Error loading stopwords file: {e}. Using default stopwords.")
            return self.stopwords

    def create_sample_stopwords_file(self, filename: str):
        """Create a sample stopwords file for testing."""
        sample_stopwords = [
            '# Common English stopwords',
            'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at',
            'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were',
            '# Technical stopwords for DevOps',
            'server', 'service', 'docker', 'kubernetes', 'config'
        ]

        with open(filename, 'w') as f:
            for word in sample_stopwords:
                f.write(word + '\n')

        print(f"Created sample stopwords file: {filename}")

# Test the text normalizer
print("=== TEXT NORMALIZATION TOOLKIT ===")

# Create normalizer instance
normalizer = TextNormalizer()

# Create sample stopwords file
normalizer.create_sample_stopwords_file('custom_stopwords.txt')

# Test individual functions
sample_text = """
    This is a SAMPLE text with   multiple    spaces!!!
    It has punctuation, UPPERCASE letters, and other issues.
    We need to normalize this text for processing...
    Docker containers and Kubernetes services are running.
""".strip()

print(f"\nOriginal text ({len(sample_text)} chars):")
print(f'"{sample_text}"')

# Test normalization steps
print("\n=== STEP-BY-STEP NORMALIZATION ===")

step1 = normalizer.normalize_whitespace(sample_text)
print(f"1. Normalize whitespace: \"{step1}\"")

step2 = normalizer.remove_punctuation(step1)
print(f"2. Remove punctuation: \"{step2}\"")

step3 = normalizer.lowercase(step2)
print(f"3. Lowercase: \"{step3}\"")

step4 = normalizer.tokenize(step3)
print(f"4. Tokenize: {step4}")

step5 = normalizer.unique_word_counts(step4)
print(f"5. Word counts: {dict(list(step5.items())[:10])}")  # Show first 10

# Test complete pipeline
print("\n=== COMPLETE PIPELINE RESULTS ===")
result = normalizer.process_text_pipeline(sample_text)

print(json.dumps(result, indent=2))

# Test with custom stopwords
print("\n=== TESTING CUSTOM STOPWORDS ===")
normalizer.load_stopwords_file('custom_stopwords.txt')
result_custom = normalizer.process_text_pipeline(sample_text)

print(f"With custom stopwords: {len(result_custom['top_10_words'])} unique words")
print(f"Top words: {list(result_custom['top_10_words'].keys())[:5]}")

=== TEXT NORMALIZATION TOOLKIT ===
Created sample stopwords file: custom_stopwords.txt

Original text (225 chars):
"This is a SAMPLE text with   multiple    spaces!!! 
    It has punctuation, UPPERCASE letters, and other issues.
    We need to normalize this text for processing... 
    Docker containers and Kubernetes services are running."

=== STEP-BY-STEP NORMALIZATION ===
1. Normalize whitespace: "This is a SAMPLE text with multiple spaces!!! It has punctuation, UPPERCASE letters, and other issues. We need to normalize this text for processing... Docker containers and Kubernetes services are running."
2. Remove punctuation: "This is a SAMPLE text with multiple spaces It has punctuation UPPERCASE letters and other issues We need to normalize this text for processing Docker containers and Kubernetes services are running"
3. Lowercase: "this is a sample text with multiple spaces it has punctuation uppercase letters and other issues we need to normalize this text for processing docker 

### Key Text Processing Methods:

**Regular Expression Operations:**
- **`re.sub(pattern, replacement, text)`** - Replace all matches of pattern with replacement
- **`r'\\s+'`** - Raw string pattern matching one or more whitespace characters
- **`r'[^\\w\\s]'`** - Pattern matching non-word, non-space characters (punctuation)
- **Character classes** - `\\w` (word chars), `\\s` (whitespace), `^` (negation)

**String Processing:**
- **`str.split()`** - Split string on whitespace into list of tokens
- **`str.strip()`** - Remove leading and trailing whitespace
- **`str.lower()`** - Convert string to lowercase for normalization
- **List comprehensions** - `[token for token in tokens if condition]` for filtering

**Data Structures for Counting:**
- **`collections.Counter(iterable)`** - Count occurrences of hashable objects
- **`dict(Counter(items))`** - Convert Counter to regular dictionary
- **`sorted(items, key=lambda x: x[1], reverse=True)`** - Sort by count descending
- **Set operations** - `token in stopwords` for efficient membership testing

**Type Safety and Validation:**
- **`isinstance(obj, type)`** - Runtime type checking for function inputs
- **Type hints** - `Set[str]`, `List[str]`, `Dict[str, int]` for documentation
- **Input validation** - Raising `TypeError` for invalid input types
- **Default parameters** - Using `None` with conditional assignment for optional params

## Homework Assignment: Complete DevOps Automation Tasks

### 🎯 **Task 1: Log Analyzer CLI Tool**

**Objective**: Build a professional command-line log analyzer that processes web server logs efficiently.

**Requirements**:
- Create `log_analyzer.py` with `argparse` CLI interface
- Support options: `--input`, `--top-n`, `--output-json`, `--output-tsv`
- Implement streaming file processing (no full file load)
- Calculate: total requests, status code distribution (2xx/3xx/4xx/5xx), top N client IPs, top N paths, hourly counts
- Handle malformed lines gracefully (log and skip)
- Write unit tests with `pytest`

**Deliverables**:
```bash
python log_analyzer.py --input large_log.log --top-n 10 --output-json report.json --output-tsv report.tsv
```

**Acceptance Criteria**:
- Tool processes 100k+ lines without memory issues
- JSON output contains all required metrics
- TSV file opens correctly in Excel
- Tests cover edge cases and malformed lines

---

### 🎯 **Task 2: Configuration Tool Package**

**Objective**: Create an installable Python package for YAML configuration validation.

**Requirements**:
- Package structure with `setup.py` or `pyproject.toml`
- CLI tool `cfgtool` that validates YAML configs
- Validate fields: name, version, services list
- Provide `requirements.txt` and `bootstrap.sh` script
- Include 4+ unit tests for config parsing and error handling

**Deliverables**:
```bash
pip install -e .
cfgtool config.yml  # Prints validated summary
```

**Package Structure**:
```
cfgtool/
├── setup.py
├── requirements.txt
├── bootstrap.sh
├── cfgtool/
│   ├── __init__.py
│   ├── cli.py
│   └── validator.py
└── tests/
    └── test_cfgtool.py
```

---

### 🎯 **Task 3: Configuration-Driven File Renamer**

**Objective**: Build a safe file renaming tool with dry-run capability and rollback support.

**Requirements**:
- YAML config defines glob patterns → rename templates
- CLI options: `--config`, `--dir`, `--dry-run`, `--commit`, `--force`
- Template variables: `{date}`, `{datetime}`, `{orig}`, `{ext}`, `{parent}`
- Conflict detection and prevention
- Generate `rename_manifest.json` for rollback
- Unit tests for pattern matching and conflict detection

**Example Usage**:
```bash
python renamer.py --config rules.yml --dir /logs --dry-run
python renamer.py --config rules.yml --dir /logs --commit
```

---

### 🎯 **Task 4: CSV Aggregator → Excel Report**

**Objective**: Process multiple CSV transaction files into comprehensive Excel reports.

**Requirements**:
- Input: folder with CSV files (timestamp, user_id, service, amount columns)
- Aggregate by day and service: count, sum, average amounts
- Output Excel with 3 sheets: DailySummary, ServiceSummary, RawMerged
- Use `pandas` and `openpyxl` for processing
- Include data validation script
- Format Excel sheets professionally

**Expected Output**:
- Multi-sheet Excel workbook
- Proper formatting and column headers
- Validation that aggregates match raw data

---

### 🎯 **Task 5: Concurrent REST Health Checker**

**Objective**: Implement async HTTP health monitoring with retry logic.

**Requirements**:
- JSON config: `[{"name": "...", "url": "...", "expected_status": 200}]`
- Use `asyncio` and `aiohttp` for concurrent requests
- Retry logic: 3 attempts with exponential backoff
- Output CSV/JSON with: name, url, status_code, ok, response_time_ms, attempts
- Unit tests with mocked HTTP responses
- Performance comparison vs sequential execution

**Key Features**:
- Exponential backoff: 1s, 2s, 4s delays
- Connection pooling for efficiency
- Timeout handling and error reporting

---

### 🎯 **Task 6: Text Normalization Package & CLI**

**Objective**: Create a reusable text processing package with CLI interface.

**Requirements**:
- Package `textnorm` with functions:
  - `normalize_whitespace(text)`
  - `remove_punctuation(text)`
  - `lowercase(text)`
  - `tokenize(text)`
  - `unique_word_counts(tokens, stopwords=[])`
- CLI reads from file/stdin, outputs JSON summary
- Support custom stopwords file
- Include type hints and comprehensive tests

**CLI Output Example**:
```json
{
  "original_length": 150,
  "normalized_length": 145,
  "unique_words_count": 25,
  "top_10_words": {"python": 5, "devops": 3, "automation": 2}
}
```

---

### 📋 **Submission Guidelines**

**Project Structure**:
```
session1_homework/
├── task1_log_analyzer/
│   ├── log_analyzer.py
│   ├── tests/
│   └── sample_data/
├── task2_cfgtool/
│   ├── setup.py
│   ├── cfgtool/
│   └── tests/
├── task3_renamer/
├── task4_report_builder/
├── task5_health_checker/
├── task6_textnorm/
├── requirements.txt
└── README.md
```

**Quality Requirements**:
- ✅ All code includes error handling
- ✅ Unit tests with >80% coverage
- ✅ Type hints where appropriate
- ✅ Documentation strings
- ✅ CLI help messages
- ✅ Professional logging output

**Testing Commands**:
```bash
# Run all tests
python -m pytest tests/ -v --cov

# Test individual tasks
python -m pytest tests/test_log_analyzer.py -v
python -m pytest tests/test_cfgtool.py -v
```

### 🏆 **Success Criteria**

Each task will be evaluated on:
1. **Functionality** - Meets all requirements
2. **Code Quality** - Clean, readable, well-structured
3. **Error Handling** - Graceful failure modes
4. **Testing** - Comprehensive test coverage
5. **Documentation** - Clear usage instructions
6. **Performance** - Efficient for expected workloads

**Due Date**: [Insert appropriate deadline]
**Submission**: Git repository with complete implementation and README

## Summary and Best Practices

### What We've Learned

In this advanced session, we covered essential DevOps automation techniques:

1. **CLI Development** - Professional command-line tools with `argparse`
2. **File Processing** - Memory-efficient streaming and pattern matching
3. **Package Development** - Creating installable Python packages
4. **Data Pipelines** - ETL operations with pandas and Excel output
5. **Concurrent Programming** - Async HTTP operations with proper error handling
6. **Text Processing** - Comprehensive normalization and analysis pipelines

### Key DevOps Principles Applied

**Automation First**
- Every manual task should be automated
- Tools should be reusable and configurable
- Error handling must be comprehensive

**Performance Considerations**
- Stream large files instead of loading into memory
- Use async operations for I/O-bound tasks
- Implement connection pooling and retry logic

**Reliability and Safety**
- Dry-run modes for destructive operations
- Comprehensive logging and error reporting
- Rollback capabilities for file operations
- Input validation and type checking

**Professional Development**
- Complete test coverage with pytest
- Type hints for better code documentation
- CLI interfaces with proper help messages
- Package structure following Python conventions

### Production Deployment Checklist

✅ **Code Quality**
- Error handling for all failure modes
- Comprehensive logging at appropriate levels
- Input validation and sanitization
- Type hints and documentation

✅ **Testing**
- Unit tests for all functions
- Integration tests for complete workflows
- Mocked external dependencies
- Performance tests for large datasets

✅ **Documentation**
- Clear README with usage examples
- API documentation for functions
- CLI help messages
- Error message clarity

✅ **Deployment**
- Requirements.txt with pinned versions
- Virtual environment setup scripts
- Configuration file examples
- Installation and setup instructions

### Next Steps

These tools form the foundation for more advanced DevOps automation:

- **CI/CD Integration** - Incorporating tools into pipeline automation
- **Monitoring and Alerting** - Using health checkers for system monitoring
- **Infrastructure as Code** - Configuration-driven infrastructure management
- **Observability** - Log analysis and metrics collection at scale

Practice building and deploying these tools in real environments to gain experience with production considerations like security, scalability, and maintainability.