# Capstone Project 3: Cloud Data Pipeline Concepts

## Building Production-Ready Data Processing Patterns

This capstone project demonstrates how to structure data pipelines that could be deployed to cloud environments. We'll build a complete ETL (Extract, Transform, Load) pipeline with:

1. **Simulated cloud storage** (mimicking S3/Azure Blob/GCS patterns)
2. **Data ingestion patterns** for various file formats
3. **Data transformation** with Pandas
4. **Data quality validation**
5. **Results storage** with proper organization
6. **Pipeline orchestration** patterns

### Why This Matters

In production environments, data pipelines need to:
- Handle failures gracefully
- Be testable and maintainable
- Follow consistent patterns
- Produce auditable outputs

### Learning Objectives
- Design modular, reusable pipeline components
- Implement proper error handling and logging
- Apply data validation patterns
- Structure code for cloud deployment
- Use configuration-driven processing

**Note**: This notebook uses simulated cloud storage (local filesystem) to demonstrate patterns. The same code structure would work with real cloud SDKs (boto3, azure-storage, google-cloud-storage).

In [None]:
import os
import json
import logging
import hashlib
from pathlib import Path
from datetime import datetime, timedelta
from dataclasses import dataclass, field, asdict
from typing import Dict, List, Optional, Any, Callable, Protocol
from abc import ABC, abstractmethod
from enum import Enum
import tempfile
import shutil

import numpy as np
import pandas as pd

# Configure logging for pipeline visibility
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger('DataPipeline')

print("Cloud Data Pipeline Framework Initialized")
print(f"Pandas version: {pd.__version__}")
print(f"NumPy version: {np.__version__}")

---
## Part 1: Cloud Storage Abstraction Layer

A key pattern in cloud development is abstracting storage operations. This allows:
- Easy testing with local storage
- Switching between cloud providers
- Consistent interface across the codebase

We'll implement a storage abstraction that mimics S3-style operations.

In [None]:
class StorageBackend(Protocol):
    """
    Protocol defining the interface for storage backends.
    
    This allows for dependency injection and easy swapping
    between local and cloud storage implementations.
    """
    
    def read(self, path: str) -> bytes:
        """Read raw bytes from storage."""
        ...
    
    def write(self, path: str, data: bytes) -> None:
        """Write raw bytes to storage."""
        ...
    
    def exists(self, path: str) -> bool:
        """Check if a path exists."""
        ...
    
    def list_objects(self, prefix: str) -> List[str]:
        """List objects with given prefix."""
        ...
    
    def delete(self, path: str) -> None:
        """Delete an object."""
        ...


class LocalStorageBackend:
    """
    Local filesystem storage backend that mimics cloud storage patterns.
    
    In production, this would be replaced with:
    - S3StorageBackend (using boto3)
    - AzureBlobBackend (using azure-storage-blob)
    - GCSBackend (using google-cloud-storage)
    
    Parameters
    ----------
    base_path : str
        Base directory for storage (like an S3 bucket)
    """
    
    def __init__(self, base_path: str):
        self.base_path = Path(base_path)
        self.base_path.mkdir(parents=True, exist_ok=True)
        logger.info(f"Initialized local storage at: {self.base_path}")
    
    def _full_path(self, path: str) -> Path:
        """Get full path for a storage key."""
        return self.base_path / path
    
    def read(self, path: str) -> bytes:
        """Read raw bytes from storage."""
        full_path = self._full_path(path)
        if not full_path.exists():
            raise FileNotFoundError(f"Object not found: {path}")
        return full_path.read_bytes()
    
    def write(self, path: str, data: bytes) -> None:
        """Write raw bytes to storage."""
        full_path = self._full_path(path)
        full_path.parent.mkdir(parents=True, exist_ok=True)
        full_path.write_bytes(data)
        logger.debug(f"Written {len(data)} bytes to {path}")
    
    def exists(self, path: str) -> bool:
        """Check if a path exists."""
        return self._full_path(path).exists()
    
    def list_objects(self, prefix: str = "") -> List[str]:
        """List all objects with given prefix."""
        search_path = self._full_path(prefix)
        if search_path.is_file():
            return [prefix]
        
        results = []
        if search_path.exists():
            for item in search_path.rglob('*'):
                if item.is_file():
                    results.append(str(item.relative_to(self.base_path)))
        return sorted(results)
    
    def delete(self, path: str) -> None:
        """Delete an object."""
        full_path = self._full_path(path)
        if full_path.exists():
            full_path.unlink()
            logger.debug(f"Deleted: {path}")


