# Week 1: AI Engineering Mindset & Python Foundations - SOLUTION

## Overview
This solution notebook provides complete implementations for all Week 1 exercises.

### Learning Objectives
By the end of this week, you will be able to:
- Distinguish between AI, ML, DL, and Agentic AI
- Understand the AI system lifecycle: data → model → system → production
- Write clean, production-quality Python code with OOP, modularity, and typing
- Apply NumPy for efficient numerical computations
- Process data using Pandas
- Implement data validation and logging

---

## Part 1: AI vs ML vs DL vs Agentic AI

### Solution 1.1: Map Use Cases to AI Categories

In [None]:
from typing import Dict, List
from enum import Enum

class AICategory(Enum):
    RULE_BASED_AI = "rule_based_ai"
    TRADITIONAL_ML = "traditional_ml"
    DEEP_LEARNING = "deep_learning"
    AGENTIC_AI = "agentic_ai"

def categorize_ai_system(description: str) -> AICategory:
    """
    Categorize an AI system based on its description.
    
    Args:
        description: Description of the AI system
    
    Returns:
        AICategory enum value
    """
    description_lower = description.lower()
    
    # Check for agentic AI keywords
    if any(keyword in description_lower for keyword in ['autonomous', 'agent', 'plan', 'tool', 'memory']):
        return AICategory.AGENTIC_AI
    
    # Check for deep learning keywords
    if any(keyword in description_lower for keyword in ['neural network', 'cnn', 'transformer', 'deep learning', 'convolution']):
        return AICategory.DEEP_LEARNING
    
    # Check for rule-based keywords
    if any(keyword in description_lower for keyword in ['rule', 'if-then', 'expert system', 'logic']):
        return AICategory.RULE_BASED_AI
    
    # Default to traditional ML
    return AICategory.TRADITIONAL_ML

# Test cases
test_cases = [
    "A spam filter using logistic regression",
    "A chess program with if-then rules",
    "An image classifier using convolutional neural networks",
    "An autonomous agent that plans and executes research tasks"
]

for case in test_cases:
    print(f"{case}: {categorize_ai_system(case)}")

---

## Part 2: AI System Lifecycle

### Solution 2.1: Design a System Lifecycle Tracker

In [None]:
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

@dataclass
class LifecycleStage:
    """Represents a stage in the AI system lifecycle."""
    name: str
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    status: str = "pending"  # pending, in_progress, completed, failed
    
class AISystemLifecycle:
    """Tracks the lifecycle of an AI system."""
    
    def __init__(self, project_name: str):
        self.project_name = project_name
        self.stages = {
            "data": LifecycleStage("Data Collection & Preparation"),
            "model": LifecycleStage("Model Development"),
            "system": LifecycleStage("System Integration"),
            "production": LifecycleStage("Production Deployment")
        }
    
    def start_stage(self, stage_name: str) -> None:
        """Mark a stage as started."""
        if stage_name not in self.stages:
            raise ValueError(f"Invalid stage: {stage_name}")
        
        stage = self.stages[stage_name]
        stage.started_at = datetime.now()
        stage.status = "in_progress"
        print(f"Started stage: {stage.name}")
    
    def complete_stage(self, stage_name: str) -> None:
        """Mark a stage as completed."""
        if stage_name not in self.stages:
            raise ValueError(f"Invalid stage: {stage_name}")
        
        stage = self.stages[stage_name]
        if stage.status != "in_progress":
            print(f"Warning: Stage {stage.name} was not in progress")
        
        stage.completed_at = datetime.now()
        stage.status = "completed"
        print(f"Completed stage: {stage.name}")
    
    def get_current_stage(self) -> Optional[str]:
        """Get the current in-progress stage."""
        for name, stage in self.stages.items():
            if stage.status == "in_progress":
                return name
        return None
    
    def get_progress_report(self) -> Dict[str, str]:
        """Generate a progress report of all stages."""
        return {name: stage.status for name, stage in self.stages.items()}

# Test the lifecycle tracker
lifecycle = AISystemLifecycle("Customer Churn Predictor")
print(f"Project: {lifecycle.project_name}")
print(f"Initial progress: {lifecycle.get_progress_report()}")