class DataLake:
    """
    High-level data lake interface supporting multiple formats.
    
    Provides convenient methods for reading/writing DataFrames
    to various storage formats (CSV, Parquet, JSON).
    
    Parameters
    ----------
    storage : StorageBackend
        Backend storage implementation
    """
    
    def __init__(self, storage: StorageBackend):
        self.storage = storage
    
    def read_csv(self, path: str, **kwargs) -> pd.DataFrame:
        """Read CSV file into DataFrame."""
        data = self.storage.read(path)
        from io import BytesIO
        return pd.read_csv(BytesIO(data), **kwargs)
    
    def write_csv(self, df: pd.DataFrame, path: str, **kwargs) -> None:
        """Write DataFrame to CSV."""
        data = df.to_csv(index=False, **kwargs).encode('utf-8')
        self.storage.write(path, data)
        logger.info(f"Written CSV: {path} ({len(df)} rows)")
    
    def read_json(self, path: str, **kwargs) -> pd.DataFrame:
        """Read JSON file into DataFrame."""
        data = self.storage.read(path)
        from io import BytesIO
        return pd.read_json(BytesIO(data), **kwargs)
    
    def write_json(self, df: pd.DataFrame, path: str, **kwargs) -> None:
        """Write DataFrame to JSON."""
        data = df.to_json(orient='records', **kwargs).encode('utf-8')
        self.storage.write(path, data)
        logger.info(f"Written JSON: {path} ({len(df)} rows)")
    
    def read_parquet(self, path: str) -> pd.DataFrame:
        """Read Parquet file into DataFrame."""
        # In production with actual Parquet support
        raise NotImplementedError("Parquet requires pyarrow - use CSV for this demo")
    
    def write_metadata(self, path: str, metadata: Dict[str, Any]) -> None:
        """Write metadata JSON file."""
        data = json.dumps(metadata, indent=2, default=str).encode('utf-8')
        self.storage.write(path, data)
    
    def read_metadata(self, path: str) -> Dict[str, Any]:
        """Read metadata JSON file."""
        data = self.storage.read(path)
        return json.loads(data.decode('utf-8'))


# Create a temporary data lake for this session
TEMP_DIR = tempfile.mkdtemp(prefix='data_pipeline_')
storage = LocalStorageBackend(TEMP_DIR)
data_lake = DataLake(storage)

print(f"\nData Lake initialized at: {TEMP_DIR}")

---
## Part 2: Data Ingestion Layer

The ingestion layer handles:
- Generating/receiving raw data
- Initial data validation
- Storing in the landing zone (raw layer)

We'll simulate multiple data sources with different characteristics.

In [None]:
@dataclass
class DataSource:
    """
    Configuration for a data source.
    
    Attributes
    ----------
    name : str
        Unique identifier for the source
    source_type : str
        Type of source (api, database, file, stream)
    schema : Dict[str, str]
        Expected column names and data types
    required_columns : List[str]
        Columns that must be present
    """
    name: str
    source_type: str
    schema: Dict[str, str]
    required_columns: List[str]
    refresh_frequency: str = "daily"


class DataGenerator:
    """
    Simulates data from various sources.
    
    In production, this would be replaced with actual connectors:
    - API clients
    - Database connections
    - Message queue consumers
    - File watchers
    """
    
    def __init__(self, seed: int = 42):
        self.rng = np.random.default_rng(seed)
    
    def generate_transactions(self, n_records: int, date: datetime) -> pd.DataFrame:
        """
        Generate simulated transaction data.
        
        Parameters
        ----------
        n_records : int
            Number of records to generate
        date : datetime
            Date for the transactions
        
        Returns
        -------
        pd.DataFrame
            Simulated transaction data
        """
        products = ['Widget A', 'Widget B', 'Gadget X', 'Gadget Y', 'Service Z']
        regions = ['US-East', 'US-West', 'EU', 'APAC']
        
        # Generate timestamps throughout the day
        timestamps = [
            date + timedelta(
                hours=self.rng.integers(0, 24),
                minutes=self.rng.integers(0, 60),
                seconds=self.rng.integers(0, 60)
            )
            for _ in range(n_records)
        ]
        
        data = {
            'transaction_id': [f'TXN{date.strftime("%Y%m%d")}{i:06d}' for i in range(n_records)],
            'timestamp': timestamps,
            'customer_id': [f'CUST{self.rng.integers(1000, 9999)}' for _ in range(n_records)],
            'product': self.rng.choice(products, n_records),
            'quantity': self.rng.integers(1, 10, n_records),
            'unit_price': np.round(self.rng.uniform(10, 500, n_records), 2),
            'region': self.rng.choice(regions, n_records),
        }
        
        df = pd.DataFrame(data)
        df['total_amount'] = df['quantity'] * df['unit_price']
        
        # Add some data quality issues for realistic testing
        # 2% null values in region
        null_idx = self.rng.choice(n_records, int(n_records * 0.02), replace=False)
        df.loc[null_idx, 'region'] = None
        
        return df
    
    def generate_customer_data(self, n_customers: int) -> pd.DataFrame:
        """
        Generate simulated customer master data.
        
        Parameters
        ----------
        n_customers : int
            Number of customers
        
        Returns
        -------
        pd.DataFrame
            Customer dimension data
        """
        segments = ['Bronze', 'Silver', 'Gold', 'Platinum']
        segment_weights = [0.4, 0.35, 0.2, 0.05]
        
        data = {
            'customer_id': [f'CUST{i+1000}' for i in range(n_customers)],
            'customer_name': [f'Customer {i+1}' for i in range(n_customers)],
            'segment': self.rng.choice(segments, n_customers, p=segment_weights),
            'join_date': [
                datetime(2020, 1, 1) + timedelta(days=int(self.rng.integers(0, 1500)))
                for _ in range(n_customers)
            ],
            'lifetime_value': np.round(self.rng.exponential(500, n_customers), 2)
        }
        
        return pd.DataFrame(data)
    
    def generate_product_data(self) -> pd.DataFrame:
        """Generate product catalog data."""
        return pd.DataFrame({
            'product': ['Widget A', 'Widget B', 'Gadget X', 'Gadget Y', 'Service Z'],
            'category': ['Hardware', 'Hardware', 'Electronics', 'Electronics', 'Services'],
            'cost': [25.00, 45.00, 120.00, 85.00, 50.00],
            'active': [True, True, True, True, True]
        })