lifecycle.start_stage("data")
print(f"Current stage: {lifecycle.get_current_stage()}")
lifecycle.complete_stage("data")

lifecycle.start_stage("model")
print(f"Progress: {lifecycle.get_progress_report()}")

---

## Part 3: Production-Quality Python

### Solution 3.1: Build a Data Validator Class

In [None]:
from typing import Any, Callable, List, Tuple
import logging
import re

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class ValidationRule:
    """Represents a single validation rule."""
    
    def __init__(self, name: str, validator: Callable[[Any], bool], error_message: str):
        self.name = name
        self.validator = validator
        self.error_message = error_message
    
    def validate(self, value: Any) -> Tuple[bool, str]:
        """Run validation and return (is_valid, message)."""
        try:
            is_valid = self.validator(value)
            if is_valid:
                return (True, "")
            else:
                return (False, self.error_message)
        except Exception as e:
            return (False, f"Validation error: {str(e)}")

class DataValidator:
    """Validates data against a set of rules."""
    
    def __init__(self):
        self.rules: List[ValidationRule] = []
    
    def add_rule(self, rule: ValidationRule) -> None:
        """Add a validation rule."""
        self.rules.append(rule)
        logger.info(f"Added validation rule: {rule.name}")
    
    def validate(self, value: Any) -> Tuple[bool, List[str]]:
        """Validate value against all rules.
        
        Returns:
            Tuple of (is_valid, list of error messages)
        """
        errors = []
        
        for rule in self.rules:
            is_valid, message = rule.validate(value)
            if not is_valid:
                errors.append(f"{rule.name}: {message}")
                logger.warning(f"Validation failed for rule '{rule.name}': {message}")
        
        return (len(errors) == 0, errors)

# Create validation rules for email addresses
email_validator = DataValidator()

# Rule 1: Must contain @
email_validator.add_rule(
    ValidationRule(
        "contains_at",
        lambda email: '@' in str(email),
        "Email must contain @ symbol"
    )
)

# Rule 2: Must have domain
email_validator.add_rule(
    ValidationRule(
        "has_domain",
        lambda email: '.' in str(email).split('@')[-1] if '@' in str(email) else False,
        "Email must have a valid domain"
    )
)

# Rule 3: Must not be empty
email_validator.add_rule(
    ValidationRule(
        "not_empty",
        lambda email: len(str(email).strip()) > 0,
        "Email cannot be empty"
    )
)

# Rule 4: Basic format check
email_validator.add_rule(
    ValidationRule(
        "valid_format",
        lambda email: re.match(r'^[\w\.-]+@[\w\.-]+\.\w+$', str(email)) is not None,
        "Email format is invalid"
    )
)

# Test with valid and invalid emails
test_emails = [
    "user@example.com",
    "invalid.email",
    "@nodomain.com",
    "user@domain",
    "",
    "valid.user@company.org"
]

print("\nEmail Validation Results:")
print("=" * 60)
for email in test_emails:
    is_valid, errors = email_validator.validate(email)
    status = "✓ VALID" if is_valid else "✗ INVALID"
    print(f"{status}: '{email}'")
    for error in errors:
        print(f"  - {error}")

---

## Part 4: NumPy Fundamentals & Vectorization

### Solution 4.1: Vectorize Data Processing

In [None]:
import numpy as np
import time

# Generate sample data
np.random.seed(42)
data = np.random.randn(1000000)

def normalize_loop(arr: np.ndarray) -> np.ndarray:
    """
    Normalize array to range [0, 1] using a loop.
    Formula: (x - min) / (max - min)
    """
    min_val = arr.min()
    max_val = arr.max()
    result = np.zeros_like(arr)
    
    for i in range(len(arr)):
        result[i] = (arr[i] - min_val) / (max_val - min_val)
    
    return result

def normalize_vectorized(arr: np.ndarray) -> np.ndarray:
    """
    Normalize array to range [0, 1] using vectorized operations.
    """
    min_val = arr.min()
    max_val = arr.max()
    return (arr - min_val) / (max_val - min_val)