# Define data sources
SOURCES = {
    'transactions': DataSource(
        name='transactions',
        source_type='stream',
        schema={
            'transaction_id': 'string',
            'timestamp': 'datetime',
            'customer_id': 'string',
            'product': 'string',
            'quantity': 'int',
            'unit_price': 'float',
            'region': 'string',
            'total_amount': 'float'
        },
        required_columns=['transaction_id', 'timestamp', 'customer_id', 'product', 'quantity'],
        refresh_frequency='hourly'
    ),
    'customers': DataSource(
        name='customers',
        source_type='database',
        schema={
            'customer_id': 'string',
            'customer_name': 'string',
            'segment': 'string',
            'join_date': 'datetime',
            'lifetime_value': 'float'
        },
        required_columns=['customer_id', 'segment'],
        refresh_frequency='daily'
    ),
    'products': DataSource(
        name='products',
        source_type='file',
        schema={
            'product': 'string',
            'category': 'string',
            'cost': 'float',
            'active': 'bool'
        },
        required_columns=['product', 'category', 'cost'],
        refresh_frequency='weekly'
    )
}

print("Data Sources Configured:")
for name, source in SOURCES.items():
    print(f"  - {name}: {source.source_type} ({source.refresh_frequency})")

In [None]:
class Ingester:
    """
    Handles data ingestion with validation and landing zone storage.
    
    Parameters
    ----------
    data_lake : DataLake
        Data lake interface for storage
    landing_zone : str
        Path prefix for raw data landing zone
    """
    
    def __init__(self, data_lake: DataLake, landing_zone: str = "landing"):
        self.data_lake = data_lake
        self.landing_zone = landing_zone
        self.generator = DataGenerator()
    
    def _validate_schema(self, df: pd.DataFrame, source: DataSource) -> Dict[str, Any]:
        """
        Validate DataFrame against source schema.
        
        Parameters
        ----------
        df : pd.DataFrame
            Data to validate
        source : DataSource
            Source configuration with expected schema
        
        Returns
        -------
        dict
            Validation results
        """
        results = {
            'valid': True,
            'errors': [],
            'warnings': [],
            'row_count': len(df),
            'column_count': len(df.columns)
        }
        
        # Check required columns
        missing_cols = set(source.required_columns) - set(df.columns)
        if missing_cols:
            results['valid'] = False
            results['errors'].append(f"Missing required columns: {missing_cols}")
        
        # Check for unexpected columns
        extra_cols = set(df.columns) - set(source.schema.keys())
        if extra_cols:
            results['warnings'].append(f"Unexpected columns: {extra_cols}")
        
        # Check for null values in required columns
        for col in source.required_columns:
            if col in df.columns and df[col].isnull().any():
                null_count = df[col].isnull().sum()
                results['warnings'].append(f"Column '{col}' has {null_count} null values")
        
        return results
    
    def ingest(
        self, 
        source_name: str, 
        date: datetime,
        n_records: int = 1000
    ) -> Dict[str, Any]:
        """
        Ingest data from a source into the landing zone.
        
        Parameters
        ----------
        source_name : str
            Name of the data source
        date : datetime
            Date for the data batch
        n_records : int
            Number of records (for generated data)
        
        Returns
        -------
        dict
            Ingestion results including path and validation
        """
        source = SOURCES.get(source_name)
        if not source:
            raise ValueError(f"Unknown source: {source_name}")
        
        logger.info(f"Starting ingestion for {source_name}")
        
        # Generate/fetch data (in production, this would call actual sources)
        if source_name == 'transactions':
            df = self.generator.generate_transactions(n_records, date)
        elif source_name == 'customers':
            df = self.generator.generate_customer_data(n_records)
        elif source_name == 'products':
            df = self.generator.generate_product_data()
        else:
            raise ValueError(f"No generator for source: {source_name}")
        
        # Validate
        validation = self._validate_schema(df, source)
        
        if not validation['valid']:
            logger.error(f"Validation failed: {validation['errors']}")
            return {
                'success': False,
                'source': source_name,
                'validation': validation
            }
        
        # Generate storage path (partitioned by date)
        date_partition = date.strftime('%Y/%m/%d')
        filename = f"{source_name}_{date.strftime('%Y%m%d_%H%M%S')}.csv"
        path = f"{self.landing_zone}/{source_name}/{date_partition}/{filename}"
        
        # Write to landing zone
        self.data_lake.write_csv(df, path)
        
        # Write metadata
        metadata = {
            'source': source_name,
            'ingestion_time': datetime.now().isoformat(),
            'date_partition': date_partition,
            'row_count': len(df),
            'columns': list(df.columns),
            'validation': validation,
            'checksum': hashlib.md5(df.to_csv().encode()).hexdigest()
        }
        metadata_path = path.replace('.csv', '_metadata.json')
        self.data_lake.write_metadata(metadata_path, metadata)
        
        logger.info(f"Ingested {len(df)} rows to {path}")
        
        return {
            'success': True,
            'source': source_name,
            'path': path,
            'row_count': len(df),
            'validation': validation
        }


# Create ingester and ingest some data
ingester = Ingester(data_lake)

# Ingest data for multiple dates
print("\n" + "="*60)
print("DATA INGESTION")
print("="*60)

ingestion_results = []
base_date = datetime(2024, 1, 15)

# Ingest transactions for 3 days
for day_offset in range(3):
    date = base_date + timedelta(days=day_offset)
    result = ingester.ingest('transactions', date, n_records=500)
    ingestion_results.append(result)
    print(f"  Transactions {date.date()}: {result['row_count']} rows")

# Ingest dimension tables
result = ingester.ingest('customers', base_date, n_records=200)
ingestion_results.append(result)
print(f"  Customers: {result['row_count']} rows")

result = ingester.ingest('products', base_date)
ingestion_results.append(result)
print(f"  Products: {result['row_count']} rows")

# Show landing zone contents
print("\nLanding Zone Contents:")
for obj in storage.list_objects('landing'):
    if not obj.endswith('_metadata.json'):
        print(f"  {obj}")

---
## Part 3: Data Transformation Layer

The transformation layer implements business logic:
- Data cleaning and standardization
- Business rule application
- Aggregations and calculations
- Data enrichment through joins

In [None]:
@dataclass
class TransformationResult:
    """
    Result of a transformation step.
    
    Attributes
    ----------
    success : bool
        Whether the transformation succeeded
    input_rows : int
        Number of input rows
    output_rows : int
        Number of output rows
    dropped_rows : int
        Number of rows dropped during transformation
    output_path : str
        Where the output was stored
    metrics : Dict[str, Any]
        Additional metrics from the transformation
    """
    success: bool
    input_rows: int
    output_rows: int
    dropped_rows: int = 0
    output_path: str = ""
    metrics: Dict[str, Any] = field(default_factory=dict)