# Compare performance
print("Performance Comparison:")
print("=" * 60)

# Test with smaller array for loop version (to avoid long wait)
small_data = data[:10000]

# Loop version
start = time.time()
result_loop = normalize_loop(small_data)
time_loop = time.time() - start
print(f"Loop version (10k elements): {time_loop:.4f} seconds")

# Vectorized version (small)
start = time.time()
result_vec_small = normalize_vectorized(small_data)
time_vec_small = time.time() - start
print(f"Vectorized version (10k elements): {time_vec_small:.6f} seconds")
print(f"Speedup (small): {time_loop / time_vec_small:.1f}x faster")

# Vectorized version (full)
start = time.time()
result_vec = normalize_vectorized(data)
time_vec = time.time() - start
print(f"\nVectorized version (1M elements): {time_vec:.6f} seconds")

# Verify results are the same
assert np.allclose(result_loop, result_vec_small), "Results don't match!"
print("\n✓ Results verified: Loop and vectorized versions produce same output")

### Solution 4.2: Statistical Analysis with NumPy

In [None]:
from typing import Dict

def compute_statistics(arr: np.ndarray) -> Dict[str, float]:
    """
    Compute comprehensive statistics for an array.
    
    Returns dict with: mean, median, std, min, max, q25, q75
    """
    return {
        'mean': float(np.mean(arr)),
        'median': float(np.median(arr)),
        'std': float(np.std(arr)),
        'min': float(np.min(arr)),
        'max': float(np.max(arr)),
        'q25': float(np.percentile(arr, 25)),
        'q75': float(np.percentile(arr, 75)),
        'variance': float(np.var(arr)),
        'range': float(np.ptp(arr))  # peak-to-peak (max - min)
    }

# Test with sample data
test_data = np.random.randn(1000)
stats = compute_statistics(test_data)

print("Statistical Summary:")
print("=" * 60)
for key, value in stats.items():
    print(f"{key:12s}: {value:10.4f}")

---

## Part 5: Pandas for Data Processing

### Solution 5.1: Load and Explore Data

In [None]:
import pandas as pd
from typing import Any

# Create sample dataset
np.random.seed(42)
data = {
    'user_id': range(1, 101),
    'age': np.random.randint(18, 70, 100),
    'income': np.random.randint(20000, 150000, 100),
    'signup_date': pd.date_range('2023-01-01', periods=100),
    'is_active': np.random.choice([True, False], 100),
    'total_purchases': np.random.randint(0, 50, 100)
}
df = pd.DataFrame(data)

# Introduce some missing values
df.loc[np.random.choice(df.index, 10, replace=False), 'income'] = np.nan

def explore_dataframe(df: pd.DataFrame) -> Dict[str, Any]:
    """
    Generate comprehensive exploration report for a DataFrame.
    
    Returns dict with:
    - shape: (rows, columns)
    - dtypes: dict of column types
    - missing: dict of missing value counts per column
    - numeric_summary: summary stats for numeric columns
    """
    return {
        'shape': df.shape,
        'dtypes': df.dtypes.to_dict(),
        'missing': df.isnull().sum().to_dict(),
        'missing_pct': (df.isnull().sum() / len(df) * 100).to_dict(),
        'numeric_summary': df.describe().to_dict(),
        'memory_usage': df.memory_usage(deep=True).sum(),
        'duplicate_rows': df.duplicated().sum()
    }

# Test the function
report = explore_dataframe(df)

print("DataFrame Exploration Report:")
print("=" * 60)
print(f"Shape: {report['shape']}")
print(f"\nData Types:")
for col, dtype in report['dtypes'].items():
    print(f"  {col:20s}: {dtype}")

print(f"\nMissing Values:")
for col, count in report['missing'].items():
    pct = report['missing_pct'][col]
    if count > 0:
        print(f"  {col:20s}: {count:3d} ({pct:.1f}%)")

print(f"\nMemory Usage: {report['memory_usage'] / 1024:.2f} KB")
print(f"Duplicate Rows: {report['duplicate_rows']}")

print("\nFirst 5 rows:")
print(df.head())

### Solution 5.2: Clean and Transform Data