class Transformer:
    """
    Handles data transformations from landing zone to processed zone.
    
    Implements the transformation layer of the data pipeline.
    
    Parameters
    ----------
    data_lake : DataLake
        Data lake interface
    landing_zone : str
        Path to landing zone
    processed_zone : str
        Path to processed zone
    """
    
    def __init__(
        self, 
        data_lake: DataLake, 
        landing_zone: str = "landing",
        processed_zone: str = "processed"
    ):
        self.data_lake = data_lake
        self.landing_zone = landing_zone
        self.processed_zone = processed_zone
    
    def _load_latest_landing_data(self, source_name: str) -> pd.DataFrame:
        """
        Load all data from landing zone for a source.
        
        Parameters
        ----------
        source_name : str
            Name of the data source
        
        Returns
        -------
        pd.DataFrame
            Combined data from all partitions
        """
        prefix = f"{self.landing_zone}/{source_name}"
        files = [
            f for f in self.data_lake.storage.list_objects(prefix)
            if f.endswith('.csv') and '_metadata' not in f
        ]
        
        if not files:
            raise FileNotFoundError(f"No data files found for {source_name}")
        
        dfs = [self.data_lake.read_csv(f) for f in files]
        return pd.concat(dfs, ignore_index=True)
    
    def clean_transactions(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
        """
        Clean and standardize transaction data.
        
        Parameters
        ----------
        df : pd.DataFrame
            Raw transaction data
        
        Returns
        -------
        tuple
            (cleaned DataFrame, cleaning metrics)
        """
        metrics = {'initial_rows': len(df)}
        
        # Remove duplicates
        df = df.drop_duplicates(subset=['transaction_id'])
        metrics['after_dedup'] = len(df)
        
        # Handle missing regions (fill with 'Unknown')
        null_regions = df['region'].isnull().sum()
        df['region'] = df['region'].fillna('Unknown')
        metrics['null_regions_filled'] = null_regions
        
        # Standardize product names
        df['product'] = df['product'].str.strip().str.title()
        
        # Parse timestamp
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        
        # Add derived columns
        df['date'] = df['timestamp'].dt.date
        df['hour'] = df['timestamp'].dt.hour
        df['day_of_week'] = df['timestamp'].dt.day_name()
        df['is_weekend'] = df['timestamp'].dt.dayofweek >= 5
        
        # Remove invalid transactions (negative amounts)
        invalid_mask = df['total_amount'] < 0
        metrics['invalid_amounts_removed'] = invalid_mask.sum()
        df = df[~invalid_mask]
        
        metrics['final_rows'] = len(df)
        
        return df, metrics
    
    def enrich_transactions(
        self, 
        transactions: pd.DataFrame,
        customers: pd.DataFrame,
        products: pd.DataFrame
    ) -> pd.DataFrame:
        """
        Enrich transaction data with customer and product information.
        
        Parameters
        ----------
        transactions : pd.DataFrame
            Cleaned transaction data
        customers : pd.DataFrame
            Customer dimension data
        products : pd.DataFrame
            Product dimension data
        
        Returns
        -------
        pd.DataFrame
            Enriched transaction data
        """
        # Join with customers
        df = transactions.merge(
            customers[['customer_id', 'segment', 'lifetime_value']],
            on='customer_id',
            how='left'
        )
        
        # Fill missing customer info
        df['segment'] = df['segment'].fillna('Unknown')
        df['lifetime_value'] = df['lifetime_value'].fillna(0)
        
        # Join with products
        df = df.merge(
            products[['product', 'category', 'cost']],
            on='product',
            how='left'
        )
        
        # Calculate profit metrics
        df['profit'] = df['total_amount'] - (df['cost'].fillna(0) * df['quantity'])
        df['profit_margin'] = (df['profit'] / df['total_amount'] * 100).round(2)
        
        return df
    
    def create_daily_summary(
        self, 
        enriched_transactions: pd.DataFrame
    ) -> pd.DataFrame:
        """
        Create daily summary aggregations.
        
        Parameters
        ----------
        enriched_transactions : pd.DataFrame
            Enriched transaction data
        
        Returns
        -------
        pd.DataFrame
            Daily summary metrics
        """
        summary = enriched_transactions.groupby(['date', 'region', 'category']).agg({
            'transaction_id': 'count',
            'customer_id': 'nunique',
            'quantity': 'sum',
            'total_amount': 'sum',
            'profit': 'sum'
        }).reset_index()
        
        summary.columns = [
            'date', 'region', 'category', 
            'transaction_count', 'unique_customers',
            'total_quantity', 'revenue', 'profit'
        ]
        
        summary['avg_transaction_value'] = (summary['revenue'] / summary['transaction_count']).round(2)
        
        return summary
    
    def run_transformation_pipeline(self) -> Dict[str, TransformationResult]:
        """
        Run the complete transformation pipeline.
        
        Returns
        -------
        dict
            Results for each transformation step
        """
        results = {}
        
        logger.info("Starting transformation pipeline")
        
        # Step 1: Load raw data
        logger.info("Loading raw data from landing zone")
        raw_transactions = self._load_latest_landing_data('transactions')
        customers = self._load_latest_landing_data('customers')
        products = self._load_latest_landing_data('products')
        
        # Step 2: Clean transactions
        logger.info("Cleaning transaction data")
        cleaned_transactions, clean_metrics = self.clean_transactions(raw_transactions)
        
        clean_path = f"{self.processed_zone}/cleaned/transactions.csv"
        self.data_lake.write_csv(cleaned_transactions, clean_path)
        
        results['clean'] = TransformationResult(
            success=True,
            input_rows=clean_metrics['initial_rows'],
            output_rows=clean_metrics['final_rows'],
            dropped_rows=clean_metrics['initial_rows'] - clean_metrics['final_rows'],
            output_path=clean_path,
            metrics=clean_metrics
        )
        
        # Step 3: Enrich transactions
        logger.info("Enriching transaction data")
        enriched = self.enrich_transactions(cleaned_transactions, customers, products)
        
        enriched_path = f"{self.processed_zone}/enriched/transactions.csv"
        self.data_lake.write_csv(enriched, enriched_path)
        
        results['enrich'] = TransformationResult(
            success=True,
            input_rows=len(cleaned_transactions),
            output_rows=len(enriched),
            output_path=enriched_path,
            metrics={
                'customers_matched': (enriched['segment'] != 'Unknown').sum(),
                'products_matched': enriched['category'].notna().sum()
            }
        )
        
        # Step 4: Create aggregations
        logger.info("Creating daily summaries")
        daily_summary = self.create_daily_summary(enriched)
        
        summary_path = f"{self.processed_zone}/aggregated/daily_summary.csv"
        self.data_lake.write_csv(daily_summary, summary_path)
        
        results['aggregate'] = TransformationResult(
            success=True,
            input_rows=len(enriched),
            output_rows=len(daily_summary),
            output_path=summary_path,
            metrics={
                'total_revenue': enriched['total_amount'].sum(),
                'total_profit': enriched['profit'].sum(),
                'unique_dates': enriched['date'].nunique()
            }
        )
        
        logger.info("Transformation pipeline complete")
        return results


# Run transformations
transformer = Transformer(data_lake)

print("\n" + "="*60)
print("DATA TRANSFORMATION")
print("="*60)

transform_results = transformer.run_transformation_pipeline()

print("\nTransformation Results:")
for step, result in transform_results.items():
    print(f"\n  {step.upper()}:")
    print(f"    Input rows: {result.input_rows}")
    print(f"    Output rows: {result.output_rows}")
    if result.dropped_rows > 0:
        print(f"    Dropped rows: {result.dropped_rows}")
    print(f"    Output: {result.output_path}")

In [None]:
# View transformed data samples
print("\n" + "="*60)
print("TRANSFORMED DATA PREVIEW")
print("="*60)

# Load and display enriched transactions
enriched_df = data_lake.read_csv('processed/enriched/transactions.csv')
print(f"\nEnriched Transactions ({len(enriched_df)} rows):")
print(enriched_df.head(10).to_string())

# Load and display daily summary
summary_df = data_lake.read_csv('processed/aggregated/daily_summary.csv')
print(f"\n\nDaily Summary ({len(summary_df)} rows):")
print(summary_df.head(20).to_string())

---
## Part 4: Data Quality & Validation Layer

A critical aspect of production pipelines is continuous data quality monitoring.

In [None]:
class QualityCheckType(Enum):
    """Types of data quality checks."""
    COMPLETENESS = "completeness"
    UNIQUENESS = "uniqueness"
    VALIDITY = "validity"
    CONSISTENCY = "consistency"
    TIMELINESS = "timeliness"


@dataclass
class QualityCheck:
    """
    Definition of a data quality check.
    
    Attributes
    ----------
    name : str
        Descriptive name of the check
    check_type : QualityCheckType
        Category of quality check
    column : str
        Column to check (if applicable)
    check_fn : Callable
        Function that performs the check
    threshold : float
        Minimum acceptable score (0-1)
    """
    name: str
    check_type: QualityCheckType
    column: Optional[str]
    check_fn: Callable[[pd.DataFrame], float]
    threshold: float = 0.95


@dataclass
class QualityReport:
    """
    Results of quality checks on a dataset.
    
    Attributes
    ----------
    dataset_name : str
        Name of the checked dataset
    timestamp : datetime
        When the checks were run
    overall_score : float
        Average score across all checks
    passed : bool
        Whether all checks passed their thresholds
    check_results : List[Dict]
        Detailed results for each check
    """
    dataset_name: str
    timestamp: datetime
    overall_score: float
    passed: bool
    check_results: List[Dict[str, Any]]


class DataQualityChecker:
    """
    Performs data quality checks and generates reports.
    
    Provides a framework for defining and running quality checks
    on DataFrames, producing detailed reports.
    """
    
    def __init__(self):
        self.checks: List[QualityCheck] = []
    
    def add_check(self, check: QualityCheck) -> None:
        """Add a quality check to run."""
        self.checks.append(check)
    
    def add_completeness_check(
        self, 
        column: str, 
        threshold: float = 0.95
    ) -> None:
        """Add a check for column completeness (non-null rate)."""
        self.add_check(QualityCheck(
            name=f"Completeness: {column}",
            check_type=QualityCheckType.COMPLETENESS,
            column=column,
            check_fn=lambda df, col=column: 1 - df[col].isnull().mean(),
            threshold=threshold
        ))
    
    def add_uniqueness_check(
        self, 
        column: str, 
        threshold: float = 1.0
    ) -> None:
        """Add a check for column uniqueness."""
        self.add_check(QualityCheck(
            name=f"Uniqueness: {column}",
            check_type=QualityCheckType.UNIQUENESS,
            column=column,
            check_fn=lambda df, col=column: df[col].nunique() / len(df),
            threshold=threshold
        ))
    
    def add_range_check(
        self, 
        column: str, 
        min_val: float, 
        max_val: float,
        threshold: float = 0.99
    ) -> None:
        """Add a check for values within expected range."""
        self.add_check(QualityCheck(
            name=f"Range: {column} [{min_val}, {max_val}]",
            check_type=QualityCheckType.VALIDITY,
            column=column,
            check_fn=lambda df, col=column, mn=min_val, mx=max_val: 
                ((df[col] >= mn) & (df[col] <= mx)).mean(),
            threshold=threshold
        ))
    
    def add_custom_check(
        self,
        name: str,
        check_fn: Callable[[pd.DataFrame], float],
        check_type: QualityCheckType = QualityCheckType.VALIDITY,
        threshold: float = 0.95
    ) -> None:
        """Add a custom quality check."""
        self.add_check(QualityCheck(
            name=name,
            check_type=check_type,
            column=None,
            check_fn=check_fn,
            threshold=threshold
        ))
    
    def run_checks(self, df: pd.DataFrame, dataset_name: str) -> QualityReport:
        """
        Run all configured quality checks on a DataFrame.
        
        Parameters
        ----------
        df : pd.DataFrame
            Data to check
        dataset_name : str
            Name for the report
        
        Returns
        -------
        QualityReport
            Detailed quality report
        """
        results = []
        all_passed = True
        
        for check in self.checks:
            try:
                score = check.check_fn(df)
                passed = score >= check.threshold
                status = "PASS" if passed else "FAIL"
                
                if not passed:
                    all_passed = False
                
                results.append({
                    'check_name': check.name,
                    'check_type': check.check_type.value,
                    'score': round(score, 4),
                    'threshold': check.threshold,
                    'status': status
                })
            except Exception as e:
                results.append({
                    'check_name': check.name,
                    'check_type': check.check_type.value,
                    'score': 0,
                    'threshold': check.threshold,
                    'status': f"ERROR: {str(e)}"
                })
                all_passed = False
        
        overall_score = np.mean([r['score'] for r in results if isinstance(r['score'], (int, float))])
        
        return QualityReport(
            dataset_name=dataset_name,
            timestamp=datetime.now(),
            overall_score=round(overall_score, 4),
            passed=all_passed,
            check_results=results
        )


# Create and configure quality checker
checker = DataQualityChecker()

# Add checks for transaction data
checker.add_uniqueness_check('transaction_id', threshold=1.0)
checker.add_completeness_check('customer_id', threshold=1.0)
checker.add_completeness_check('product', threshold=1.0)
checker.add_completeness_check('region', threshold=0.95)  # Allow some missing
checker.add_range_check('quantity', 1, 100, threshold=0.99)
checker.add_range_check('total_amount', 0, 10000, threshold=0.99)
checker.add_range_check('profit_margin', -50, 100, threshold=0.95)

# Add custom business rule checks
checker.add_custom_check(
    name="Positive revenue",
    check_fn=lambda df: (df['total_amount'] > 0).mean(),
    threshold=0.99
)

checker.add_custom_check(
    name="Valid segments",
    check_fn=lambda df: df['segment'].isin(['Bronze', 'Silver', 'Gold', 'Platinum', 'Unknown']).mean(),
    threshold=1.0
)

# Run quality checks
print("\n" + "="*60)
print("DATA QUALITY REPORT")
print("="*60)

report = checker.run_checks(enriched_df, "Enriched Transactions")

print(f"\nDataset: {report.dataset_name}")
print(f"Timestamp: {report.timestamp}")
print(f"Overall Score: {report.overall_score:.2%}")
print(f"Status: {'PASSED' if report.passed else 'FAILED'}")

print("\nDetailed Results:")
print("-" * 70)
print(f"{'Check Name':<35} {'Type':<15} {'Score':<10} {'Status':<10}")
print("-" * 70)

for result in report.check_results:
    status_icon = "[OK]" if result['status'] == 'PASS' else "[X]"
    print(f"{result['check_name']:<35} {result['check_type']:<15} {result['score']:.2%}    {status_icon}")

---
## Part 5: Pipeline Orchestration

Finally, we'll create an orchestration layer that ties everything together and provides:
- End-to-end pipeline execution
- Error handling and recovery
- Execution logging and metrics

In [None]:
class PipelineStatus(Enum):
    """Status of a pipeline run."""
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    PARTIAL = "partial"  # Some steps failed


@dataclass
class PipelineStep:
    """
    Definition of a pipeline step.
    
    Attributes
    ----------
    name : str
        Step identifier
    description : str
        What this step does
    execute_fn : Callable
        Function to execute
    required : bool
        Whether failure should stop the pipeline
    """
    name: str
    description: str
    execute_fn: Callable[[], Any]
    required: bool = True


@dataclass
class PipelineRun:
    """
    Record of a pipeline execution.
    
    Attributes
    ----------
    run_id : str
        Unique identifier for this run
    pipeline_name : str
        Name of the pipeline
    status : PipelineStatus
        Current status
    start_time : datetime
        When the run started
    end_time : Optional[datetime]
        When the run ended (if complete)
    step_results : Dict[str, Any]
        Results from each step
    """
    run_id: str
    pipeline_name: str
    status: PipelineStatus
    start_time: datetime
    end_time: Optional[datetime] = None
    step_results: Dict[str, Any] = field(default_factory=dict)
    errors: List[str] = field(default_factory=list)


class DataPipeline:
    """
    Orchestrates the complete data pipeline.
    
    Manages the execution of pipeline steps with proper
    error handling, logging, and state tracking.
    
    Parameters
    ----------
    name : str
        Pipeline name
    data_lake : DataLake
        Data lake interface
    """
    
    def __init__(self, name: str, data_lake: DataLake):
        self.name = name
        self.data_lake = data_lake
        self.steps: List[PipelineStep] = []
        self.runs: List[PipelineRun] = []
    
    def add_step(self, step: PipelineStep) -> None:
        """Add a step to the pipeline."""
        self.steps.append(step)
        logger.debug(f"Added step: {step.name}")
    
    def _generate_run_id(self) -> str:
        """Generate a unique run ID."""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        return f"{self.name}_{timestamp}"
    
    def run(self) -> PipelineRun:
        """
        Execute the pipeline.
        
        Returns
        -------
        PipelineRun
            Record of the pipeline execution
        """
        run = PipelineRun(
            run_id=self._generate_run_id(),
            pipeline_name=self.name,
            status=PipelineStatus.RUNNING,
            start_time=datetime.now()
        )
        
        logger.info(f"Starting pipeline run: {run.run_id}")
        print(f"\n{'='*60}")
        print(f"PIPELINE RUN: {run.run_id}")
        print(f"{'='*60}\n")
        
        all_success = True
        
        for i, step in enumerate(self.steps, 1):
            print(f"Step {i}/{len(self.steps)}: {step.name}")
            print(f"  Description: {step.description}")
            
            try:
                step_start = datetime.now()
                result = step.execute_fn()
                step_duration = (datetime.now() - step_start).total_seconds()
                
                run.step_results[step.name] = {
                    'status': 'success',
                    'duration_seconds': step_duration,
                    'result': result
                }
                print(f"  Status: SUCCESS ({step_duration:.2f}s)")
                
            except Exception as e:
                error_msg = f"{step.name}: {str(e)}"
                run.errors.append(error_msg)
                run.step_results[step.name] = {
                    'status': 'failed',
                    'error': str(e)
                }
                print(f"  Status: FAILED - {str(e)}")
                
                if step.required:
                    all_success = False
                    logger.error(f"Required step failed: {step.name}")
                    # Don't stop, continue to try other steps for partial completion
            
            print()
        
        # Determine final status
        run.end_time = datetime.now()
        
        if len(run.errors) == 0:
            run.status = PipelineStatus.SUCCESS
        elif all_success:
            run.status = PipelineStatus.PARTIAL  # Non-required steps failed
        else:
            run.status = PipelineStatus.FAILED
        
        # Store run record
        self.runs.append(run)
        
        # Save run metadata
        metadata_path = f"pipeline_runs/{run.run_id}_metadata.json"
        self.data_lake.write_metadata(metadata_path, asdict(run))
        
        duration = (run.end_time - run.start_time).total_seconds()
        
        print(f"{'='*60}")
        print(f"PIPELINE COMPLETE")
        print(f"  Status: {run.status.value.upper()}")
        print(f"  Duration: {duration:.2f} seconds")
        print(f"  Steps: {len(self.steps)} total, {len(run.errors)} failed")
        print(f"{'='*60}")
        
        return run


# Create the main pipeline
pipeline = DataPipeline("sales_etl", data_lake)

# Create fresh ingester and transformer for pipeline
pipeline_ingester = Ingester(data_lake)
pipeline_transformer = Transformer(data_lake)
pipeline_checker = DataQualityChecker()

# Configure quality checker for pipeline
pipeline_checker.add_uniqueness_check('transaction_id')
pipeline_checker.add_completeness_check('customer_id')
pipeline_checker.add_range_check('total_amount', 0, 10000)

# Define pipeline steps
processing_date = datetime(2024, 1, 20)

pipeline.add_step(PipelineStep(
    name="ingest_transactions",
    description="Ingest daily transaction data",
    execute_fn=lambda: pipeline_ingester.ingest('transactions', processing_date, n_records=300),
    required=True
))

pipeline.add_step(PipelineStep(
    name="ingest_customers",
    description="Ingest customer dimension data",
    execute_fn=lambda: pipeline_ingester.ingest('customers', processing_date, n_records=100),
    required=True
))

pipeline.add_step(PipelineStep(
    name="ingest_products",
    description="Ingest product catalog",
    execute_fn=lambda: pipeline_ingester.ingest('products', processing_date),
    required=True
))

pipeline.add_step(PipelineStep(
    name="transform_data",
    description="Run all data transformations",
    execute_fn=lambda: pipeline_transformer.run_transformation_pipeline(),
    required=True
))

pipeline.add_step(PipelineStep(
    name="quality_checks",
    description="Run data quality validation",
    execute_fn=lambda: pipeline_checker.run_checks(
        data_lake.read_csv('processed/enriched/transactions.csv'),
        "Final Output"
    ),
    required=False  # Quality check failure shouldn't stop pipeline
))

# Run the pipeline
run_result = pipeline.run()

In [None]:
# Final summary and data lake contents
print("\n" + "="*60)
print("FINAL DATA LAKE CONTENTS")
print("="*60)

all_objects = storage.list_objects()

# Group by zone
zones = {}
for obj in all_objects:
    zone = obj.split('/')[0] if '/' in obj else 'root'
    if zone not in zones:
        zones[zone] = []
    zones[zone].append(obj)

for zone, files in sorted(zones.items()):
    print(f"\n{zone.upper()}/")
    for f in files:
        # Get file size
        full_path = storage._full_path(f)
        size = full_path.stat().st_size if full_path.exists() else 0
        size_str = f"{size:,} bytes" if size < 1024 else f"{size/1024:.1f} KB"
        print(f"  {f.replace(zone + '/', '')} ({size_str})")

In [None]:
# Cleanup temporary directory
print("\n" + "="*60)
print("CLEANUP")
print("="*60)

# Note: In a real scenario, you'd keep this data
# For demo purposes, we can clean up
try:
    shutil.rmtree(TEMP_DIR)
    print(f"Cleaned up temporary directory: {TEMP_DIR}")
except Exception as e:
    print(f"Cleanup note: {e}")

print("\nPipeline demonstration complete!")

---
## Summary & Production Considerations

This capstone project demonstrated key patterns for building data pipelines:

### Patterns Implemented

1. **Storage Abstraction**
   - Protocol-based interface for storage backends
   - Easy switching between local and cloud storage
   - Consistent API regardless of backend

2. **Data Lake Zones**
   - Landing zone: Raw, immutable data
   - Processed zone: Cleaned and transformed data
   - Aggregated zone: Business-ready summaries

3. **Data Quality**
   - Configurable quality checks
   - Automated validation reports
   - Threshold-based pass/fail criteria

4. **Pipeline Orchestration**
   - Step-based execution
   - Error handling and recovery
   - Execution logging and metrics

### Production Enhancements

For production deployment, you would add:

```python
# 1. Use actual cloud storage
import boto3
class S3StorageBackend(StorageBackend):
    def __init__(self, bucket: str):
        self.s3 = boto3.client('s3')
        self.bucket = bucket
    # ... implement methods

# 2. Use orchestration tools
# - Apache Airflow for scheduling
# - AWS Step Functions for serverless
# - Prefect or Dagster for modern pipelines

# 3. Add monitoring
# - CloudWatch/DataDog metrics
# - Alerting on failures
# - Dashboard for pipeline health

# 4. Implement partitioning
# - Time-based partitions (year/month/day)
# - Efficient incremental processing

# 5. Add data catalog
# - AWS Glue Catalog
# - Schema registry
# - Data lineage tracking
```

### Key Takeaways

1. **Abstraction enables flexibility**: The storage abstraction allows the same code to work locally and in the cloud
2. **Data quality is critical**: Automated checks catch issues before they propagate
3. **Idempotency matters**: Pipelines should be re-runnable without side effects
4. **Logging and metrics**: Essential for debugging and monitoring
5. **Error handling**: Graceful degradation vs. fail-fast depends on use case

In [None]:
print("="*60)
print("     CLOUD DATA PIPELINE CAPSTONE COMPLETE!")
print("="*60)
print("\nYou have learned to:")
print("  [x] Create storage abstraction layers")
print("  [x] Design data ingestion patterns")
print("  [x] Implement ETL transformations")
print("  [x] Build data quality frameworks")
print("  [x] Orchestrate end-to-end pipelines")
print("  [x] Structure code for production deployment")