In [None]:
class DataCleaner:
    """Handles data cleaning operations."""
    
    @staticmethod
    def handle_missing_values(df: pd.DataFrame, strategy: str = 'mean') -> pd.DataFrame:
        """
        Handle missing values in DataFrame.
        
        Args:
            df: Input DataFrame
            strategy: 'mean', 'median', 'mode', or 'drop'
        
        Returns:
            Cleaned DataFrame
        """
        df_clean = df.copy()
        
        if strategy == 'drop':
            df_clean = df_clean.dropna()
        else:
            numeric_cols = df_clean.select_dtypes(include=[np.number]).columns
            
            for col in numeric_cols:
                if df_clean[col].isnull().any():
                    if strategy == 'mean':
                        df_clean[col].fillna(df_clean[col].mean(), inplace=True)
                    elif strategy == 'median':
                        df_clean[col].fillna(df_clean[col].median(), inplace=True)
                    elif strategy == 'mode':
                        df_clean[col].fillna(df_clean[col].mode()[0], inplace=True)
        
        return df_clean
    
    @staticmethod
    def remove_outliers(df: pd.DataFrame, column: str, n_std: float = 3.0) -> pd.DataFrame:
        """
        Remove outliers from a specific column using standard deviation method.
        
        Args:
            df: Input DataFrame
            column: Column name to check for outliers
            n_std: Number of standard deviations for outlier threshold
        
        Returns:
            DataFrame with outliers removed
        """
        df_clean = df.copy()
        
        mean = df_clean[column].mean()
        std = df_clean[column].std()
        
        lower_bound = mean - n_std * std
        upper_bound = mean + n_std * std
        
        outliers_mask = (df_clean[column] >= lower_bound) & (df_clean[column] <= upper_bound)
        outliers_removed = len(df_clean) - outliers_mask.sum()
        
        print(f"Removed {outliers_removed} outliers from '{column}' column")
        
        return df_clean[outliers_mask]
    
    @staticmethod
    def create_features(df: pd.DataFrame) -> pd.DataFrame:
        """
        Create derived features from existing columns.
        """
        df_featured = df.copy()
        
        # Calculate days since signup
        df_featured['days_since_signup'] = (pd.Timestamp.now() - df_featured['signup_date']).dt.days
        
        # Purchase frequency (avoid division by zero)
        df_featured['purchase_frequency'] = df_featured['total_purchases'] / (df_featured['days_since_signup'] + 1)
        
        # Income bracket
        df_featured['income_bracket'] = pd.cut(
            df_featured['income'],
            bins=[0, 40000, 80000, float('inf')],
            labels=['low', 'medium', 'high']
        )
        
        # Age group
        df_featured['age_group'] = pd.cut(
            df_featured['age'],
            bins=[0, 30, 45, 60, float('inf')],
            labels=['18-30', '31-45', '46-60', '60+']
        )
        
        # Customer value score (simple example)
        df_featured['customer_value_score'] = (
            df_featured['total_purchases'] * 10 +
            df_featured['is_active'].astype(int) * 50
        )
        
        return df_featured

# Test the data cleaner
print("Original DataFrame:")
print(f"Shape: {df.shape}")
print(f"Missing values in income: {df['income'].isnull().sum()}")

cleaner = DataCleaner()
df_cleaned = cleaner.handle_missing_values(df, strategy='median')
print(f"\nAfter handling missing values:")
print(f"Missing values in income: {df_cleaned['income'].isnull().sum()}")

df_featured = cleaner.create_features(df_cleaned)
print(f"\nAfter feature engineering:")
print(f"New columns: {[col for col in df_featured.columns if col not in df.columns]}")
print("\nSample of enhanced data:")
print(df_featured[['user_id', 'age', 'age_group', 'income', 'income_bracket', 'purchase_frequency']].head())

---

## Part 6: Data Validation & Logging

### Solution 6.1: Implement Pipeline Validation

In [None]:
from typing import Callable, Dict, List
import logging

class DataQualityChecker:
    """Validates data quality in a pipeline."""
    
    def __init__(self):
        self.logger = logging.getLogger(self.__class__.__name__)
        self.validation_results = []
    
    def check_missing_values(self, df: pd.DataFrame, max_missing_pct: float = 0.1) -> bool:
        """
        Check if missing values are within acceptable threshold.
        """
        missing_pct = df.isnull().sum() / len(df)
        failed_columns = missing_pct[missing_pct > max_missing_pct]
        
        if len(failed_columns) > 0:
            self.logger.error(f"Missing value threshold exceeded in columns: {failed_columns.to_dict()}")
            self.validation_results.append({
                'check': 'missing_values',
                'passed': False,
                'details': failed_columns.to_dict()
            })
            return False
        
        self.logger.info("Missing values check passed")
        self.validation_results.append({
            'check': 'missing_values',
            'passed': True,
            'details': 'All columns within threshold'
        })
        return True
    
    def check_schema(self, df: pd.DataFrame, expected_columns: List[str]) -> bool:
        """
        Check if DataFrame has expected columns.
        """
        actual_columns = set(df.columns)
        expected_columns_set = set(expected_columns)
        
        missing_columns = expected_columns_set - actual_columns
        extra_columns = actual_columns - expected_columns_set
        
        if missing_columns or extra_columns:
            self.logger.error(f"Schema mismatch - Missing: {missing_columns}, Extra: {extra_columns}")
            self.validation_results.append({
                'check': 'schema',
                'passed': False,
                'details': {'missing': list(missing_columns), 'extra': list(extra_columns)}
            })
            return False
        
        self.logger.info("Schema check passed")
        self.validation_results.append({
            'check': 'schema',
            'passed': True,
            'details': 'All expected columns present'
        })
        return True
    
    def check_data_types(self, df: pd.DataFrame, expected_types: Dict[str, str]) -> bool:
        """
        Check if columns have expected data types.
        """
        type_mismatches = {}
        
        for col, expected_type in expected_types.items():
            if col not in df.columns:
                continue
            
            actual_type = str(df[col].dtype)
            if expected_type not in actual_type:
                type_mismatches[col] = {'expected': expected_type, 'actual': actual_type}
        
        if type_mismatches:
            self.logger.error(f"Data type mismatches: {type_mismatches}")
            self.validation_results.append({
                'check': 'data_types',
                'passed': False,
                'details': type_mismatches
            })
            return False
        
        self.logger.info("Data type check passed")
        self.validation_results.append({
            'check': 'data_types',
            'passed': True,
            'details': 'All data types match'
        })
        return True
    
    def check_value_ranges(self, df: pd.DataFrame, column: str, min_val: float, max_val: float) -> bool:
        """
        Check if values in a column are within expected range.
        """
        if column not in df.columns:
            self.logger.error(f"Column '{column}' not found")
            return False
        
        out_of_range = df[(df[column] < min_val) | (df[column] > max_val)]
        
        if len(out_of_range) > 0:
            self.logger.error(f"{len(out_of_range)} values out of range [{min_val}, {max_val}] in '{column}'")
            self.validation_results.append({
                'check': f'value_range_{column}',
                'passed': False,
                'details': f'{len(out_of_range)} values out of range'
            })
            return False
        
        self.logger.info(f"Value range check passed for '{column}'")
        self.validation_results.append({
            'check': f'value_range_{column}',
            'passed': True,
            'details': 'All values within range'
        })
        return True
    
    def get_validation_report(self) -> Dict[str, Any]:
        """
        Generate a comprehensive validation report.
        """
        total_checks = len(self.validation_results)
        passed_checks = sum(1 for r in self.validation_results if r['passed'])
        
        return {
            'total_checks': total_checks,
            'passed': passed_checks,
            'failed': total_checks - passed_checks,
            'success_rate': (passed_checks / total_checks * 100) if total_checks > 0 else 0,
            'results': self.validation_results
        }

# Test the quality checker
print("Data Quality Validation:")
print("=" * 60)

qc = DataQualityChecker()

# Run various checks
qc.check_schema(df_featured, ['user_id', 'age', 'income', 'signup_date'])
qc.check_data_types(df_featured, {'user_id': 'int', 'age': 'int', 'income': 'float'})
qc.check_value_ranges(df_featured, 'age', 18, 70)
qc.check_missing_values(df_featured, max_missing_pct=0.15)

# Get report
report = qc.get_validation_report()
print(f"\nValidation Report:")
print(f"Total Checks: {report['total_checks']}")
print(f"Passed: {report['passed']}")
print(f"Failed: {report['failed']}")
print(f"Success Rate: {report['success_rate']:.1f}%")

print("\nDetailed Results:")
for result in report['results']:
    status = "✓" if result['passed'] else "✗"
    print(f"{status} {result['check']}: {result['details']}")

---

## Part 7: Week 1 Project - Production Data Pipeline

### Solution 7.1: Implement Complete Pipeline

In [None]:
from datetime import datetime
from pathlib import Path
import json

class ProductionDataPipeline:
    """
    A production-grade data pipeline with validation, logging, and metrics.
    """
    
    def __init__(self, pipeline_name: str):
        self.pipeline_name = pipeline_name
        self.logger = self._setup_logging()
        self.cleaner = DataCleaner()
        self.quality_checker = DataQualityChecker()
        self.metrics = {}
        self.start_time = None
    
    def _setup_logging(self) -> logging.Logger:
        """Set up pipeline logging."""
        logger = logging.getLogger(f"Pipeline.{self.pipeline_name}")
        logger.setLevel(logging.INFO)
        
        # Console handler
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)
        
        return logger
    
    def load_data(self, data_source: Any) -> pd.DataFrame:
        """
        Load data from source.
        """
        try:
            self.logger.info(f"Loading data from source")
            
            # Handle different source types
            if isinstance(data_source, pd.DataFrame):
                df = data_source.copy()
            elif isinstance(data_source, str):
                # Assume it's a file path
                if data_source.endswith('.csv'):
                    df = pd.read_csv(data_source)
                elif data_source.endswith('.json'):
                    df = pd.read_json(data_source)
                else:
                    raise ValueError(f"Unsupported file format: {data_source}")
            else:
                raise ValueError("Data source must be DataFrame or file path")
            
            self.logger.info(f"Loaded {len(df)} rows, {len(df.columns)} columns")
            return df
            
        except Exception as e:
            self.logger.error(f"Failed to load data: {str(e)}")
            raise
    
    def validate_raw_data(self, df: pd.DataFrame) -> bool:
        """
        Validate raw data before processing.
        """
        self.logger.info("Validating raw data")
        
        # Check for empty dataframe
        if len(df) == 0:
            self.logger.error("DataFrame is empty")
            return False
        
        # Check missing values
        self.quality_checker.check_missing_values(df, max_missing_pct=0.5)
        
        # Get validation report
        report = self.quality_checker.get_validation_report()
        
        if report['failed'] > 0:
            self.logger.warning(f"Validation had {report['failed']} failed checks")
        
        return True
    
    def clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Clean and transform data.
        """
        self.logger.info("Cleaning data")
        
        try:
            # Handle missing values
            df_clean = self.cleaner.handle_missing_values(df, strategy='median')
            self.logger.info(f"Handled missing values")
            
            # Create features (if applicable)
            if all(col in df_clean.columns for col in ['signup_date', 'total_purchases']):
                df_clean = self.cleaner.create_features(df_clean)
                self.logger.info("Created derived features")
            
            return df_clean
            
        except Exception as e:
            self.logger.error(f"Failed to clean data: {str(e)}")
            raise
    
    def compute_metrics(self, df_raw: pd.DataFrame, df_clean: pd.DataFrame) -> Dict[str, Any]:
        """
        Compute pipeline metrics.
        """
        processing_time = (datetime.now() - self.start_time).total_seconds()
        
        # Calculate metrics
        rows_processed = len(df_raw)
        rows_removed = len(df_raw) - len(df_clean)
        missing_before = df_raw.isnull().sum().sum()
        missing_after = df_clean.isnull().sum().sum()
        missing_handled = missing_before - missing_after
        
        # Data quality score (simple example)
        completeness = 1 - (df_clean.isnull().sum().sum() / (len(df_clean) * len(df_clean.columns)))
        quality_score = completeness * 100
        
        metrics = {
            'rows_processed': rows_processed,
            'rows_in_output': len(df_clean),
            'rows_removed': rows_removed,
            'columns_input': len(df_raw.columns),
            'columns_output': len(df_clean.columns),
            'missing_values_handled': missing_handled,
            'processing_time_seconds': processing_time,
            'data_quality_score': quality_score,
            'throughput_rows_per_second': rows_processed / processing_time if processing_time > 0 else 0
        }
        
        self.metrics = metrics
        self.logger.info(f"Computed metrics: quality_score={quality_score:.2f}%, processing_time={processing_time:.2f}s")
        
        return metrics
    
    def export_data(self, df: pd.DataFrame, output_path: str) -> None:
        """
        Export cleaned data to file.
        """
        try:
            self.logger.info(f"Exporting data to {output_path}")
            
            # Create directory if needed
            Path(output_path).parent.mkdir(parents=True, exist_ok=True)
            
            # Export based on file extension
            if output_path.endswith('.csv'):
                df.to_csv(output_path, index=False)
            elif output_path.endswith('.json'):
                df.to_json(output_path, orient='records', indent=2)
            elif output_path.endswith('.parquet'):
                df.to_parquet(output_path, index=False)
            else:
                # Default to CSV
                df.to_csv(output_path, index=False)
            
            self.logger.info(f"Successfully exported {len(df)} rows to {output_path}")
            
        except Exception as e:
            self.logger.error(f"Failed to export data: {str(e)}")
            raise
    
    def run(self, data_source: Any, output_path: str) -> Dict[str, Any]:
        """
        Execute the complete pipeline.
        
        Returns:
            Pipeline execution report
        """
        self.start_time = datetime.now()
        self.logger.info(f"Starting pipeline: {self.pipeline_name}")
        
        try:
            # 1. Load data
            df_raw = self.load_data(data_source)
            
            # 2. Validate raw data
            if not self.validate_raw_data(df_raw):
                raise ValueError("Raw data validation failed")
            
            # 3. Clean data
            df_clean = self.clean_data(df_raw)
            
            # 4. Compute metrics
            metrics = self.compute_metrics(df_raw, df_clean)
            
            # 5. Export data
            self.export_data(df_clean, output_path)
            
            # 6. Create execution report
            report = {
                'pipeline_name': self.pipeline_name,
                'status': 'success',
                'timestamp': datetime.now().isoformat(),
                'metrics': metrics,
                'validation_report': self.quality_checker.get_validation_report(),
                'output_path': output_path
            }
            
            self.logger.info("Pipeline completed successfully")
            return report
            
        except Exception as e:
            self.logger.error(f"Pipeline failed: {str(e)}")
            return {
                'pipeline_name': self.pipeline_name,
                'status': 'failed',
                'timestamp': datetime.now().isoformat(),
                'error': str(e)
            }

# Test the complete pipeline
print("\n" + "=" * 60)
print("PRODUCTION DATA PIPELINE EXECUTION")
print("=" * 60)

pipeline = ProductionDataPipeline("user_data_pipeline")
report = pipeline.run(df, "/tmp/cleaned_user_data.csv")

print("\n" + "=" * 60)
print("PIPELINE EXECUTION REPORT")
print("=" * 60)
print(json.dumps(report, indent=2, default=str))

---

## Summary

### What We've Built
This solution demonstrates:
- **Production-quality code** with proper OOP, typing, and error handling
- **Efficient data processing** using NumPy vectorization and Pandas
- **Comprehensive validation** with multiple quality checks
- **Detailed logging** for debugging and monitoring
- **Complete pipeline** that handles the full data lifecycle

### Key Takeaways
1. **Engineering mindset**: Think systems, not scripts
2. **Code quality**: Type hints, modularity, testability
3. **Performance**: Vectorization is crucial for large datasets
4. **Validation**: Always validate data at every stage
5. **Observability**: Log everything important

### Real-World Applications
This pipeline pattern is used in:
- ETL systems for data warehouses
- ML feature engineering pipelines
- Data quality monitoring
- Automated reporting systems

---